cyfs_lib/ndn/
requestor.rs

1use super::def::*;
2use super::output_request::*;
3use super::processor::*;
4use crate::base::*;
5use crate::requestor::*;
6use crate::stack::SharedObjectStackDecID;
7use cyfs_base::*;
8
9use http_types::{Method, Request, Response, Url};
10use std::sync::Arc;
11
12pub struct NDNRequestorHelper;
13
14impl NDNRequestorHelper {
15    pub async fn decode_get_data_response(
16        resp: &mut Response,
17    ) -> BuckyResult<NDNGetDataOutputResponse> {
18        let data = Box::new(resp.take_body());
19
20        let attr: Option<u32> =
21            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_ATTRIBUTES)?;
22        let attr = attr.map(|v| Attributes::new(v));
23
24        let object_id = RequestorHelper::decode_header(resp, cyfs_base::CYFS_OBJECT_ID)?;
25        let owner_id = RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OWNER_ID)?;
26
27        let range = RequestorHelper::decode_optional_json_header(resp, cyfs_base::CYFS_DATA_RANGE)?;
28
29        let group = RequestorHelper::decode_optional_header_with_utf8_decoding(
30            resp,
31            cyfs_base::CYFS_TASK_GROUP,
32        )?;
33
34        let length: u64 =
35            RequestorHelper::decode_header(resp, http_types::headers::CONTENT_LENGTH)?;
36        let ret = NDNGetDataOutputResponse {
37            object_id,
38            owner_id,
39            attr,
40            range,
41            group,
42            length,
43            data,
44        };
45
46        Ok(ret)
47    }
48}
49
50#[derive(Clone)]
51pub struct NDNRequestor {
52    dec_id: Option<SharedObjectStackDecID>,
53    requestor: HttpRequestorRef,
54    service_url: Url,
55
56    data_requestor: HttpRequestorRef,
57    data_service_url: Url,
58}
59
60impl NDNRequestor {
61    pub fn new(
62        dec_id: Option<SharedObjectStackDecID>,
63        requestor: HttpRequestorRef,
64        data_requestor: Option<HttpRequestorRef>,
65    ) -> Self {
66        let url = format!("http://{}/ndn/", requestor.remote_addr());
67        let url = Url::parse(&url).unwrap();
68
69        let data_service_url = match &data_requestor {
70            Some(requestor) => {
71                let url = format!("http://{}/ndn/", requestor.remote_addr());
72                Url::parse(&url).unwrap()
73            }
74            None => url.clone(),
75        };
76
77        let data_requestor = data_requestor.unwrap_or(requestor.clone());
78
79        Self {
80            dec_id,
81            requestor,
82            service_url: url,
83
84            data_requestor,
85            data_service_url,
86        }
87    }
88
89    pub fn into_processor(self) -> NDNOutputProcessorRef {
90        Arc::new(Box::new(self))
91    }
92
93    pub fn clone_processor(&self) -> NDNOutputProcessorRef {
94        self.clone().into_processor()
95    }
96
97    fn encode_common_headers(
98        &self,
99        action: NDNAction,
100        com_req: &NDNOutputRequestCommon,
101        http_req: &mut Request,
102    ) {
103        if let Some(dec_id) = &com_req.dec_id {
104            http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
105        } else if let Some(dec_id) = &self.dec_id {
106            if let Some(dec_id) = dec_id.get() {
107                http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
108            }
109        }
110
111        http_req.insert_header(cyfs_base::CYFS_NDN_ACTION, action.to_string());
112
113        http_req.insert_header(cyfs_base::CYFS_API_LEVEL, com_req.level.to_string());
114
115        if let Some(target) = &com_req.target {
116            http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
117        }
118
119        RequestorHelper::encode_opt_header_with_encoding(
120            http_req,
121            cyfs_base::CYFS_REQ_PATH,
122            com_req.req_path.as_deref(),
123        );
124
125        if !com_req.referer_object.is_empty() {
126            RequestorHelper::insert_headers_with_encoding(
127                http_req,
128                cyfs_base::CYFS_REFERER_OBJECT,
129                &com_req.referer_object,
130            );
131        }
132
133        http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
134    }
135
136    fn encode_put_data_request(&self, req: &NDNPutDataOutputRequest) -> Request {
137        let mut http_req = Request::new(Method::Put, self.data_service_url.clone());
138
139        self.encode_common_headers(NDNAction::PutData, &req.common, &mut http_req);
140
141        http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
142
143        http_req
144    }
145
146    async fn decode_put_data_response(
147        &self,
148        resp: &Response,
149    ) -> BuckyResult<NDNPutDataOutputResponse> {
150        let result: NDNPutDataResult =
151            RequestorHelper::decode_header(resp, cyfs_base::CYFS_RESULT)?;
152
153        let ret = NDNPutDataOutputResponse { result };
154
155        Ok(ret)
156    }
157
158    #[allow(unused_mut)]
159    pub async fn put_data(
160        &self,
161        mut req: NDNPutDataOutputRequest,
162    ) -> BuckyResult<NDNPutDataOutputResponse> {
163        let mut http_req = self.encode_put_data_request(&req);
164
165        #[cfg(debug_assertions)]
166        {
167            use async_std::io::ReadExt;
168
169            let mut data = Vec::new();
170            req.data.read_to_end(&mut data).await.map_err(|e| {
171                let msg = format!("read data failed! chunk={} {}", req.object_id, e);
172                error!("{}", msg);
173                BuckyError::new(BuckyErrorCode::IoError, msg)
174            })?;
175
176            if data.len() != req.length as usize {
177                error!(
178                    "chunk length unmatch: calc={}, expect={}",
179                    data.len(),
180                    req.length,
181                );
182                unreachable!();
183            }
184
185            let calc_id = ChunkId::calculate_sync(&data).unwrap();
186
187            if calc_id.object_id() != req.object_id {
188                error!(
189                    "chunk id unmatch: calc_id={}, expect={}",
190                    calc_id, req.object_id,
191                );
192                unreachable!();
193            }
194
195            http_req.set_body(data);
196        }
197        #[cfg(not(debug_assertions))]
198        {
199            let reader = async_std::io::BufReader::new(req.data);
200            let body = tide::Body::from_reader(reader, Some(req.length as usize));
201            http_req.set_body(body);
202        }
203        let mut resp = self.data_requestor.request(http_req).await?;
204
205        if resp.status().is_success() {
206            info!("put data to ndn service success: {}", req.object_id);
207            self.decode_put_data_response(&resp).await
208        } else {
209            let e = RequestorHelper::error_from_resp(&mut resp).await;
210            error!(
211                "put data to ndn service error! object={}, {}",
212                req.object_id, e
213            );
214            Err(e)
215        }
216    }
217
218    fn encode_put_shared_data_request(&self, req: &NDNPutDataOutputRequest) -> Request {
219        let mut http_req = Request::new(Method::Put, self.service_url.clone());
220
221        self.encode_common_headers(NDNAction::PutSharedData, &req.common, &mut http_req);
222
223        http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
224
225        http_req
226    }
227
228    async fn decode_put_shared_data_response(
229        &self,
230        resp: &Response,
231    ) -> BuckyResult<NDNPutDataOutputResponse> {
232        let result: NDNPutDataResult =
233            RequestorHelper::decode_header(resp, cyfs_base::CYFS_RESULT)?;
234
235        let ret = NDNPutDataOutputResponse { result };
236
237        Ok(ret)
238    }
239
240    pub async fn put_shared_data(
241        &self,
242        req: NDNPutDataOutputRequest,
243    ) -> BuckyResult<NDNPutDataOutputResponse> {
244        info!("will put_shared_data: {}", req);
245
246        let mut http_req = self.encode_put_shared_data_request(&req);
247
248        let reader = async_std::io::BufReader::new(req.data);
249        let body = tide::Body::from_reader(reader, Some(req.length as usize));
250        http_req.set_body(body);
251
252        let mut resp = self.requestor.request(http_req).await?;
253
254        if resp.status().is_success() {
255            info!("put shared data to ndn service success: {}", req.object_id);
256            self.decode_put_shared_data_response(&resp).await
257        } else {
258            let e = RequestorHelper::error_from_resp(&mut resp).await;
259            error!(
260                "put shared data to ndn service error! object={}, {}",
261                req.object_id, e
262            );
263            Err(e)
264        }
265    }
266
267    fn encode_get_data_request(&self, action: NDNAction, req: &NDNGetDataOutputRequest) -> Request {
268        let mut http_req = Request::new(Method::Get, self.data_service_url.clone());
269        self.encode_common_headers(action, &req.common, &mut http_req);
270
271        http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
272        RequestorHelper::encode_opt_header_with_encoding(
273            &mut http_req,
274            cyfs_base::CYFS_INNER_PATH,
275            req.inner_path.as_deref(),
276        );
277
278        RequestorHelper::encode_opt_header_with_encoding(
279            &mut http_req,
280            cyfs_base::CYFS_CONTEXT,
281            req.context.as_deref(),
282        );
283
284        RequestorHelper::encode_opt_header_with_encoding(
285            &mut http_req,
286            cyfs_base::CYFS_TASK_GROUP,
287            req.group.as_deref(),
288        );
289
290        if let Some(ref range) = req.range {
291            http_req.insert_header("Range", range.encode_string());
292        }
293
294        http_req
295    }
296
297    pub async fn get_data(
298        &self,
299        req: NDNGetDataOutputRequest,
300    ) -> BuckyResult<NDNGetDataOutputResponse> {
301        let http_req = self.encode_get_data_request(NDNAction::GetData, &req);
302
303        let mut resp = self.data_requestor.request(http_req).await?;
304
305        if resp.status().is_success() {
306            match NDNRequestorHelper::decode_get_data_response(&mut resp).await {
307                Ok(resp) => {
308                    info!("get data from ndn service success: {}", resp);
309                    Ok(resp)
310                }
311                Err(e) => {
312                    error!("decode get data response error: {}, {}", req.object_id, e);
313                    Err(e)
314                }
315            }
316        } else {
317            let e = RequestorHelper::error_from_resp(&mut resp).await;
318            error!(
319                "get data from ndn service error: object={}, {}",
320                req.object_id, e
321            );
322            Err(e)
323        }
324    }
325
326    async fn decode_get_shared_data_response(
327        &self,
328        _req: &NDNGetDataOutputRequest,
329        resp: &mut Response,
330    ) -> BuckyResult<NDNGetDataOutputResponse> {
331        let data = Box::new(resp.take_body());
332
333        let attr: Option<u32> =
334            RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_ATTRIBUTES)?;
335        let attr = attr.map(|v| Attributes::new(v));
336
337        let object_id = RequestorHelper::decode_header(resp, cyfs_base::CYFS_OBJECT_ID)?;
338        let owner_id = RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OWNER_ID)?;
339
340        let range = RequestorHelper::decode_optional_json_header(resp, cyfs_base::CYFS_DATA_RANGE)?;
341        let group = RequestorHelper::decode_optional_header_with_utf8_decoding(
342            resp,
343            cyfs_base::CYFS_TASK_GROUP,
344        )?;
345
346        let length: u64 =
347            RequestorHelper::decode_header(resp, http_types::headers::CONTENT_LENGTH)?;
348
349        let ret = NDNGetDataOutputResponse {
350            object_id,
351            owner_id,
352            attr,
353            range,
354            group,
355            length,
356            data,
357        };
358
359        Ok(ret)
360    }
361
362    pub async fn get_shared_data(
363        &self,
364        req: NDNGetDataOutputRequest,
365    ) -> BuckyResult<NDNGetDataOutputResponse> {
366        let http_req = self.encode_get_data_request(NDNAction::GetSharedData, &req);
367
368        let mut resp = self.requestor.request(http_req).await?;
369
370        if resp.status().is_success() {
371            info!("get data from ndn service success: {}", req.object_id);
372            self.decode_get_shared_data_response(&req, &mut resp).await
373        } else {
374            let e = RequestorHelper::error_from_resp(&mut resp).await;
375            error!(
376                "get data from ndn service error: object={}, {}",
377                req.object_id, e
378            );
379            Err(e)
380        }
381    }
382
383    fn encode_delete_data_request(&self, req: &NDNDeleteDataOutputRequest) -> Request {
384        let mut http_req = Request::new(Method::Delete, self.service_url.clone());
385        self.encode_common_headers(NDNAction::DeleteData, &req.common, &mut http_req);
386
387        http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
388        if let Some(inner_path) = &req.inner_path {
389            http_req.insert_header(cyfs_base::CYFS_INNER_PATH, inner_path);
390        }
391
392        http_req
393    }
394
395    async fn decode_delete_data_response(
396        &self,
397        resp: &Response,
398    ) -> BuckyResult<NDNDeleteDataOutputResponse> {
399        let object_id = RequestorHelper::decode_header(resp, cyfs_base::CYFS_OBJECT_ID)?;
400
401        let ret = NDNDeleteDataOutputResponse { object_id };
402
403        Ok(ret)
404    }
405
406    pub async fn delete_data(
407        &self,
408        req: NDNDeleteDataOutputRequest,
409    ) -> BuckyResult<NDNDeleteDataOutputResponse> {
410        let http_req = self.encode_delete_data_request(&req);
411        let mut resp = self.requestor.request(http_req).await?;
412
413        if resp.status().is_success() {
414            info!("delete data from ndn service success: {}", req.object_id);
415            self.decode_delete_data_response(&resp).await
416        } else {
417            let e = RequestorHelper::error_from_resp(&mut resp).await;
418            error!(
419                "delete data from ndn service error! object={}, {}",
420                req.object_id, e
421            );
422            Err(e)
423        }
424    }
425
426    fn encode_query_file_request(&self, req: &NDNQueryFileOutputRequest) -> Request {
427        let mut url = self.service_url.clone();
428
429        let (t, v) = req.param.to_key_pair();
430        url.query_pairs_mut()
431            .append_pair("type", t)
432            .append_pair("value", &v);
433
434        let mut http_req = Request::new(Method::Get, url);
435        self.encode_common_headers(NDNAction::QueryFile, &req.common, &mut http_req);
436
437        http_req
438    }
439
440    async fn decode_query_file_response(
441        &self,
442        resp: &mut Response,
443    ) -> BuckyResult<NDNQueryFileOutputResponse> {
444        let ret: NDNQueryFileOutputResponse = RequestorHelper::decode_json_body(resp).await?;
445
446        Ok(ret)
447    }
448
449    async fn query_file(
450        &self,
451        req: NDNQueryFileOutputRequest,
452    ) -> BuckyResult<NDNQueryFileOutputResponse> {
453        let http_req = self.encode_query_file_request(&req);
454        let mut resp = self.requestor.request(http_req).await?;
455
456        if resp.status().is_success() {
457            // info!("query file from ndn service success: {}", resp);
458            self.decode_query_file_response(&mut resp).await
459        } else {
460            let e = RequestorHelper::error_from_resp(&mut resp).await;
461            error!(
462                "query file from ndn service error! param={}, {}",
463                req.param, e
464            );
465            Err(e)
466        }
467    }
468}
469
470#[async_trait::async_trait]
471impl NDNOutputProcessor for NDNRequestor {
472    async fn put_data(
473        &self,
474        req: NDNPutDataOutputRequest,
475    ) -> BuckyResult<NDNPutDataOutputResponse> {
476        self.put_data(req).await
477    }
478
479    async fn get_data(
480        &self,
481        req: NDNGetDataOutputRequest,
482    ) -> BuckyResult<NDNGetDataOutputResponse> {
483        self.get_data(req).await
484    }
485
486    async fn put_shared_data(
487        &self,
488        req: NDNPutDataOutputRequest,
489    ) -> BuckyResult<NDNPutDataOutputResponse> {
490        self.put_shared_data(req).await
491    }
492
493    async fn get_shared_data(
494        &self,
495        req: NDNGetDataOutputRequest,
496    ) -> BuckyResult<NDNGetDataOutputResponse> {
497        self.get_shared_data(req).await
498    }
499
500    async fn delete_data(
501        &self,
502        req: NDNDeleteDataOutputRequest,
503    ) -> BuckyResult<NDNDeleteDataOutputResponse> {
504        self.delete_data(req).await
505    }
506
507    async fn query_file(
508        &self,
509        req: NDNQueryFileOutputRequest,
510    ) -> BuckyResult<NDNQueryFileOutputResponse> {
511        self.query_file(req).await
512    }
513}