Skip to main content

objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::metadata::{Compression, ExpirationPolicy};
8use objectstore_types::scope;
9use url::Url;
10
11use crate::auth::TokenProvider;
12
13const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
14
15#[derive(Debug)]
16struct ClientBuilderInner {
17    service_url: Url,
18    propagate_traces: bool,
19    reqwest_builder: reqwest::ClientBuilder,
20    token: Option<TokenProvider>,
21}
22
23impl ClientBuilderInner {
24    /// Applies defaults that cannot be overridden by the caller.
25    fn apply_defaults(mut self) -> Self {
26        self.reqwest_builder = self
27            .reqwest_builder
28            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
29            // we are dealing with de/compression ourselves:
30            .no_brotli()
31            .no_deflate()
32            .no_gzip()
33            .no_zstd();
34        self
35    }
36}
37
38/// Builder to create a [`Client`].
39#[must_use = "call .build() on this ClientBuilder to create a Client"]
40#[derive(Debug)]
41pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
42
43impl ClientBuilder {
44    /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
45    ///
46    /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
47    /// and Scope in order to create a [`Session`].
48    pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
49        let service_url = match service_url.into_url() {
50            Ok(url) => url,
51            Err(err) => return Self(Err(err.into())),
52        };
53        if service_url.cannot_be_a_base() {
54            return ClientBuilder(Err(crate::Error::InvalidUrl {
55                message: "service_url cannot be a base".to_owned(),
56            }));
57        }
58
59        let reqwest_builder = reqwest::Client::builder()
60            // We define just a connection timeout by default but do not limit reads. A connect
61            // timeout of 100ms is still very conservative, but should provide a sensible upper
62            // bound to expected request latencies.
63            .connect_timeout(Duration::from_millis(100))
64            .user_agent(USER_AGENT);
65
66        Self(Ok(ClientBuilderInner {
67            service_url,
68            propagate_traces: false,
69            reqwest_builder,
70            token: None,
71        }))
72    }
73
74    /// Changes whether the `sentry-trace` header will be sent to Objectstore
75    /// to take advantage of Sentry's distributed tracing.
76    ///
77    /// By default, tracing headers will not be propagated.
78    pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
79        if let Ok(ref mut inner) = self.0 {
80            inner.propagate_traces = propagate_traces;
81        }
82        self
83    }
84
85    /// Defines a read timeout for the [`reqwest::Client`].
86    ///
87    /// The read timeout is defined to be "between consecutive read operations", for example between
88    /// chunks of a streaming response. For more fine-grained configuration of this and other
89    /// timeouts, use [`Self::configure_reqwest`].
90    ///
91    /// By default, no read timeout and a connect timeout of 100ms is set.
92    pub fn timeout(self, timeout: Duration) -> Self {
93        let Ok(mut inner) = self.0 else { return self };
94        inner.reqwest_builder = inner.reqwest_builder.read_timeout(timeout);
95        Self(Ok(inner))
96    }
97
98    /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
99    ///
100    /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
101    pub fn configure_reqwest<F>(self, closure: F) -> Self
102    where
103        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
104    {
105        let Ok(mut inner) = self.0 else { return self };
106        inner.reqwest_builder = closure(inner.reqwest_builder);
107        Self(Ok(inner))
108    }
109
110    /// Sets the authentication token to use for requests to Objectstore.
111    ///
112    /// Accepts anything that implements `Into<TokenProvider>`:
113    /// - A [`TokenGenerator`](crate::TokenGenerator) — for internal services that have access to
114    ///   an EdDSA keypair. The generator signs a fresh JWT for each request.
115    /// - A `String` or `&str` — a pre-signed JWT, used as-is for every request.
116    pub fn token(self, token: impl Into<TokenProvider>) -> Self {
117        let Ok(mut inner) = self.0 else { return self };
118        inner.token = Some(token.into());
119        Self(Ok(inner))
120    }
121
122    /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
123    ///
124    /// # Errors
125    ///
126    /// This method fails if:
127    /// - the given `service_url` is invalid or cannot be used as a base URL
128    /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
129    ///   more information on when this can happen.
130    pub fn build(self) -> crate::Result<Client> {
131        let inner = self.0?.apply_defaults();
132
133        Ok(Client {
134            inner: Arc::new(ClientInner {
135                reqwest: inner.reqwest_builder.build()?,
136                service_url: inner.service_url,
137                propagate_traces: inner.propagate_traces,
138                token: inner.token,
139            }),
140        })
141    }
142}
143
144/// An identifier for a workload in Objectstore, along with defaults to use for all
145/// operations within that Usecase.
146///
147/// Usecases need to be statically defined in Objectstore's configuration server-side.
148/// Objectstore can make decisions based on the Usecase. For example, choosing the most
149/// suitable storage backend.
150#[derive(Debug, Clone)]
151pub struct Usecase {
152    name: Arc<str>,
153    compression: Compression,
154    expiration_policy: ExpirationPolicy,
155}
156
157impl Usecase {
158    /// Creates a new Usecase.
159    pub fn new(name: &str) -> Self {
160        Self {
161            name: name.into(),
162            compression: Compression::Zstd,
163            expiration_policy: Default::default(),
164        }
165    }
166
167    /// Returns the name of this usecase.
168    #[inline]
169    pub fn name(&self) -> &str {
170        &self.name
171    }
172
173    /// Returns the compression algorithm to use for operations within this usecase.
174    #[inline]
175    pub fn compression(&self) -> Compression {
176        self.compression
177    }
178
179    /// Sets the compression algorithm to use for operations within this usecase.
180    ///
181    /// It's still possible to override this default on each operation's builder.
182    ///
183    /// By default, [`Compression::Zstd`] is used.
184    pub fn with_compression(self, compression: Compression) -> Self {
185        Self {
186            compression,
187            ..self
188        }
189    }
190
191    /// Returns the expiration policy to use by default for operations within this usecase.
192    #[inline]
193    pub fn expiration_policy(&self) -> ExpirationPolicy {
194        self.expiration_policy
195    }
196
197    /// Sets the expiration policy to use for operations within this usecase.
198    ///
199    /// It's still possible to override this default on each operation's builder.
200    ///
201    /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
202    /// expire.
203    pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
204        Self {
205            expiration_policy,
206            ..self
207        }
208    }
209
210    /// Creates a new custom [`Scope`].
211    ///
212    /// Add parts to it using [`Scope::push`].
213    ///
214    /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
215    /// so prefer using those methods rather than creating your own custom [`Scope`].
216    pub fn scope(&self) -> Scope {
217        Scope::new(self.clone())
218    }
219
220    /// Creates a new [`Scope`] tied to the given organization.
221    pub fn for_organization(&self, organization: u64) -> Scope {
222        Scope::for_organization(self.clone(), organization)
223    }
224
225    /// Creates a new [`Scope`] tied to the given organization and project.
226    pub fn for_project(&self, organization: u64, project: u64) -> Scope {
227        Scope::for_project(self.clone(), organization, project)
228    }
229}
230
231#[derive(Debug)]
232pub(crate) struct ScopeInner {
233    usecase: Usecase,
234    scopes: scope::Scopes,
235}
236
237impl ScopeInner {
238    #[inline]
239    pub(crate) fn usecase(&self) -> &Usecase {
240        &self.usecase
241    }
242
243    #[inline]
244    pub(crate) fn scopes(&self) -> &scope::Scopes {
245        &self.scopes
246    }
247}
248
249/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
250/// [`Usecase`].
251///
252/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
253/// [`Usecase::scope`] for custom scopes.
254#[derive(Debug)]
255pub struct Scope(pub(crate) crate::Result<ScopeInner>);
256
257impl Scope {
258    /// Creates a new root-level Scope for the given usecase.
259    ///
260    /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
261    pub fn new(usecase: Usecase) -> Self {
262        Self(Ok(ScopeInner {
263            usecase,
264            scopes: scope::Scopes::empty(),
265        }))
266    }
267
268    fn for_organization(usecase: Usecase, organization: u64) -> Self {
269        Self::new(usecase).push("org", organization)
270    }
271
272    fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
273        Self::for_organization(usecase, organization).push("project", project)
274    }
275
276    /// Extends this Scope by creating a new sub-scope nested within it.
277    pub fn push<V>(self, key: &str, value: V) -> Self
278    where
279        V: std::fmt::Display,
280    {
281        let result = self.0.and_then(|mut inner| {
282            inner.scopes.push(key, value)?;
283            Ok(inner)
284        });
285
286        Self(result)
287    }
288
289    /// Creates a session for this scope using the given client.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
294    pub fn session(self, client: &Client) -> crate::Result<Session> {
295        client.session(self)
296    }
297}
298
299#[derive(Debug)]
300pub(crate) struct ClientInner {
301    reqwest: reqwest::Client,
302    service_url: Url,
303    propagate_traces: bool,
304    token: Option<TokenProvider>,
305}
306
307/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
308///
309/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
310/// and Scope in order to create a [`Session`].
311///
312/// If your Objectstore instance enforces authorization checks, you must provide
313/// authentication via [`ClientBuilder::token`]. It accepts anything that converts
314/// into a [`TokenProvider`]:
315///
316/// - **[`TokenGenerator`](crate::TokenGenerator)** — for internal services that have access to
317///   an EdDSA keypair. The generator signs a fresh JWT for each request, scoped to the
318///   specific usecase and scope being accessed.
319/// - **`String` / `&str`** — a pre-signed JWT, used as-is for every request.
320///   Use this for external services that receive a token from another source.
321///
322/// # Examples
323///
324/// Internal service with a keypair:
325///
326/// ```no_run
327/// use std::time::Duration;
328/// use objectstore_client::{Client, SecretKey, TokenGenerator, Usecase};
329/// use objectstore_types::auth::Permission;
330///
331/// # async fn example() -> objectstore_client::Result<()> {
332/// let token_generator = TokenGenerator::new(SecretKey {
333///         secret_key: "<safely inject secret key>".into(),
334///         kid: "my-service".into(),
335///     })?
336///     .expiry_seconds(30)
337///     .permissions(&[Permission::ObjectRead]);
338///
339/// let client = Client::builder("http://localhost:8888/")
340///     .timeout(Duration::from_secs(1))
341///     .propagate_traces(true)
342///     .token(token_generator)
343///     .build()?;
344/// # Ok(())
345/// # }
346/// ```
347///
348/// External service with a pre-signed JWT (obtained via
349/// [`TokenGenerator::sign`](crate::TokenGenerator::sign)):
350///
351/// ```no_run
352/// use objectstore_client::{Client, SecretKey, TokenGenerator, Usecase};
353///
354/// # fn example() -> objectstore_client::Result<()> {
355/// let scope = Usecase::new("my_app").for_project(42, 1337);
356/// let token = TokenGenerator::new(SecretKey {
357///     secret_key: "<private key>".into(),
358///     kid: "my-service".into(),
359/// })?.sign(&scope)?;
360///
361/// let client = Client::builder("http://localhost:8888/")
362///     .token(token)
363///     .build()?;
364/// # Ok(())
365/// # }
366/// ```
367#[derive(Debug, Clone)]
368pub struct Client {
369    inner: Arc<ClientInner>,
370}
371
372impl Client {
373    /// Creates a new [`Client`], configured with the given `service_url` and default
374    /// configuration.
375    ///
376    /// Use [`Client::builder`] for more fine-grained configuration.
377    ///
378    /// # Errors
379    ///
380    /// This method fails if [`ClientBuilder::build`] fails.
381    pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
382        ClientBuilder::new(service_url).build()
383    }
384
385    /// Convenience function to create a [`ClientBuilder`].
386    pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
387        ClientBuilder::new(service_url)
388    }
389
390    /// Creates a session for the given scope using this client.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
395    pub fn session(&self, scope: Scope) -> crate::Result<Session> {
396        scope.0.map(|inner| Session {
397            scope: inner.into(),
398            client: self.inner.clone(),
399        })
400    }
401}
402
403/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
404///
405/// Create a Session using [`Client::session`] or [`Scope::session`].
406#[derive(Debug, Clone)]
407pub struct Session {
408    pub(crate) scope: Arc<ScopeInner>,
409    pub(crate) client: Arc<ClientInner>,
410}
411
412/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
413pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
414
415impl Session {
416    /// Generates a GET url to the object with the given `key`.
417    ///
418    /// This can then be used by downstream services to fetch the given object.
419    /// NOTE however that the service does not strictly follow HTTP semantics,
420    /// in particular in relation to `Accept-Encoding`.
421    pub fn object_url(&self, object_key: &str) -> Url {
422        let mut url = self.client.service_url.clone();
423
424        // `path_segments_mut` can only error if the url is cannot-be-a-base,
425        // and we check that in `ClientBuilder::new`, therefore this will never panic.
426        let mut segments = url.path_segments_mut().unwrap();
427        segments
428            .push("v1")
429            .push("objects")
430            .push(&self.scope.usecase.name)
431            .push(&self.scope.scopes.as_api_path().to_string())
432            .extend(object_key.split("/"));
433        drop(segments);
434
435        url
436    }
437
438    pub(crate) fn request(
439        &self,
440        method: reqwest::Method,
441        object_key: &str,
442    ) -> crate::Result<reqwest::RequestBuilder> {
443        let url = self.object_url(object_key);
444
445        let mut builder = self.client.reqwest.request(method, url);
446
447        match &self.client.token {
448            Some(TokenProvider::Generator(generator)) => {
449                let token = generator.sign_for_scope(&self.scope)?;
450                builder = builder.bearer_auth(token);
451            }
452            Some(TokenProvider::Static(token)) => {
453                builder = builder.bearer_auth(token);
454            }
455            None => {}
456        }
457
458        if self.client.propagate_traces {
459            let trace_headers =
460                sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
461            for (header_name, value) in trace_headers.into_iter().flatten() {
462                builder = builder.header(header_name, value);
463            }
464        }
465
466        Ok(builder)
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_object_url() {
476        let client = Client::new("http://127.0.0.1:8888/").unwrap();
477        let usecase = Usecase::new("testing");
478        let scope = usecase
479            .for_project(12345, 1337)
480            .push("app_slug", "email_app");
481        let session = client.session(scope).unwrap();
482
483        assert_eq!(
484            session.object_url("foo/bar").to_string(),
485            "http://127.0.0.1:8888/v1/objects/testing/org=12345;project=1337;app_slug=email_app/foo/bar"
486        )
487    }
488
489    #[test]
490    fn test_object_url_with_base_path() {
491        let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
492        let usecase = Usecase::new("testing");
493        let scope = usecase.for_project(12345, 1337);
494        let session = client.session(scope).unwrap();
495
496        assert_eq!(
497            session.object_url("foo/bar").to_string(),
498            "http://127.0.0.1:8888/api/prefix/v1/objects/testing/org=12345;project=1337/foo/bar"
499        )
500    }
501}