objectstore_client/
get.rs1use std::{fmt, io};
2
3use async_compression::tokio::bufread::ZstdDecoder;
4use bytes::BytesMut;
5use futures_util::{StreamExt, TryStreamExt};
6use objectstore_types::metadata::{Compression, Metadata};
7use reqwest::StatusCode;
8use tokio_util::io::{ReaderStream, StreamReader};
9
10use crate::{ClientStream, ObjectKey, Session};
11
12pub struct GetResponse {
16 pub metadata: Metadata,
18 pub stream: ClientStream,
20}
21
22impl GetResponse {
23 pub async fn payload(self) -> crate::Result<bytes::Bytes> {
25 let bytes: BytesMut = self.stream.try_collect().await?;
26 Ok(bytes.freeze())
27 }
28
29 pub async fn text(self) -> crate::Result<String> {
31 let bytes = self.payload().await?;
32 Ok(String::from_utf8(bytes.to_vec())?)
33 }
34}
35
36impl fmt::Debug for GetResponse {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 f.debug_struct("GetResponse")
39 .field("metadata", &self.metadata)
40 .field("stream", &format_args!("[Stream]"))
41 .finish()
42 }
43}
44
45impl Session {
46 pub fn get(&self, key: &str) -> GetBuilder {
48 GetBuilder {
49 session: self.clone(),
50 key: key.to_owned(),
51 decompress: true,
52 }
53 }
54}
55
56#[derive(Debug)]
58pub struct GetBuilder {
59 pub(crate) session: Session,
60 pub(crate) key: ObjectKey,
61 pub(crate) decompress: bool,
62}
63
64impl GetBuilder {
65 pub fn decompress(mut self, decompress: bool) -> Self {
70 self.decompress = decompress;
71 self
72 }
73
74 pub async fn send(self) -> crate::Result<Option<GetResponse>> {
76 let response = self
77 .session
78 .request(reqwest::Method::GET, &self.key)?
79 .send()
80 .await?;
81 if response.status() == StatusCode::NOT_FOUND {
82 return Ok(None);
83 }
84 let response = response.error_for_status()?;
85
86 let mut metadata = Metadata::from_headers(response.headers(), "")?;
87
88 let stream = response.bytes_stream().map_err(io::Error::other).boxed();
89 let stream = maybe_decompress(stream, &mut metadata, self.decompress);
90
91 Ok(Some(GetResponse { metadata, stream }))
92 }
93}
94
95pub(crate) fn maybe_decompress(
100 stream: ClientStream,
101 metadata: &mut Metadata,
102 decompress: bool,
103) -> ClientStream {
104 match (metadata.compression, decompress) {
105 (Some(Compression::Zstd), true) => {
106 metadata.compression = None;
107 ReaderStream::new(ZstdDecoder::new(StreamReader::new(stream))).boxed()
108 }
109 _ => stream,
110 }
111}