aliyun_oss_rs/object/
select_object.rs1use crate::{
2 Error,
3 common::body_to_bytes,
4 error::normal_error,
5 request::{Oss, OssRequest},
6};
7use bytes::Bytes;
8use futures_util::{Stream, StreamExt};
9use http::Method;
10use http_body_util::{BodyExt, Full};
11use hyper::{Response, body::Incoming};
12use std::pin::Pin;
13
14pub struct SelectObject {
18 req: OssRequest,
19 request_xml: Option<String>,
20}
21
22impl SelectObject {
23 pub(super) fn new(oss: Oss) -> Self {
24 let mut req = OssRequest::new(oss, Method::POST);
25 req.insert_query("select", "");
26 req.insert_query("select-type", "2");
27 SelectObject {
28 req,
29 request_xml: None,
30 }
31 }
32
33 pub fn set_request(mut self, xml: impl ToString) -> Self {
37 self.request_xml = Some(xml.to_string());
38 self
39 }
40
41 pub fn enable_output_raw(mut self, enable: bool) -> Self {
43 if enable {
44 self.req.insert_header("x-oss-select-output-raw", "true");
45 }
46 self
47 }
48
49 pub async fn send(self) -> Result<Bytes, Error> {
51 let response = self.send_internal().await?;
52 Ok(body_to_bytes(response.into_body()).await?)
53 }
54
55 pub async fn send_to_stream(
57 self,
58 ) -> Result<Pin<Box<dyn Stream<Item = Result<bytes::Bytes, Error>> + Send>>, Error> {
59 let response = self.send_internal().await?;
60 let status_code = response.status();
61 match status_code {
62 code if code.is_success() => {
63 let stream = response
64 .into_body()
65 .into_data_stream()
66 .map(|item| match item {
67 Ok(bytes) => Ok(bytes),
68 Err(e) => Err(e.into()),
69 });
70 Ok(Box::pin(stream))
71 }
72 _ => Err(normal_error(response).await),
73 }
74 }
75
76 async fn send_internal(mut self) -> Result<Response<Incoming>, Error> {
77 let body = self.request_xml.ok_or(Error::MissingRequestBody)?;
78 self.req.set_body(Full::new(Bytes::from(body)));
79 let response = self.req.send_to_oss()?.await?;
80 let status_code = response.status();
81 if status_code.is_success() {
82 Ok(response)
83 } else {
84 Err(normal_error(response).await)
85 }
86 }
87}