objectstore_client/
put.rs1use std::fmt;
2use std::io::Cursor;
3use std::{borrow::Cow, collections::BTreeMap};
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::metadata::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::io::AsyncRead;
12use tokio_util::io::{ReaderStream, StreamReader};
13
14pub use objectstore_types::metadata::{Compression, ExpirationPolicy};
15
16use crate::{ClientStream, ObjectKey, Session};
17
18#[derive(Debug, Deserialize)]
20pub struct PutResponse {
21 pub key: ObjectKey,
23}
24
25pub(crate) enum PutBody {
26 Buffer(Bytes),
27 Stream(ClientStream),
28}
29
30impl fmt::Debug for PutBody {
31 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32 f.debug_tuple("PutBody").finish_non_exhaustive()
33 }
34}
35
36impl Session {
37 fn put_body(&self, body: PutBody) -> PutBuilder {
38 let metadata = Metadata {
39 expiration_policy: self.scope.usecase().expiration_policy(),
40 compression: Some(self.scope.usecase().compression()),
41 ..Default::default()
42 };
43
44 PutBuilder {
45 session: self.clone(),
46 metadata,
47 key: None,
48 body,
49 }
50 }
51
52 pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder {
54 self.put_body(PutBody::Buffer(body.into()))
55 }
56
57 pub fn put_stream(&self, body: ClientStream) -> PutBuilder {
59 self.put_body(PutBody::Stream(body))
60 }
61
62 pub fn put_read<R>(&self, body: R) -> PutBuilder
64 where
65 R: AsyncRead + Send + Sync + 'static,
66 {
67 let stream = ReaderStream::new(body).boxed();
68 self.put_body(PutBody::Stream(stream))
69 }
70}
71
72#[derive(Debug)]
74pub struct PutBuilder {
75 pub(crate) session: Session,
76 pub(crate) metadata: Metadata,
77 pub(crate) key: Option<ObjectKey>,
78 pub(crate) body: PutBody,
79}
80
81impl PutBuilder {
82 pub fn key(mut self, key: impl Into<ObjectKey>) -> Self {
87 self.key = Some(key.into()).filter(|k| !k.is_empty());
88 self
89 }
90
91 pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
99 self.metadata.compression = compression.into();
100 self
101 }
102
103 pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
107 self.metadata.expiration_policy = expiration_policy;
108 self
109 }
110
111 pub fn content_type(mut self, content_type: impl Into<Cow<'static, str>>) -> Self {
116 self.metadata.content_type = content_type.into();
117 self
118 }
119
120 pub fn origin(mut self, origin: impl Into<String>) -> Self {
138 self.metadata.origin = Some(origin.into());
139 self
140 }
141
142 pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
146 self.metadata.custom = metadata.into();
147 self
148 }
149
150 pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
152 self.metadata.custom.insert(key.into(), value.into());
153 self
154 }
155}
156
157pub(crate) fn maybe_compress(body: PutBody, compression: Option<Compression>) -> Body {
159 match (compression, body) {
160 (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
161 let cursor = Cursor::new(bytes);
162 let encoder = ZstdEncoder::new(cursor);
163 let stream = ReaderStream::new(encoder);
164 Body::wrap_stream(stream)
165 }
166 (Some(Compression::Zstd), PutBody::Stream(stream)) => {
167 let stream = StreamReader::new(stream);
168 let encoder = ZstdEncoder::new(stream);
169 let stream = ReaderStream::new(encoder);
170 Body::wrap_stream(stream)
171 }
172 (None, PutBody::Buffer(bytes)) => bytes.into(),
173 (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
174 }
176}
177
178impl PutBuilder {
182 pub async fn send(self) -> crate::Result<PutResponse> {
184 let method = match self.key {
185 Some(_) => reqwest::Method::PUT,
186 None => reqwest::Method::POST,
187 };
188
189 let mut builder = self
190 .session
191 .request(method, self.key.as_deref().unwrap_or_default())?;
192
193 let body = maybe_compress(self.body, self.metadata.compression);
194
195 builder = builder.headers(self.metadata.to_headers("")?);
196
197 let response = builder.body(body).send().await?;
198 Ok(response.error_for_status()?.json().await?)
199 }
200}