1use super::def::*;
2use super::output_request::*;
3use super::processor::*;
4use crate::base::*;
5use crate::stack::SharedObjectStackDecID;
6use crate::requestor::*;
7use cyfs_base::*;
8
9use http_types::{Method, Request, Response, Url};
10use std::sync::Arc;
11
12pub struct NONRequestorHelper;
13
14impl NONRequestorHelper {
15 async fn decode_object_info_from_body<T>(
16 object_id: ObjectId,
17 req: &mut T,
18 ) -> BuckyResult<NONObjectInfo>
19 where
20 T: BodyOp + HeaderOp,
21 {
22 let object_raw = req.body_bytes().await.map_err(|e| {
23 let msg = format!(
24 "read object bytes request/response error! obj={} {}",
25 object_id, e
26 );
27 error!("{}", msg);
28
29 BuckyError::new(BuckyErrorCode::IoError, msg)
30 })?;
31
32 let info = NONObjectInfo::new(object_id, object_raw, None);
33
34 Ok(info)
35 }
36
37 pub async fn decode_object_info<T>(req: &mut T) -> BuckyResult<NONObjectInfo>
38 where
39 T: BodyOp + HeaderOp,
40 {
41 let object_id: ObjectId = RequestorHelper::decode_header(req, cyfs_base::CYFS_OBJECT_ID)?;
43
44 let mut info = Self::decode_object_info_from_body(object_id, req).await?;
45 if !info.is_empty() {
46 info.decode_and_verify()?;
47 } else {
48 }
50
51 Ok(info)
52 }
53
54 pub async fn decode_allow_empty_object_info<T>(req: &mut T) -> BuckyResult<NONObjectInfo>
55 where
56 T: BodyOp + HeaderOp,
57 {
58 let object_id: ObjectId = RequestorHelper::decode_header(req, cyfs_base::CYFS_OBJECT_ID)?;
60
61 let mut info = Self::decode_object_info_from_body(object_id, req).await?;
62 if !info.is_empty() {
63 info.decode_and_verify()?;
64 }
65 Ok(info)
66 }
67
68 pub async fn decode_option_object_info<T>(req: &mut T) -> BuckyResult<Option<NONObjectInfo>>
69 where
70 T: BodyOp + HeaderOp,
71 {
72 let ret: Option<ObjectId> =
74 RequestorHelper::decode_optional_header(req, cyfs_base::CYFS_OBJECT_ID)?;
75 if ret.is_none() {
76 return Ok(None);
77 }
78
79 let mut info = Self::decode_object_info_from_body(ret.unwrap(), req).await?;
80 info.decode_and_verify()?;
81
82 Ok(Some(info))
83 }
84
85 pub fn encode_object_info<T>(req: &mut T, info: NONObjectInfo)
86 where
87 T: BodyOp + HeaderOp,
88 {
89 req.insert_header(cyfs_base::CYFS_OBJECT_ID, info.object_id.to_string());
90
91 if info.object_raw.len() > 0 {
92 req.set_body(info.object_raw);
93 req.set_content_type(CYFS_OBJECT_MIME.clone());
94 }
95 }
96
97 pub async fn decode_get_object_response<T>(
98 resp: &mut T,
99 ) -> BuckyResult<NONGetObjectOutputResponse>
100 where
101 T: BodyOp + HeaderOp,
102 {
103 let object = Self::decode_object_info(resp).await?;
104 let attr: Option<u32> =
105 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_ATTRIBUTES)?;
106 let attr = attr.map(|v| Attributes::new(v));
107
108 let object_update_time =
109 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_UPDATE_TIME)?;
110 let object_expires_time =
111 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_EXPIRES_TIME)?;
112
113 let ret = NONGetObjectOutputResponse {
114 object,
115 object_expires_time,
116 object_update_time,
117 attr,
118 };
119
120 Ok(ret)
121 }
122}
123
124#[derive(Clone)]
125pub struct NONRequestor {
126 dec_id: Option<SharedObjectStackDecID>,
127 requestor: HttpRequestorRef,
128 service_url: Url,
129}
130
131impl NONRequestor {
132 pub fn new(dec_id: Option<SharedObjectStackDecID>, requestor: HttpRequestorRef) -> Self {
133 let addr = requestor.remote_addr();
134
135 let url = format!("http://{}/non/", addr);
136 let url = Url::parse(&url).unwrap();
137
138 let ret = Self {
139 dec_id,
140 requestor,
141 service_url: url,
142 };
143
144 ret
145 }
146
147 pub fn into_processor(self) -> NONOutputProcessorRef {
148 Arc::new(Box::new(self))
149 }
150
151 pub fn clone_processor(&self) -> NONOutputProcessorRef {
152 self.clone().into_processor()
153 }
154
155 fn encode_common_headers(
156 &self,
157 action: NONAction,
158 com_req: &NONOutputRequestCommon,
159 http_req: &mut Request,
160 ) {
161 if let Some(dec_id) = &com_req.dec_id {
162 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
163 } else if let Some(dec_id) = &self.dec_id {
164 if let Some(dec_id) = dec_id.get() {
165 http_req.insert_header(cyfs_base::CYFS_DEC_ID, dec_id.to_string());
166 }
167 }
168
169 RequestorHelper::encode_opt_header_with_encoding(
170 http_req,
171 cyfs_base::CYFS_REQ_PATH,
172 com_req.req_path.as_deref(),
173 );
174
175 http_req.insert_header(cyfs_base::CYFS_NON_ACTION, action.to_string());
176
177 http_req.insert_header(cyfs_base::CYFS_API_LEVEL, com_req.level.to_string());
178
179 if let Some(target) = &com_req.target {
180 http_req.insert_header(cyfs_base::CYFS_TARGET, target.to_string());
181 }
182
183 if let Some(source) = &com_req.source {
184 http_req.insert_header(cyfs_base::CYFS_SOURCE, source.to_string());
185 }
186
187 http_req.insert_header(cyfs_base::CYFS_FLAGS, com_req.flags.to_string());
188 }
189
190 fn encode_put_object_request(&self, req: &NONPutObjectOutputRequest) -> Request {
191 #[cfg(debug_assertions)]
192 {
193 if !req.object.is_empty() {
194 req.object.verify().expect(&format!(
195 "pub object id unmatch: id={}, object={:?}",
196 req.object.object_id,
197 req.object.object_raw.to_hex()
198 ));
199 }
200 }
201
202 let mut http_req = Request::new(Method::Put, self.service_url.clone());
203 self.encode_common_headers(NONAction::PutObject, &req.common, &mut http_req);
204
205 if let Some(access) = &req.access {
206 http_req.insert_header(cyfs_base::CYFS_ACCESS, access.value().to_string());
207 }
208
209 http_req
210 }
211
212 async fn decode_put_object_response(
213 &self,
214 resp: &Response,
215 ) -> BuckyResult<NONPutObjectOutputResponse> {
216 let result: NONPutObjectResult =
217 RequestorHelper::decode_header(resp, cyfs_base::CYFS_RESULT)?;
218 let object_update_time =
219 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_UPDATE_TIME)?;
220 let object_expires_time =
221 RequestorHelper::decode_optional_header(resp, cyfs_base::CYFS_OBJECT_EXPIRES_TIME)?;
222
223 let ret = NONPutObjectOutputResponse {
224 result,
225 object_expires_time,
226 object_update_time,
227 };
228
229 Ok(ret)
230 }
231
232 pub async fn put_object(
233 &self,
234 req: NONPutObjectOutputRequest,
235 ) -> BuckyResult<NONPutObjectOutputResponse> {
236 let object_id = req.object.object_id.clone();
237
238 let mut http_req = self.encode_put_object_request(&req);
239 NONRequestorHelper::encode_object_info(&mut http_req, req.object);
240
241 let mut resp = self.requestor.request(http_req).await?;
242
243 if resp.status().is_success() {
244 info!("put object to non service success: {}", object_id);
245 self.decode_put_object_response(&resp).await
246 } else {
247 let e = RequestorHelper::error_from_resp(&mut resp).await;
248 error!(
249 "put object to non service error! object={}, {}",
250 object_id, e
251 );
252 Err(e)
253 }
254 }
255
256 pub async fn update_object_meta(
257 &self,
258 req: NONUpdateObjectMetaOutputRequest,
259 ) -> BuckyResult<NONPutObjectOutputResponse> {
260 let req = NONPutObjectOutputRequest {
261 common: req.common,
262 object: NONObjectInfo::new(req.object_id, vec![], None),
263 access: req.access,
264 };
265
266 self.put_object(req).await
267 }
268
269 fn encode_get_object_request(&self, req: &NONGetObjectOutputRequest) -> Request {
270 let mut http_req = Request::new(Method::Get, self.service_url.clone());
271 self.encode_common_headers(NONAction::GetObject, &req.common, &mut http_req);
272
273 http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
274
275 RequestorHelper::encode_opt_header_with_encoding(
276 &mut http_req,
277 cyfs_base::CYFS_INNER_PATH,
278 req.inner_path.as_deref(),
279 );
280
281 http_req
282 }
283
284 pub async fn get_object(
285 &self,
286 req: NONGetObjectOutputRequest,
287 ) -> BuckyResult<NONGetObjectOutputResponse> {
288 let http_req = self.encode_get_object_request(&req);
289
290 let mut resp = self.requestor.request(http_req).await?;
291
292 if resp.status().is_success() {
293 let resp = NONRequestorHelper::decode_get_object_response(&mut resp).await?;
294 info!(
295 "get object from non service success: {}, object={}",
296 req.object_debug_info(), resp.object.object_id,
297 );
298 Ok(resp)
299 } else {
300 let e = RequestorHelper::error_from_resp(&mut resp).await;
301 error!(
302 "get object from non service error! object={}, {}",
303 req.object_debug_info(),
304 e
305 );
306 Err(e)
307 }
308 }
309
310 fn encode_post_object_request(&self, req: &NONPostObjectOutputRequest) -> Request {
311 let mut http_req = Request::new(Method::Post, self.service_url.clone());
312 self.encode_common_headers(NONAction::PostObject, &req.common, &mut http_req);
313
314 http_req
315 }
316
317 async fn decode_post_object_response(
318 &self,
319 resp: &mut Response,
320 ) -> BuckyResult<NONPostObjectOutputResponse> {
321 let object = NONRequestorHelper::decode_option_object_info(resp).await?;
322
323 let ret = NONPostObjectOutputResponse { object };
324
325 Ok(ret)
326 }
327
328 pub async fn post_object(
329 &self,
330 req: NONPostObjectOutputRequest,
331 ) -> BuckyResult<NONPostObjectOutputResponse> {
332 let object_id = req.object.object_id.clone();
333
334 let mut http_req = self.encode_post_object_request(&req);
335 NONRequestorHelper::encode_object_info(&mut http_req, req.object);
336
337 let mut resp = self.requestor.request(http_req).await?;
338
339 let status = resp.status();
340 if status.is_success() {
341 match status {
342 http_types::StatusCode::NoContent => {
343 let e = RequestorHelper::error_from_resp(&mut resp).await;
344 info!(
345 "post object to non service but empty response! obj={}, {}",
346 object_id, e
347 );
348 Err(e)
349 }
350 _ => {
351 info!("post object to non service success: {}", object_id);
352 self.decode_post_object_response(&mut resp).await
353 }
354 }
355 } else {
356 let e = RequestorHelper::error_from_resp(&mut resp).await;
357 if e.code() == BuckyErrorCode::NotHandled {
358 warn!(
359 "post object to non service but not handled! object={}, {}",
360 object_id, e
361 );
362 } else {
363 error!(
364 "post object to non service error! object={}, {}",
365 object_id, e
366 );
367 }
368
369 Err(e)
370 }
371 }
372
373 fn format_select_url(&self, req_path: Option<&String>, filter: &SelectFilter) -> Url {
374 let mut url = if let Some(req_path) = req_path {
375 self.service_url
376 .join(req_path.trim_start_matches('/').trim_end_matches('/'))
377 .unwrap()
378 } else {
379 self.service_url.clone()
380 };
381
382 SelectFilterUrlCodec::encode(&mut url, filter);
384
385 url
386 }
387
388 fn encode_select_request(&self, req: &NONSelectObjectOutputRequest) -> Request {
389 let url = self.format_select_url(req.common.req_path.as_ref(), &req.filter);
390 let mut http_req = Request::new(Method::Get, url);
391 self.encode_common_headers(NONAction::SelectObject, &req.common, &mut http_req);
392
393 SelectOptionCodec::encode(&mut http_req, &req.opt);
394
395 http_req
396 }
397
398 pub async fn select_object(
399 &self,
400 req: NONSelectObjectOutputRequest,
401 ) -> BuckyResult<NONSelectObjectOutputResponse> {
402 let http_req = self.encode_select_request(&req);
403
404 let mut resp = self.requestor.request(http_req).await?;
405
406 if resp.status().is_success() {
407 let resp = SelectResponse::from_respone(resp).await?;
408 Ok(NONSelectObjectOutputResponse {
409 objects: resp.objects,
410 })
411 } else {
412 let e = RequestorHelper::error_from_resp(&mut resp).await;
413 error!("select object from non failed: {}", e);
414 Err(e)
415 }
416 }
417
418 fn encode_delete_object_request(&self, req: &NONDeleteObjectOutputRequest) -> Request {
419 let mut http_req = Request::new(Method::Delete, self.service_url.clone());
420 self.encode_common_headers(NONAction::DeleteObject, &req.common, &mut http_req);
421
422 http_req.insert_header(cyfs_base::CYFS_OBJECT_ID, req.object_id.to_string());
423
424 RequestorHelper::encode_opt_header_with_encoding(
425 &mut http_req,
426 cyfs_base::CYFS_INNER_PATH,
427 req.inner_path.as_deref(),
428 );
429
430 http_req
431 }
432
433 async fn decode_delete_object_response(
434 &self,
435 req: &NONDeleteObjectOutputRequest,
436 resp: &mut Response,
437 ) -> BuckyResult<NONDeleteObjectOutputResponse> {
438 let object = if req.common.flags & CYFS_REQUEST_FLAG_DELETE_WITH_QUERY != 0 {
439 let object = NONRequestorHelper::decode_object_info(resp).await?;
440 Some(object)
441 } else {
442 None
443 };
444
445 Ok(NONDeleteObjectOutputResponse { object })
446 }
447
448 pub async fn delete_object(
449 &self,
450 req: NONDeleteObjectOutputRequest,
451 ) -> BuckyResult<NONDeleteObjectOutputResponse> {
452 let http_req = self.encode_delete_object_request(&req);
453
454 let mut resp = self.requestor.request(http_req).await?;
455
456 if resp.status().is_success() {
457 let ret = self.decode_delete_object_response(&req, &mut resp).await?;
458 info!(
459 "delete object from non service success: {}, obj={:?}",
460 req.object_id, ret.object
461 );
462 Ok(ret)
463 } else {
464 let e = RequestorHelper::error_from_resp(&mut resp).await;
465 error!(
466 "delete object from non failed: object={}, {}",
467 req.object_id, e
468 );
469 Err(e)
470 }
471 }
472}
473
474#[async_trait::async_trait]
475impl NONOutputProcessor for NONRequestor {
476 async fn put_object(
477 &self,
478 req: NONPutObjectOutputRequest,
479 ) -> BuckyResult<NONPutObjectOutputResponse> {
480 self.put_object(req).await
481 }
482
483 async fn get_object(
484 &self,
485 req: NONGetObjectOutputRequest,
486 ) -> BuckyResult<NONGetObjectOutputResponse> {
487 self.get_object(req).await
488 }
489
490 async fn post_object(
491 &self,
492 req: NONPostObjectOutputRequest,
493 ) -> BuckyResult<NONPostObjectOutputResponse> {
494 self.post_object(req).await
495 }
496
497 async fn select_object(
498 &self,
499 req: NONSelectObjectOutputRequest,
500 ) -> BuckyResult<NONSelectObjectOutputResponse> {
501 self.select_object(req).await
502 }
503
504 async fn delete_object(
505 &self,
506 req: NONDeleteObjectOutputRequest,
507 ) -> BuckyResult<NONDeleteObjectOutputResponse> {
508 self.delete_object(req).await
509 }
510}