1use super::def::*;
2use super::output_request::*;
3use super::processor::*;
4use crate::base::*;
5use crate::requestor::*;
6use crate::stack::SharedObjectStackDecID;
7use cyfs_base::*;
8
9use http_types::{Method, Request, Response, Url};
10use std::sync::Arc;
11
12pub struct NDNRequestorHelper;
13
14impl NDNRequestorHelper {
15 pub async fn decode_get_data_response(
16 resp: &mut Response,
17 ) -> BuckyResult<NDNGetDataOutputResponse> {
18 let data = Box::new(resp.take_body());
19
20 let attr: Option<u32> =
21 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_ATTRIBUTES)?;
22 let attr = attr.map(|v| Attributes::new(v));
23
24 let object_id = RequestorHelper::decode_header(resp, cyfs_base::CYFS_OBJECT_ID)?;
25 let owner_id = RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OWNER_ID)?;
26
27 let range = RequestorHelper::decode_optional_json_header(resp, cyfs_base::CYFS_DATA_RANGE)?;
28
29 let group = RequestorHelper::decode_optional_header_with_utf8_decoding(
30 resp,
31 cyfs_base::CYFS_TASK_GROUP,
32 )?;
33
34 let length: u64 =
35 RequestorHelper::decode_header(resp, http_types::headers::CONTENT_LENGTH)?;
36 let ret = NDNGetDataOutputResponse {
37 object_id,
38 owner_id,
39 attr,
40 range,
41 group,
42 length,
43 data,
44 };
45
46 Ok(ret)
47 }
48}
49
50#[derive(Clone)]
51pub struct NDNRequestor {
52 dec_id: Option<SharedObjectStackDecID>,
53 requestor: HttpRequestorRef,
54 service_url: Url,
55
56 data_requestor: HttpRequestorRef,
57 data_service_url: Url,
58}
59
60impl NDNRequestor {
61 pub fn new(
62 dec_id: Option<SharedObjectStackDecID>,
63 requestor: HttpRequestorRef,
64 data_requestor: Option<HttpRequestorRef>,
65 ) -> Self {
66 let url = format!("http://{}/ndn/", requestor.remote_addr());
67 let url = Url::parse(&url).unwrap();
68
69 let data_service_url = match &data_requestor {
70 Some(requestor) => {
71 let url = format!("http://{}/ndn/", requestor.remote_addr());
72 Url::parse(&url).unwrap()
73 }
74 None => url.clone(),
75 };
76
77 let data_requestor = data_requestor.unwrap_or(requestor.clone());
78
79 Self {
80 dec_id,
81 requestor,
82 service_url: url,
83
84 data_requestor,
85 data_service_url,
86 }
87 }
88
89 pub fn into_processor(self) -> NDNOutputProcessorRef {
90 Arc::new(Box::new(self))
91 }
92
93 pub fn clone_processor(&self) -> NDNOutputProcessorRef {
94 self.clone().into_processor()
95 }
96
97 fn encode_common_headers(
98 &self,
99 action: NDNAction,
100 com_req: &NDNOutputRequestCommon,
101 http_req: &mut Request,
102 ) {
103 if let Some(dec_id) = &com_req.dec_id {
104 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
105 } else if let Some(dec_id) = &self.dec_id {
106 if let Some(dec_id) = dec_id.get() {
107 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
108 }
109 }
110
111 http_req.insert_header(cyfs_base::CYFS_NDN_ACTION, action.to_string());
112
113 http_req.insert_header(cyfs_base::CYFS_API_LEVEL, com_req.level.to_string());
114
115 if let Some(target) = &com_req.target {
116 http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
117 }
118
119 RequestorHelper::encode_opt_header_with_encoding(
120 http_req,
121 cyfs_base::CYFS_REQ_PATH,
122 com_req.req_path.as_deref(),
123 );
124
125 if !com_req.referer_object.is_empty() {
126 RequestorHelper::insert_headers_with_encoding(
127 http_req,
128 cyfs_base::CYFS_REFERER_OBJECT,
129 &com_req.referer_object,
130 );
131 }
132
133 http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
134 }
135
136 fn encode_put_data_request(&self, req: &NDNPutDataOutputRequest) -> Request {
137 let mut http_req = Request::new(Method::Put, self.data_service_url.clone());
138
139 self.encode_common_headers(NDNAction::PutData, &req.common, &mut http_req);
140
141 http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
142
143 http_req
144 }
145
146 async fn decode_put_data_response(
147 &self,
148 resp: &Response,
149 ) -> BuckyResult<NDNPutDataOutputResponse> {
150 let result: NDNPutDataResult =
151 RequestorHelper::decode_header(resp, cyfs_base::CYFS_RESULT)?;
152
153 let ret = NDNPutDataOutputResponse { result };
154
155 Ok(ret)
156 }
157
158 #[allow(unused_mut)]
159 pub async fn put_data(
160 &self,
161 mut req: NDNPutDataOutputRequest,
162 ) -> BuckyResult<NDNPutDataOutputResponse> {
163 let mut http_req = self.encode_put_data_request(&req);
164
165 #[cfg(debug_assertions)]
166 {
167 use async_std::io::ReadExt;
168
169 let mut data = Vec::new();
170 req.data.read_to_end(&mut data).await.map_err(|e| {
171 let msg = format!("read data failed! chunk={} {}", req.object_id, e);
172 error!("{}", msg);
173 BuckyError::new(BuckyErrorCode::IoError, msg)
174 })?;
175
176 if data.len() != req.length as usize {
177 error!(
178 "chunk length unmatch: calc={}, expect={}",
179 data.len(),
180 req.length,
181 );
182 unreachable!();
183 }
184
185 let calc_id = ChunkId::calculate_sync(&data).unwrap();
186
187 if calc_id.object_id() != req.object_id {
188 error!(
189 "chunk id unmatch: calc_id={}, expect={}",
190 calc_id, req.object_id,
191 );
192 unreachable!();
193 }
194
195 http_req.set_body(data);
196 }
197 #[cfg(not(debug_assertions))]
198 {
199 let reader = async_std::io::BufReader::new(req.data);
200 let body = tide::Body::from_reader(reader, Some(req.length as usize));
201 http_req.set_body(body);
202 }
203 let mut resp = self.data_requestor.request(http_req).await?;
204
205 if resp.status().is_success() {
206 info!("put data to ndn service success: {}", req.object_id);
207 self.decode_put_data_response(&resp).await
208 } else {
209 let e = RequestorHelper::error_from_resp(&mut resp).await;
210 error!(
211 "put data to ndn service error! object={}, {}",
212 req.object_id, e
213 );
214 Err(e)
215 }
216 }
217
218 fn encode_put_shared_data_request(&self, req: &NDNPutDataOutputRequest) -> Request {
219 let mut http_req = Request::new(Method::Put, self.service_url.clone());
220
221 self.encode_common_headers(NDNAction::PutSharedData, &req.common, &mut http_req);
222
223 http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
224
225 http_req
226 }
227
228 async fn decode_put_shared_data_response(
229 &self,
230 resp: &Response,
231 ) -> BuckyResult<NDNPutDataOutputResponse> {
232 let result: NDNPutDataResult =
233 RequestorHelper::decode_header(resp, cyfs_base::CYFS_RESULT)?;
234
235 let ret = NDNPutDataOutputResponse { result };
236
237 Ok(ret)
238 }
239
240 pub async fn put_shared_data(
241 &self,
242 req: NDNPutDataOutputRequest,
243 ) -> BuckyResult<NDNPutDataOutputResponse> {
244 info!("will put_shared_data: {}", req);
245
246 let mut http_req = self.encode_put_shared_data_request(&req);
247
248 let reader = async_std::io::BufReader::new(req.data);
249 let body = tide::Body::from_reader(reader, Some(req.length as usize));
250 http_req.set_body(body);
251
252 let mut resp = self.requestor.request(http_req).await?;
253
254 if resp.status().is_success() {
255 info!("put shared data to ndn service success: {}", req.object_id);
256 self.decode_put_shared_data_response(&resp).await
257 } else {
258 let e = RequestorHelper::error_from_resp(&mut resp).await;
259 error!(
260 "put shared data to ndn service error! object={}, {}",
261 req.object_id, e
262 );
263 Err(e)
264 }
265 }
266
267 fn encode_get_data_request(&self, action: NDNAction, req: &NDNGetDataOutputRequest) -> Request {
268 let mut http_req = Request::new(Method::Get, self.data_service_url.clone());
269 self.encode_common_headers(action, &req.common, &mut http_req);
270
271 http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
272 RequestorHelper::encode_opt_header_with_encoding(
273 &mut http_req,
274 cyfs_base::CYFS_INNER_PATH,
275 req.inner_path.as_deref(),
276 );
277
278 RequestorHelper::encode_opt_header_with_encoding(
279 &mut http_req,
280 cyfs_base::CYFS_CONTEXT,
281 req.context.as_deref(),
282 );
283
284 RequestorHelper::encode_opt_header_with_encoding(
285 &mut http_req,
286 cyfs_base::CYFS_TASK_GROUP,
287 req.group.as_deref(),
288 );
289
290 if let Some(ref range) = req.range {
291 http_req.insert_header("Range", range.encode_string());
292 }
293
294 http_req
295 }
296
297 pub async fn get_data(
298 &self,
299 req: NDNGetDataOutputRequest,
300 ) -> BuckyResult<NDNGetDataOutputResponse> {
301 let http_req = self.encode_get_data_request(NDNAction::GetData, &req);
302
303 let mut resp = self.data_requestor.request(http_req).await?;
304
305 if resp.status().is_success() {
306 match NDNRequestorHelper::decode_get_data_response(&mut resp).await {
307 Ok(resp) => {
308 info!("get data from ndn service success: {}", resp);
309 Ok(resp)
310 }
311 Err(e) => {
312 error!("decode get data response error: {}, {}", req.object_id, e);
313 Err(e)
314 }
315 }
316 } else {
317 let e = RequestorHelper::error_from_resp(&mut resp).await;
318 error!(
319 "get data from ndn service error: object={}, {}",
320 req.object_id, e
321 );
322 Err(e)
323 }
324 }
325
326 async fn decode_get_shared_data_response(
327 &self,
328 _req: &NDNGetDataOutputRequest,
329 resp: &mut Response,
330 ) -> BuckyResult<NDNGetDataOutputResponse> {
331 let data = Box::new(resp.take_body());
332
333 let attr: Option<u32> =
334 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_ATTRIBUTES)?;
335 let attr = attr.map(|v| Attributes::new(v));
336
337 let object_id = RequestorHelper::decode_header(resp, cyfs_base::CYFS_OBJECT_ID)?;
338 let owner_id = RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OWNER_ID)?;
339
340 let range = RequestorHelper::decode_optional_json_header(resp, cyfs_base::CYFS_DATA_RANGE)?;
341 let group = RequestorHelper::decode_optional_header_with_utf8_decoding(
342 resp,
343 cyfs_base::CYFS_TASK_GROUP,
344 )?;
345
346 let length: u64 =
347 RequestorHelper::decode_header(resp, http_types::headers::CONTENT_LENGTH)?;
348
349 let ret = NDNGetDataOutputResponse {
350 object_id,
351 owner_id,
352 attr,
353 range,
354 group,
355 length,
356 data,
357 };
358
359 Ok(ret)
360 }
361
362 pub async fn get_shared_data(
363 &self,
364 req: NDNGetDataOutputRequest,
365 ) -> BuckyResult<NDNGetDataOutputResponse> {
366 let http_req = self.encode_get_data_request(NDNAction::GetSharedData, &req);
367
368 let mut resp = self.requestor.request(http_req).await?;
369
370 if resp.status().is_success() {
371 info!("get data from ndn service success: {}", req.object_id);
372 self.decode_get_shared_data_response(&req, &mut resp).await
373 } else {
374 let e = RequestorHelper::error_from_resp(&mut resp).await;
375 error!(
376 "get data from ndn service error: object={}, {}",
377 req.object_id, e
378 );
379 Err(e)
380 }
381 }
382
383 fn encode_delete_data_request(&self, req: &NDNDeleteDataOutputRequest) -> Request {
384 let mut http_req = Request::new(Method::Delete, self.service_url.clone());
385 self.encode_common_headers(NDNAction::DeleteData, &req.common, &mut http_req);
386
387 http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
388 if let Some(inner_path) = &req.inner_path {
389 http_req.insert_header(cyfs_base::CYFS_INNER_PATH, inner_path);
390 }
391
392 http_req
393 }
394
395 async fn decode_delete_data_response(
396 &self,
397 resp: &Response,
398 ) -> BuckyResult<NDNDeleteDataOutputResponse> {
399 let object_id = RequestorHelper::decode_header(resp, cyfs_base::CYFS_OBJECT_ID)?;
400
401 let ret = NDNDeleteDataOutputResponse { object_id };
402
403 Ok(ret)
404 }
405
406 pub async fn delete_data(
407 &self,
408 req: NDNDeleteDataOutputRequest,
409 ) -> BuckyResult<NDNDeleteDataOutputResponse> {
410 let http_req = self.encode_delete_data_request(&req);
411 let mut resp = self.requestor.request(http_req).await?;
412
413 if resp.status().is_success() {
414 info!("delete data from ndn service success: {}", req.object_id);
415 self.decode_delete_data_response(&resp).await
416 } else {
417 let e = RequestorHelper::error_from_resp(&mut resp).await;
418 error!(
419 "delete data from ndn service error! object={}, {}",
420 req.object_id, e
421 );
422 Err(e)
423 }
424 }
425
426 fn encode_query_file_request(&self, req: &NDNQueryFileOutputRequest) -> Request {
427 let mut url = self.service_url.clone();
428
429 let (t, v) = req.param.to_key_pair();
430 url.query_pairs_mut()
431 .append_pair("type", t)
432 .append_pair("value", &v);
433
434 let mut http_req = Request::new(Method::Get, url);
435 self.encode_common_headers(NDNAction::QueryFile, &req.common, &mut http_req);
436
437 http_req
438 }
439
440 async fn decode_query_file_response(
441 &self,
442 resp: &mut Response,
443 ) -> BuckyResult<NDNQueryFileOutputResponse> {
444 let ret: NDNQueryFileOutputResponse = RequestorHelper::decode_json_body(resp).await?;
445
446 Ok(ret)
447 }
448
449 async fn query_file(
450 &self,
451 req: NDNQueryFileOutputRequest,
452 ) -> BuckyResult<NDNQueryFileOutputResponse> {
453 let http_req = self.encode_query_file_request(&req);
454 let mut resp = self.requestor.request(http_req).await?;
455
456 if resp.status().is_success() {
457 self.decode_query_file_response(&mut resp).await
459 } else {
460 let e = RequestorHelper::error_from_resp(&mut resp).await;
461 error!(
462 "query file from ndn service error! param={}, {}",
463 req.param, e
464 );
465 Err(e)
466 }
467 }
468}
469
470#[async_trait::async_trait]
471impl NDNOutputProcessor for NDNRequestor {
472 async fn put_data(
473 &self,
474 req: NDNPutDataOutputRequest,
475 ) -> BuckyResult<NDNPutDataOutputResponse> {
476 self.put_data(req).await
477 }
478
479 async fn get_data(
480 &self,
481 req: NDNGetDataOutputRequest,
482 ) -> BuckyResult<NDNGetDataOutputResponse> {
483 self.get_data(req).await
484 }
485
486 async fn put_shared_data(
487 &self,
488 req: NDNPutDataOutputRequest,
489 ) -> BuckyResult<NDNPutDataOutputResponse> {
490 self.put_shared_data(req).await
491 }
492
493 async fn get_shared_data(
494 &self,
495 req: NDNGetDataOutputRequest,
496 ) -> BuckyResult<NDNGetDataOutputResponse> {
497 self.get_shared_data(req).await
498 }
499
500 async fn delete_data(
501 &self,
502 req: NDNDeleteDataOutputRequest,
503 ) -> BuckyResult<NDNDeleteDataOutputResponse> {
504 self.delete_data(req).await
505 }
506
507 async fn query_file(
508 &self,
509 req: NDNQueryFileOutputRequest,
510 ) -> BuckyResult<NDNQueryFileOutputResponse> {
511 self.query_file(req).await
512 }
513}