Skip to main content

objectstore_client/
put.rs

1use std::fmt;
2use std::io::{self, Cursor};
3use std::path::PathBuf;
4use std::{borrow::Cow, collections::BTreeMap};
5
6use async_compression::tokio::bufread::ZstdEncoder;
7use bytes::Bytes;
8use futures_util::StreamExt;
9use objectstore_types::metadata::Metadata;
10use reqwest::Body;
11use serde::Deserialize;
12use tokio::fs::File;
13use tokio::io::{AsyncRead, BufReader};
14use tokio_util::io::{ReaderStream, StreamReader};
15
16pub use objectstore_types::metadata::{Compression, ExpirationPolicy};
17
18use crate::{ClientStream, ObjectKey, Session};
19
20/// The response returned from the service after uploading an object.
21#[derive(Debug, Deserialize)]
22pub struct PutResponse {
23    /// The key of the object, as stored.
24    pub key: ObjectKey,
25}
26
27pub(crate) enum PutBody {
28    Buffer(Bytes),
29    Stream(ClientStream),
30    File(File),
31    Path(PathBuf),
32}
33
34impl fmt::Debug for PutBody {
35    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36        f.debug_tuple("PutBody").finish_non_exhaustive()
37    }
38}
39
40impl Session {
41    fn put_body(&self, body: PutBody) -> PutBuilder {
42        let metadata = Metadata {
43            expiration_policy: self.scope.usecase().expiration_policy(),
44            compression: Some(self.scope.usecase().compression()),
45            ..Default::default()
46        };
47
48        PutBuilder {
49            session: self.clone(),
50            metadata,
51            key: None,
52            body,
53        }
54    }
55
56    /// Creates or replaces an object using a [`Bytes`]-like payload.
57    pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder {
58        self.put_body(PutBody::Buffer(body.into()))
59    }
60
61    /// Creates or replaces an object using a streaming payload.
62    pub fn put_stream(&self, body: ClientStream) -> PutBuilder {
63        self.put_body(PutBody::Stream(body))
64    }
65
66    /// Creates or replaces an object using an [`AsyncRead`] payload.
67    pub fn put_read<R>(&self, body: R) -> PutBuilder
68    where
69        R: AsyncRead + Send + Sync + 'static,
70    {
71        let stream = ReaderStream::new(body).boxed();
72        self.put_body(PutBody::Stream(stream))
73    }
74
75    /// Creates or replaces an object using the contents of an opened file.
76    ///
77    /// The file descriptor is held open from the moment this method is called until the
78    /// upload completes. When enqueueing many files via [`Session::many`], prefer
79    /// [`put_path`](Session::put_path) instead: it defers opening the file until just before
80    /// upload, keeping file descriptor usage within the active concurrency window and avoiding
81    /// OS file descriptor limit (e.g., macOS's default `ulimit -n`) exhaustion.
82    pub fn put_file(&self, file: File) -> PutBuilder {
83        self.put_body(PutBody::File(file))
84    }
85
86    /// Creates or replaces an object using the contents of the file at `path`.
87    ///
88    /// Unlike [`put_file`](Session::put_file), this method defers opening the file until the
89    /// request is actually sent. When enqueueing many file uploads via [`Session::many`], this
90    /// ensures that file descriptors are opened only within the active concurrency window,
91    /// preventing the process from exhausting the OS file descriptor limit (e.g., macOS's
92    /// default `ulimit -n`).
93    ///
94    /// Prefer `put_path` over [`put_file`](Session::put_file) whenever you are lining up a
95    /// large number of files for upload.
96    pub fn put_path(&self, path: impl Into<PathBuf>) -> PutBuilder {
97        self.put_body(PutBody::Path(path.into()))
98    }
99}
100
101/// A [`put`](Session::put) request builder.
102#[derive(Debug)]
103pub struct PutBuilder {
104    pub(crate) session: Session,
105    pub(crate) metadata: Metadata,
106    pub(crate) key: Option<ObjectKey>,
107    pub(crate) body: PutBody,
108}
109
110impl PutBuilder {
111    /// Sets an explicit object key.
112    ///
113    /// If a key is specified, the object will be stored under that key. Otherwise, the Objectstore
114    /// server will automatically assign a random key, which is then returned from this request.
115    pub fn key(mut self, key: impl Into<ObjectKey>) -> Self {
116        self.key = Some(key.into()).filter(|k| !k.is_empty());
117        self
118    }
119
120    /// Sets an explicit compression algorithm to be used for this payload.
121    ///
122    /// [`None`] should be used if no compression should be performed by the client,
123    /// either because the payload is uncompressible (such as a media format), or if the user
124    /// will handle any kind of compression, without the clients knowledge.
125    ///
126    /// By default, the compression algorithm set on this Session's Usecase is used.
127    pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
128        self.metadata.compression = compression.into();
129        self
130    }
131
132    /// Sets the expiration policy of the object to be uploaded.
133    ///
134    /// By default, the expiration policy set on this Session's Usecase is used.
135    pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
136        self.metadata.expiration_policy = expiration_policy;
137        self
138    }
139
140    /// Sets the content type of the object to be uploaded.
141    ///
142    /// You can use the utility function [`crate::utils::guess_mime_type`] to attempt to guess a
143    /// `content_type` based on magic bytes.
144    pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
145        self.metadata.content_type = content_type.into();
146        self
147    }
148
149    /// Sets the origin of the object, typically the IP address of the original source.
150    ///
151    /// This is an optional but encouraged field that tracks where the payload was
152    /// originally obtained from. For example, the IP address of the Sentry SDK or CLI
153    /// that uploaded the data.
154    ///
155    /// # Example
156    ///
157    /// ```no_run
158    /// # async fn example(session: objectstore_client::Session) {
159    /// session.put("data")
160    ///     .origin("203.0.113.42")
161    ///     .send()
162    ///     .await
163    ///     .unwrap();
164    /// # }
165    /// ```
166    pub fn origin(mut self, origin: impl Into<String>) -> Self {
167        self.metadata.origin = Some(origin.into());
168        self
169    }
170
171    /// This sets the custom metadata to the provided map.
172    ///
173    /// It will clear any previously set metadata.
174    pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
175        self.metadata.custom = metadata.into();
176        self
177    }
178
179    /// Appends they `key`/`value` to the custom metadata of this object.
180    pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
181        self.metadata.custom.insert(key.into(), value.into());
182        self
183    }
184}
185
186/// Compresses the body if compression is specified.
187pub(crate) async fn maybe_compress(
188    body: PutBody,
189    compression: Option<Compression>,
190) -> io::Result<Body> {
191    Ok(match (compression, body) {
192        (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
193            let cursor = Cursor::new(bytes);
194            let encoder = ZstdEncoder::new(cursor);
195            let stream = ReaderStream::new(encoder);
196            Body::wrap_stream(stream)
197        }
198        (Some(Compression::Zstd), PutBody::Stream(stream)) => {
199            let stream = StreamReader::new(stream);
200            let encoder = ZstdEncoder::new(stream);
201            let stream = ReaderStream::new(encoder);
202            Body::wrap_stream(stream)
203        }
204        (Some(Compression::Zstd), PutBody::File(file)) => {
205            let reader = BufReader::new(file);
206            let encoder = ZstdEncoder::new(reader);
207            let stream = ReaderStream::new(encoder);
208            Body::wrap_stream(stream)
209        }
210        (Some(Compression::Zstd), PutBody::Path(file)) => {
211            let file = File::open(file).await?;
212            let reader = BufReader::new(file);
213            let encoder = ZstdEncoder::new(reader);
214            let stream = ReaderStream::new(encoder);
215            Body::wrap_stream(stream)
216        }
217        (None, PutBody::Buffer(bytes)) => bytes.into(),
218        (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
219        (None, PutBody::File(file)) => {
220            let stream = ReaderStream::new(file);
221            Body::wrap_stream(stream)
222        }
223        (None, PutBody::Path(path)) => {
224            let stream = ReaderStream::new(File::open(path).await?);
225            Body::wrap_stream(stream)
226        }
227    })
228}
229
230// TODO: instead of a separate `send` method, it would be nice to just implement `IntoFuture`.
231// However, `IntoFuture` needs to define the resulting future as an associated type,
232// and "impl trait in associated type position" is not yet stable :-(
233impl PutBuilder {
234    /// Sends the built put request to the upstream service.
235    pub async fn send(self) -> crate::Result<PutResponse> {
236        let method = match self.key {
237            Some(_) => reqwest::Method::PUT,
238            None => reqwest::Method::POST,
239        };
240
241        let mut builder = self
242            .session
243            .request(method, self.key.as_deref().unwrap_or_default())?;
244
245        let body = maybe_compress(self.body, self.metadata.compression).await?;
246
247        builder = builder.headers(self.metadata.to_headers("")?);
248
249        let response = builder.body(body).send().await?;
250        Ok(response.error_for_status()?.json().await?)
251    }
252}