cyfs_lib/trans/
requestor.rs

1use super::output_request::*;
2use crate::base::*;
3use crate::*;
4use cyfs_base::*;
5use cyfs_core::TransContextObject;
6
7use http_types::{Method, Request, Url};
8use std::sync::Arc;
9
10#[derive(Clone)]
11pub struct TransRequestor {
12    dec_id: Option<SharedObjectStackDecID>,
13    requestor: HttpRequestorRef,
14    service_url: Url,
15}
16
17impl TransRequestor {
18    pub fn new(dec_id: Option<SharedObjectStackDecID>, requestor: HttpRequestorRef) -> Self {
19        let addr = requestor.remote_addr();
20
21        let url = format!("http://{}/trans/", addr);
22        let url = Url::parse(&url).unwrap();
23
24        Self {
25            dec_id,
26            requestor,
27            service_url: url,
28        }
29    }
30
31    pub fn clone_processor(&self) -> TransOutputProcessorRef {
32        Arc::new(self.clone())
33    }
34
35    fn encode_common_headers(&self, com_req: &NDNOutputRequestCommon, http_req: &mut Request) {
36        if let Some(dec_id) = &com_req.dec_id {
37            http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
38        } else if let Some(dec_id) = &self.dec_id {
39            if let Some(dec_id) = dec_id.get() {
40                http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
41            }
42        }
43
44        RequestorHelper::encode_opt_header_with_encoding(
45            http_req,
46            cyfs_base::CYFS_REQ_PATH,
47            com_req.req_path.as_deref(),
48        );
49        http_req.insert_header(CYFS_API_LEVEL, com_req.level.to_string());
50
51        if let Some(target) = &com_req.target {
52            http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
53        }
54
55        if !com_req.referer_object.is_empty() {
56            RequestorHelper::insert_headers_with_encoding(
57                http_req,
58                cyfs_base::CYFS_REFERER_OBJECT,
59                &com_req.referer_object,
60            );
61        }
62
63        http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
64    }
65
66    pub async fn get_context(
67        &self,
68        req: TransGetContextOutputRequest,
69    ) -> BuckyResult<TransGetContextOutputResponse> {
70        info!(
71            "will get context id={:?}, path={:?}",
72            req.context_id, req.context_path
73        );
74
75        let url = self.service_url.join("get_context").unwrap();
76        let mut http_req = Request::new(Method::Post, url);
77
78        self.encode_common_headers(&req.common, &mut http_req);
79        let body = req.encode_string();
80        http_req.set_body(body);
81
82        let mut resp = self.requestor.request(http_req).await?;
83        match resp.status() {
84            code if code.is_success() => {
85                let context = RequestorHelper::decode_raw_object_body(&mut resp).await?;
86
87                Ok(TransGetContextOutputResponse { context })
88            }
89            code @ _ => {
90                let e = RequestorHelper::error_from_resp(&mut resp).await;
91                error!(
92                    "get context failed: id={:?}, path={:?}, status={}, {}",
93                    req.context_id, req.context_path, code, e
94                );
95
96                Err(e)
97            }
98        }
99    }
100
101    pub async fn put_context(&self, req: TransPutContextOutputRequest) -> BuckyResult<()> {
102        info!("will put context {}", req.context.context_path());
103
104        let url = self.service_url.join("put_context").unwrap();
105        let mut http_req = Request::new(Method::Post, url);
106
107        self.encode_common_headers(&req.common, &mut http_req);
108
109        if let Some(access) = &req.access {
110            http_req.insert_header(cyfs_base::CYFS_ACCESS, access.value().to_string());
111        }
112        
113        let body = req.context.to_vec()?;
114        http_req.set_body(body);
115
116        let mut resp = self.requestor.request(http_req).await?;
117        match resp.status() {
118            code if code.is_success() => Ok(()),
119            code @ _ => {
120                let e = RequestorHelper::error_from_resp(&mut resp).await;
121                error!(
122                    "update context failed: context={}, status={}, {}",
123                    req.context.context_path(),
124                    code,
125                    e
126                );
127                Err(e)
128            }
129        }
130    }
131
132    pub async fn create_task(
133        &self,
134        req: TransCreateTaskOutputRequest,
135    ) -> BuckyResult<TransCreateTaskOutputResponse> {
136        info!("will create trans task: {:?}", req);
137
138        let url = self.service_url.join("task").unwrap();
139        let mut http_req = Request::new(Method::Post, url);
140
141        self.encode_common_headers(&req.common, &mut http_req);
142        let body = req.encode_string();
143        http_req.set_body(body);
144
145        let mut resp = self.requestor.request(http_req).await?;
146
147        match resp.status() {
148            code if code.is_success() => {
149                let body = resp.body_string().await.map_err(|e| {
150                    let msg = format!(
151                        "trans create task failed, read body string error! req={:?} {}",
152                        req, e
153                    );
154                    error!("{}", msg);
155
156                    BuckyError::from(msg)
157                })?;
158
159                let resp = TransCreateTaskOutputResponse::decode_string(&body).map_err(|e| {
160                    error!(
161                        "decode trans create task resp from body string error: body={} {}",
162                        body, e,
163                    );
164                    e
165                })?;
166
167                debug!("trans create task success: resp={:?}", resp.task_id);
168
169                Ok(resp)
170            }
171            code @ _ => {
172                let e = RequestorHelper::error_from_resp(&mut resp).await;
173                error!(
174                    "create task failed: obj={}, status={}, {}",
175                    req.object_id, code, e
176                );
177                Err(e)
178            }
179        }
180    }
181
182    pub async fn control_task(&self, req: TransControlTaskOutputRequest) -> BuckyResult<()> {
183        info!("will control trans task: {:?}", req);
184
185        let url = self.service_url.join("task").unwrap();
186        let mut http_req = Request::new(Method::Put, url);
187
188        self.encode_common_headers(&req.common, &mut http_req);
189        let body = req.encode_string();
190        http_req.set_body(body);
191
192        let mut resp = self.requestor.request(http_req).await?;
193
194        match resp.status() {
195            code if code.is_success() => Ok(()),
196            code @ _ => {
197                let e = RequestorHelper::error_from_resp(&mut resp).await;
198                error!(
199                    "stop trans task failed: task={}, status={}, {}",
200                    req.task_id, code, e
201                );
202                Err(e)
203            }
204        }
205    }
206
207    pub async fn start_task(&self, req: TransTaskOutputRequest) -> BuckyResult<()> {
208        Self::control_task(
209            self,
210            TransControlTaskOutputRequest {
211                common: req.common.clone(),
212                task_id: req.task_id.clone(),
213                action: TransTaskControlAction::Start,
214            },
215        )
216        .await
217    }
218
219    pub async fn stop_task(&self, req: TransTaskOutputRequest) -> BuckyResult<()> {
220        Self::control_task(
221            self,
222            TransControlTaskOutputRequest {
223                common: req.common.clone(),
224                task_id: req.task_id.clone(),
225                action: TransTaskControlAction::Stop,
226            },
227        )
228        .await
229    }
230
231    pub async fn delete_task(&self, req: TransTaskOutputRequest) -> BuckyResult<()> {
232        Self::control_task(
233            self,
234            TransControlTaskOutputRequest {
235                common: req.common.clone(),
236                task_id: req.task_id.clone(),
237                action: TransTaskControlAction::Delete,
238            },
239        )
240        .await
241    }
242
243    pub async fn get_task_state(
244        &self,
245        req: TransGetTaskStateOutputRequest,
246    ) -> BuckyResult<TransGetTaskStateOutputResponse> {
247        info!("will get trans task state: {:?}", req);
248
249        let url = self.service_url.join("task/state").unwrap();
250        let mut http_req = Request::new(Method::Get, url);
251
252        self.encode_common_headers(&req.common, &mut http_req);
253        let body = req.encode_string();
254        http_req.set_body(body);
255
256        let mut resp = self.requestor.request(http_req).await?;
257
258        match resp.status() {
259            code if code.is_success() => {
260                let content = resp.body_json().await.map_err(|e| {
261                    let msg = format!("parse TransTaskState resp body error! err={}", e);
262                    error!("{}", msg);
263                    BuckyError::new(BuckyErrorCode::InvalidData, msg)
264                })?;
265
266                info!(
267                    "got trans task state: task={}, state={:?}",
268                    req.task_id, content
269                );
270
271                Ok(content)
272            }
273            code @ _ => {
274                let e = RequestorHelper::error_from_resp(&mut resp).await;
275                error!(
276                    "get trans task state failed: task={}, status={}, {}",
277                    req.task_id, code, e,
278                );
279                Err(e)
280            }
281        }
282    }
283
284    pub async fn query_tasks(
285        &self,
286        req: TransQueryTasksOutputRequest,
287    ) -> BuckyResult<TransQueryTasksOutputResponse> {
288        info!("will query tasks: {:?}", req);
289
290        let url = self.service_url.join("tasks").unwrap();
291        let mut http_req = Request::new(Method::Post, url);
292
293        self.encode_common_headers(&req.common, &mut http_req);
294        let body = req.encode_string();
295        http_req.set_body(body);
296
297        let mut resp = self.requestor.request(http_req).await?;
298
299        match resp.status() {
300            code if code.is_success() => {
301                let content = resp.body_string().await.map_err(|e| {
302                    let msg = format!("get query task resp body error! err={}", e);
303                    error!("{}", msg);
304                    BuckyError::new(BuckyErrorCode::InvalidData, msg)
305                })?;
306
307                let resp = TransQueryTasksOutputResponse::decode_string(content.as_str())?;
308                Ok(resp)
309            }
310            code @ _ => {
311                let e = RequestorHelper::error_from_resp(&mut resp).await;
312                error!("query tasks failed: status={}, msg={}", code, e);
313
314                Err(e)
315            }
316        }
317    }
318
319    pub async fn publish_file(
320        &self,
321        req: TransPublishFileOutputRequest,
322    ) -> BuckyResult<TransPublishFileOutputResponse> {
323        info!("will publish file: {:?}", req);
324
325        let url = self.service_url.join("file").unwrap();
326        let mut http_req = Request::new(Method::Post, url);
327
328        self.encode_common_headers(&req.common, &mut http_req);
329        let body = req.encode_string();
330        http_req.set_body(body);
331
332        let mut resp = self.requestor.request(http_req).await?;
333
334        match resp.status() {
335            code if code.is_success() => {
336                let body = resp.body_string().await.map_err(|e| {
337                    let msg = format!(
338                        "trans publish file failed, read body string error! req={:?} {}",
339                        req, e
340                    );
341                    error!("{}", msg);
342
343                    BuckyError::from(msg)
344                })?;
345
346                let resp = TransPublishFileOutputResponse::decode_string(&body).map_err(|e| {
347                    error!(
348                        "decode trans publish file resp from body string error: body={} {}",
349                        body, e,
350                    );
351                    e
352                })?;
353
354                debug!("trans publish file success: resp={:?}", resp);
355
356                Ok(resp)
357            }
358            code @ _ => {
359                let e = RequestorHelper::error_from_resp(&mut resp).await;
360                error!(
361                    "trans publish file failed: file={}, status={}, {}",
362                    req.local_path.display(),
363                    code,
364                    e
365                );
366
367                Err(e)
368            }
369        }
370    }
371
372    pub async fn get_task_group_state(
373        &self,
374        req: TransGetTaskGroupStateOutputRequest,
375    ) -> BuckyResult<TransGetTaskGroupStateOutputResponse> {
376        info!("will get trans task group state: {:?}", req);
377
378        let url = self.service_url.join("task_group/state").unwrap();
379        let mut http_req = Request::new(Method::Post, url);
380
381        self.encode_common_headers(&req.common, &mut http_req);
382        http_req.set_body(serde_json::to_string(&req).unwrap());
383
384        let mut resp = self.requestor.request(http_req).await?;
385
386        if resp.status().is_success() {
387            let content = resp.body_json().await.map_err(|e| {
388                let msg = format!("parse get task group state resp body error! err={}", e);
389                error!("{}", msg);
390                BuckyError::new(BuckyErrorCode::InvalidData, msg)
391            })?;
392
393            info!(
394                "got trans task group state: task_group={}, state={:?}",
395                req.group, content
396            );
397
398            Ok(content)
399        } else {
400            let e = RequestorHelper::error_from_resp(&mut resp).await;
401            error!(
402                "get trans task state failed: task_group={}, status={}, {}",
403                req.group,
404                resp.status(),
405                e
406            );
407
408            Err(e)
409        }
410    }
411
412    pub async fn control_task_group(
413        &self,
414        req: TransControlTaskGroupOutputRequest,
415    ) -> BuckyResult<TransControlTaskGroupOutputResponse> {
416        info!("will control trans task group: {:?}", req);
417
418        let url = self.service_url.join("task_group").unwrap();
419        let mut http_req = Request::new(Method::Put, url);
420
421        self.encode_common_headers(&req.common, &mut http_req);
422        http_req.set_body(serde_json::to_string(&req).unwrap());
423
424        let mut resp = self.requestor.request(http_req).await?;
425
426        if resp.status().is_success() {
427            let resp = resp.body_json().await.map_err(|e| {
428                let msg = format!(
429                    "trans control task group failed, read body string error! req={:?} {}",
430                    req, e
431                );
432                error!("{}", msg);
433
434                BuckyError::from(msg)
435            })?;
436
437            debug!("trans control task group success: resp={:?}", resp);
438
439            Ok(resp)
440        } else {
441            let e = RequestorHelper::error_from_resp(&mut resp).await;
442            error!("trans control task failed! status={}, {}", resp.status(), e);
443
444            Err(e)
445        }
446    }
447}
448
449#[async_trait::async_trait]
450impl TransOutputProcessor for TransRequestor {
451    async fn get_context(
452        &self,
453        req: TransGetContextOutputRequest,
454    ) -> BuckyResult<TransGetContextOutputResponse> {
455        Self::get_context(self, req).await
456    }
457
458    async fn put_context(&self, req: TransPutContextOutputRequest) -> BuckyResult<()> {
459        Self::put_context(self, req).await
460    }
461
462    async fn create_task(
463        &self,
464        req: TransCreateTaskOutputRequest,
465    ) -> BuckyResult<TransCreateTaskOutputResponse> {
466        Self::create_task(self, req).await
467    }
468
469    async fn query_tasks(
470        &self,
471        req: TransQueryTasksOutputRequest,
472    ) -> BuckyResult<TransQueryTasksOutputResponse> {
473        Self::query_tasks(self, req).await
474    }
475
476    async fn get_task_state(
477        &self,
478        req: TransGetTaskStateOutputRequest,
479    ) -> BuckyResult<TransGetTaskStateOutputResponse> {
480        Self::get_task_state(self, req).await
481    }
482
483    async fn publish_file(
484        &self,
485        req: TransPublishFileOutputRequest,
486    ) -> BuckyResult<TransPublishFileOutputResponse> {
487        Self::publish_file(self, req).await
488    }
489
490    async fn control_task(&self, req: TransControlTaskOutputRequest) -> BuckyResult<()> {
491        Self::control_task(self, req).await
492    }
493
494    async fn get_task_group_state(
495        &self,
496        req: TransGetTaskGroupStateOutputRequest,
497    ) -> BuckyResult<TransGetTaskGroupStateOutputResponse> {
498        Self::get_task_group_state(self, req).await
499    }
500
501    async fn control_task_group(
502        &self,
503        req: TransControlTaskGroupOutputRequest,
504    ) -> BuckyResult<TransControlTaskGroupOutputResponse> {
505        Self::control_task_group(self, req).await
506    }
507}
508/*
509struct TransHelper {
510
511}
512
513impl TransHelper {
514    pub async fn download_chunk_sync(requestor: &TransRequestor, chunk_id: ChunkId, device_id: DeviceId) -> BuckyResult<Vec<u8>> {
515
516        let local_path= cyfs_util::get_temp_path().join("trans_chunk").join(chunk_id.to_string());
517
518        // 创建下载任务
519        let req = TransStartTaskRequest {
520            target: None,
521            object_id: chunk_id.object_id().to_owned(),
522            local_path: local_path.clone(),
523            device_list: vec![device_id.clone()],
524        };
525
526        info!("will download chunk to tmp, chunk_id={}, tmp_file={}", chunk_id, local_path.display());
527
528        requestor.start_task(&req).await.map_err(|e|{
529            error!("trans start task error! chunk_id={}, {}", chunk_id, e);
530            e
531        })?;
532
533        loop {
534            let req = TransGetTaskStateRequest {
535                target: None,
536                object_id: chunk_id.object_id().to_owned(),
537                local_path: local_path.clone(),
538            };
539
540            let state = requestor.get_task_state(&req).await.map_err(|e| {
541                error!("get trans task state error! chunk={}, {}", chunk_id, e);
542                e
543            })?;
544
545            match state {
546                TransTaskState::Downloading(v) => {
547                    // info!("trans task downloading! file_id={}, {:?}", chunk_id, v);
548                }
549                TransTaskState::Finished(_v) => {
550                    info!("chunk trans task finished! chunk_id={}", chunk_id);
551                    break;
552                }
553                TransTaskState::Canceled | TransTaskState::Paused | TransTaskState::Pending => {
554                    unreachable!()
555                }
556            }
557
558            async_std::task::sleep(std::time::Duration::from_secs(1)).await;
559        }
560
561        let mut f = async_std::fs::OpenOptions::new().read(true).open(&local_path).await.unwrap();
562        let mut buf = vec![];
563        let bytes = f.read_to_end(&mut buf).await.unwrap();
564        if let Err(e) = async_std::fs::remove_file(&local_path).await {
565            error!("remove tmp chunk file error!")
566        }
567
568        if bytes != chunk_id.len() {
569
570        }
571    }
572}
573*/