1use super::def::*;
2use super::output_request::*;
3use super::processor::*;
4use crate::base::*;
5use crate::non::NONRequestorHelper;
6use crate::requestor::*;
7use crate::stack::SharedObjectStackDecID;
8use cyfs_base::*;
9
10use http_types::{Method, Request, Response, Url};
11use std::sync::Arc;
12
13#[derive(Clone)]
14pub struct GlobalStateRequestor {
15 category: GlobalStateCategory,
16 dec_id: Option<SharedObjectStackDecID>,
17 requestor: HttpRequestorRef,
18 service_url: Url,
19}
20
21impl GlobalStateRequestor {
22 pub fn new_root_state(
23 dec_id: Option<SharedObjectStackDecID>,
24 requestor: HttpRequestorRef,
25 ) -> Self {
26 Self::new(GlobalStateCategory::RootState, dec_id, requestor)
27 }
28
29 pub fn new_local_cache(
30 dec_id: Option<SharedObjectStackDecID>,
31 requestor: HttpRequestorRef,
32 ) -> Self {
33 Self::new(GlobalStateCategory::LocalCache, dec_id, requestor)
34 }
35
36 pub fn new(
37 category: GlobalStateCategory,
38 dec_id: Option<SharedObjectStackDecID>,
39 requestor: HttpRequestorRef,
40 ) -> Self {
41 let addr = requestor.remote_addr();
42
43 let url = format!("http://{}/{}/", addr, category.as_str());
44 let url = Url::parse(&url).unwrap();
45
46 let ret = Self {
47 category,
48 dec_id,
49 requestor,
50 service_url: url,
51 };
52
53 ret
54 }
55
56 pub fn category(&self) -> &GlobalStateCategory {
57 &self.category
58 }
59
60 pub fn into_processor(self) -> GlobalStateOutputProcessorRef {
61 Arc::new(Box::new(self))
62 }
63
64 pub fn clone_processor(&self) -> GlobalStateOutputProcessorRef {
65 self.clone().into_processor()
66 }
67
68 fn encode_common_headers(
70 &self,
71 action: RootStateAction,
72 com_req: &RootStateOutputRequestCommon,
73 http_req: &mut Request,
74 ) {
75 if let Some(dec_id) = &com_req.dec_id {
76 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
77 } else if let Some(dec_id) = &self.dec_id {
78 if let Some(dec_id) = dec_id.get() {
79 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
80 }
81 }
82
83 if let Some(target_dec_id) = &com_req.target_dec_id {
84 http_req.insert_header(cyfs_base::CYFS_TARGET_DEC_ID, target_dec_id.to_string());
85 }
86
87 if let Some(target) = &com_req.target {
88 http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
89 }
90
91 http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
92
93 http_req.insert_header(cyfs_base::CYFS_ROOT_STATE_ACTION, action.to_string());
94 }
95
96 fn encode_get_current_root_request(
98 &self,
99 req: &RootStateGetCurrentRootOutputRequest,
100 ) -> Request {
101 let url = self.service_url.join("root").unwrap();
102 let mut http_req = Request::new(Method::Post, url);
103 self.encode_common_headers(RootStateAction::GetCurrentRoot, &req.common, &mut http_req);
104
105 http_req.set_body(req.encode_string());
106 http_req
107 }
108
109 async fn get_current_root(
110 &self,
111 req: RootStateGetCurrentRootOutputRequest,
112 ) -> BuckyResult<RootStateGetCurrentRootOutputResponse> {
113 let http_req = self.encode_get_current_root_request(&req);
114 let mut resp = self.requestor.request(http_req).await?;
115
116 if resp.status().is_success() {
117 let resp: RootStateGetCurrentRootOutputResponse =
118 RequestorHelper::decode_json_body(&mut resp).await?;
119 info!(
120 "get current root from root state success: root={}",
121 resp.root
122 );
123 Ok(resp)
124 } else {
125 let e = RequestorHelper::error_from_resp(&mut resp).await;
126 error!("get current root from root state error! {}", e);
127 Err(e)
128 }
129 }
130
131 fn encode_create_op_env_request(&self, req: &RootStateCreateOpEnvOutputRequest) -> Request {
133 let url = self.service_url.join("op-env").unwrap();
134
135 let mut http_req = Request::new(Method::Post, url);
136 self.encode_common_headers(RootStateAction::CreateOpEnv, &req.common, &mut http_req);
137
138 http_req.set_body(req.encode_string());
139
140 http_req
141 }
142
143 async fn create_op_env(
144 &self,
145 req: RootStateCreateOpEnvOutputRequest,
146 ) -> BuckyResult<RootStateCreateOpEnvOutputResponse> {
147 let http_req = self.encode_create_op_env_request(&req);
148
149 let mut resp = self.requestor.request(http_req).await?;
150
151 if resp.status().is_success() {
152 let ret: RootStateCreateOpEnvOutputResponse =
153 RequestorHelper::decode_json_body(&mut resp).await?;
154 info!("create op_env from root state success: sid={}", ret.sid);
155 Ok(ret)
156 } else {
157 let e = RequestorHelper::error_from_resp(&mut resp).await;
158 error!("create op_env from root state error! {}", e);
159 Err(e)
160 }
161 }
162}
163
164#[async_trait::async_trait]
165impl GlobalStateOutputProcessor for GlobalStateRequestor {
166 fn get_category(&self) -> GlobalStateCategory {
167 self.category
168 }
169
170 async fn get_current_root(
171 &self,
172 req: RootStateGetCurrentRootOutputRequest,
173 ) -> BuckyResult<RootStateGetCurrentRootOutputResponse> {
174 GlobalStateRequestor::get_current_root(&self, req).await
175 }
176
177 async fn create_op_env(
178 &self,
179 req: RootStateCreateOpEnvOutputRequest,
180 ) -> BuckyResult<OpEnvOutputProcessorRef> {
181 let op_env_type = req.op_env_type.clone();
182 let resp = GlobalStateRequestor::create_op_env(&self, req).await?;
183
184 let requestor = OpEnvRequestor::new(
185 self.category.clone(),
186 op_env_type,
187 resp.sid,
188 self.dec_id.clone(),
189 self.requestor.clone(),
190 );
191 Ok(requestor.into_processor())
192 }
193}
194
195#[derive(Clone)]
196pub struct OpEnvRequestor {
197 category: GlobalStateCategory,
198 op_env_type: ObjectMapOpEnvType,
199 sid: u64,
200
201 dec_id: Option<SharedObjectStackDecID>,
202 requestor: HttpRequestorRef,
203 service_url: Url,
204}
205
206impl OpEnvRequestor {
207 pub fn new(
208 category: GlobalStateCategory,
209 op_env_type: ObjectMapOpEnvType,
210 sid: u64,
211 dec_id: Option<SharedObjectStackDecID>,
212 requestor: HttpRequestorRef,
213 ) -> Self {
214 assert!(sid > 0);
215
216 let addr = requestor.remote_addr();
217
218 let url = format!("http://{}/{}/op-env/", addr, category.as_str());
219 let url = Url::parse(&url).unwrap();
220
221 let ret = Self {
222 category,
223 op_env_type,
224 sid,
225 dec_id,
226 requestor,
227 service_url: url,
228 };
229
230 ret
231 }
232
233 pub fn category(&self) -> &GlobalStateCategory {
234 &self.category
235 }
236
237 pub fn into_processor(self) -> OpEnvOutputProcessorRef {
238 Arc::new(Box::new(self))
239 }
240
241 pub fn clone_processor(&self) -> OpEnvOutputProcessorRef {
242 self.clone().into_processor()
243 }
244
245 fn encode_common_headers(
246 &self,
247 action: OpEnvAction,
248 com_req: &OpEnvOutputRequestCommon,
249 http_req: &mut Request,
250 ) {
251 if let Some(dec_id) = &com_req.dec_id {
252 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
253 } else if let Some(dec_id) = &self.dec_id {
254 if let Some(dec_id) = dec_id.get() {
255 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
256 }
257 }
258
259 http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
260
261 http_req.insert_header(cyfs_base::CYFS_OP_ENV_ACTION, action.to_string());
262
263 if let Some(target_dec_id) = &com_req.target_dec_id {
264 http_req.insert_header(cyfs_base::CYFS_TARGET_DEC_ID, target_dec_id.to_string());
265 }
266
267 if let Some(target) = &com_req.target {
268 http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
269 }
270
271 if com_req.sid > 0 {
272 http_req.insert_header(cyfs_base::CYFS_OP_ENV_SID, com_req.sid.to_string());
273 } else {
274 http_req.insert_header(cyfs_base::CYFS_OP_ENV_SID, self.sid.to_string());
275 }
276 }
277
278 fn encode_load_request(&self, req: &OpEnvLoadOutputRequest) -> Request {
281 let url = self.service_url.join("init/target").unwrap();
282
283 let mut http_req = Request::new(Method::Post, url);
284 self.encode_common_headers(OpEnvAction::Load, &req.common, &mut http_req);
285
286 http_req.set_body(req.encode_string());
287
288 http_req
289 }
290
291 async fn load(&self, req: OpEnvLoadOutputRequest) -> BuckyResult<()> {
292 if self.op_env_type != ObjectMapOpEnvType::Single
293 && self.op_env_type != ObjectMapOpEnvType::IsolatePath
294 {
295 let msg = format!(
296 "load method only valid for single_op_env and isolate_path_op_env! sid={}",
297 self.sid
298 );
299 error!("{}", msg);
300 return Err(BuckyError::new(BuckyErrorCode::UnSupport, msg));
301 }
302
303 let http_req = self.encode_load_request(&req);
304 let mut resp = self.requestor.request(http_req).await?;
305
306 if resp.status().is_success() {
307 info!(
308 "load objectmap for single_op_env success: target={}, sid={}",
309 req.target, self.sid,
310 );
311 Ok(())
312 } else {
313 let e = RequestorHelper::error_from_resp(&mut resp).await;
314 error!(
315 "load objectmap for single_op_env error! target={}, sid={}, {}",
316 req.target, self.sid, e
317 );
318 Err(e)
319 }
320 }
321
322 fn encode_load_by_path_request(&self, req: &OpEnvLoadByPathOutputRequest) -> Request {
324 let url = self.service_url.join("init/path").unwrap();
325
326 let mut http_req = Request::new(Method::Post, url);
327 self.encode_common_headers(OpEnvAction::LoadByPath, &req.common, &mut http_req);
328
329 http_req.set_body(req.encode_string());
330
331 http_req
332 }
333
334 async fn load_by_path(&self, req: OpEnvLoadByPathOutputRequest) -> BuckyResult<()> {
335 if self.op_env_type != ObjectMapOpEnvType::Single
336 && self.op_env_type != ObjectMapOpEnvType::IsolatePath
337 {
338 let msg = format!(
339 "load_by_path method only valid for single_op_env and isolate_path_op_env! sid={}",
340 self.sid
341 );
342 error!("{}", msg);
343 return Err(BuckyError::new(BuckyErrorCode::UnSupport, msg));
344 }
345
346 let http_req = self.encode_load_by_path_request(&req);
347 let mut resp = self.requestor.request(http_req).await?;
348
349 if resp.status().is_success() {
350 info!(
351 "load_by_path for single_op_env success: path={}, sid={}",
352 req.path, self.sid,
353 );
354 Ok(())
355 } else {
356 let e = RequestorHelper::error_from_resp(&mut resp).await;
357 error!(
358 "load_by_path for single_op_env error! path={}, sid={}, {}",
359 req.path, self.sid, e
360 );
361 Err(e)
362 }
363 }
364
365 fn encode_create_new_request(&self, req: &OpEnvCreateNewOutputRequest) -> Request {
367 let url = self.service_url.join("init/new").unwrap();
368
369 let mut http_req = Request::new(Method::Post, url);
370 self.encode_common_headers(OpEnvAction::CreateNew, &req.common, &mut http_req);
371
372 http_req.set_body(req.encode_string());
373
374 http_req
375 }
376
377 async fn create_new(&self, req: OpEnvCreateNewOutputRequest) -> BuckyResult<()> {
378 let http_req = self.encode_create_new_request(&req);
379 let mut resp = self.requestor.request(http_req).await?;
380
381 if resp.status().is_success() {
382 info!("create_new for op_env success: sid={}", self.sid,);
383 Ok(())
384 } else {
385 let e = RequestorHelper::error_from_resp(&mut resp).await;
386 error!("create_new for op_env error! sid={}, {}", self.sid, e);
387 Err(e)
388 }
389 }
390
391 fn encode_lock_request(&self, req: &OpEnvLockOutputRequest) -> Request {
394 let url = self.service_url.join("lock").unwrap();
395 let mut http_req = Request::new(Method::Post, url);
396 self.encode_common_headers(OpEnvAction::Lock, &req.common, &mut http_req);
397
398 http_req.set_body(req.encode_string());
399
400 http_req
401 }
402
403 async fn lock(&self, req: OpEnvLockOutputRequest) -> BuckyResult<()> {
404 if self.op_env_type != ObjectMapOpEnvType::Path {
405 let msg = format!("lock method only valid for path_op_env! sid={}", self.sid);
406 error!("{}", msg);
407 return Err(BuckyError::new(BuckyErrorCode::UnSupport, msg));
408 }
409
410 let http_req = self.encode_lock_request(&req);
411 let mut resp = self.requestor.request(http_req).await?;
412
413 if resp.status().is_success() {
414 info!(
415 "lock for path_op_env success: path_list={:?}, sid={}",
416 req.path_list, self.sid,
417 );
418 Ok(())
419 } else {
420 let e = RequestorHelper::error_from_resp(&mut resp).await;
421 error!("lock for path_op_env error! sid={}, {}", self.sid, e);
422 Err(e)
423 }
424 }
425
426 fn encode_get_current_root_request(&self, req: &OpEnvGetCurrentRootOutputRequest) -> Request {
428 let url = self.service_url.join("root").unwrap();
429 let mut http_req = Request::new(Method::Get, url);
430 self.encode_common_headers(OpEnvAction::GetCurrentRoot, &req.common, &mut http_req);
431
432 http_req
433 }
434
435 async fn get_current_root(
436 &self,
437 req: OpEnvGetCurrentRootOutputRequest,
438 ) -> BuckyResult<OpEnvGetCurrentRootOutputResponse> {
439 let http_req = self.encode_get_current_root_request(&req);
440 let mut resp = self.requestor.request(http_req).await?;
441
442 if resp.status().is_success() {
443 let resp: OpEnvGetCurrentRootOutputResponse =
444 RequestorHelper::decode_json_body(&mut resp).await?;
445
446 info!("get_current_root for op_env success: sid={}", self.sid,);
447 Ok(resp)
448 } else {
449 let e = RequestorHelper::error_from_resp(&mut resp).await;
450 error!("get_current_root for op_env error! sid={}, {}", self.sid, e);
451 Err(e)
452 }
453 }
454
455 fn encode_commit_request(&self, req: &OpEnvCommitOutputRequest) -> Request {
457 let url = self.service_url.join("transaction").unwrap();
458 let mut http_req = Request::new(Method::Post, url);
459 self.encode_common_headers(OpEnvAction::Commit, &req.common, &mut http_req);
460
461 http_req.set_body(req.encode_string());
462
463 http_req
464 }
465
466 async fn commit(
467 &self,
468 req: OpEnvCommitOutputRequest,
469 ) -> BuckyResult<OpEnvCommitOutputResponse> {
470 let http_req = self.encode_commit_request(&req);
471 let mut resp = self.requestor.request(http_req).await?;
472
473 if resp.status().is_success() {
474 let resp: OpEnvCommitOutputResponse =
475 RequestorHelper::decode_json_body(&mut resp).await?;
476
477 info!("commit for op_env success: sid={}", self.sid,);
478 Ok(resp)
479 } else {
480 let e = RequestorHelper::error_from_resp(&mut resp).await;
481 error!("commit for op_env error! sid={}, {}", self.sid, e);
482 Err(e)
483 }
484 }
485
486 fn encode_abort_request(&self, req: &OpEnvAbortOutputRequest) -> Request {
488 let url = self.service_url.join("transaction").unwrap();
489 let mut http_req = Request::new(Method::Delete, url);
490 self.encode_common_headers(OpEnvAction::Abort, &req.common, &mut http_req);
491
492 http_req.set_body(req.encode_string());
493
494 http_req
495 }
496
497 async fn abort(&self, req: OpEnvAbortOutputRequest) -> BuckyResult<()> {
498 let http_req = self.encode_abort_request(&req);
499 let mut resp = self.requestor.request(http_req).await?;
500
501 if resp.status().is_success() {
502 info!("abort for op_env success: sid={}", self.sid,);
503 Ok(())
504 } else {
505 let e = RequestorHelper::error_from_resp(&mut resp).await;
506 error!("abort for op_env error! sid={}, {}", self.sid, e);
507 Err(e)
508 }
509 }
510
511 fn encode_metadata_request(&self, req: &OpEnvMetadataOutputRequest) -> Request {
513 let url = self.service_url.join("metadata").unwrap();
514 let mut http_req = Request::new(Method::Get, url);
515 self.encode_common_headers(OpEnvAction::Metadata, &req.common, &mut http_req);
516 RequestorHelper::encode_opt_header_with_encoding(
517 &mut http_req,
518 cyfs_base::CYFS_OP_ENV_PATH,
519 req.path.as_deref(),
520 );
521
522 http_req
523 }
524
525 async fn metadata(
526 &self,
527 req: OpEnvMetadataOutputRequest,
528 ) -> BuckyResult<OpEnvMetadataOutputResponse> {
529 let http_req = self.encode_metadata_request(&req);
530 let mut resp = self.requestor.request(http_req).await?;
531
532 if resp.status().is_success() {
533 let resp: OpEnvMetadataOutputResponse =
534 RequestorHelper::decode_json_body(&mut resp).await?;
535 info!(
536 "get metadata of op_env success: sid={}, resp={:?}",
537 self.sid, resp
538 );
539 Ok(resp)
540 } else {
541 let e = RequestorHelper::error_from_resp(&mut resp).await;
542 error!("get metadata of op_env error! sid={}, {}", self.sid, e);
543 Err(e)
544 }
545 }
546
547 fn encode_get_by_key_request(&self, req: &OpEnvGetByKeyOutputRequest) -> Request {
549 let url = self.service_url.join("map").unwrap();
550 let mut http_req = Request::new(Method::Get, url);
551 self.encode_common_headers(OpEnvAction::GetByKey, &req.common, &mut http_req);
552
553 RequestorHelper::encode_opt_header_with_encoding(
554 &mut http_req,
555 cyfs_base::CYFS_OP_ENV_PATH,
556 req.path.as_deref(),
557 );
558 RequestorHelper::encode_header_with_encoding(
559 &mut http_req,
560 cyfs_base::CYFS_OP_ENV_KEY,
561 &req.key,
562 );
563
564 http_req
567 }
568
569 async fn get_by_key(
570 &self,
571 req: OpEnvGetByKeyOutputRequest,
572 ) -> BuckyResult<OpEnvGetByKeyOutputResponse> {
573 let http_req = self.encode_get_by_key_request(&req);
574 let mut resp = self.requestor.request(http_req).await?;
575
576 if resp.status().is_success() {
577 let resp: OpEnvGetByKeyOutputResponse =
578 RequestorHelper::decode_json_body(&mut resp).await?;
579
580 info!("get_by_key for op_env success: sid={}", self.sid,);
581 Ok(resp)
582 } else {
583 let e = RequestorHelper::error_from_resp(&mut resp).await;
584 error!("get_by_key for op_env error! sid={}, {}", self.sid, e);
585 Err(e)
586 }
587 }
588
589 fn encode_insert_with_key_request(&self, req: &OpEnvInsertWithKeyOutputRequest) -> Request {
591 let url = self.service_url.join("map").unwrap();
592 let mut http_req = Request::new(Method::Post, url);
593 self.encode_common_headers(OpEnvAction::InsertWithKey, &req.common, &mut http_req);
594 http_req.set_body(req.encode_string());
595 http_req
596 }
597 async fn insert_with_key(&self, req: OpEnvInsertWithKeyOutputRequest) -> BuckyResult<()> {
598 let http_req = self.encode_insert_with_key_request(&req);
599 let mut resp = self.requestor.request(http_req).await?;
600 if resp.status().is_success() {
601 info!("insert_with_key for op_env success: sid={}", self.sid,);
602 Ok(())
603 } else {
604 let e = RequestorHelper::error_from_resp(&mut resp).await;
605 error!("insert_with_key for op_env error! sid={}, {}", self.sid, e);
606 Err(e)
607 }
608 }
609
610 fn encode_set_with_key_request(&self, req: &OpEnvSetWithKeyOutputRequest) -> Request {
612 let url = self.service_url.join("map").unwrap();
613 let mut http_req = Request::new(Method::Put, url);
614 self.encode_common_headers(OpEnvAction::SetWithKey, &req.common, &mut http_req);
615 http_req.set_body(req.encode_string());
616 http_req
617 }
618
619 async fn set_with_key(
620 &self,
621 req: OpEnvSetWithKeyOutputRequest,
622 ) -> BuckyResult<OpEnvSetWithKeyOutputResponse> {
623 let http_req = self.encode_set_with_key_request(&req);
624 let mut resp = self.requestor.request(http_req).await?;
625 if resp.status().is_success() {
626 let resp: OpEnvSetWithKeyOutputResponse =
627 RequestorHelper::decode_json_body(&mut resp).await?;
628 info!("set_with_key for op_env success: sid={}", self.sid,);
629 Ok(resp)
630 } else {
631 let e = RequestorHelper::error_from_resp(&mut resp).await;
632 error!("set_with_key for op_env error! sid={}, {}", self.sid, e);
633 Err(e)
634 }
635 }
636
637 fn encode_remove_with_key_request(&self, req: &OpEnvRemoveWithKeyOutputRequest) -> Request {
639 let url = self.service_url.join("map").unwrap();
640 let mut http_req = Request::new(Method::Delete, url);
641 self.encode_common_headers(OpEnvAction::RemoveWithKey, &req.common, &mut http_req);
642 http_req.set_body(req.encode_string());
643 http_req
644 }
645
646 async fn remove_with_key(
647 &self,
648 req: OpEnvRemoveWithKeyOutputRequest,
649 ) -> BuckyResult<OpEnvRemoveWithKeyOutputResponse> {
650 let http_req = self.encode_remove_with_key_request(&req);
651 let mut resp = self.requestor.request(http_req).await?;
652 if resp.status().is_success() {
653 let resp: OpEnvRemoveWithKeyOutputResponse =
654 RequestorHelper::decode_json_body(&mut resp).await?;
655 info!("remove_with_key for op_env success: sid={}", self.sid,);
656 Ok(resp)
657 } else {
658 let e = RequestorHelper::error_from_resp(&mut resp).await;
659 error!("remove_with_key for op_env error! sid={}, {}", self.sid, e);
660 Err(e)
661 }
662 }
663
664 fn encode_contains_request(&self, req: &OpEnvContainsOutputRequest) -> Request {
666 let url = self.service_url.join("set").unwrap();
667 let mut http_req = Request::new(Method::Get, url);
668 self.encode_common_headers(OpEnvAction::Contains, &req.common, &mut http_req);
669
670 RequestorHelper::encode_opt_header_with_encoding(
671 &mut http_req,
672 cyfs_base::CYFS_OP_ENV_PATH,
673 req.path.as_deref(),
674 );
675 RequestorHelper::encode_header(&mut http_req, cyfs_base::CYFS_OP_ENV_VALUE, &req.value);
676
677 http_req
679 }
680
681 async fn contains(
682 &self,
683 req: OpEnvContainsOutputRequest,
684 ) -> BuckyResult<OpEnvContainsOutputResponse> {
685 let http_req = self.encode_contains_request(&req);
686 let mut resp = self.requestor.request(http_req).await?;
687 if resp.status().is_success() {
688 let resp: OpEnvContainsOutputResponse =
689 RequestorHelper::decode_json_body(&mut resp).await?;
690 info!("contains for op_env success: sid={}", self.sid,);
691 Ok(resp)
692 } else {
693 let e = RequestorHelper::error_from_resp(&mut resp).await;
694 error!("contains for op_env error! sid={}, {}", self.sid, e);
695 Err(e)
696 }
697 }
698
699 fn encode_insert_request(&self, req: &OpEnvInsertOutputRequest) -> Request {
701 let url = self.service_url.join("set").unwrap();
702 let mut http_req = Request::new(Method::Post, url);
703 self.encode_common_headers(OpEnvAction::Insert, &req.common, &mut http_req);
704 http_req.set_body(req.encode_string());
705 http_req
706 }
707
708 async fn insert(
709 &self,
710 req: OpEnvInsertOutputRequest,
711 ) -> BuckyResult<OpEnvInsertOutputResponse> {
712 let http_req = self.encode_insert_request(&req);
713 let mut resp = self.requestor.request(http_req).await?;
714 if resp.status().is_success() {
715 let resp: OpEnvInsertOutputResponse =
716 RequestorHelper::decode_json_body(&mut resp).await?;
717 info!("insert for op_env success: sid={}", self.sid,);
718 Ok(resp)
719 } else {
720 let e = RequestorHelper::error_from_resp(&mut resp).await;
721 error!("insert for op_env error! sid={}, {}", self.sid, e);
722 Err(e)
723 }
724 }
725
726 fn encode_remove_request(&self, req: &OpEnvRemoveOutputRequest) -> Request {
728 let url = self.service_url.join("set").unwrap();
729 let mut http_req = Request::new(Method::Delete, url);
730 self.encode_common_headers(OpEnvAction::Remove, &req.common, &mut http_req);
731 http_req.set_body(req.encode_string());
732 http_req
733 }
734
735 async fn remove(
736 &self,
737 req: OpEnvRemoveOutputRequest,
738 ) -> BuckyResult<OpEnvRemoveOutputResponse> {
739 let http_req = self.encode_remove_request(&req);
740 let mut resp = self.requestor.request(http_req).await?;
741 if resp.status().is_success() {
742 let resp: OpEnvRemoveOutputResponse =
743 RequestorHelper::decode_json_body(&mut resp).await?;
744 info!("remove for op_env success: sid={}", self.sid,);
745 Ok(resp)
746 } else {
747 let e = RequestorHelper::error_from_resp(&mut resp).await;
748 error!("remove for op_env error! sid={}, {}", self.sid, e);
749 Err(e)
750 }
751 }
752
753 fn encode_next_request(&self, req: &OpEnvNextOutputRequest) -> Request {
755 let url = self.service_url.join("iterator").unwrap();
756 let mut http_req = Request::new(Method::Post, url);
757 self.encode_common_headers(OpEnvAction::Next, &req.common, &mut http_req);
758 http_req.set_body(req.encode_string());
759 http_req
760 }
761
762 async fn next(&self, req: OpEnvNextOutputRequest) -> BuckyResult<OpEnvNextOutputResponse> {
763 let http_req = self.encode_next_request(&req);
764 let mut resp = self.requestor.request(http_req).await?;
765 if resp.status().is_success() {
766 let resp: OpEnvNextOutputResponse =
767 RequestorHelper::decode_json_body(&mut resp).await?;
768 info!("next for op_env success: sid={}", self.sid,);
769 Ok(resp)
770 } else {
771 let e = RequestorHelper::error_from_resp(&mut resp).await;
772 error!("next for op_env error! sid={}, {}", self.sid, e);
773 Err(e)
774 }
775 }
776
777 fn encode_reset_request(&self, req: &OpEnvResetOutputRequest) -> Request {
779 let url = self.service_url.join("iterator").unwrap();
780 let mut http_req = Request::new(Method::Delete, url);
781 self.encode_common_headers(OpEnvAction::Reset, &req.common, &mut http_req);
782 http_req
783 }
784
785 async fn reset(&self, req: OpEnvResetOutputRequest) -> BuckyResult<()> {
786 let http_req = self.encode_reset_request(&req);
787 let mut resp = self.requestor.request(http_req).await?;
788 if resp.status().is_success() {
789 info!("reset for op_env success: sid={}", self.sid,);
790 Ok(())
791 } else {
792 let e = RequestorHelper::error_from_resp(&mut resp).await;
793 error!("reset for op_env error! sid={}, {}", self.sid, e);
794 Err(e)
795 }
796 }
797
798 fn encode_list_request(&self, req: &OpEnvListOutputRequest) -> Request {
800 let url = self.service_url.join("list").unwrap();
801 let mut http_req = Request::new(Method::Get, url);
802
803 self.encode_common_headers(OpEnvAction::List, &req.common, &mut http_req);
804 RequestorHelper::encode_opt_header_with_encoding(
805 &mut http_req,
806 cyfs_base::CYFS_OP_ENV_PATH,
807 req.path.as_deref(),
808 );
809
810 http_req
811 }
812
813 async fn list(&self, req: OpEnvListOutputRequest) -> BuckyResult<OpEnvListOutputResponse> {
814 let http_req = self.encode_list_request(&req);
815 let mut resp = self.requestor.request(http_req).await?;
816 if resp.status().is_success() {
817 let resp: OpEnvListOutputResponse =
818 RequestorHelper::decode_json_body(&mut resp).await?;
819 info!("list for op_env success: sid={}", self.sid,);
820 Ok(resp)
821 } else {
822 let e = RequestorHelper::error_from_resp(&mut resp).await;
823 error!("list for op_env error! sid={}, {}", self.sid, e);
824 Err(e)
825 }
826 }
827}
828
829#[async_trait::async_trait]
830impl OpEnvOutputProcessor for OpEnvRequestor {
831 fn get_sid(&self) -> u64 {
832 self.sid
833 }
834
835 fn get_category(&self) -> GlobalStateCategory {
836 self.category
837 }
838
839 async fn load(&self, req: OpEnvLoadOutputRequest) -> BuckyResult<()> {
840 Self::load(&self, req).await
841 }
842
843 async fn load_by_path(&self, req: OpEnvLoadByPathOutputRequest) -> BuckyResult<()> {
844 Self::load_by_path(&self, req).await
845 }
846
847 async fn create_new(&self, req: OpEnvCreateNewOutputRequest) -> BuckyResult<()> {
848 Self::create_new(&self, req).await
849 }
850
851 async fn lock(&self, req: OpEnvLockOutputRequest) -> BuckyResult<()> {
852 Self::lock(&self, req).await
853 }
854
855 async fn get_current_root(
856 &self,
857 req: OpEnvGetCurrentRootOutputRequest,
858 ) -> BuckyResult<OpEnvGetCurrentRootOutputResponse> {
859 Self::get_current_root(&self, req).await
860 }
861
862 async fn commit(
863 &self,
864 req: OpEnvCommitOutputRequest,
865 ) -> BuckyResult<OpEnvCommitOutputResponse> {
866 Self::commit(&self, req).await
867 }
868 async fn abort(&self, req: OpEnvAbortOutputRequest) -> BuckyResult<()> {
869 Self::abort(&self, req).await
870 }
871
872 async fn metadata(
873 &self,
874 req: OpEnvMetadataOutputRequest,
875 ) -> BuckyResult<OpEnvMetadataOutputResponse> {
876 Self::metadata(&self, req).await
877 }
878
879 async fn get_by_key(
881 &self,
882 req: OpEnvGetByKeyOutputRequest,
883 ) -> BuckyResult<OpEnvGetByKeyOutputResponse> {
884 Self::get_by_key(&self, req).await
885 }
886
887 async fn insert_with_key(&self, req: OpEnvInsertWithKeyOutputRequest) -> BuckyResult<()> {
888 Self::insert_with_key(&self, req).await
889 }
890
891 async fn set_with_key(
892 &self,
893 req: OpEnvSetWithKeyOutputRequest,
894 ) -> BuckyResult<OpEnvSetWithKeyOutputResponse> {
895 Self::set_with_key(&self, req).await
896 }
897
898 async fn remove_with_key(
899 &self,
900 req: OpEnvRemoveWithKeyOutputRequest,
901 ) -> BuckyResult<OpEnvRemoveWithKeyOutputResponse> {
902 Self::remove_with_key(&self, req).await
903 }
904
905 async fn contains(
907 &self,
908 req: OpEnvContainsOutputRequest,
909 ) -> BuckyResult<OpEnvContainsOutputResponse> {
910 Self::contains(&self, req).await
911 }
912
913 async fn insert(
914 &self,
915 req: OpEnvInsertOutputRequest,
916 ) -> BuckyResult<OpEnvInsertOutputResponse> {
917 Self::insert(&self, req).await
918 }
919
920 async fn remove(
921 &self,
922 req: OpEnvRemoveOutputRequest,
923 ) -> BuckyResult<OpEnvRemoveOutputResponse> {
924 Self::remove(&self, req).await
925 }
926
927 async fn next(&self, req: OpEnvNextOutputRequest) -> BuckyResult<OpEnvNextOutputResponse> {
929 Self::next(&self, req).await
930 }
931
932 async fn reset(&self, req: OpEnvResetOutputRequest) -> BuckyResult<()> {
933 Self::reset(&self, req).await
934 }
935
936 async fn list(&self, req: OpEnvListOutputRequest) -> BuckyResult<OpEnvListOutputResponse> {
937 Self::list(&self, req).await
938 }
939}
940
941#[derive(Clone)]
942pub struct GlobalStateAccessorRequestor {
943 category: GlobalStateCategory,
944 dec_id: Option<SharedObjectStackDecID>,
945 requestor: HttpRequestorRef,
946 service_url: Url,
947}
948
949impl GlobalStateAccessorRequestor {
950 pub fn new_root_state(
951 dec_id: Option<SharedObjectStackDecID>,
952 requestor: HttpRequestorRef,
953 ) -> Self {
954 Self::new(GlobalStateCategory::RootState, dec_id, requestor)
955 }
956
957 pub fn new_local_cache(
958 dec_id: Option<SharedObjectStackDecID>,
959 requestor: HttpRequestorRef,
960 ) -> Self {
961 Self::new(GlobalStateCategory::LocalCache, dec_id, requestor)
962 }
963
964 pub fn new(
965 category: GlobalStateCategory,
966 dec_id: Option<SharedObjectStackDecID>,
967 requestor: HttpRequestorRef,
968 ) -> Self {
969 let addr = requestor.remote_addr();
970
971 let url = format!("http://{}/{}/", addr, category.as_str());
972 let url = Url::parse(&url).unwrap();
973
974 let ret = Self {
975 category,
976 dec_id,
977 requestor,
978 service_url: url,
979 };
980
981 ret
982 }
983
984 pub fn category(&self) -> &GlobalStateCategory {
985 &self.category
986 }
987
988 pub fn into_processor(self) -> GlobalStateAccessorOutputProcessorRef {
989 Arc::new(Box::new(self))
990 }
991
992 pub fn clone_processor(&self) -> GlobalStateAccessorOutputProcessorRef {
993 self.clone().into_processor()
994 }
995
996 fn encode_common_headers(
998 &self,
999 com_req: &RootStateOutputRequestCommon,
1000 http_req: &mut Request,
1001 ) {
1002 if let Some(dec_id) = &com_req.dec_id {
1003 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
1004 } else if let Some(dec_id) = &self.dec_id {
1005 if let Some(dec_id) = dec_id.get() {
1006 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
1007 }
1008 }
1009
1010 if let Some(target_dec_id) = &com_req.target_dec_id {
1011 http_req.insert_header(cyfs_base::CYFS_TARGET_DEC_ID, target_dec_id.to_string());
1012 }
1013
1014 if let Some(target) = &com_req.target {
1015 http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
1016 }
1017
1018 http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
1019 }
1020
1021 fn gen_url(&self, inner_path: &str) -> Url {
1024 self.service_url
1025 .join(&inner_path.trim_start_matches("/"))
1026 .unwrap()
1027 }
1028
1029 fn encode_get_object_by_path_request(
1031 &self,
1032 req: &RootStateAccessorGetObjectByPathOutputRequest,
1033 ) -> Request {
1034 let url = self.gen_url(&req.inner_path);
1035
1036 let mut http_req = Request::new(Method::Get, url);
1037 self.encode_common_headers(&req.common, &mut http_req);
1038
1039 http_req
1040 }
1041
1042 pub async fn decode_get_object_by_path_response(
1043 resp: &mut Response,
1044 ) -> BuckyResult<RootStateAccessorGetObjectByPathOutputResponse> {
1045 let object = NONRequestorHelper::decode_get_object_response(resp).await?;
1046 let root = RequestorHelper::decode_header(resp, cyfs_base::CYFS_ROOT)?;
1047 let revision = RequestorHelper::decode_header(resp, cyfs_base::CYFS_REVISION)?;
1048
1049 Ok(RootStateAccessorGetObjectByPathOutputResponse {
1050 object,
1051 root,
1052 revision,
1053 })
1054 }
1055
1056 async fn get_object_by_path(
1057 &self,
1058 req: RootStateAccessorGetObjectByPathOutputRequest,
1059 ) -> BuckyResult<RootStateAccessorGetObjectByPathOutputResponse> {
1060 debug!("access get_object_by_path: {}", req);
1061
1062 let http_req = self.encode_get_object_by_path_request(&req);
1063
1064 let mut resp = self.requestor.request(http_req).await?;
1065
1066 if resp.status().is_success() {
1067 let info = Self::decode_get_object_by_path_response(&mut resp).await?;
1068 info!(
1069 "get_object_by_path from global state success: category={}, inner_path={}, {}",
1070 self.category, req.inner_path, info
1071 );
1072 Ok(info)
1073 } else {
1074 let e = RequestorHelper::error_from_resp(&mut resp).await;
1075 error!(
1076 "get_object_by_path from global state error: category={}, inner_path={}, {}",
1077 self.category, req.inner_path, e
1078 );
1079 Err(e)
1080 }
1081 }
1082
1083 fn encode_list_request(&self, req: &RootStateAccessorListOutputRequest) -> Request {
1085 let mut url = self.gen_url(&req.inner_path);
1086 debug!("list url: {}, {}", url, req.inner_path);
1087
1088 {
1089 let mut querys = url.query_pairs_mut();
1090 querys.append_pair("action", &GlobalStateAccessorAction::List.to_string());
1091
1092 if let Some(page_index) = &req.page_index {
1093 querys.append_pair("page_index", &page_index.to_string());
1094 }
1095
1096 if let Some(page_size) = &req.page_size {
1097 querys.append_pair("page_size", &page_size.to_string());
1098 }
1099 }
1100
1101 let mut http_req = Request::new(Method::Get, url);
1102 self.encode_common_headers(&req.common, &mut http_req);
1103
1104 http_req
1105 }
1106
1107 pub async fn decode_list_response(
1108 resp: &mut Response,
1109 ) -> BuckyResult<RootStateAccessorListOutputResponse> {
1110 let list = RequestorHelper::decode_json_body(resp).await?;
1111 let root = RequestorHelper::decode_header(resp, cyfs_base::CYFS_ROOT)?;
1112 let revision = RequestorHelper::decode_header(resp, cyfs_base::CYFS_REVISION)?;
1113
1114 Ok(RootStateAccessorListOutputResponse {
1115 list,
1116 root,
1117 revision,
1118 })
1119 }
1120
1121 async fn list(
1122 &self,
1123 req: RootStateAccessorListOutputRequest,
1124 ) -> BuckyResult<RootStateAccessorListOutputResponse> {
1125 debug!("access list: {}", req);
1126
1127 let http_req = self.encode_list_request(&req);
1128
1129 let mut resp = self.requestor.request(http_req).await?;
1130
1131 if resp.status().is_success() {
1132 let resp = Self::decode_list_response(&mut resp).await?;
1133
1134 info!(
1135 "list from global state success: category={}, req={}, count={}, root={}, revision={}",
1136 self.category,
1137 req,
1138 resp.list.len(),
1139 resp.root,
1140 resp.revision,
1141 );
1142
1143 Ok(resp)
1144 } else {
1145 let e = RequestorHelper::error_from_resp(&mut resp).await;
1146 error!(
1147 "list from global state error: category={}, req={}, {}",
1148 self.category, req, e
1149 );
1150 Err(e)
1151 }
1152 }
1153}
1154
1155#[async_trait::async_trait]
1156impl GlobalStateAccessorOutputProcessor for GlobalStateAccessorRequestor {
1157 async fn get_object_by_path(
1158 &self,
1159 req: RootStateAccessorGetObjectByPathOutputRequest,
1160 ) -> BuckyResult<RootStateAccessorGetObjectByPathOutputResponse> {
1161 Self::get_object_by_path(self, req).await
1162 }
1163
1164 async fn list(
1165 &self,
1166 req: RootStateAccessorListOutputRequest,
1167 ) -> BuckyResult<RootStateAccessorListOutputResponse> {
1168 Self::list(self, req).await
1169 }
1170}
1171
1172#[test]
1173fn test_url() {
1174 let url = Url::parse("http://www.cyfs.com").unwrap();
1175 let mut http_req = Request::new(Method::Get, url);
1176
1177 RequestorHelper::encode_header(&mut http_req, &"Content-Type", &"text/html; charset=utf-8");
1178
1179 let value = "新建文件夹";
1180
1181 RequestorHelper::encode_header_with_encoding(
1182 &mut http_req,
1183 cyfs_base::CYFS_OP_ENV_PATH,
1184 &value,
1185 );
1186 let header: String =
1187 RequestorHelper::decode_header_with_utf8_decoding(&http_req, cyfs_base::CYFS_OP_ENV_PATH)
1188 .unwrap();
1189 assert_eq!(header, value);
1190
1191 let value = "/article/standby";
1192 RequestorHelper::encode_header_with_encoding(
1193 &mut http_req,
1194 cyfs_base::CYFS_OP_ENV_PATH,
1195 &value,
1196 );
1197 let header: String =
1198 RequestorHelper::decode_header_with_utf8_decoding(&http_req, cyfs_base::CYFS_OP_ENV_PATH)
1199 .unwrap();
1200 assert_eq!(header, value);
1201
1202 let value = RequestorHelper::decode_utf8("test", "%2Farticle%2Fstandby").unwrap();
1203 println!("{}", value);
1204
1205 let v1 = "%2Fa%2Fb%2F%E6%88%91%E7%9A%84%2F%20%2F**";
1206 let v2 = "/a/b/%E6%88%91%E7%9A%84/%20/**";
1207 let value = RequestorHelper::decode_utf8("test", v1).unwrap();
1208 println!("{}", value);
1209 let value2 = RequestorHelper::decode_utf8("test", v2).unwrap();
1210 println!("{}", value);
1211 assert_eq!(value, value2);
1212
1213 let url = format!("http://{}/{}/", "addr", "category/新建文件夹");
1214 let url = Url::parse(&url).unwrap();
1215 let inner_path = "/test/it";
1216 let url = url.join(&inner_path.trim_start_matches("/")).unwrap();
1217
1218 println!("{}", url);
1219}