Skip to main content

objectstore_client/
put.rs

1use std::fmt;
2use std::io::Cursor;
3use std::{borrow::Cow, collections::BTreeMap};
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::metadata::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::io::AsyncRead;
12use tokio_util::io::{ReaderStream, StreamReader};
13
14pub use objectstore_types::metadata::{Compression, ExpirationPolicy};
15
16use crate::{ClientStream, ObjectKey, Session};
17
18/// The response returned from the service after uploading an object.
19#[derive(Debug, Deserialize)]
20pub struct PutResponse {
21    /// The key of the object, as stored.
22    pub key: ObjectKey,
23}
24
25pub(crate) enum PutBody {
26    Buffer(Bytes),
27    Stream(ClientStream),
28}
29
30impl fmt::Debug for PutBody {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_tuple("PutBody").finish_non_exhaustive()
33    }
34}
35
36impl Session {
37    fn put_body(&self, body: PutBody) -> PutBuilder {
38        let metadata = Metadata {
39            expiration_policy: self.scope.usecase().expiration_policy(),
40            compression: Some(self.scope.usecase().compression()),
41            ..Default::default()
42        };
43
44        PutBuilder {
45            session: self.clone(),
46            metadata,
47            key: None,
48            body,
49        }
50    }
51
52    /// Creates or replaces an object using a [`Bytes`]-like payload.
53    pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder {
54        self.put_body(PutBody::Buffer(body.into()))
55    }
56
57    /// Creates or replaces an object using a streaming payload.
58    pub fn put_stream(&self, body: ClientStream) -> PutBuilder {
59        self.put_body(PutBody::Stream(body))
60    }
61
62    /// Creates or replaces an object using an [`AsyncRead`] payload.
63    pub fn put_read<R>(&self, body: R) -> PutBuilder
64    where
65        R: AsyncRead + Send + Sync + 'static,
66    {
67        let stream = ReaderStream::new(body).boxed();
68        self.put_body(PutBody::Stream(stream))
69    }
70}
71
72/// A [`put`](Session::put) request builder.
73#[derive(Debug)]
74pub struct PutBuilder {
75    pub(crate) session: Session,
76    pub(crate) metadata: Metadata,
77    pub(crate) key: Option<ObjectKey>,
78    pub(crate) body: PutBody,
79}
80
81impl PutBuilder {
82    /// Sets an explicit object key.
83    ///
84    /// If a key is specified, the object will be stored under that key. Otherwise, the Objectstore
85    /// server will automatically assign a random key, which is then returned from this request.
86    pub fn key(mut self, key: impl Into<ObjectKey>) -> Self {
87        self.key = Some(key.into()).filter(|k| !k.is_empty());
88        self
89    }
90
91    /// Sets an explicit compression algorithm to be used for this payload.
92    ///
93    /// [`None`] should be used if no compression should be performed by the client,
94    /// either because the payload is uncompressible (such as a media format), or if the user
95    /// will handle any kind of compression, without the clients knowledge.
96    ///
97    /// By default, the compression algorithm set on this Session's Usecase is used.
98    pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
99        self.metadata.compression = compression.into();
100        self
101    }
102
103    /// Sets the expiration policy of the object to be uploaded.
104    ///
105    /// By default, the expiration policy set on this Session's Usecase is used.
106    pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
107        self.metadata.expiration_policy = expiration_policy;
108        self
109    }
110
111    /// Sets the content type of the object to be uploaded.
112    ///
113    /// You can use the utility function [`crate::utils::guess_mime_type`] to attempt to guess a
114    /// `content_type` based on magic bytes.
115    pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
116        self.metadata.content_type = content_type.into();
117        self
118    }
119
120    /// Sets the origin of the object, typically the IP address of the original source.
121    ///
122    /// This is an optional but encouraged field that tracks where the payload was
123    /// originally obtained from. For example, the IP address of the Sentry SDK or CLI
124    /// that uploaded the data.
125    ///
126    /// # Example
127    ///
128    /// ```no_run
129    /// # async fn example(session: objectstore_client::Session) {
130    /// session.put("data")
131    ///     .origin("203.0.113.42")
132    ///     .send()
133    ///     .await
134    ///     .unwrap();
135    /// # }
136    /// ```
137    pub fn origin(mut self, origin: impl Into<String>) -> Self {
138        self.metadata.origin = Some(origin.into());
139        self
140    }
141
142    /// This sets the custom metadata to the provided map.
143    ///
144    /// It will clear any previously set metadata.
145    pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
146        self.metadata.custom = metadata.into();
147        self
148    }
149
150    /// Appends they `key`/`value` to the custom metadata of this object.
151    pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
152        self.metadata.custom.insert(key.into(), value.into());
153        self
154    }
155}
156
157/// Compresses the body if compression is specified.
158pub(crate) fn maybe_compress(body: PutBody, compression: Option<Compression>) -> Body {
159    match (compression, body) {
160        (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
161            let cursor = Cursor::new(bytes);
162            let encoder = ZstdEncoder::new(cursor);
163            let stream = ReaderStream::new(encoder);
164            Body::wrap_stream(stream)
165        }
166        (Some(Compression::Zstd), PutBody::Stream(stream)) => {
167            let stream = StreamReader::new(stream);
168            let encoder = ZstdEncoder::new(stream);
169            let stream = ReaderStream::new(encoder);
170            Body::wrap_stream(stream)
171        }
172        (None, PutBody::Buffer(bytes)) => bytes.into(),
173        (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
174        // _ => todo!("compression algorithms other than `zstd` are currently not supported"),
175    }
176}
177
178// TODO: instead of a separate `send` method, it would be nice to just implement `IntoFuture`.
179// However, `IntoFuture` needs to define the resulting future as an associated type,
180// and "impl trait in associated type position" is not yet stable :-(
181impl PutBuilder {
182    /// Sends the built put request to the upstream service.
183    pub async fn send(self) -> crate::Result<PutResponse> {
184        let method = match self.key {
185            Some(_) => reqwest::Method::PUT,
186            None => reqwest::Method::POST,
187        };
188
189        let mut builder = self
190            .session
191            .request(method, self.key.as_deref().unwrap_or_default())?;
192
193        let body = maybe_compress(self.body, self.metadata.compression);
194
195        builder = builder.headers(self.metadata.to_headers("")?);
196
197        let response = builder.body(body).send().await?;
198        Ok(response.error_for_status()?.json().await?)
199    }
200}