aliyun_oss_rs/object/
select_object.rs

1use crate::{
2    Error,
3    common::body_to_bytes,
4    error::normal_error,
5    request::{Oss, OssRequest},
6};
7use bytes::Bytes;
8use futures_util::{Stream, StreamExt};
9use http::Method;
10use http_body_util::{BodyExt, Full};
11use hyper::{Response, body::Incoming};
12use std::pin::Pin;
13
14/// Execute an SQL-like query against an object stored in OSS.
15///
16/// See the [Alibaba Cloud documentation](https://help.aliyun.com/zh/oss/developer-reference/selectobject) for details.
17pub struct SelectObject {
18    req: OssRequest,
19    request_xml: Option<String>,
20}
21
22impl SelectObject {
23    pub(super) fn new(oss: Oss) -> Self {
24        let mut req = OssRequest::new(oss, Method::POST);
25        req.insert_query("select", "");
26        req.insert_query("select-type", "2");
27        SelectObject {
28            req,
29            request_xml: None,
30        }
31    }
32
33    /// Provide the select request XML document.
34    ///
35    /// Refer to the official documentation for the schema of the `<SelectRequest>` payload.
36    pub fn set_request(mut self, xml: impl ToString) -> Self {
37        self.request_xml = Some(xml.to_string());
38        self
39    }
40
41    /// Enable raw output mode (sets the `x-oss-select-output-raw` header).
42    pub fn enable_output_raw(mut self, enable: bool) -> Self {
43        if enable {
44            self.req.insert_header("x-oss-select-output-raw", "true");
45        }
46        self
47    }
48
49    /// Send the request and collect the response into memory.
50    pub async fn send(self) -> Result<Bytes, Error> {
51        let response = self.send_internal().await?;
52        Ok(body_to_bytes(response.into_body()).await?)
53    }
54
55    /// Send the request and return the response stream for manual consumption.
56    pub async fn send_to_stream(
57        self,
58    ) -> Result<Pin<Box<dyn Stream<Item = Result<bytes::Bytes, Error>> + Send>>, Error> {
59        let response = self.send_internal().await?;
60        let status_code = response.status();
61        match status_code {
62            code if code.is_success() => {
63                let stream = response
64                    .into_body()
65                    .into_data_stream()
66                    .map(|item| match item {
67                        Ok(bytes) => Ok(bytes),
68                        Err(e) => Err(e.into()),
69                    });
70                Ok(Box::pin(stream))
71            }
72            _ => Err(normal_error(response).await),
73        }
74    }
75
76    async fn send_internal(mut self) -> Result<Response<Incoming>, Error> {
77        let body = self.request_xml.ok_or(Error::MissingRequestBody)?;
78        self.req.set_body(Full::new(Bytes::from(body)));
79        let response = self.req.send_to_oss()?.await?;
80        let status_code = response.status();
81        if status_code.is_success() {
82            Ok(response)
83        } else {
84            Err(normal_error(response).await)
85        }
86    }
87}