objectstore_client/
put.rs1use std::fmt;
2use std::io::{self, Cursor};
3use std::path::PathBuf;
4use std::{borrow::Cow, collections::BTreeMap};
5
6use async_compression::tokio::bufread::ZstdEncoder;
7use bytes::Bytes;
8use futures_util::StreamExt;
9use objectstore_types::metadata::Metadata;
10use reqwest::Body;
11use serde::Deserialize;
12use tokio::fs::File;
13use tokio::io::{AsyncRead, BufReader};
14use tokio_util::io::{ReaderStream, StreamReader};
15
16pub use objectstore_types::metadata::{Compression, ExpirationPolicy};
17
18use crate::{ClientStream, ObjectKey, Session};
19
20#[derive(Debug, Deserialize)]
22pub struct PutResponse {
23 pub key: ObjectKey,
25}
26
27pub(crate) enum PutBody {
28 Buffer(Bytes),
29 Stream(ClientStream),
30 File(File),
31 Path(PathBuf),
32}
33
34impl fmt::Debug for PutBody {
35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
36 f.debug_tuple("PutBody").finish_non_exhaustive()
37 }
38}
39
40impl Session {
41 fn put_body(&self, body: PutBody) -> PutBuilder {
42 let metadata = Metadata {
43 expiration_policy: self.scope.usecase().expiration_policy(),
44 compression: Some(self.scope.usecase().compression()),
45 ..Default::default()
46 };
47
48 PutBuilder {
49 session: self.clone(),
50 metadata,
51 key: None,
52 body,
53 }
54 }
55
56 pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder {
58 self.put_body(PutBody::Buffer(body.into()))
59 }
60
61 pub fn put_stream(&self, body: ClientStream) -> PutBuilder {
63 self.put_body(PutBody::Stream(body))
64 }
65
66 pub fn put_read<R>(&self, body: R) -> PutBuilder
68 where
69 R: AsyncRead + Send + Sync + 'static,
70 {
71 let stream = ReaderStream::new(body).boxed();
72 self.put_body(PutBody::Stream(stream))
73 }
74
75 pub fn put_file(&self, file: File) -> PutBuilder {
83 self.put_body(PutBody::File(file))
84 }
85
86 pub fn put_path(&self, path: impl Into<PathBuf>) -> PutBuilder {
97 self.put_body(PutBody::Path(path.into()))
98 }
99}
100
101#[derive(Debug)]
103pub struct PutBuilder {
104 pub(crate) session: Session,
105 pub(crate) metadata: Metadata,
106 pub(crate) key: Option<ObjectKey>,
107 pub(crate) body: PutBody,
108}
109
110impl PutBuilder {
111 pub fn key(mut self, key: impl Into<ObjectKey>) -> Self {
116 self.key = Some(key.into()).filter(|k| !k.is_empty());
117 self
118 }
119
120 pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
128 self.metadata.compression = compression.into();
129 self
130 }
131
132 pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
136 self.metadata.expiration_policy = expiration_policy;
137 self
138 }
139
140 pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
145 self.metadata.content_type = content_type.into();
146 self
147 }
148
149 pub fn origin(mut self, origin: impl Into<String>) -> Self {
167 self.metadata.origin = Some(origin.into());
168 self
169 }
170
171 pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
175 self.metadata.custom = metadata.into();
176 self
177 }
178
179 pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
181 self.metadata.custom.insert(key.into(), value.into());
182 self
183 }
184}
185
186pub(crate) async fn maybe_compress(
188 body: PutBody,
189 compression: Option<Compression>,
190) -> io::Result<Body> {
191 Ok(match (compression, body) {
192 (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
193 let cursor = Cursor::new(bytes);
194 let encoder = ZstdEncoder::new(cursor);
195 let stream = ReaderStream::new(encoder);
196 Body::wrap_stream(stream)
197 }
198 (Some(Compression::Zstd), PutBody::Stream(stream)) => {
199 let stream = StreamReader::new(stream);
200 let encoder = ZstdEncoder::new(stream);
201 let stream = ReaderStream::new(encoder);
202 Body::wrap_stream(stream)
203 }
204 (Some(Compression::Zstd), PutBody::File(file)) => {
205 let reader = BufReader::new(file);
206 let encoder = ZstdEncoder::new(reader);
207 let stream = ReaderStream::new(encoder);
208 Body::wrap_stream(stream)
209 }
210 (Some(Compression::Zstd), PutBody::Path(file)) => {
211 let file = File::open(file).await?;
212 let reader = BufReader::new(file);
213 let encoder = ZstdEncoder::new(reader);
214 let stream = ReaderStream::new(encoder);
215 Body::wrap_stream(stream)
216 }
217 (None, PutBody::Buffer(bytes)) => bytes.into(),
218 (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
219 (None, PutBody::File(file)) => {
220 let stream = ReaderStream::new(file);
221 Body::wrap_stream(stream)
222 }
223 (None, PutBody::Path(path)) => {
224 let stream = ReaderStream::new(File::open(path).await?);
225 Body::wrap_stream(stream)
226 }
227 })
228}
229
230impl PutBuilder {
234 pub async fn send(self) -> crate::Result<PutResponse> {
236 let method = match self.key {
237 Some(_) => reqwest::Method::PUT,
238 None => reqwest::Method::POST,
239 };
240
241 let mut builder = self
242 .session
243 .request(method, self.key.as_deref().unwrap_or_default())?;
244
245 let body = maybe_compress(self.body, self.metadata.compression).await?;
246
247 builder = builder.headers(self.metadata.to_headers("")?);
248
249 let response = builder.body(body).send().await?;
250 Ok(response.error_for_status()?.json().await?)
251 }
252}