Skip to main content

objectstore_client/
get.rs

1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::metadata::{Compression, Metadata};
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10use crate::{ClientStream, ObjectKey, Session};
11
12/// The result from a successful [`get()`](Session::get) call.
13///
14/// This carries the response as a stream, plus the compression algorithm of the data.
15pub struct GetResponse {
16    /// The metadata attached to this object, including the compression algorithm used for the payload.
17    pub metadata: Metadata,
18    /// The response stream.
19    pub stream: ClientStream,
20}
21
22impl GetResponse {
23    /// Loads the object payload fully into memory.
24    pub async fn payload(self) -> crate::Result<bytes::Bytes> {
25        let bytes: BytesMut = self.stream.try_collect().await?;
26        Ok(bytes.freeze())
27    }
28
29    /// Loads the object payload fully into memory and interprets it as UTF-8 text.
30    pub async fn text(self) -> crate::Result<String> {
31        let bytes = self.payload().await?;
32        Ok(String::from_utf8(bytes.to_vec())?)
33    }
34}
35
36impl fmt::Debug for GetResponse {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("GetResponse")
39            .field("metadata", &self.metadata)
40            .field("stream", &format_args!("[Stream]"))
41            .finish()
42    }
43}
44
45impl Session {
46    /// Retrieves the object with the given `key`.
47    pub fn get(&self, key: &str) -> GetBuilder {
48        GetBuilder {
49            session: self.clone(),
50            key: key.to_owned(),
51            decompress: true,
52        }
53    }
54}
55
56/// A [`get`](Session::get) request builder.
57#[derive(Debug)]
58pub struct GetBuilder {
59    pub(crate) session: Session,
60    pub(crate) key: ObjectKey,
61    pub(crate) decompress: bool,
62}
63
64impl GetBuilder {
65    /// Indicates whether the request should automatically handle decompression of known algorithms,
66    /// or rather return the payload as it is stored, along with the compression algorithm it is stored in.
67    ///
68    /// By default, automatic decompression is enabled.
69    pub fn decompress(mut self, decompress: bool) -> Self {
70        self.decompress = decompress;
71        self
72    }
73
74    /// Sends the get request.
75    pub async fn send(self) -> crate::Result<Option<GetResponse>> {
76        let response = self
77            .session
78            .request(reqwest::Method::GET, &self.key)?
79            .send()
80            .await?;
81        if response.status() == StatusCode::NOT_FOUND {
82            return Ok(None);
83        }
84        let response = response.error_for_status()?;
85
86        let mut metadata = Metadata::from_headers(response.headers(), "")?;
87
88        let stream = response.bytes_stream().map_err(io::Error::other).boxed();
89        let stream = maybe_decompress(stream, &mut metadata, self.decompress);
90
91        Ok(Some(GetResponse { metadata, stream }))
92    }
93}
94
95/// Wraps a stream in a zstd decompression layer.
96///
97/// Decompresses if the metadata indicates zstd compression and `decompress` is `true`.
98/// Clears `metadata.compression` when decompression is applied.
99pub(crate) fn maybe_decompress(
100    stream: ClientStream,
101    metadata: &mut Metadata,
102    decompress: bool,
103) -> ClientStream {
104    match (metadata.compression, decompress) {
105        (Some(Compression::Zstd), true) => {
106            metadata.compression = None;
107            ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
108        }
109        _ => stream,
110    }
111}