cyfs_lib/root_state/
requestor.rs

1use super::def::*;
2use super::output_request::*;
3use super::processor::*;
4use crate::base::*;
5use crate::non::NONRequestorHelper;
6use crate::requestor::*;
7use crate::stack::SharedObjectStackDecID;
8use cyfs_base::*;
9
10use http_types::{Method, Request, Response, Url};
11use std::sync::Arc;
12
13#[derive(Clone)]
14pub struct GlobalStateRequestor {
15    category: GlobalStateCategory,
16    dec_id: Option<SharedObjectStackDecID>,
17    requestor: HttpRequestorRef,
18    service_url: Url,
19}
20
21impl GlobalStateRequestor {
22    pub fn new_root_state(
23        dec_id: Option<SharedObjectStackDecID>,
24        requestor: HttpRequestorRef,
25    ) -> Self {
26        Self::new(GlobalStateCategory::RootState, dec_id, requestor)
27    }
28
29    pub fn new_local_cache(
30        dec_id: Option<SharedObjectStackDecID>,
31        requestor: HttpRequestorRef,
32    ) -> Self {
33        Self::new(GlobalStateCategory::LocalCache, dec_id, requestor)
34    }
35
36    pub fn new(
37        category: GlobalStateCategory,
38        dec_id: Option<SharedObjectStackDecID>,
39        requestor: HttpRequestorRef,
40    ) -> Self {
41        let addr = requestor.remote_addr();
42
43        let url = format!("http://{}/{}/", addr, category.as_str());
44        let url = Url::parse(&url).unwrap();
45
46        let ret = Self {
47            category,
48            dec_id,
49            requestor,
50            service_url: url,
51        };
52
53        ret
54    }
55
56    pub fn category(&self) -> &GlobalStateCategory {
57        &self.category
58    }
59
60    pub fn into_processor(self) -> GlobalStateOutputProcessorRef {
61        Arc::new(Box::new(self))
62    }
63
64    pub fn clone_processor(&self) -> GlobalStateOutputProcessorRef {
65        self.clone().into_processor()
66    }
67
68    // TODO: 目前和request的body部分编码一部分冗余的信息
69    fn encode_common_headers(
70        &self,
71        action: RootStateAction,
72        com_req: &RootStateOutputRequestCommon,
73        http_req: &mut Request,
74    ) {
75        if let Some(dec_id) = &com_req.dec_id {
76            http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
77        } else if let Some(dec_id) = &self.dec_id {
78            if let Some(dec_id) = dec_id.get() {
79                http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
80            }
81        }
82
83        if let Some(target_dec_id) = &com_req.target_dec_id {
84            http_req.insert_header(cyfs_base::CYFS_TARGET_DEC_ID, target_dec_id.to_string());
85        }
86
87        if let Some(target) = &com_req.target {
88            http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
89        }
90
91        http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
92
93        http_req.insert_header(cyfs_base::CYFS_ROOT_STATE_ACTION, action.to_string());
94    }
95
96    // /root_state/root GET
97    fn encode_get_current_root_request(
98        &self,
99        req: &RootStateGetCurrentRootOutputRequest,
100    ) -> Request {
101        let url = self.service_url.join("root").unwrap();
102        let mut http_req = Request::new(Method::Post, url);
103        self.encode_common_headers(RootStateAction::GetCurrentRoot, &req.common, &mut http_req);
104
105        http_req.set_body(req.encode_string());
106        http_req
107    }
108
109    async fn get_current_root(
110        &self,
111        req: RootStateGetCurrentRootOutputRequest,
112    ) -> BuckyResult<RootStateGetCurrentRootOutputResponse> {
113        let http_req = self.encode_get_current_root_request(&req);
114        let mut resp = self.requestor.request(http_req).await?;
115
116        if resp.status().is_success() {
117            let resp: RootStateGetCurrentRootOutputResponse =
118                RequestorHelper::decode_json_body(&mut resp).await?;
119            info!(
120                "get current root from root state success: root={}",
121                resp.root
122            );
123            Ok(resp)
124        } else {
125            let e = RequestorHelper::error_from_resp(&mut resp).await;
126            error!("get current root from root state error! {}", e);
127            Err(e)
128        }
129    }
130
131    // root_state/op_env POST
132    fn encode_create_op_env_request(&self, req: &RootStateCreateOpEnvOutputRequest) -> Request {
133        let url = self.service_url.join("op-env").unwrap();
134
135        let mut http_req = Request::new(Method::Post, url);
136        self.encode_common_headers(RootStateAction::CreateOpEnv, &req.common, &mut http_req);
137
138        http_req.set_body(req.encode_string());
139
140        http_req
141    }
142
143    async fn create_op_env(
144        &self,
145        req: RootStateCreateOpEnvOutputRequest,
146    ) -> BuckyResult<RootStateCreateOpEnvOutputResponse> {
147        let http_req = self.encode_create_op_env_request(&req);
148
149        let mut resp = self.requestor.request(http_req).await?;
150
151        if resp.status().is_success() {
152            let ret: RootStateCreateOpEnvOutputResponse =
153                RequestorHelper::decode_json_body(&mut resp).await?;
154            info!("create op_env from root state success: sid={}", ret.sid);
155            Ok(ret)
156        } else {
157            let e = RequestorHelper::error_from_resp(&mut resp).await;
158            error!("create op_env from root state error! {}", e);
159            Err(e)
160        }
161    }
162}
163
164#[async_trait::async_trait]
165impl GlobalStateOutputProcessor for GlobalStateRequestor {
166    fn get_category(&self) -> GlobalStateCategory {
167        self.category
168    }
169
170    async fn get_current_root(
171        &self,
172        req: RootStateGetCurrentRootOutputRequest,
173    ) -> BuckyResult<RootStateGetCurrentRootOutputResponse> {
174        GlobalStateRequestor::get_current_root(&self, req).await
175    }
176
177    async fn create_op_env(
178        &self,
179        req: RootStateCreateOpEnvOutputRequest,
180    ) -> BuckyResult<OpEnvOutputProcessorRef> {
181        let op_env_type = req.op_env_type.clone();
182        let resp = GlobalStateRequestor::create_op_env(&self, req).await?;
183
184        let requestor = OpEnvRequestor::new(
185            self.category.clone(),
186            op_env_type,
187            resp.sid,
188            self.dec_id.clone(),
189            self.requestor.clone(),
190        );
191        Ok(requestor.into_processor())
192    }
193}
194
195#[derive(Clone)]
196pub struct OpEnvRequestor {
197    category: GlobalStateCategory,
198    op_env_type: ObjectMapOpEnvType,
199    sid: u64,
200
201    dec_id: Option<SharedObjectStackDecID>,
202    requestor: HttpRequestorRef,
203    service_url: Url,
204}
205
206impl OpEnvRequestor {
207    pub fn new(
208        category: GlobalStateCategory,
209        op_env_type: ObjectMapOpEnvType,
210        sid: u64,
211        dec_id: Option<SharedObjectStackDecID>,
212        requestor: HttpRequestorRef,
213    ) -> Self {
214        assert!(sid > 0);
215
216        let addr = requestor.remote_addr();
217
218        let url = format!("http://{}/{}/op-env/", addr, category.as_str());
219        let url = Url::parse(&url).unwrap();
220
221        let ret = Self {
222            category,
223            op_env_type,
224            sid,
225            dec_id,
226            requestor,
227            service_url: url,
228        };
229
230        ret
231    }
232
233    pub fn category(&self) -> &GlobalStateCategory {
234        &self.category
235    }
236
237    pub fn into_processor(self) -> OpEnvOutputProcessorRef {
238        Arc::new(Box::new(self))
239    }
240
241    pub fn clone_processor(&self) -> OpEnvOutputProcessorRef {
242        self.clone().into_processor()
243    }
244
245    fn encode_common_headers(
246        &self,
247        action: OpEnvAction,
248        com_req: &OpEnvOutputRequestCommon,
249        http_req: &mut Request,
250    ) {
251        if let Some(dec_id) = &com_req.dec_id {
252            http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
253        } else if let Some(dec_id) = &self.dec_id {
254            if let Some(dec_id) = dec_id.get() {
255                http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
256            }
257        }
258
259        http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
260
261        http_req.insert_header(cyfs_base::CYFS_OP_ENV_ACTION, action.to_string());
262
263        if let Some(target_dec_id) = &com_req.target_dec_id {
264            http_req.insert_header(cyfs_base::CYFS_TARGET_DEC_ID, target_dec_id.to_string());
265        }
266
267        if let Some(target) = &com_req.target {
268            http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
269        }
270
271        if com_req.sid > 0 {
272            http_req.insert_header(cyfs_base::CYFS_OP_ENV_SID, com_req.sid.to_string());
273        } else {
274            http_req.insert_header(cyfs_base::CYFS_OP_ENV_SID, self.sid.to_string());
275        }
276    }
277
278    // load
279    // op_env/init/target
280    fn encode_load_request(&self, req: &OpEnvLoadOutputRequest) -> Request {
281        let url = self.service_url.join("init/target").unwrap();
282
283        let mut http_req = Request::new(Method::Post, url);
284        self.encode_common_headers(OpEnvAction::Load, &req.common, &mut http_req);
285
286        http_req.set_body(req.encode_string());
287
288        http_req
289    }
290
291    async fn load(&self, req: OpEnvLoadOutputRequest) -> BuckyResult<()> {
292        if self.op_env_type != ObjectMapOpEnvType::Single
293            && self.op_env_type != ObjectMapOpEnvType::IsolatePath
294        {
295            let msg = format!(
296                "load method only valid for single_op_env and isolate_path_op_env! sid={}",
297                self.sid
298            );
299            error!("{}", msg);
300            return Err(BuckyError::new(BuckyErrorCode::UnSupport, msg));
301        }
302
303        let http_req = self.encode_load_request(&req);
304        let mut resp = self.requestor.request(http_req).await?;
305
306        if resp.status().is_success() {
307            info!(
308                "load objectmap for single_op_env success: target={}, sid={}",
309                req.target, self.sid,
310            );
311            Ok(())
312        } else {
313            let e = RequestorHelper::error_from_resp(&mut resp).await;
314            error!(
315                "load objectmap for single_op_env error! target={}, sid={}, {}",
316                req.target, self.sid, e
317            );
318            Err(e)
319        }
320    }
321
322    // load_by_path
323    fn encode_load_by_path_request(&self, req: &OpEnvLoadByPathOutputRequest) -> Request {
324        let url = self.service_url.join("init/path").unwrap();
325
326        let mut http_req = Request::new(Method::Post, url);
327        self.encode_common_headers(OpEnvAction::LoadByPath, &req.common, &mut http_req);
328
329        http_req.set_body(req.encode_string());
330
331        http_req
332    }
333
334    async fn load_by_path(&self, req: OpEnvLoadByPathOutputRequest) -> BuckyResult<()> {
335        if self.op_env_type != ObjectMapOpEnvType::Single
336            && self.op_env_type != ObjectMapOpEnvType::IsolatePath
337        {
338            let msg = format!(
339                "load_by_path method only valid for single_op_env and isolate_path_op_env! sid={}",
340                self.sid
341            );
342            error!("{}", msg);
343            return Err(BuckyError::new(BuckyErrorCode::UnSupport, msg));
344        }
345
346        let http_req = self.encode_load_by_path_request(&req);
347        let mut resp = self.requestor.request(http_req).await?;
348
349        if resp.status().is_success() {
350            info!(
351                "load_by_path for single_op_env success: path={}, sid={}",
352                req.path, self.sid,
353            );
354            Ok(())
355        } else {
356            let e = RequestorHelper::error_from_resp(&mut resp).await;
357            error!(
358                "load_by_path for single_op_env error! path={}, sid={}, {}",
359                req.path, self.sid, e
360            );
361            Err(e)
362        }
363    }
364
365    // create_new
366    fn encode_create_new_request(&self, req: &OpEnvCreateNewOutputRequest) -> Request {
367        let url = self.service_url.join("init/new").unwrap();
368
369        let mut http_req = Request::new(Method::Post, url);
370        self.encode_common_headers(OpEnvAction::CreateNew, &req.common, &mut http_req);
371
372        http_req.set_body(req.encode_string());
373
374        http_req
375    }
376
377    async fn create_new(&self, req: OpEnvCreateNewOutputRequest) -> BuckyResult<()> {
378        let http_req = self.encode_create_new_request(&req);
379        let mut resp = self.requestor.request(http_req).await?;
380
381        if resp.status().is_success() {
382            info!("create_new for op_env success: sid={}", self.sid,);
383            Ok(())
384        } else {
385            let e = RequestorHelper::error_from_resp(&mut resp).await;
386            error!("create_new for op_env error! sid={}, {}", self.sid, e);
387            Err(e)
388        }
389    }
390
391    // lock
392    // op_env/{op_env_type}/lock
393    fn encode_lock_request(&self, req: &OpEnvLockOutputRequest) -> Request {
394        let url = self.service_url.join("lock").unwrap();
395        let mut http_req = Request::new(Method::Post, url);
396        self.encode_common_headers(OpEnvAction::Lock, &req.common, &mut http_req);
397
398        http_req.set_body(req.encode_string());
399
400        http_req
401    }
402
403    async fn lock(&self, req: OpEnvLockOutputRequest) -> BuckyResult<()> {
404        if self.op_env_type != ObjectMapOpEnvType::Path {
405            let msg = format!("lock method only valid for path_op_env! sid={}", self.sid);
406            error!("{}", msg);
407            return Err(BuckyError::new(BuckyErrorCode::UnSupport, msg));
408        }
409
410        let http_req = self.encode_lock_request(&req);
411        let mut resp = self.requestor.request(http_req).await?;
412
413        if resp.status().is_success() {
414            info!(
415                "lock for path_op_env success: path_list={:?}, sid={}",
416                req.path_list, self.sid,
417            );
418            Ok(())
419        } else {
420            let e = RequestorHelper::error_from_resp(&mut resp).await;
421            error!("lock for path_op_env error! sid={}, {}", self.sid, e);
422            Err(e)
423        }
424    }
425
426    // get_current_root
427    fn encode_get_current_root_request(&self, req: &OpEnvGetCurrentRootOutputRequest) -> Request {
428        let url = self.service_url.join("root").unwrap();
429        let mut http_req = Request::new(Method::Get, url);
430        self.encode_common_headers(OpEnvAction::GetCurrentRoot, &req.common, &mut http_req);
431
432        http_req
433    }
434
435    async fn get_current_root(
436        &self,
437        req: OpEnvGetCurrentRootOutputRequest,
438    ) -> BuckyResult<OpEnvGetCurrentRootOutputResponse> {
439        let http_req = self.encode_get_current_root_request(&req);
440        let mut resp = self.requestor.request(http_req).await?;
441
442        if resp.status().is_success() {
443            let resp: OpEnvGetCurrentRootOutputResponse =
444                RequestorHelper::decode_json_body(&mut resp).await?;
445
446            info!("get_current_root for op_env success: sid={}", self.sid,);
447            Ok(resp)
448        } else {
449            let e = RequestorHelper::error_from_resp(&mut resp).await;
450            error!("get_current_root for op_env error! sid={}, {}", self.sid, e);
451            Err(e)
452        }
453    }
454
455    // commit
456    fn encode_commit_request(&self, req: &OpEnvCommitOutputRequest) -> Request {
457        let url = self.service_url.join("transaction").unwrap();
458        let mut http_req = Request::new(Method::Post, url);
459        self.encode_common_headers(OpEnvAction::Commit, &req.common, &mut http_req);
460
461        http_req.set_body(req.encode_string());
462
463        http_req
464    }
465
466    async fn commit(
467        &self,
468        req: OpEnvCommitOutputRequest,
469    ) -> BuckyResult<OpEnvCommitOutputResponse> {
470        let http_req = self.encode_commit_request(&req);
471        let mut resp = self.requestor.request(http_req).await?;
472
473        if resp.status().is_success() {
474            let resp: OpEnvCommitOutputResponse =
475                RequestorHelper::decode_json_body(&mut resp).await?;
476
477            info!("commit for op_env success: sid={}", self.sid,);
478            Ok(resp)
479        } else {
480            let e = RequestorHelper::error_from_resp(&mut resp).await;
481            error!("commit for op_env error! sid={}, {}", self.sid, e);
482            Err(e)
483        }
484    }
485
486    // abort
487    fn encode_abort_request(&self, req: &OpEnvAbortOutputRequest) -> Request {
488        let url = self.service_url.join("transaction").unwrap();
489        let mut http_req = Request::new(Method::Delete, url);
490        self.encode_common_headers(OpEnvAction::Abort, &req.common, &mut http_req);
491
492        http_req.set_body(req.encode_string());
493
494        http_req
495    }
496
497    async fn abort(&self, req: OpEnvAbortOutputRequest) -> BuckyResult<()> {
498        let http_req = self.encode_abort_request(&req);
499        let mut resp = self.requestor.request(http_req).await?;
500
501        if resp.status().is_success() {
502            info!("abort for op_env success: sid={}", self.sid,);
503            Ok(())
504        } else {
505            let e = RequestorHelper::error_from_resp(&mut resp).await;
506            error!("abort for op_env error! sid={}, {}", self.sid, e);
507            Err(e)
508        }
509    }
510
511    // metadata
512    fn encode_metadata_request(&self, req: &OpEnvMetadataOutputRequest) -> Request {
513        let url = self.service_url.join("metadata").unwrap();
514        let mut http_req = Request::new(Method::Get, url);
515        self.encode_common_headers(OpEnvAction::Metadata, &req.common, &mut http_req);
516        RequestorHelper::encode_opt_header_with_encoding(
517            &mut http_req,
518            cyfs_base::CYFS_OP_ENV_PATH,
519            req.path.as_deref(),
520        );
521
522        http_req
523    }
524
525    async fn metadata(
526        &self,
527        req: OpEnvMetadataOutputRequest,
528    ) -> BuckyResult<OpEnvMetadataOutputResponse> {
529        let http_req = self.encode_metadata_request(&req);
530        let mut resp = self.requestor.request(http_req).await?;
531
532        if resp.status().is_success() {
533            let resp: OpEnvMetadataOutputResponse =
534                RequestorHelper::decode_json_body(&mut resp).await?;
535            info!(
536                "get metadata of op_env success: sid={}, resp={:?}",
537                self.sid, resp
538            );
539            Ok(resp)
540        } else {
541            let e = RequestorHelper::error_from_resp(&mut resp).await;
542            error!("get metadata of op_env error! sid={}, {}", self.sid, e);
543            Err(e)
544        }
545    }
546
547    // get_by_key
548    fn encode_get_by_key_request(&self, req: &OpEnvGetByKeyOutputRequest) -> Request {
549        let url = self.service_url.join("map").unwrap();
550        let mut http_req = Request::new(Method::Get, url);
551        self.encode_common_headers(OpEnvAction::GetByKey, &req.common, &mut http_req);
552
553        RequestorHelper::encode_opt_header_with_encoding(
554            &mut http_req,
555            cyfs_base::CYFS_OP_ENV_PATH,
556            req.path.as_deref(),
557        );
558        RequestorHelper::encode_header_with_encoding(
559            &mut http_req,
560            cyfs_base::CYFS_OP_ENV_KEY,
561            &req.key,
562        );
563
564        // http_req.set_body(req.encode_string());
565
566        http_req
567    }
568
569    async fn get_by_key(
570        &self,
571        req: OpEnvGetByKeyOutputRequest,
572    ) -> BuckyResult<OpEnvGetByKeyOutputResponse> {
573        let http_req = self.encode_get_by_key_request(&req);
574        let mut resp = self.requestor.request(http_req).await?;
575
576        if resp.status().is_success() {
577            let resp: OpEnvGetByKeyOutputResponse =
578                RequestorHelper::decode_json_body(&mut resp).await?;
579
580            info!("get_by_key for op_env success: sid={}", self.sid,);
581            Ok(resp)
582        } else {
583            let e = RequestorHelper::error_from_resp(&mut resp).await;
584            error!("get_by_key for op_env error! sid={}, {}", self.sid, e);
585            Err(e)
586        }
587    }
588
589    // insert_with_key
590    fn encode_insert_with_key_request(&self, req: &OpEnvInsertWithKeyOutputRequest) -> Request {
591        let url = self.service_url.join("map").unwrap();
592        let mut http_req = Request::new(Method::Post, url);
593        self.encode_common_headers(OpEnvAction::InsertWithKey, &req.common, &mut http_req);
594        http_req.set_body(req.encode_string());
595        http_req
596    }
597    async fn insert_with_key(&self, req: OpEnvInsertWithKeyOutputRequest) -> BuckyResult<()> {
598        let http_req = self.encode_insert_with_key_request(&req);
599        let mut resp = self.requestor.request(http_req).await?;
600        if resp.status().is_success() {
601            info!("insert_with_key for op_env success: sid={}", self.sid,);
602            Ok(())
603        } else {
604            let e = RequestorHelper::error_from_resp(&mut resp).await;
605            error!("insert_with_key for op_env error! sid={}, {}", self.sid, e);
606            Err(e)
607        }
608    }
609
610    // set_with_key
611    fn encode_set_with_key_request(&self, req: &OpEnvSetWithKeyOutputRequest) -> Request {
612        let url = self.service_url.join("map").unwrap();
613        let mut http_req = Request::new(Method::Put, url);
614        self.encode_common_headers(OpEnvAction::SetWithKey, &req.common, &mut http_req);
615        http_req.set_body(req.encode_string());
616        http_req
617    }
618
619    async fn set_with_key(
620        &self,
621        req: OpEnvSetWithKeyOutputRequest,
622    ) -> BuckyResult<OpEnvSetWithKeyOutputResponse> {
623        let http_req = self.encode_set_with_key_request(&req);
624        let mut resp = self.requestor.request(http_req).await?;
625        if resp.status().is_success() {
626            let resp: OpEnvSetWithKeyOutputResponse =
627                RequestorHelper::decode_json_body(&mut resp).await?;
628            info!("set_with_key for op_env success: sid={}", self.sid,);
629            Ok(resp)
630        } else {
631            let e = RequestorHelper::error_from_resp(&mut resp).await;
632            error!("set_with_key for op_env error! sid={}, {}", self.sid, e);
633            Err(e)
634        }
635    }
636
637    // remove_with_key
638    fn encode_remove_with_key_request(&self, req: &OpEnvRemoveWithKeyOutputRequest) -> Request {
639        let url = self.service_url.join("map").unwrap();
640        let mut http_req = Request::new(Method::Delete, url);
641        self.encode_common_headers(OpEnvAction::RemoveWithKey, &req.common, &mut http_req);
642        http_req.set_body(req.encode_string());
643        http_req
644    }
645
646    async fn remove_with_key(
647        &self,
648        req: OpEnvRemoveWithKeyOutputRequest,
649    ) -> BuckyResult<OpEnvRemoveWithKeyOutputResponse> {
650        let http_req = self.encode_remove_with_key_request(&req);
651        let mut resp = self.requestor.request(http_req).await?;
652        if resp.status().is_success() {
653            let resp: OpEnvRemoveWithKeyOutputResponse =
654                RequestorHelper::decode_json_body(&mut resp).await?;
655            info!("remove_with_key for op_env success: sid={}", self.sid,);
656            Ok(resp)
657        } else {
658            let e = RequestorHelper::error_from_resp(&mut resp).await;
659            error!("remove_with_key for op_env error! sid={}, {}", self.sid, e);
660            Err(e)
661        }
662    }
663
664    // contains
665    fn encode_contains_request(&self, req: &OpEnvContainsOutputRequest) -> Request {
666        let url = self.service_url.join("set").unwrap();
667        let mut http_req = Request::new(Method::Get, url);
668        self.encode_common_headers(OpEnvAction::Contains, &req.common, &mut http_req);
669
670        RequestorHelper::encode_opt_header_with_encoding(
671            &mut http_req,
672            cyfs_base::CYFS_OP_ENV_PATH,
673            req.path.as_deref(),
674        );
675        RequestorHelper::encode_header(&mut http_req, cyfs_base::CYFS_OP_ENV_VALUE, &req.value);
676
677        // http_req.set_body(req.encode_string());
678        http_req
679    }
680
681    async fn contains(
682        &self,
683        req: OpEnvContainsOutputRequest,
684    ) -> BuckyResult<OpEnvContainsOutputResponse> {
685        let http_req = self.encode_contains_request(&req);
686        let mut resp = self.requestor.request(http_req).await?;
687        if resp.status().is_success() {
688            let resp: OpEnvContainsOutputResponse =
689                RequestorHelper::decode_json_body(&mut resp).await?;
690            info!("contains for op_env success: sid={}", self.sid,);
691            Ok(resp)
692        } else {
693            let e = RequestorHelper::error_from_resp(&mut resp).await;
694            error!("contains for op_env error! sid={}, {}", self.sid, e);
695            Err(e)
696        }
697    }
698
699    // insert
700    fn encode_insert_request(&self, req: &OpEnvInsertOutputRequest) -> Request {
701        let url = self.service_url.join("set").unwrap();
702        let mut http_req = Request::new(Method::Post, url);
703        self.encode_common_headers(OpEnvAction::Insert, &req.common, &mut http_req);
704        http_req.set_body(req.encode_string());
705        http_req
706    }
707
708    async fn insert(
709        &self,
710        req: OpEnvInsertOutputRequest,
711    ) -> BuckyResult<OpEnvInsertOutputResponse> {
712        let http_req = self.encode_insert_request(&req);
713        let mut resp = self.requestor.request(http_req).await?;
714        if resp.status().is_success() {
715            let resp: OpEnvInsertOutputResponse =
716                RequestorHelper::decode_json_body(&mut resp).await?;
717            info!("insert for op_env success: sid={}", self.sid,);
718            Ok(resp)
719        } else {
720            let e = RequestorHelper::error_from_resp(&mut resp).await;
721            error!("insert for op_env error! sid={}, {}", self.sid, e);
722            Err(e)
723        }
724    }
725
726    // remove
727    fn encode_remove_request(&self, req: &OpEnvRemoveOutputRequest) -> Request {
728        let url = self.service_url.join("set").unwrap();
729        let mut http_req = Request::new(Method::Delete, url);
730        self.encode_common_headers(OpEnvAction::Remove, &req.common, &mut http_req);
731        http_req.set_body(req.encode_string());
732        http_req
733    }
734
735    async fn remove(
736        &self,
737        req: OpEnvRemoveOutputRequest,
738    ) -> BuckyResult<OpEnvRemoveOutputResponse> {
739        let http_req = self.encode_remove_request(&req);
740        let mut resp = self.requestor.request(http_req).await?;
741        if resp.status().is_success() {
742            let resp: OpEnvRemoveOutputResponse =
743                RequestorHelper::decode_json_body(&mut resp).await?;
744            info!("remove for op_env success: sid={}", self.sid,);
745            Ok(resp)
746        } else {
747            let e = RequestorHelper::error_from_resp(&mut resp).await;
748            error!("remove for op_env error! sid={}, {}", self.sid, e);
749            Err(e)
750        }
751    }
752
753    // next
754    fn encode_next_request(&self, req: &OpEnvNextOutputRequest) -> Request {
755        let url = self.service_url.join("iterator").unwrap();
756        let mut http_req = Request::new(Method::Post, url);
757        self.encode_common_headers(OpEnvAction::Next, &req.common, &mut http_req);
758        http_req.set_body(req.encode_string());
759        http_req
760    }
761
762    async fn next(&self, req: OpEnvNextOutputRequest) -> BuckyResult<OpEnvNextOutputResponse> {
763        let http_req = self.encode_next_request(&req);
764        let mut resp = self.requestor.request(http_req).await?;
765        if resp.status().is_success() {
766            let resp: OpEnvNextOutputResponse =
767                RequestorHelper::decode_json_body(&mut resp).await?;
768            info!("next for op_env success: sid={}", self.sid,);
769            Ok(resp)
770        } else {
771            let e = RequestorHelper::error_from_resp(&mut resp).await;
772            error!("next for op_env error! sid={}, {}", self.sid, e);
773            Err(e)
774        }
775    }
776
777    // reset
778    fn encode_reset_request(&self, req: &OpEnvResetOutputRequest) -> Request {
779        let url = self.service_url.join("iterator").unwrap();
780        let mut http_req = Request::new(Method::Delete, url);
781        self.encode_common_headers(OpEnvAction::Reset, &req.common, &mut http_req);
782        http_req
783    }
784
785    async fn reset(&self, req: OpEnvResetOutputRequest) -> BuckyResult<()> {
786        let http_req = self.encode_reset_request(&req);
787        let mut resp = self.requestor.request(http_req).await?;
788        if resp.status().is_success() {
789            info!("reset for op_env success: sid={}", self.sid,);
790            Ok(())
791        } else {
792            let e = RequestorHelper::error_from_resp(&mut resp).await;
793            error!("reset for op_env error! sid={}, {}", self.sid, e);
794            Err(e)
795        }
796    }
797
798    // list
799    fn encode_list_request(&self, req: &OpEnvListOutputRequest) -> Request {
800        let url = self.service_url.join("list").unwrap();
801        let mut http_req = Request::new(Method::Get, url);
802
803        self.encode_common_headers(OpEnvAction::List, &req.common, &mut http_req);
804        RequestorHelper::encode_opt_header_with_encoding(
805            &mut http_req,
806            cyfs_base::CYFS_OP_ENV_PATH,
807            req.path.as_deref(),
808        );
809
810        http_req
811    }
812
813    async fn list(&self, req: OpEnvListOutputRequest) -> BuckyResult<OpEnvListOutputResponse> {
814        let http_req = self.encode_list_request(&req);
815        let mut resp = self.requestor.request(http_req).await?;
816        if resp.status().is_success() {
817            let resp: OpEnvListOutputResponse =
818                RequestorHelper::decode_json_body(&mut resp).await?;
819            info!("list for op_env success: sid={}", self.sid,);
820            Ok(resp)
821        } else {
822            let e = RequestorHelper::error_from_resp(&mut resp).await;
823            error!("list for op_env error! sid={}, {}", self.sid, e);
824            Err(e)
825        }
826    }
827}
828
829#[async_trait::async_trait]
830impl OpEnvOutputProcessor for OpEnvRequestor {
831    fn get_sid(&self) -> u64 {
832        self.sid
833    }
834
835    fn get_category(&self) -> GlobalStateCategory {
836        self.category
837    }
838
839    async fn load(&self, req: OpEnvLoadOutputRequest) -> BuckyResult<()> {
840        Self::load(&self, req).await
841    }
842
843    async fn load_by_path(&self, req: OpEnvLoadByPathOutputRequest) -> BuckyResult<()> {
844        Self::load_by_path(&self, req).await
845    }
846
847    async fn create_new(&self, req: OpEnvCreateNewOutputRequest) -> BuckyResult<()> {
848        Self::create_new(&self, req).await
849    }
850
851    async fn lock(&self, req: OpEnvLockOutputRequest) -> BuckyResult<()> {
852        Self::lock(&self, req).await
853    }
854
855    async fn get_current_root(
856        &self,
857        req: OpEnvGetCurrentRootOutputRequest,
858    ) -> BuckyResult<OpEnvGetCurrentRootOutputResponse> {
859        Self::get_current_root(&self, req).await
860    }
861
862    async fn commit(
863        &self,
864        req: OpEnvCommitOutputRequest,
865    ) -> BuckyResult<OpEnvCommitOutputResponse> {
866        Self::commit(&self, req).await
867    }
868    async fn abort(&self, req: OpEnvAbortOutputRequest) -> BuckyResult<()> {
869        Self::abort(&self, req).await
870    }
871
872    async fn metadata(
873        &self,
874        req: OpEnvMetadataOutputRequest,
875    ) -> BuckyResult<OpEnvMetadataOutputResponse> {
876        Self::metadata(&self, req).await
877    }
878
879    // map methods
880    async fn get_by_key(
881        &self,
882        req: OpEnvGetByKeyOutputRequest,
883    ) -> BuckyResult<OpEnvGetByKeyOutputResponse> {
884        Self::get_by_key(&self, req).await
885    }
886
887    async fn insert_with_key(&self, req: OpEnvInsertWithKeyOutputRequest) -> BuckyResult<()> {
888        Self::insert_with_key(&self, req).await
889    }
890
891    async fn set_with_key(
892        &self,
893        req: OpEnvSetWithKeyOutputRequest,
894    ) -> BuckyResult<OpEnvSetWithKeyOutputResponse> {
895        Self::set_with_key(&self, req).await
896    }
897
898    async fn remove_with_key(
899        &self,
900        req: OpEnvRemoveWithKeyOutputRequest,
901    ) -> BuckyResult<OpEnvRemoveWithKeyOutputResponse> {
902        Self::remove_with_key(&self, req).await
903    }
904
905    // set methods
906    async fn contains(
907        &self,
908        req: OpEnvContainsOutputRequest,
909    ) -> BuckyResult<OpEnvContainsOutputResponse> {
910        Self::contains(&self, req).await
911    }
912
913    async fn insert(
914        &self,
915        req: OpEnvInsertOutputRequest,
916    ) -> BuckyResult<OpEnvInsertOutputResponse> {
917        Self::insert(&self, req).await
918    }
919
920    async fn remove(
921        &self,
922        req: OpEnvRemoveOutputRequest,
923    ) -> BuckyResult<OpEnvRemoveOutputResponse> {
924        Self::remove(&self, req).await
925    }
926
927    // iterator methods
928    async fn next(&self, req: OpEnvNextOutputRequest) -> BuckyResult<OpEnvNextOutputResponse> {
929        Self::next(&self, req).await
930    }
931
932    async fn reset(&self, req: OpEnvResetOutputRequest) -> BuckyResult<()> {
933        Self::reset(&self, req).await
934    }
935
936    async fn list(&self, req: OpEnvListOutputRequest) -> BuckyResult<OpEnvListOutputResponse> {
937        Self::list(&self, req).await
938    }
939}
940
941#[derive(Clone)]
942pub struct GlobalStateAccessorRequestor {
943    category: GlobalStateCategory,
944    dec_id: Option<SharedObjectStackDecID>,
945    requestor: HttpRequestorRef,
946    service_url: Url,
947}
948
949impl GlobalStateAccessorRequestor {
950    pub fn new_root_state(
951        dec_id: Option<SharedObjectStackDecID>,
952        requestor: HttpRequestorRef,
953    ) -> Self {
954        Self::new(GlobalStateCategory::RootState, dec_id, requestor)
955    }
956
957    pub fn new_local_cache(
958        dec_id: Option<SharedObjectStackDecID>,
959        requestor: HttpRequestorRef,
960    ) -> Self {
961        Self::new(GlobalStateCategory::LocalCache, dec_id, requestor)
962    }
963
964    pub fn new(
965        category: GlobalStateCategory,
966        dec_id: Option<SharedObjectStackDecID>,
967        requestor: HttpRequestorRef,
968    ) -> Self {
969        let addr = requestor.remote_addr();
970
971        let url = format!("http://{}/{}/", addr, category.as_str());
972        let url = Url::parse(&url).unwrap();
973
974        let ret = Self {
975            category,
976            dec_id,
977            requestor,
978            service_url: url,
979        };
980
981        ret
982    }
983
984    pub fn category(&self) -> &GlobalStateCategory {
985        &self.category
986    }
987
988    pub fn into_processor(self) -> GlobalStateAccessorOutputProcessorRef {
989        Arc::new(Box::new(self))
990    }
991
992    pub fn clone_processor(&self) -> GlobalStateAccessorOutputProcessorRef {
993        self.clone().into_processor()
994    }
995
996    // TODO: 目前和request的body部分编码一部分冗余的信息
997    fn encode_common_headers(
998        &self,
999        com_req: &RootStateOutputRequestCommon,
1000        http_req: &mut Request,
1001    ) {
1002        if let Some(dec_id) = &com_req.dec_id {
1003            http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
1004        } else if let Some(dec_id) = &self.dec_id {
1005            if let Some(dec_id) = dec_id.get() {
1006                http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
1007            }
1008        }
1009
1010        if let Some(target_dec_id) = &com_req.target_dec_id {
1011            http_req.insert_header(cyfs_base::CYFS_TARGET_DEC_ID, target_dec_id.to_string());
1012        }
1013
1014        if let Some(target) = &com_req.target {
1015            http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
1016        }
1017
1018        http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
1019    }
1020
1021    ////// access methods
1022
1023    fn gen_url(&self, inner_path: &str) -> Url {
1024        self.service_url
1025            .join(&inner_path.trim_start_matches("/"))
1026            .unwrap()
1027    }
1028
1029    // get_object_by_path
1030    fn encode_get_object_by_path_request(
1031        &self,
1032        req: &RootStateAccessorGetObjectByPathOutputRequest,
1033    ) -> Request {
1034        let url = self.gen_url(&req.inner_path);
1035
1036        let mut http_req = Request::new(Method::Get, url);
1037        self.encode_common_headers(&req.common, &mut http_req);
1038
1039        http_req
1040    }
1041
1042    pub async fn decode_get_object_by_path_response(
1043        resp: &mut Response,
1044    ) -> BuckyResult<RootStateAccessorGetObjectByPathOutputResponse> {
1045        let object = NONRequestorHelper::decode_get_object_response(resp).await?;
1046        let root = RequestorHelper::decode_header(resp, cyfs_base::CYFS_ROOT)?;
1047        let revision = RequestorHelper::decode_header(resp, cyfs_base::CYFS_REVISION)?;
1048
1049        Ok(RootStateAccessorGetObjectByPathOutputResponse {
1050            object,
1051            root,
1052            revision,
1053        })
1054    }
1055
1056    async fn get_object_by_path(
1057        &self,
1058        req: RootStateAccessorGetObjectByPathOutputRequest,
1059    ) -> BuckyResult<RootStateAccessorGetObjectByPathOutputResponse> {
1060        debug!("access get_object_by_path: {}", req);
1061
1062        let http_req = self.encode_get_object_by_path_request(&req);
1063
1064        let mut resp = self.requestor.request(http_req).await?;
1065
1066        if resp.status().is_success() {
1067            let info = Self::decode_get_object_by_path_response(&mut resp).await?;
1068            info!(
1069                "get_object_by_path from global state success: category={}, inner_path={}, {}",
1070                self.category, req.inner_path, info
1071            );
1072            Ok(info)
1073        } else {
1074            let e = RequestorHelper::error_from_resp(&mut resp).await;
1075            error!(
1076                "get_object_by_path from global state error: category={}, inner_path={}, {}",
1077                self.category, req.inner_path, e
1078            );
1079            Err(e)
1080        }
1081    }
1082
1083    // list
1084    fn encode_list_request(&self, req: &RootStateAccessorListOutputRequest) -> Request {
1085        let mut url = self.gen_url(&req.inner_path);
1086        debug!("list url: {}, {}", url, req.inner_path);
1087
1088        {
1089            let mut querys = url.query_pairs_mut();
1090            querys.append_pair("action", &GlobalStateAccessorAction::List.to_string());
1091
1092            if let Some(page_index) = &req.page_index {
1093                querys.append_pair("page_index", &page_index.to_string());
1094            }
1095
1096            if let Some(page_size) = &req.page_size {
1097                querys.append_pair("page_size", &page_size.to_string());
1098            }
1099        }
1100
1101        let mut http_req = Request::new(Method::Get, url);
1102        self.encode_common_headers(&req.common, &mut http_req);
1103
1104        http_req
1105    }
1106
1107    pub async fn decode_list_response(
1108        resp: &mut Response,
1109    ) -> BuckyResult<RootStateAccessorListOutputResponse> {
1110        let list = RequestorHelper::decode_json_body(resp).await?;
1111        let root = RequestorHelper::decode_header(resp, cyfs_base::CYFS_ROOT)?;
1112        let revision = RequestorHelper::decode_header(resp, cyfs_base::CYFS_REVISION)?;
1113
1114        Ok(RootStateAccessorListOutputResponse {
1115            list,
1116            root,
1117            revision,
1118        })
1119    }
1120
1121    async fn list(
1122        &self,
1123        req: RootStateAccessorListOutputRequest,
1124    ) -> BuckyResult<RootStateAccessorListOutputResponse> {
1125        debug!("access list: {}", req);
1126
1127        let http_req = self.encode_list_request(&req);
1128
1129        let mut resp = self.requestor.request(http_req).await?;
1130
1131        if resp.status().is_success() {
1132            let resp = Self::decode_list_response(&mut resp).await?;
1133
1134            info!(
1135                "list from global state success: category={}, req={}, count={}, root={}, revision={}",
1136                self.category,
1137                req,
1138                resp.list.len(),
1139                resp.root,
1140                resp.revision,
1141            );
1142
1143            Ok(resp)
1144        } else {
1145            let e = RequestorHelper::error_from_resp(&mut resp).await;
1146            error!(
1147                "list from global state error: category={}, req={}, {}",
1148                self.category, req, e
1149            );
1150            Err(e)
1151        }
1152    }
1153}
1154
1155#[async_trait::async_trait]
1156impl GlobalStateAccessorOutputProcessor for GlobalStateAccessorRequestor {
1157    async fn get_object_by_path(
1158        &self,
1159        req: RootStateAccessorGetObjectByPathOutputRequest,
1160    ) -> BuckyResult<RootStateAccessorGetObjectByPathOutputResponse> {
1161        Self::get_object_by_path(self, req).await
1162    }
1163
1164    async fn list(
1165        &self,
1166        req: RootStateAccessorListOutputRequest,
1167    ) -> BuckyResult<RootStateAccessorListOutputResponse> {
1168        Self::list(self, req).await
1169    }
1170}
1171
1172#[test]
1173fn test_url() {
1174    let url = Url::parse("http://www.cyfs.com").unwrap();
1175    let mut http_req = Request::new(Method::Get, url);
1176
1177    RequestorHelper::encode_header(&mut http_req, &"Content-Type", &"text/html; charset=utf-8");
1178
1179    let value = "新建文件夹";
1180
1181    RequestorHelper::encode_header_with_encoding(
1182        &mut http_req,
1183        cyfs_base::CYFS_OP_ENV_PATH,
1184        &value,
1185    );
1186    let header: String =
1187        RequestorHelper::decode_header_with_utf8_decoding(&http_req, cyfs_base::CYFS_OP_ENV_PATH)
1188            .unwrap();
1189    assert_eq!(header, value);
1190
1191    let value = "/article/standby";
1192    RequestorHelper::encode_header_with_encoding(
1193        &mut http_req,
1194        cyfs_base::CYFS_OP_ENV_PATH,
1195        &value,
1196    );
1197    let header: String =
1198        RequestorHelper::decode_header_with_utf8_decoding(&http_req, cyfs_base::CYFS_OP_ENV_PATH)
1199            .unwrap();
1200    assert_eq!(header, value);
1201
1202    let value = RequestorHelper::decode_utf8("test", "%2Farticle%2Fstandby").unwrap();
1203    println!("{}", value);
1204
1205    let v1 = "%2Fa%2Fb%2F%E6%88%91%E7%9A%84%2F%20%2F**";
1206    let v2 = "/a/b/%E6%88%91%E7%9A%84/%20/**";
1207    let value = RequestorHelper::decode_utf8("test", v1).unwrap();
1208    println!("{}", value);
1209    let value2 = RequestorHelper::decode_utf8("test", v2).unwrap();
1210    println!("{}", value);
1211    assert_eq!(value, value2);
1212
1213    let url = format!("http://{}/{}/", "addr", "category/新建文件夹");
1214    let url = Url::parse(&url).unwrap();
1215    let inner_path = "/test/it";
1216    let url = url.join(&inner_path.trim_start_matches("/")).unwrap();
1217
1218    println!("{}", url);
1219}