objectstore_client/
put.rs1use std::collections::BTreeMap;
2use std::fmt;
3use std::io::Cursor;
4
5use async_compression::tokio::bufread::ZstdEncoder;
6use bytes::Bytes;
7use futures_util::StreamExt;
8use objectstore_types::Metadata;
9use reqwest::Body;
10use serde::Deserialize;
11use tokio::io::AsyncRead;
12use tokio_util::io::{ReaderStream, StreamReader};
13
14pub use objectstore_types::{Compression, ExpirationPolicy};
15
16use crate::{Client, ClientStream};
17
18impl Client {
19 fn put_body(&self, body: PutBody) -> PutBuilder<'_> {
20 let metadata = Metadata {
21 expiration_policy: self.default_expiration_policy,
22 compression: Some(self.default_compression),
23 ..Default::default()
24 };
25
26 PutBuilder {
27 client: self,
28 metadata,
29 body,
30 }
31 }
32
33 pub fn put(&self, body: impl Into<Bytes>) -> PutBuilder<'_> {
35 self.put_body(PutBody::Buffer(body.into()))
36 }
37
38 pub fn put_stream(&self, body: ClientStream) -> PutBuilder<'_> {
40 self.put_body(PutBody::Stream(body))
41 }
42
43 pub fn put_read<R>(&self, body: R) -> PutBuilder<'_>
45 where
46 R: AsyncRead + Send + Sync + 'static,
47 {
48 let stream = ReaderStream::new(body).boxed();
49 self.put_body(PutBody::Stream(stream))
50 }
51}
52
53#[derive(Debug)]
55pub struct PutBuilder<'a> {
56 pub(crate) client: &'a Client,
57 pub(crate) metadata: Metadata,
58 pub(crate) body: PutBody,
59}
60
61pub(crate) enum PutBody {
62 Buffer(Bytes),
63 Stream(ClientStream),
64}
65impl fmt::Debug for PutBody {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 f.debug_tuple("PutBody").finish_non_exhaustive()
68 }
69}
70
71impl PutBuilder<'_> {
72 pub fn compression(mut self, compression: impl Into<Option<Compression>>) -> Self {
78 self.metadata.compression = compression.into();
79 self
80 }
81
82 pub fn expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
84 self.metadata.expiration_policy = expiration_policy;
85 self
86 }
87
88 pub fn set_metadata(mut self, metadata: impl Into<BTreeMap<String, String>>) -> Self {
92 self.metadata.custom = metadata.into();
93 self
94 }
95
96 pub fn append_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
98 self.metadata.custom.insert(key.into(), value.into());
99 self
100 }
101}
102
103#[derive(Debug, Deserialize)]
105pub struct PutResponse {
106 pub key: String,
108}
109
110impl PutBuilder<'_> {
114 pub async fn send(self) -> anyhow::Result<PutResponse> {
116 let put_url = format!("{}/v1/", self.client.service_url);
117 let mut builder = self.client.request(reqwest::Method::PUT, put_url)?;
118
119 let body = match (self.metadata.compression, self.body) {
120 (Some(Compression::Zstd), PutBody::Buffer(bytes)) => {
121 let cursor = Cursor::new(bytes);
122 let encoder = ZstdEncoder::new(cursor);
123 let stream = ReaderStream::new(encoder);
124 Body::wrap_stream(stream)
125 }
126 (Some(Compression::Zstd), PutBody::Stream(stream)) => {
127 let stream = StreamReader::new(stream);
128 let encoder = ZstdEncoder::new(stream);
129 let stream = ReaderStream::new(encoder);
130 Body::wrap_stream(stream)
131 }
132 (None, PutBody::Buffer(bytes)) => bytes.into(),
133 (None, PutBody::Stream(stream)) => Body::wrap_stream(stream),
134 };
136
137 builder = builder.headers(self.metadata.to_headers("", false)?);
138
139 let response = builder.body(body).send().await?;
140 Ok(response.error_for_status()?.json().await?)
141 }
142}