objectstore_client/
client.rs1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use reqwest::header::HeaderName;
9
10pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
21pub struct ClientBuilder {
22 service_url: Arc<str>,
23 client: reqwest::Client,
24 propagate_traces: bool,
25
26 usecase: Arc<str>,
27 default_compression: Compression,
28 default_expiration_policy: ExpirationPolicy,
29}
30
31impl ClientBuilder {
32 pub fn new(service_url: &str, usecase: &str) -> anyhow::Result<Self> {
40 let client = reqwest::Client::builder()
41 .user_agent(USER_AGENT)
42 .no_brotli()
45 .no_deflate()
46 .no_gzip()
47 .no_zstd()
48 .connect_timeout(Duration::from_millis(500))
53 .read_timeout(Duration::from_millis(500))
54 .build()?;
55
56 Ok(Self {
57 service_url: service_url.trim_end_matches('/').into(),
58 client,
59 propagate_traces: false,
60
61 usecase: usecase.into(),
62 default_compression: Compression::Zstd,
63 default_expiration_policy: ExpirationPolicy::Manual,
64 })
65 }
66
67 pub fn default_compression(mut self, compression: Compression) -> Self {
69 self.default_compression = compression;
70 self
71 }
72
73 pub fn default_expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
75 self.default_expiration_policy = expiration_policy;
76 self
77 }
78
79 pub fn with_distributed_tracing(mut self, propagate_traces: bool) -> Self {
82 self.propagate_traces = propagate_traces;
83 self
84 }
85
86 fn make_client(&self, scope: String) -> Client {
87 Client {
88 service_url: self.service_url.clone(),
89 http: self.client.clone(),
90 propagate_traces: self.propagate_traces,
91
92 usecase: self.usecase.clone(),
93 scope,
94 default_compression: self.default_compression,
95 default_expiration_policy: self.default_expiration_policy,
96 }
97 }
98
99 pub fn for_organization(&self, organization_id: u64) -> Client {
101 let scope = format!("org.{organization_id}");
102 self.make_client(scope)
103 }
104
105 pub fn for_project(&self, organization_id: u64, project_id: u64) -> Client {
108 let scope = format!("org.{organization_id}/proj.{project_id}");
109 self.make_client(scope)
110 }
111}
112
113#[derive(Debug)]
115pub struct Client {
116 pub(crate) http: reqwest::Client,
117 pub(crate) service_url: Arc<str>,
118 propagate_traces: bool,
119
120 pub(crate) usecase: Arc<str>,
121
122 pub(crate) scope: String,
133 pub(crate) default_compression: Compression,
134 pub(crate) default_expiration_policy: ExpirationPolicy,
135}
136
137pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
139
140impl Client {
141 pub(crate) fn request<U: reqwest::IntoUrl>(
142 &self,
143 method: reqwest::Method,
144 uri: U,
145 ) -> anyhow::Result<reqwest::RequestBuilder> {
146 let mut builder = self.http.request(method, uri).query(&[
147 (PARAM_SCOPE, self.scope.as_ref()),
148 (PARAM_USECASE, self.usecase.as_ref()),
149 ]);
150
151 if self.propagate_traces {
152 let trace_headers =
153 sentry::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
154 for (header_name, value) in trace_headers.into_iter().flatten() {
155 builder = builder.header(HeaderName::try_from(header_name)?, value);
156 }
157 }
158
159 Ok(builder)
160 }
161
162 pub async fn delete(&self, id: &str) -> anyhow::Result<()> {
164 let delete_url = format!("{}/v1/{id}", self.service_url);
165
166 let _response = self
167 .request(reqwest::Method::DELETE, delete_url)?
168 .send()
169 .await?;
170
171 Ok(())
172 }
173}