1use super::output_request::*;
2use crate::base::*;
3use crate::*;
4use cyfs_base::*;
5use cyfs_core::TransContextObject;
6
7use http_types::{Method, Request, Url};
8use std::sync::Arc;
9
10#[derive(Clone)]
11pub struct TransRequestor {
12 dec_id: Option<SharedObjectStackDecID>,
13 requestor: HttpRequestorRef,
14 service_url: Url,
15}
16
17impl TransRequestor {
18 pub fn new(dec_id: Option<SharedObjectStackDecID>, requestor: HttpRequestorRef) -> Self {
19 let addr = requestor.remote_addr();
20
21 let url = format!("http://{}/trans/", addr);
22 let url = Url::parse(&url).unwrap();
23
24 Self {
25 dec_id,
26 requestor,
27 service_url: url,
28 }
29 }
30
31 pub fn clone_processor(&self) -> TransOutputProcessorRef {
32 Arc::new(self.clone())
33 }
34
35 fn encode_common_headers(&self, com_req: &NDNOutputRequestCommon, http_req: &mut Request) {
36 if let Some(dec_id) = &com_req.dec_id {
37 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
38 } else if let Some(dec_id) = &self.dec_id {
39 if let Some(dec_id) = dec_id.get() {
40 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
41 }
42 }
43
44 RequestorHelper::encode_opt_header_with_encoding(
45 http_req,
46 cyfs_base::CYFS_REQ_PATH,
47 com_req.req_path.as_deref(),
48 );
49 http_req.insert_header(CYFS_API_LEVEL, com_req.level.to_string());
50
51 if let Some(target) = &com_req.target {
52 http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
53 }
54
55 if !com_req.referer_object.is_empty() {
56 RequestorHelper::insert_headers_with_encoding(
57 http_req,
58 cyfs_base::CYFS_REFERER_OBJECT,
59 &com_req.referer_object,
60 );
61 }
62
63 http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
64 }
65
66 pub async fn get_context(
67 &self,
68 req: TransGetContextOutputRequest,
69 ) -> BuckyResult<TransGetContextOutputResponse> {
70 info!(
71 "will get context id={:?}, path={:?}",
72 req.context_id, req.context_path
73 );
74
75 let url = self.service_url.join("get_context").unwrap();
76 let mut http_req = Request::new(Method::Post, url);
77
78 self.encode_common_headers(&req.common, &mut http_req);
79 let body = req.encode_string();
80 http_req.set_body(body);
81
82 let mut resp = self.requestor.request(http_req).await?;
83 match resp.status() {
84 code if code.is_success() => {
85 let context = RequestorHelper::decode_raw_object_body(&mut resp).await?;
86
87 Ok(TransGetContextOutputResponse { context })
88 }
89 code @ _ => {
90 let e = RequestorHelper::error_from_resp(&mut resp).await;
91 error!(
92 "get context failed: id={:?}, path={:?}, status={}, {}",
93 req.context_id, req.context_path, code, e
94 );
95
96 Err(e)
97 }
98 }
99 }
100
101 pub async fn put_context(&self, req: TransPutContextOutputRequest) -> BuckyResult<()> {
102 info!("will put context {}", req.context.context_path());
103
104 let url = self.service_url.join("put_context").unwrap();
105 let mut http_req = Request::new(Method::Post, url);
106
107 self.encode_common_headers(&req.common, &mut http_req);
108
109 if let Some(access) = &req.access {
110 http_req.insert_header(cyfs_base::CYFS_ACCESS, access.value().to_string());
111 }
112
113 let body = req.context.to_vec()?;
114 http_req.set_body(body);
115
116 let mut resp = self.requestor.request(http_req).await?;
117 match resp.status() {
118 code if code.is_success() => Ok(()),
119 code @ _ => {
120 let e = RequestorHelper::error_from_resp(&mut resp).await;
121 error!(
122 "update context failed: context={}, status={}, {}",
123 req.context.context_path(),
124 code,
125 e
126 );
127 Err(e)
128 }
129 }
130 }
131
132 pub async fn create_task(
133 &self,
134 req: TransCreateTaskOutputRequest,
135 ) -> BuckyResult<TransCreateTaskOutputResponse> {
136 info!("will create trans task: {:?}", req);
137
138 let url = self.service_url.join("task").unwrap();
139 let mut http_req = Request::new(Method::Post, url);
140
141 self.encode_common_headers(&req.common, &mut http_req);
142 let body = req.encode_string();
143 http_req.set_body(body);
144
145 let mut resp = self.requestor.request(http_req).await?;
146
147 match resp.status() {
148 code if code.is_success() => {
149 let body = resp.body_string().await.map_err(|e| {
150 let msg = format!(
151 "trans create task failed, read body string error! req={:?} {}",
152 req, e
153 );
154 error!("{}", msg);
155
156 BuckyError::from(msg)
157 })?;
158
159 let resp = TransCreateTaskOutputResponse::decode_string(&body).map_err(|e| {
160 error!(
161 "decode trans create task resp from body string error: body={} {}",
162 body, e,
163 );
164 e
165 })?;
166
167 debug!("trans create task success: resp={:?}", resp.task_id);
168
169 Ok(resp)
170 }
171 code @ _ => {
172 let e = RequestorHelper::error_from_resp(&mut resp).await;
173 error!(
174 "create task failed: obj={}, status={}, {}",
175 req.object_id, code, e
176 );
177 Err(e)
178 }
179 }
180 }
181
182 pub async fn control_task(&self, req: TransControlTaskOutputRequest) -> BuckyResult<()> {
183 info!("will control trans task: {:?}", req);
184
185 let url = self.service_url.join("task").unwrap();
186 let mut http_req = Request::new(Method::Put, url);
187
188 self.encode_common_headers(&req.common, &mut http_req);
189 let body = req.encode_string();
190 http_req.set_body(body);
191
192 let mut resp = self.requestor.request(http_req).await?;
193
194 match resp.status() {
195 code if code.is_success() => Ok(()),
196 code @ _ => {
197 let e = RequestorHelper::error_from_resp(&mut resp).await;
198 error!(
199 "stop trans task failed: task={}, status={}, {}",
200 req.task_id, code, e
201 );
202 Err(e)
203 }
204 }
205 }
206
207 pub async fn start_task(&self, req: TransTaskOutputRequest) -> BuckyResult<()> {
208 Self::control_task(
209 self,
210 TransControlTaskOutputRequest {
211 common: req.common.clone(),
212 task_id: req.task_id.clone(),
213 action: TransTaskControlAction::Start,
214 },
215 )
216 .await
217 }
218
219 pub async fn stop_task(&self, req: TransTaskOutputRequest) -> BuckyResult<()> {
220 Self::control_task(
221 self,
222 TransControlTaskOutputRequest {
223 common: req.common.clone(),
224 task_id: req.task_id.clone(),
225 action: TransTaskControlAction::Stop,
226 },
227 )
228 .await
229 }
230
231 pub async fn delete_task(&self, req: TransTaskOutputRequest) -> BuckyResult<()> {
232 Self::control_task(
233 self,
234 TransControlTaskOutputRequest {
235 common: req.common.clone(),
236 task_id: req.task_id.clone(),
237 action: TransTaskControlAction::Delete,
238 },
239 )
240 .await
241 }
242
243 pub async fn get_task_state(
244 &self,
245 req: TransGetTaskStateOutputRequest,
246 ) -> BuckyResult<TransGetTaskStateOutputResponse> {
247 info!("will get trans task state: {:?}", req);
248
249 let url = self.service_url.join("task/state").unwrap();
250 let mut http_req = Request::new(Method::Get, url);
251
252 self.encode_common_headers(&req.common, &mut http_req);
253 let body = req.encode_string();
254 http_req.set_body(body);
255
256 let mut resp = self.requestor.request(http_req).await?;
257
258 match resp.status() {
259 code if code.is_success() => {
260 let content = resp.body_json().await.map_err(|e| {
261 let msg = format!("parse TransTaskState resp body error! err={}", e);
262 error!("{}", msg);
263 BuckyError::new(BuckyErrorCode::InvalidData, msg)
264 })?;
265
266 info!(
267 "got trans task state: task={}, state={:?}",
268 req.task_id, content
269 );
270
271 Ok(content)
272 }
273 code @ _ => {
274 let e = RequestorHelper::error_from_resp(&mut resp).await;
275 error!(
276 "get trans task state failed: task={}, status={}, {}",
277 req.task_id, code, e,
278 );
279 Err(e)
280 }
281 }
282 }
283
284 pub async fn query_tasks(
285 &self,
286 req: TransQueryTasksOutputRequest,
287 ) -> BuckyResult<TransQueryTasksOutputResponse> {
288 info!("will query tasks: {:?}", req);
289
290 let url = self.service_url.join("tasks").unwrap();
291 let mut http_req = Request::new(Method::Post, url);
292
293 self.encode_common_headers(&req.common, &mut http_req);
294 let body = req.encode_string();
295 http_req.set_body(body);
296
297 let mut resp = self.requestor.request(http_req).await?;
298
299 match resp.status() {
300 code if code.is_success() => {
301 let content = resp.body_string().await.map_err(|e| {
302 let msg = format!("get query task resp body error! err={}", e);
303 error!("{}", msg);
304 BuckyError::new(BuckyErrorCode::InvalidData, msg)
305 })?;
306
307 let resp = TransQueryTasksOutputResponse::decode_string(content.as_str())?;
308 Ok(resp)
309 }
310 code @ _ => {
311 let e = RequestorHelper::error_from_resp(&mut resp).await;
312 error!("query tasks failed: status={}, msg={}", code, e);
313
314 Err(e)
315 }
316 }
317 }
318
319 pub async fn publish_file(
320 &self,
321 req: TransPublishFileOutputRequest,
322 ) -> BuckyResult<TransPublishFileOutputResponse> {
323 info!("will publish file: {:?}", req);
324
325 let url = self.service_url.join("file").unwrap();
326 let mut http_req = Request::new(Method::Post, url);
327
328 self.encode_common_headers(&req.common, &mut http_req);
329 let body = req.encode_string();
330 http_req.set_body(body);
331
332 let mut resp = self.requestor.request(http_req).await?;
333
334 match resp.status() {
335 code if code.is_success() => {
336 let body = resp.body_string().await.map_err(|e| {
337 let msg = format!(
338 "trans publish file failed, read body string error! req={:?} {}",
339 req, e
340 );
341 error!("{}", msg);
342
343 BuckyError::from(msg)
344 })?;
345
346 let resp = TransPublishFileOutputResponse::decode_string(&body).map_err(|e| {
347 error!(
348 "decode trans publish file resp from body string error: body={} {}",
349 body, e,
350 );
351 e
352 })?;
353
354 debug!("trans publish file success: resp={:?}", resp);
355
356 Ok(resp)
357 }
358 code @ _ => {
359 let e = RequestorHelper::error_from_resp(&mut resp).await;
360 error!(
361 "trans publish file failed: file={}, status={}, {}",
362 req.local_path.display(),
363 code,
364 e
365 );
366
367 Err(e)
368 }
369 }
370 }
371
372 pub async fn get_task_group_state(
373 &self,
374 req: TransGetTaskGroupStateOutputRequest,
375 ) -> BuckyResult<TransGetTaskGroupStateOutputResponse> {
376 info!("will get trans task group state: {:?}", req);
377
378 let url = self.service_url.join("task_group/state").unwrap();
379 let mut http_req = Request::new(Method::Post, url);
380
381 self.encode_common_headers(&req.common, &mut http_req);
382 http_req.set_body(serde_json::to_string(&req).unwrap());
383
384 let mut resp = self.requestor.request(http_req).await?;
385
386 if resp.status().is_success() {
387 let content = resp.body_json().await.map_err(|e| {
388 let msg = format!("parse get task group state resp body error! err={}", e);
389 error!("{}", msg);
390 BuckyError::new(BuckyErrorCode::InvalidData, msg)
391 })?;
392
393 info!(
394 "got trans task group state: task_group={}, state={:?}",
395 req.group, content
396 );
397
398 Ok(content)
399 } else {
400 let e = RequestorHelper::error_from_resp(&mut resp).await;
401 error!(
402 "get trans task state failed: task_group={}, status={}, {}",
403 req.group,
404 resp.status(),
405 e
406 );
407
408 Err(e)
409 }
410 }
411
412 pub async fn control_task_group(
413 &self,
414 req: TransControlTaskGroupOutputRequest,
415 ) -> BuckyResult<TransControlTaskGroupOutputResponse> {
416 info!("will control trans task group: {:?}", req);
417
418 let url = self.service_url.join("task_group").unwrap();
419 let mut http_req = Request::new(Method::Put, url);
420
421 self.encode_common_headers(&req.common, &mut http_req);
422 http_req.set_body(serde_json::to_string(&req).unwrap());
423
424 let mut resp = self.requestor.request(http_req).await?;
425
426 if resp.status().is_success() {
427 let resp = resp.body_json().await.map_err(|e| {
428 let msg = format!(
429 "trans control task group failed, read body string error! req={:?} {}",
430 req, e
431 );
432 error!("{}", msg);
433
434 BuckyError::from(msg)
435 })?;
436
437 debug!("trans control task group success: resp={:?}", resp);
438
439 Ok(resp)
440 } else {
441 let e = RequestorHelper::error_from_resp(&mut resp).await;
442 error!("trans control task failed! status={}, {}", resp.status(), e);
443
444 Err(e)
445 }
446 }
447}
448
449#[async_trait::async_trait]
450impl TransOutputProcessor for TransRequestor {
451 async fn get_context(
452 &self,
453 req: TransGetContextOutputRequest,
454 ) -> BuckyResult<TransGetContextOutputResponse> {
455 Self::get_context(self, req).await
456 }
457
458 async fn put_context(&self, req: TransPutContextOutputRequest) -> BuckyResult<()> {
459 Self::put_context(self, req).await
460 }
461
462 async fn create_task(
463 &self,
464 req: TransCreateTaskOutputRequest,
465 ) -> BuckyResult<TransCreateTaskOutputResponse> {
466 Self::create_task(self, req).await
467 }
468
469 async fn query_tasks(
470 &self,
471 req: TransQueryTasksOutputRequest,
472 ) -> BuckyResult<TransQueryTasksOutputResponse> {
473 Self::query_tasks(self, req).await
474 }
475
476 async fn get_task_state(
477 &self,
478 req: TransGetTaskStateOutputRequest,
479 ) -> BuckyResult<TransGetTaskStateOutputResponse> {
480 Self::get_task_state(self, req).await
481 }
482
483 async fn publish_file(
484 &self,
485 req: TransPublishFileOutputRequest,
486 ) -> BuckyResult<TransPublishFileOutputResponse> {
487 Self::publish_file(self, req).await
488 }
489
490 async fn control_task(&self, req: TransControlTaskOutputRequest) -> BuckyResult<()> {
491 Self::control_task(self, req).await
492 }
493
494 async fn get_task_group_state(
495 &self,
496 req: TransGetTaskGroupStateOutputRequest,
497 ) -> BuckyResult<TransGetTaskGroupStateOutputResponse> {
498 Self::get_task_group_state(self, req).await
499 }
500
501 async fn control_task_group(
502 &self,
503 req: TransControlTaskGroupOutputRequest,
504 ) -> BuckyResult<TransControlTaskGroupOutputResponse> {
505 Self::control_task_group(self, req).await
506 }
507}
508