cyfs_lib/non/
requestor.rs

1use super::def::*;
2use super::output_request::*;
3use super::processor::*;
4use crate::base::*;
5use crate::stack::SharedObjectStackDecID;
6use crate::requestor::*;
7use cyfs_base::*;
8
9use http_types::{Method, Request, Response, Url};
10use std::sync::Arc;
11
12pub struct NONRequestorHelper;
13
14impl NONRequestorHelper {
15    async fn decode_object_info_from_body<T>(
16        object_id: ObjectId,
17        req: &mut T,
18    ) -> BuckyResult<NONObjectInfo>
19    where
20        T: BodyOp + HeaderOp,
21    {
22        let object_raw = req.body_bytes().await.map_err(|e| {
23            let msg = format!(
24                "read object bytes request/response error! obj={} {}",
25                object_id, e
26            );
27            error!("{}", msg);
28
29            BuckyError::new(BuckyErrorCode::IoError, msg)
30        })?;
31
32        let info = NONObjectInfo::new(object_id, object_raw, None);
33
34        Ok(info)
35    }
36
37    pub async fn decode_object_info<T>(req: &mut T) -> BuckyResult<NONObjectInfo>
38    where
39        T: BodyOp + HeaderOp,
40    {
41        // 头部必须有object-id字段
42        let object_id: ObjectId = RequestorHelper::decode_header(req, cyfs_base::CYFS_OBJECT_ID)?;
43
44        let mut info = Self::decode_object_info_from_body(object_id, req).await?;
45        if !info.is_empty() {
46            info.decode_and_verify()?;
47        } else {
48            // for chunks and data object_id 
49        }
50        
51        Ok(info)
52    }
53
54    pub async fn decode_allow_empty_object_info<T>(req: &mut T) -> BuckyResult<NONObjectInfo>
55    where
56        T: BodyOp + HeaderOp,
57    {
58        // 头部必须有object-id字段
59        let object_id: ObjectId = RequestorHelper::decode_header(req, cyfs_base::CYFS_OBJECT_ID)?;
60
61        let mut info = Self::decode_object_info_from_body(object_id, req).await?;
62        if !info.is_empty() {
63            info.decode_and_verify()?;
64        }
65        Ok(info)
66    }
67
68    pub async fn decode_option_object_info<T>(req: &mut T) -> BuckyResult<Option<NONObjectInfo>>
69    where
70        T: BodyOp + HeaderOp,
71    {
72        // 头部必须有object-id字段
73        let ret: Option<ObjectId> =
74            RequestorHelper::decode_optional_header(req, cyfs_base::CYFS_OBJECT_ID)?;
75        if ret.is_none() {
76            return Ok(None);
77        }
78
79        let mut info = Self::decode_object_info_from_body(ret.unwrap(), req).await?;
80        info.decode_and_verify()?;
81
82        Ok(Some(info))
83    }
84
85    pub fn encode_object_info<T>(req: &mut T, info: NONObjectInfo)
86    where
87        T: BodyOp + HeaderOp,
88    {
89        req.insert_header(cyfs_base::CYFS_OBJECT_ID, info.object_id.to_string());
90
91        if info.object_raw.len() > 0 {
92            req.set_body(info.object_raw);
93            req.set_content_type(CYFS_OBJECT_MIME.clone());
94        }
95    }
96
97    pub async fn decode_get_object_response<T>(
98        resp: &mut T,
99    ) -> BuckyResult<NONGetObjectOutputResponse>
100    where
101        T: BodyOp + HeaderOp,
102    {
103        let object = Self::decode_object_info(resp).await?;
104        let attr: Option<u32> =
105            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_ATTRIBUTES)?;
106        let attr = attr.map(|v| Attributes::new(v));
107
108        let object_update_time =
109            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_UPDATE_TIME)?;
110        let object_expires_time =
111            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_EXPIRES_TIME)?;
112
113        let ret = NONGetObjectOutputResponse {
114            object,
115            object_expires_time,
116            object_update_time,
117            attr,
118        };
119
120        Ok(ret)
121    }
122}
123
124#[derive(Clone)]
125pub struct NONRequestor {
126    dec_id: Option<SharedObjectStackDecID>,
127    requestor: HttpRequestorRef,
128    service_url: Url,
129}
130
131impl NONRequestor {
132    pub fn new(dec_id: Option<SharedObjectStackDecID>, requestor: HttpRequestorRef) -> Self {
133        let addr = requestor.remote_addr();
134
135        let url = format!("http://{}/non/", addr);
136        let url = Url::parse(&url).unwrap();
137
138        let ret = Self {
139            dec_id,
140            requestor,
141            service_url: url,
142        };
143
144        ret
145    }
146
147    pub fn into_processor(self) -> NONOutputProcessorRef {
148        Arc::new(Box::new(self))
149    }
150
151    pub fn clone_processor(&self) -> NONOutputProcessorRef {
152        self.clone().into_processor()
153    }
154
155    fn encode_common_headers(
156        &self,
157        action: NONAction,
158        com_req: &NONOutputRequestCommon,
159        http_req: &mut Request,
160    ) {
161        if let Some(dec_id) = &com_req.dec_id {
162            http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
163        } else if let Some(dec_id) = &self.dec_id {
164            if let Some(dec_id) = dec_id.get() {
165                http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
166            }
167        }
168
169        RequestorHelper::encode_opt_header_with_encoding(
170            http_req,
171            cyfs_base::CYFS_REQ_PATH,
172            com_req.req_path.as_deref(),
173        );
174
175        http_req.insert_header(cyfs_base::CYFS_NON_ACTION, action.to_string());
176
177        http_req.insert_header(cyfs_base::CYFS_API_LEVEL, com_req.level.to_string());
178
179        if let Some(target) = &com_req.target {
180            http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
181        }
182
183        if let Some(source) = &com_req.source {
184            http_req.insert_header(cyfs_base::CYFS_SOURCE, source.to_string());
185        }
186
187        http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
188    }
189
190    fn encode_put_object_request(&self, req: &NONPutObjectOutputRequest) -> Request {
191        #[cfg(debug_assertions)]
192        {
193            if !req.object.is_empty() {
194                req.object.verify().expect(&format!(
195                    "pub object id unmatch: id={}, object={:?}",
196                    req.object.object_id,
197                    req.object.object_raw.to_hex()
198                ));
199            }
200        }
201
202        let mut http_req = Request::new(Method::Put, self.service_url.clone());
203        self.encode_common_headers(NONAction::PutObject, &req.common, &mut http_req);
204
205        if let Some(access) = &req.access {
206            http_req.insert_header(cyfs_base::CYFS_ACCESS, access.value().to_string());
207        }
208
209        http_req
210    }
211
212    async fn decode_put_object_response(
213        &self,
214        resp: &Response,
215    ) -> BuckyResult<NONPutObjectOutputResponse> {
216        let result: NONPutObjectResult =
217            RequestorHelper::decode_header(resp, cyfs_base::CYFS_RESULT)?;
218        let object_update_time =
219            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_UPDATE_TIME)?;
220        let object_expires_time =
221            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_EXPIRES_TIME)?;
222
223        let ret = NONPutObjectOutputResponse {
224            result,
225            object_expires_time,
226            object_update_time,
227        };
228
229        Ok(ret)
230    }
231
232    pub async fn put_object(
233        &self,
234        req: NONPutObjectOutputRequest,
235    ) -> BuckyResult<NONPutObjectOutputResponse> {
236        let object_id = req.object.object_id.clone();
237
238        let mut http_req = self.encode_put_object_request(&req);
239        NONRequestorHelper::encode_object_info(&mut http_req, req.object);
240
241        let mut resp = self.requestor.request(http_req).await?;
242
243        if resp.status().is_success() {
244            info!("put object to non service success: {}", object_id);
245            self.decode_put_object_response(&resp).await
246        } else {
247            let e = RequestorHelper::error_from_resp(&mut resp).await;
248            error!(
249                "put object to non service error! object={}, {}",
250                object_id, e
251            );
252            Err(e)
253        }
254    }
255
256    pub async fn update_object_meta(
257        &self,
258        req: NONUpdateObjectMetaOutputRequest,
259    ) -> BuckyResult<NONPutObjectOutputResponse> {
260        let req = NONPutObjectOutputRequest {
261            common: req.common,
262            object: NONObjectInfo::new(req.object_id, vec![], None),
263            access: req.access,
264        };
265
266        self.put_object(req).await
267    }
268
269    fn encode_get_object_request(&self, req: &NONGetObjectOutputRequest) -> Request {
270        let mut http_req = Request::new(Method::Get, self.service_url.clone());
271        self.encode_common_headers(NONAction::GetObject, &req.common, &mut http_req);
272
273        http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
274
275        RequestorHelper::encode_opt_header_with_encoding(
276            &mut http_req,
277            cyfs_base::CYFS_INNER_PATH,
278            req.inner_path.as_deref(),
279        );
280
281        http_req
282    }
283
284    pub async fn get_object(
285        &self,
286        req: NONGetObjectOutputRequest,
287    ) -> BuckyResult<NONGetObjectOutputResponse> {
288        let http_req = self.encode_get_object_request(&req);
289
290        let mut resp = self.requestor.request(http_req).await?;
291
292        if resp.status().is_success() {
293            let resp = NONRequestorHelper::decode_get_object_response(&mut resp).await?;
294            info!(
295                "get object from non service success: {}, object={}",
296                req.object_debug_info(), resp.object.object_id,
297            );
298            Ok(resp)
299        } else {
300            let e = RequestorHelper::error_from_resp(&mut resp).await;
301            error!(
302                "get object from non service error! object={}, {}",
303                req.object_debug_info(),
304                e
305            );
306            Err(e)
307        }
308    }
309
310    fn encode_post_object_request(&self, req: &NONPostObjectOutputRequest) -> Request {
311        let mut http_req = Request::new(Method::Post, self.service_url.clone());
312        self.encode_common_headers(NONAction::PostObject, &req.common, &mut http_req);
313
314        http_req
315    }
316
317    async fn decode_post_object_response(
318        &self,
319        resp: &mut Response,
320    ) -> BuckyResult<NONPostObjectOutputResponse> {
321        let object = NONRequestorHelper::decode_option_object_info(resp).await?;
322
323        let ret = NONPostObjectOutputResponse { object };
324
325        Ok(ret)
326    }
327
328    pub async fn post_object(
329        &self,
330        req: NONPostObjectOutputRequest,
331    ) -> BuckyResult<NONPostObjectOutputResponse> {
332        let object_id = req.object.object_id.clone();
333
334        let mut http_req = self.encode_post_object_request(&req);
335        NONRequestorHelper::encode_object_info(&mut http_req, req.object);
336
337        let mut resp = self.requestor.request(http_req).await?;
338
339        let status = resp.status();
340        if status.is_success() {
341            match status {
342                http_types::StatusCode::NoContent => {
343                    let e = RequestorHelper::error_from_resp(&mut resp).await;
344                    info!(
345                        "post object to non service but empty response! obj={}, {}",
346                        object_id, e
347                    );
348                    Err(e)
349                }
350                _ => {
351                    info!("post object to non service success: {}", object_id);
352                    self.decode_post_object_response(&mut resp).await
353                }
354            }
355        } else {
356            let e = RequestorHelper::error_from_resp(&mut resp).await;
357            if e.code() ==  BuckyErrorCode::NotHandled {
358                warn!(
359                    "post object to non service but not handled! object={}, {}",
360                    object_id, e
361                );
362            } else {
363                error!(
364                    "post object to non service error! object={}, {}",
365                    object_id, e
366                );
367            }
368            
369            Err(e)
370        }
371    }
372
373    fn format_select_url(&self, req_path: Option<&String>, filter: &SelectFilter) -> Url {
374        let mut url = if let Some(req_path) = req_path {
375            self.service_url
376                .join(req_path.trim_start_matches('/').trim_end_matches('/'))
377                .unwrap()
378        } else {
379            self.service_url.clone()
380        };
381
382        // filter以url params形式编码
383        SelectFilterUrlCodec::encode(&mut url, filter);
384
385        url
386    }
387
388    fn encode_select_request(&self, req: &NONSelectObjectOutputRequest) -> Request {
389        let url = self.format_select_url(req.common.req_path.as_ref(), &req.filter);
390        let mut http_req = Request::new(Method::Get, url);
391        self.encode_common_headers(NONAction::SelectObject, &req.common, &mut http_req);
392
393        SelectOptionCodec::encode(&mut http_req, &req.opt);
394
395        http_req
396    }
397
398    pub async fn select_object(
399        &self,
400        req: NONSelectObjectOutputRequest,
401    ) -> BuckyResult<NONSelectObjectOutputResponse> {
402        let http_req = self.encode_select_request(&req);
403
404        let mut resp = self.requestor.request(http_req).await?;
405
406        if resp.status().is_success() {
407            let resp = SelectResponse::from_respone(resp).await?;
408            Ok(NONSelectObjectOutputResponse {
409                objects: resp.objects,
410            })
411        } else {
412            let e = RequestorHelper::error_from_resp(&mut resp).await;
413            error!("select object from non failed: {}", e);
414            Err(e)
415        }
416    }
417
418    fn encode_delete_object_request(&self, req: &NONDeleteObjectOutputRequest) -> Request {
419        let mut http_req = Request::new(Method::Delete, self.service_url.clone());
420        self.encode_common_headers(NONAction::DeleteObject, &req.common, &mut http_req);
421
422        http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
423
424        RequestorHelper::encode_opt_header_with_encoding(
425            &mut http_req,
426            cyfs_base::CYFS_INNER_PATH,
427            req.inner_path.as_deref(),
428        );
429
430        http_req
431    }
432
433    async fn decode_delete_object_response(
434        &self,
435        req: &NONDeleteObjectOutputRequest,
436        resp: &mut Response,
437    ) -> BuckyResult<NONDeleteObjectOutputResponse> {
438        let object = if req.common.flags & CYFS_REQUEST_FLAG_DELETE_WITH_QUERY != 0 {
439            let object = NONRequestorHelper::decode_object_info(resp).await?;
440            Some(object)
441        } else {
442            None
443        };
444
445        Ok(NONDeleteObjectOutputResponse { object })
446    }
447
448    pub async fn delete_object(
449        &self,
450        req: NONDeleteObjectOutputRequest,
451    ) -> BuckyResult<NONDeleteObjectOutputResponse> {
452        let http_req = self.encode_delete_object_request(&req);
453
454        let mut resp = self.requestor.request(http_req).await?;
455
456        if resp.status().is_success() {
457            let ret = self.decode_delete_object_response(&req, &mut resp).await?;
458            info!(
459                "delete object from non service success: {}, obj={:?}",
460                req.object_id, ret.object
461            );
462            Ok(ret)
463        } else {
464            let e = RequestorHelper::error_from_resp(&mut resp).await;
465            error!(
466                "delete object from non failed: object={}, {}",
467                req.object_id, e
468            );
469            Err(e)
470        }
471    }
472}
473
474#[async_trait::async_trait]
475impl NONOutputProcessor for NONRequestor {
476    async fn put_object(
477        &self,
478        req: NONPutObjectOutputRequest,
479    ) -> BuckyResult<NONPutObjectOutputResponse> {
480        self.put_object(req).await
481    }
482
483    async fn get_object(
484        &self,
485        req: NONGetObjectOutputRequest,
486    ) -> BuckyResult<NONGetObjectOutputResponse> {
487        self.get_object(req).await
488    }
489
490    async fn post_object(
491        &self,
492        req: NONPostObjectOutputRequest,
493    ) -> BuckyResult<NONPostObjectOutputResponse> {
494        self.post_object(req).await
495    }
496
497    async fn select_object(
498        &self,
499        req: NONSelectObjectOutputRequest,
500    ) -> BuckyResult<NONSelectObjectOutputResponse> {
501        self.select_object(req).await
502    }
503
504    async fn delete_object(
505        &self,
506        req: NONDeleteObjectOutputRequest,
507    ) -> BuckyResult<NONDeleteObjectOutputResponse> {
508        self.delete_object(req).await
509    }
510}