objectstore_client/client.rs
1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::metadata::{Compression, ExpirationPolicy};
8use objectstore_types::scope;
9use url::Url;
10
11use crate::auth::TokenGenerator;
12
13const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
14
15#[derive(Debug)]
16struct ClientBuilderInner {
17 service_url: Url,
18 propagate_traces: bool,
19 reqwest_builder: reqwest::ClientBuilder,
20 token_generator: Option<TokenGenerator>,
21}
22
23impl ClientBuilderInner {
24 /// Applies defaults that cannot be overridden by the caller.
25 fn apply_defaults(mut self) -> Self {
26 self.reqwest_builder = self
27 .reqwest_builder
28 // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
29 // we are dealing with de/compression ourselves:
30 .no_brotli()
31 .no_deflate()
32 .no_gzip()
33 .no_zstd();
34 self
35 }
36}
37
38/// Builder to create a [`Client`].
39#[must_use = "call .build() on this ClientBuilder to create a Client"]
40#[derive(Debug)]
41pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
42
43impl ClientBuilder {
44 /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
45 ///
46 /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
47 /// and Scope in order to create a [`Session`].
48 pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
49 let service_url = match service_url.into_url() {
50 Ok(url) => url,
51 Err(err) => return Self(Err(err.into())),
52 };
53 if service_url.cannot_be_a_base() {
54 return ClientBuilder(Err(crate::Error::InvalidUrl {
55 message: "service_url cannot be a base".to_owned(),
56 }));
57 }
58
59 let reqwest_builder = reqwest::Client::builder()
60 // We define just a connection timeout by default but do not limit reads. A connect
61 // timeout of 100ms is still very conservative, but should provide a sensible upper
62 // bound to expected request latencies.
63 .connect_timeout(Duration::from_millis(100))
64 .user_agent(USER_AGENT);
65
66 Self(Ok(ClientBuilderInner {
67 service_url,
68 propagate_traces: false,
69 reqwest_builder,
70 token_generator: None,
71 }))
72 }
73
74 /// Changes whether the `sentry-trace` header will be sent to Objectstore
75 /// to take advantage of Sentry's distributed tracing.
76 ///
77 /// By default, tracing headers will not be propagated.
78 pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
79 if let Ok(ref mut inner) = self.0 {
80 inner.propagate_traces = propagate_traces;
81 }
82 self
83 }
84
85 /// Defines a read timeout for the [`reqwest::Client`].
86 ///
87 /// The read timeout is defined to be "between consecutive read operations", for example between
88 /// chunks of a streaming response. For more fine-grained configuration of this and other
89 /// timeouts, use [`Self::configure_reqwest`].
90 ///
91 /// By default, no read timeout and a connect timeout of 100ms is set.
92 pub fn timeout(self, timeout: Duration) -> Self {
93 let Ok(mut inner) = self.0 else { return self };
94 inner.reqwest_builder = inner.reqwest_builder.read_timeout(timeout);
95 Self(Ok(inner))
96 }
97
98 /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
99 ///
100 /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
101 pub fn configure_reqwest<F>(self, closure: F) -> Self
102 where
103 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
104 {
105 let Ok(mut inner) = self.0 else { return self };
106 inner.reqwest_builder = closure(inner.reqwest_builder);
107 Self(Ok(inner))
108 }
109
110 /// Sets a [`TokenGenerator`] that will be used to sign authorization tokens before
111 /// sending requests to Objectstore.
112 pub fn token_generator(self, token_generator: TokenGenerator) -> Self {
113 let Ok(mut inner) = self.0 else { return self };
114 inner.token_generator = Some(token_generator);
115 Self(Ok(inner))
116 }
117
118 /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
119 ///
120 /// # Errors
121 ///
122 /// This method fails if:
123 /// - the given `service_url` is invalid or cannot be used as a base URL
124 /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
125 /// more information on when this can happen.
126 pub fn build(self) -> crate::Result<Client> {
127 let inner = self.0?.apply_defaults();
128
129 Ok(Client {
130 inner: Arc::new(ClientInner {
131 reqwest: inner.reqwest_builder.build()?,
132 service_url: inner.service_url,
133 propagate_traces: inner.propagate_traces,
134 token_generator: inner.token_generator,
135 }),
136 })
137 }
138}
139
140/// An identifier for a workload in Objectstore, along with defaults to use for all
141/// operations within that Usecase.
142///
143/// Usecases need to be statically defined in Objectstore's configuration server-side.
144/// Objectstore can make decisions based on the Usecase. For example, choosing the most
145/// suitable storage backend.
146#[derive(Debug, Clone)]
147pub struct Usecase {
148 name: Arc<str>,
149 compression: Compression,
150 expiration_policy: ExpirationPolicy,
151}
152
153impl Usecase {
154 /// Creates a new Usecase.
155 pub fn new(name: &str) -> Self {
156 Self {
157 name: name.into(),
158 compression: Compression::Zstd,
159 expiration_policy: Default::default(),
160 }
161 }
162
163 /// Returns the name of this usecase.
164 #[inline]
165 pub fn name(&self) -> &str {
166 &self.name
167 }
168
169 /// Returns the compression algorithm to use for operations within this usecase.
170 #[inline]
171 pub fn compression(&self) -> Compression {
172 self.compression
173 }
174
175 /// Sets the compression algorithm to use for operations within this usecase.
176 ///
177 /// It's still possible to override this default on each operation's builder.
178 ///
179 /// By default, [`Compression::Zstd`] is used.
180 pub fn with_compression(self, compression: Compression) -> Self {
181 Self {
182 compression,
183 ..self
184 }
185 }
186
187 /// Returns the expiration policy to use by default for operations within this usecase.
188 #[inline]
189 pub fn expiration_policy(&self) -> ExpirationPolicy {
190 self.expiration_policy
191 }
192
193 /// Sets the expiration policy to use for operations within this usecase.
194 ///
195 /// It's still possible to override this default on each operation's builder.
196 ///
197 /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
198 /// expire.
199 pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
200 Self {
201 expiration_policy,
202 ..self
203 }
204 }
205
206 /// Creates a new custom [`Scope`].
207 ///
208 /// Add parts to it using [`Scope::push`].
209 ///
210 /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
211 /// so prefer using those methods rather than creating your own custom [`Scope`].
212 pub fn scope(&self) -> Scope {
213 Scope::new(self.clone())
214 }
215
216 /// Creates a new [`Scope`] tied to the given organization.
217 pub fn for_organization(&self, organization: u64) -> Scope {
218 Scope::for_organization(self.clone(), organization)
219 }
220
221 /// Creates a new [`Scope`] tied to the given organization and project.
222 pub fn for_project(&self, organization: u64, project: u64) -> Scope {
223 Scope::for_project(self.clone(), organization, project)
224 }
225}
226
227#[derive(Debug)]
228pub(crate) struct ScopeInner {
229 usecase: Usecase,
230 scopes: scope::Scopes,
231}
232
233impl ScopeInner {
234 #[inline]
235 pub(crate) fn usecase(&self) -> &Usecase {
236 &self.usecase
237 }
238
239 #[inline]
240 pub(crate) fn scopes(&self) -> &scope::Scopes {
241 &self.scopes
242 }
243}
244
245/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
246/// [`Usecase`].
247///
248/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
249/// [`Usecase::scope`] for custom scopes.
250#[derive(Debug)]
251pub struct Scope(crate::Result<ScopeInner>);
252
253impl Scope {
254 /// Creates a new root-level Scope for the given usecase.
255 ///
256 /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
257 pub fn new(usecase: Usecase) -> Self {
258 Self(Ok(ScopeInner {
259 usecase,
260 scopes: scope::Scopes::empty(),
261 }))
262 }
263
264 fn for_organization(usecase: Usecase, organization: u64) -> Self {
265 Self::new(usecase).push("org", organization)
266 }
267
268 fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
269 Self::for_organization(usecase, organization).push("project", project)
270 }
271
272 /// Extends this Scope by creating a new sub-scope nested within it.
273 pub fn push<V>(self, key: &str, value: V) -> Self
274 where
275 V: std::fmt::Display,
276 {
277 let result = self.0.and_then(|mut inner| {
278 inner.scopes.push(key, value)?;
279 Ok(inner)
280 });
281
282 Self(result)
283 }
284
285 /// Creates a session for this scope using the given client.
286 ///
287 /// # Errors
288 ///
289 /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
290 pub fn session(self, client: &Client) -> crate::Result<Session> {
291 client.session(self)
292 }
293}
294
295#[derive(Debug)]
296pub(crate) struct ClientInner {
297 reqwest: reqwest::Client,
298 service_url: Url,
299 propagate_traces: bool,
300 token_generator: Option<TokenGenerator>,
301}
302
303/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
304///
305/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
306/// and Scope in order to create a [`Session`].
307///
308/// If your Objectstore instance enforces authorization checks, you must provide a
309/// [`TokenGenerator`] on creation.
310///
311/// # Example
312///
313/// ```no_run
314/// use std::time::Duration;
315/// use objectstore_client::{Client, SecretKey, TokenGenerator, Usecase};
316/// use objectstore_types::auth::Permission;
317///
318/// # async fn example() -> objectstore_client::Result<()> {
319/// let token_generator = TokenGenerator::new(SecretKey {
320/// secret_key: "<safely inject secret key>".into(),
321/// kid: "my-service".into(),
322/// })?
323/// .expiry_seconds(30)
324/// .permissions(&[Permission::ObjectRead]);
325///
326/// let client = Client::builder("http://localhost:8888/")
327/// .timeout(Duration::from_secs(1))
328/// .propagate_traces(true)
329/// .token_generator(token_generator)
330/// .build()?;
331///
332/// let session = Usecase::new("my_app")
333/// .for_project(12345, 1337)
334/// .session(&client)?;
335///
336/// let response = session.put("hello world").send().await?;
337///
338/// # Ok(())
339/// # }
340/// ```
341#[derive(Debug, Clone)]
342pub struct Client {
343 inner: Arc<ClientInner>,
344}
345
346impl Client {
347 /// Creates a new [`Client`], configured with the given `service_url` and default
348 /// configuration.
349 ///
350 /// Use [`Client::builder`] for more fine-grained configuration.
351 ///
352 /// # Errors
353 ///
354 /// This method fails if [`ClientBuilder::build`] fails.
355 pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
356 ClientBuilder::new(service_url).build()
357 }
358
359 /// Convenience function to create a [`ClientBuilder`].
360 pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
361 ClientBuilder::new(service_url)
362 }
363
364 /// Creates a session for the given scope using this client.
365 ///
366 /// # Errors
367 ///
368 /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
369 pub fn session(&self, scope: Scope) -> crate::Result<Session> {
370 scope.0.map(|inner| Session {
371 scope: inner.into(),
372 client: self.inner.clone(),
373 })
374 }
375}
376
377/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
378///
379/// Create a Session using [`Client::session`] or [`Scope::session`].
380#[derive(Debug, Clone)]
381pub struct Session {
382 pub(crate) scope: Arc<ScopeInner>,
383 pub(crate) client: Arc<ClientInner>,
384}
385
386/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
387pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
388
389impl Session {
390 /// Generates a GET url to the object with the given `key`.
391 ///
392 /// This can then be used by downstream services to fetch the given object.
393 /// NOTE however that the service does not strictly follow HTTP semantics,
394 /// in particular in relation to `Accept-Encoding`.
395 pub fn object_url(&self, object_key: &str) -> Url {
396 let mut url = self.client.service_url.clone();
397
398 // `path_segments_mut` can only error if the url is cannot-be-a-base,
399 // and we check that in `ClientBuilder::new`, therefore this will never panic.
400 let mut segments = url.path_segments_mut().unwrap();
401 segments
402 .push("v1")
403 .push("objects")
404 .push(&self.scope.usecase.name)
405 .push(&self.scope.scopes.as_api_path().to_string())
406 .extend(object_key.split("/"));
407 drop(segments);
408
409 url
410 }
411
412 pub(crate) fn request(
413 &self,
414 method: reqwest::Method,
415 object_key: &str,
416 ) -> crate::Result<reqwest::RequestBuilder> {
417 let url = self.object_url(object_key);
418
419 let mut builder = self.client.reqwest.request(method, url);
420
421 if let Some(token_generator) = &self.client.token_generator {
422 let token = token_generator.sign_for_scope(&self.scope)?;
423 builder = builder.bearer_auth(token);
424 }
425
426 if self.client.propagate_traces {
427 let trace_headers =
428 sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
429 for (header_name, value) in trace_headers.into_iter().flatten() {
430 builder = builder.header(header_name, value);
431 }
432 }
433
434 Ok(builder)
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 #[test]
443 fn test_object_url() {
444 let client = Client::new("http://127.0.0.1:8888/").unwrap();
445 let usecase = Usecase::new("testing");
446 let scope = usecase
447 .for_project(12345, 1337)
448 .push("app_slug", "email_app");
449 let session = client.session(scope).unwrap();
450
451 assert_eq!(
452 session.object_url("foo/bar").to_string(),
453 "http://127.0.0.1:8888/v1/objects/testing/org=12345;project=1337;app_slug=email_app/foo/bar"
454 )
455 }
456
457 #[test]
458 fn test_object_url_with_base_path() {
459 let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
460 let usecase = Usecase::new("testing");
461 let scope = usecase.for_project(12345, 1337);
462 let session = client.session(scope).unwrap();
463
464 assert_eq!(
465 session.object_url("foo/bar").to_string(),
466 "http://127.0.0.1:8888/api/prefix/v1/objects/testing/org=12345;project=1337/foo/bar"
467 )
468 }
469}