objectstore_client/
client.rs1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::Compression;
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16 service_url: Url,
17 propagate_traces: bool,
18 reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22 fn apply_defaults(mut self) -> Self {
24 self.reqwest_builder = self
25 .reqwest_builder
26 .no_brotli()
29 .no_deflate()
30 .no_gzip()
31 .no_zstd();
32 self
33 }
34}
35
36#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42 pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47 let service_url = match service_url.into_url() {
48 Ok(url) => url,
49 Err(err) => return Self(Err(err.into())),
50 };
51 if service_url.cannot_be_a_base() {
52 return ClientBuilder(Err(crate::Error::InvalidUrl {
53 message: "service_url cannot be a base".to_owned(),
54 }));
55 }
56
57 let reqwest_builder = reqwest::Client::builder()
58 .connect_timeout(Duration::from_millis(500))
64 .read_timeout(Duration::from_millis(500))
65 .user_agent(USER_AGENT);
66
67 Self(Ok(ClientBuilderInner {
68 service_url,
69 propagate_traces: false,
70 reqwest_builder,
71 }))
72 }
73
74 pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
79 if let Ok(ref mut inner) = self.0 {
80 inner.propagate_traces = propagate_traces;
81 }
82 self
83 }
84
85 pub fn timeout(self, timeout: Duration) -> Self {
90 let Ok(mut inner) = self.0 else { return self };
91 inner.reqwest_builder = inner
92 .reqwest_builder
93 .connect_timeout(timeout)
94 .read_timeout(timeout);
95 Self(Ok(inner))
96 }
97
98 pub fn configure_reqwest<F>(self, closure: F) -> Self
102 where
103 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
104 {
105 let Ok(mut inner) = self.0 else { return self };
106 inner.reqwest_builder = closure(inner.reqwest_builder);
107 Self(Ok(inner))
108 }
109
110 pub fn build(self) -> crate::Result<Client> {
119 let inner = self.0?.apply_defaults();
120
121 Ok(Client {
122 inner: Arc::new(ClientInner {
123 reqwest: inner.reqwest_builder.build()?,
124 service_url: inner.service_url,
125 propagate_traces: inner.propagate_traces,
126 }),
127 })
128 }
129}
130
131#[derive(Debug, Clone)]
138pub struct Usecase {
139 name: Arc<str>,
140 compression: Compression,
141 expiration_policy: ExpirationPolicy,
142}
143
144impl Usecase {
145 pub fn new(name: &str) -> Self {
147 Self {
148 name: name.into(),
149 compression: Compression::Zstd,
150 expiration_policy: Default::default(),
151 }
152 }
153
154 #[inline]
156 pub fn name(&self) -> &str {
157 &self.name
158 }
159
160 #[inline]
162 pub fn compression(&self) -> Compression {
163 self.compression
164 }
165
166 pub fn with_compression(self, compression: Compression) -> Self {
172 Self {
173 compression,
174 ..self
175 }
176 }
177
178 #[inline]
180 pub fn expiration_policy(&self) -> ExpirationPolicy {
181 self.expiration_policy
182 }
183
184 pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
191 Self {
192 expiration_policy,
193 ..self
194 }
195 }
196
197 pub fn scope(&self) -> Scope {
204 Scope::new(self.clone())
205 }
206
207 pub fn for_organization(&self, organization: u64) -> Scope {
209 Scope::for_organization(self.clone(), organization)
210 }
211
212 pub fn for_project(&self, organization: u64, project: u64) -> Scope {
214 Scope::for_project(self.clone(), organization, project)
215 }
216}
217
218#[derive(Debug)]
219pub(crate) struct ScopeInner {
220 usecase: Usecase,
221 scope: String,
222}
223
224impl ScopeInner {
225 #[inline]
226 pub(crate) fn usecase(&self) -> &Usecase {
227 &self.usecase
228 }
229
230 fn as_path_segment(&self) -> &str {
231 if self.scope.is_empty() {
232 "_"
233 } else {
234 &self.scope
235 }
236 }
237}
238
239impl std::fmt::Display for ScopeInner {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.write_str(&self.scope)
242 }
243}
244
245#[derive(Debug)]
251pub struct Scope(crate::Result<ScopeInner>);
252
253impl Scope {
254 pub fn new(usecase: Usecase) -> Self {
258 Self(Ok(ScopeInner {
259 usecase,
260 scope: String::new(),
261 }))
262 }
263
264 fn for_organization(usecase: Usecase, organization: u64) -> Self {
265 let scope = format!("org={}", organization);
266 Self(Ok(ScopeInner { usecase, scope }))
267 }
268
269 fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
270 let scope = format!("org={};project={}", organization, project);
271 Self(Ok(ScopeInner { usecase, scope }))
272 }
273
274 pub fn push<V>(self, key: &str, value: V) -> Self
276 where
277 V: std::fmt::Display,
278 {
279 let result = self.0.and_then(|mut inner| {
280 Self::validate_key(key)?;
281
282 let value = value.to_string();
283 Self::validate_value(&value)?;
284
285 if !inner.scope.is_empty() {
286 inner.scope.push(';');
287 }
288 inner.scope.push_str(key);
289 inner.scope.push('=');
290 inner.scope.push_str(&value);
291
292 Ok(inner)
293 });
294
295 Self(result)
296 }
297
298 const ALLOWED_CHARS: &[u8] =
302 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
303
304 fn validate_key(key: &str) -> crate::Result<()> {
306 if key.is_empty() {
307 return Err(crate::Error::InvalidScope {
308 message: "Scope key cannot be empty".to_string(),
309 });
310 }
311 if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
312 Ok(())
313 } else {
314 Err(crate::Error::InvalidScope {
315 message: format!("Invalid scope key '{key}'."),
316 })
317 }
318 }
319
320 fn validate_value(value: &str) -> crate::Result<()> {
322 if value.is_empty() {
323 return Err(crate::Error::InvalidScope {
324 message: "Scope value cannot be empty".to_string(),
325 });
326 }
327 if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
328 Ok(())
329 } else {
330 Err(crate::Error::InvalidScope {
331 message: format!("Invalid scope value '{value}'."),
332 })
333 }
334 }
335
336 pub fn session(self, client: &Client) -> crate::Result<Session> {
342 client.session(self)
343 }
344}
345
346#[derive(Debug)]
347pub(crate) struct ClientInner {
348 reqwest: reqwest::Client,
349 service_url: Url,
350 propagate_traces: bool,
351}
352
353#[derive(Debug, Clone)]
379pub struct Client {
380 inner: Arc<ClientInner>,
381}
382
383impl Client {
384 pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
393 ClientBuilder::new(service_url).build()
394 }
395
396 pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
398 ClientBuilder::new(service_url)
399 }
400
401 pub fn session(&self, scope: Scope) -> crate::Result<Session> {
407 scope.0.map(|inner| Session {
408 scope: inner.into(),
409 client: self.inner.clone(),
410 })
411 }
412}
413
414#[derive(Debug, Clone)]
418pub struct Session {
419 pub(crate) scope: Arc<ScopeInner>,
420 pub(crate) client: Arc<ClientInner>,
421}
422
423pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
425
426impl Session {
427 pub fn object_url(&self, object_key: &str) -> Url {
433 let mut url = self.client.service_url.clone();
434
435 let mut segments = url.path_segments_mut().unwrap();
438 segments
439 .push("v1")
440 .push("objects")
441 .push(&self.scope.usecase.name)
442 .push(self.scope.as_path_segment())
443 .extend(object_key.split("/"));
444 drop(segments);
445
446 url
447 }
448
449 pub(crate) fn request(
450 &self,
451 method: reqwest::Method,
452 object_key: &str,
453 ) -> reqwest::RequestBuilder {
454 let url = self.object_url(object_key);
455
456 let mut builder = self.client.reqwest.request(method, url);
457
458 if self.client.propagate_traces {
459 let trace_headers =
460 sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
461 for (header_name, value) in trace_headers.into_iter().flatten() {
462 builder = builder.header(header_name, value);
463 }
464 }
465
466 builder
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[test]
475 fn test_object_url() {
476 let client = Client::new("http://127.0.0.1:8888/").unwrap();
477 let usecase = Usecase::new("testing");
478 let scope = usecase
479 .for_project(12345, 1337)
480 .push("app_slug", "email_app");
481 let session = client.session(scope).unwrap();
482
483 assert_eq!(
484 session.object_url("foo/bar").to_string(),
485 "http://127.0.0.1:8888/v1/objects/testing/org=12345;project=1337;app_slug=email_app/foo/bar"
486 )
487 }
488
489 #[test]
490 fn test_object_url_with_base_path() {
491 let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
492 let usecase = Usecase::new("testing");
493 let scope = usecase.for_project(12345, 1337);
494 let session = client.session(scope).unwrap();
495
496 assert_eq!(
497 session.object_url("foo/bar").to_string(),
498 "http://127.0.0.1:8888/api/prefix/v1/objects/testing/org=12345;project=1337/foo/bar"
499 )
500 }
501}