objectstore_client/
put.rs

1use std::collections::BTreeMap;
2use std::fmt;
3use std::io::Cursor;
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::io::AsyncRead;
12use tokio_util::io::{ReaderStream, StreamReader};
13
14pub use objectstore_types::{Compression, ExpirationPolicy};
15
16use crate::{Client, ClientStream};
17
18impl Client {
19    fn put_body(&self, body: PutBody) -> PutBuilder<'_> {
20        let metadata = Metadata {
21            expiration_policy: self.default_expiration_policy,
22            compression: Some(self.default_compression),
23            ..Default::default()
24        };
25
26        PutBuilder {
27            client: self,
28            metadata,
29            body,
30        }
31    }
32
33    /// Creates a PUT request for a [`Bytes`]-like type.
34    pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder<'_> {
35        self.put_body(PutBody::Buffer(body.into()))
36    }
37
38    /// Creates a PUT request with a stream.
39    pub fn put_stream(&self, body: ClientStream) -> PutBuilder<'_> {
40        self.put_body(PutBody::Stream(body))
41    }
42
43    /// Creates a PUT request with an [`AsyncRead`] type.
44    pub fn put_read<R>(&self, body: R) -> PutBuilder<'_>
45    where
46        R: AsyncRead + Send + Sync + 'static,
47    {
48        let stream = ReaderStream::new(body).boxed();
49        self.put_body(PutBody::Stream(stream))
50    }
51}
52
53/// A PUT request builder.
54#[derive(Debug)]
55pub struct PutBuilder<'a> {
56    pub(crate) client: &'a Client,
57    pub(crate) metadata: Metadata,
58    pub(crate) body: PutBody,
59}
60
61pub(crate) enum PutBody {
62    Buffer(Bytes),
63    Stream(ClientStream),
64}
65impl fmt::Debug for PutBody {
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67        f.debug_tuple("PutBody").finish_non_exhaustive()
68    }
69}
70
71impl PutBuilder<'_> {
72    /// Sets an explicit compression algorithm to be used for this payload.
73    ///
74    /// [`None`] should be used if no compression should be performed by the client,
75    /// either because the payload is uncompressible (such as a media format), or if the user
76    /// will handle any kind of compression, without the clients knowledge.
77    pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
78        self.metadata.compression = compression.into();
79        self
80    }
81
82    /// Sets the expiration policy of the object to be uploaded.
83    pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
84        self.metadata.expiration_policy = expiration_policy;
85        self
86    }
87
88    /// This sets the custom metadata to the provided map.
89    ///
90    /// It will clear any previously set metadata.
91    pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
92        self.metadata.custom = metadata.into();
93        self
94    }
95
96    /// Appends they `key`/`value` to the custom metadata of this object.
97    pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
98        self.metadata.custom.insert(key.into(), value.into());
99        self
100    }
101}
102
103/// The response returned from the service after uploading an object.
104#[derive(Debug, Deserialize)]
105pub struct PutResponse {
106    /// The key of the object, as stored.
107    pub key: String,
108}
109
110// TODO: instead of a separate `send` method, it would be nice to just implement `IntoFuture`.
111// However, `IntoFuture` needs to define the resulting future as an associated type,
112// and "impl trait in associated type position" is not yet stable :-(
113impl PutBuilder<'_> {
114    /// Sends the built PUT request to the upstream service.
115    pub async fn send(self) -> anyhow::Result<PutResponse> {
116        let put_url = format!("{}/v1/", self.client.service_url);
117        let mut builder = self.client.request(reqwest::Method::PUT, put_url)?;
118
119        let body = match (self.metadata.compression, self.body) {
120            (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
121                let cursor = Cursor::new(bytes);
122                let encoder = ZstdEncoder::new(cursor);
123                let stream = ReaderStream::new(encoder);
124                Body::wrap_stream(stream)
125            }
126            (Some(Compression::Zstd), PutBody::Stream(stream)) => {
127                let stream = StreamReader::new(stream);
128                let encoder = ZstdEncoder::new(stream);
129                let stream = ReaderStream::new(encoder);
130                Body::wrap_stream(stream)
131            }
132            (None, PutBody::Buffer(bytes)) => bytes.into(),
133            (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
134            // _ => todo!("compression algorithms other than `zstd` are currently not supported"),
135        };
136
137        builder = builder.headers(self.metadata.to_headers("", false)?);
138
139        let response = builder.body(body).send().await?;
140        Ok(response.error_for_status()?.json().await?)
141    }
142}