objectstore_client/
client.rs1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16 service_url: Url,
17 propagate_traces: bool,
18 reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22 fn apply_defaults(mut self) -> Self {
24 self.reqwest_builder = self
25 .reqwest_builder
26 .no_brotli()
29 .no_deflate()
30 .no_gzip()
31 .no_zstd();
32 self
33 }
34}
35
36#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42 pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47 let service_url = match service_url.into_url() {
48 Ok(url) => url,
49 Err(err) => return Self(Err(err.into())),
50 };
51
52 let reqwest_builder = reqwest::Client::builder()
53 .connect_timeout(Duration::from_millis(500))
59 .read_timeout(Duration::from_millis(500))
60 .user_agent(USER_AGENT);
61
62 Self(Ok(ClientBuilderInner {
63 service_url,
64 propagate_traces: false,
65 reqwest_builder,
66 }))
67 }
68
69 pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
72 if let Ok(ref mut inner) = self.0 {
73 inner.propagate_traces = propagate_traces;
74 }
75 self
76 }
77
78 pub fn timeout(self, timeout: Duration) -> Self {
81 let Ok(mut inner) = self.0 else { return self };
82 inner.reqwest_builder = inner
83 .reqwest_builder
84 .connect_timeout(timeout)
85 .read_timeout(timeout);
86 Self(Ok(inner))
87 }
88
89 pub fn configure_reqwest<F>(self, closure: F) -> Self
91 where
92 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
93 {
94 let Ok(mut inner) = self.0 else { return self };
95 inner.reqwest_builder = closure(inner.reqwest_builder);
96 Self(Ok(inner))
97 }
98
99 pub fn build(self) -> crate::Result<Client> {
108 let inner = self.0?.apply_defaults();
109 Ok(Client {
110 inner: Arc::new(ClientInner {
111 reqwest: inner.reqwest_builder.build()?,
112 service_url: inner.service_url,
113 propagate_traces: inner.propagate_traces,
114 }),
115 })
116 }
117}
118
119#[derive(Debug, Clone)]
126pub struct Usecase {
127 name: Arc<str>,
128 compression: Compression,
129 expiration: ExpirationPolicy,
130}
131
132impl Usecase {
133 pub fn new(name: &str) -> Self {
135 Self {
136 name: name.into(),
137 compression: Compression::Zstd,
138 expiration: Default::default(),
139 }
140 }
141
142 #[inline]
144 pub fn name(&self) -> &str {
145 &self.name
146 }
147
148 #[inline]
150 pub fn compression(&self) -> Compression {
151 self.compression
152 }
153
154 pub fn with_compression(self, compression: Compression) -> Self {
156 Self {
157 compression,
158 ..self
159 }
160 }
161
162 #[inline]
164 pub fn expiration(&self) -> ExpirationPolicy {
165 self.expiration
166 }
167
168 pub fn with_expiration(self, expiration: ExpirationPolicy) -> Self {
170 Self { expiration, ..self }
171 }
172
173 pub fn scope(&self) -> Scope {
175 Scope::new(self.clone())
176 }
177
178 pub fn for_organization(&self, organization: u64) -> Scope {
180 Scope::for_organization(self.clone(), organization)
181 }
182
183 pub fn for_project(&self, organization: u64, project: u64) -> Scope {
185 Scope::for_project(self.clone(), organization, project)
186 }
187}
188
189#[derive(Debug)]
190pub(crate) struct ScopeInner {
191 usecase: Usecase,
192 scope: String,
193}
194
195impl ScopeInner {
196 #[inline]
197 pub(crate) fn usecase(&self) -> &Usecase {
198 &self.usecase
199 }
200}
201
202impl std::fmt::Display for ScopeInner {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.write_str(&self.scope)
205 }
206}
207
208#[derive(Debug)]
210pub struct Scope(crate::Result<ScopeInner>);
211
212impl Scope {
213 pub fn new(usecase: Usecase) -> Self {
215 Self(Ok(ScopeInner {
216 usecase,
217 scope: String::new(),
218 }))
219 }
220
221 fn for_organization(usecase: Usecase, organization: u64) -> Self {
222 let scope = format!("org.{}", organization);
223 Self(Ok(ScopeInner { usecase, scope }))
224 }
225
226 fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
227 let scope = format!("org.{}/project.{}", organization, project);
228 Self(Ok(ScopeInner { usecase, scope }))
229 }
230
231 pub fn push<V>(self, key: &str, value: &V) -> Self
233 where
234 V: std::fmt::Display,
235 {
236 let result = self.0.and_then(|mut inner| {
237 Self::validate_key(key)?;
238
239 let value = value.to_string();
240 Self::validate_value(&value)?;
241
242 if !inner.scope.is_empty() {
243 inner.scope.push('/');
244 }
245 inner.scope.push_str(key);
246 inner.scope.push('.');
247 inner.scope.push_str(&value);
248
249 Ok(inner)
250 });
251
252 Self(result)
253 }
254
255 const ALLOWED_CHARS: &[u8] =
259 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
260
261 fn validate_key(key: &str) -> crate::Result<()> {
263 if key.is_empty() {
264 return Err(crate::Error::InvalidScope {
265 message: "Scope key cannot be empty".to_string(),
266 });
267 }
268 if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
269 Ok(())
270 } else {
271 Err(crate::Error::InvalidScope {
272 message: format!("Invalid scope key '{key}'."),
273 })
274 }
275 }
276
277 fn validate_value(value: &str) -> crate::Result<()> {
279 if value.is_empty() {
280 return Err(crate::Error::InvalidScope {
281 message: "Scope value cannot be empty".to_string(),
282 });
283 }
284 if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
285 Ok(())
286 } else {
287 Err(crate::Error::InvalidScope {
288 message: format!("Invalid scope value '{value}'."),
289 })
290 }
291 }
292
293 pub fn session(self, client: &Client) -> crate::Result<Session> {
295 client.session(self)
296 }
297}
298
299#[derive(Debug)]
300pub(crate) struct ClientInner {
301 reqwest: reqwest::Client,
302 service_url: Url,
303 propagate_traces: bool,
304}
305
306#[derive(Debug, Clone)]
332pub struct Client {
333 inner: Arc<ClientInner>,
334}
335
336impl Client {
337 pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
346 ClientBuilder::new(service_url).build()
347 }
348
349 pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
351 ClientBuilder::new(service_url)
352 }
353
354 pub fn session(&self, scope: Scope) -> crate::Result<Session> {
356 scope.0.map(|inner| Session {
357 scope: inner.into(),
358 client: self.inner.clone(),
359 })
360 }
361}
362
363#[derive(Debug, Clone)]
365pub struct Session {
366 pub(crate) scope: Arc<ScopeInner>,
367 pub(crate) client: Arc<ClientInner>,
368}
369
370pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
372
373impl Session {
374 pub(crate) fn request(
375 &self,
376 method: reqwest::Method,
377 resource_id: &str,
378 ) -> crate::Result<reqwest::RequestBuilder> {
379 let mut url = self.client.service_url.clone();
380 url.path_segments_mut()
381 .map_err(|_| crate::Error::InvalidUrl {
382 message: format!("The URL {} cannot be a base", self.client.service_url),
383 })?
384 .extend(&["v1", resource_id]);
385
386 let mut builder = self.client.reqwest.request(method, url).query(&[
387 (PARAM_SCOPE, self.scope.scope.as_str()),
388 (PARAM_USECASE, self.scope.usecase.name.as_ref()),
389 ]);
390
391 if self.client.propagate_traces {
392 let trace_headers =
393 sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
394 for (header_name, value) in trace_headers.into_iter().flatten() {
395 builder = builder.header(header_name, value);
396 }
397 }
398
399 Ok(builder)
400 }
401}