objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::Compression;
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16    service_url: Url,
17    propagate_traces: bool,
18    reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22    /// Applies defaults that cannot be overridden by the caller.
23    fn apply_defaults(mut self) -> Self {
24        self.reqwest_builder = self
25            .reqwest_builder
26            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
27            // we are dealing with de/compression ourselves:
28            .no_brotli()
29            .no_deflate()
30            .no_gzip()
31            .no_zstd();
32        self
33    }
34}
35
36/// Builder to create a [`Client`].
37#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42    /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
43    ///
44    /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
45    /// and Scope in order to create a [`Session`].
46    pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47        let service_url = match service_url.into_url() {
48            Ok(url) => url,
49            Err(err) => return Self(Err(err.into())),
50        };
51        if service_url.cannot_be_a_base() {
52            return ClientBuilder(Err(crate::Error::InvalidUrl {
53                message: "service_url cannot be a base".to_owned(),
54            }));
55        }
56
57        let reqwest_builder = reqwest::Client::builder()
58            // The read timeout "applies to each read operation", so should work fine for larger
59            // transfers that are split into multiple chunks.
60            // We define both as 500ms which is still very conservative, given that we are in the same network,
61            // and expect our backends to respond in <100ms.
62            // This can be overridden by the caller.
63            .connect_timeout(Duration::from_millis(500))
64            .read_timeout(Duration::from_millis(500))
65            .user_agent(USER_AGENT);
66
67        Self(Ok(ClientBuilderInner {
68            service_url,
69            propagate_traces: false,
70            reqwest_builder,
71        }))
72    }
73
74    /// Changes whether the `sentry-trace` header will be sent to Objectstore
75    /// to take advantage of Sentry's distributed tracing.
76    ///
77    /// By default, tracing headers will not be propagated.
78    pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
79        if let Ok(ref mut inner) = self.0 {
80            inner.propagate_traces = propagate_traces;
81        }
82        self
83    }
84
85    /// Sets both the connect and the read timeout for the [`reqwest::Client`].
86    /// For more fine-grained configuration, use [`Self::configure_reqwest`].
87    ///
88    /// By default, a connect and read timeout of 500ms is set.
89    pub fn timeout(self, timeout: Duration) -> Self {
90        let Ok(mut inner) = self.0 else { return self };
91        inner.reqwest_builder = inner
92            .reqwest_builder
93            .connect_timeout(timeout)
94            .read_timeout(timeout);
95        Self(Ok(inner))
96    }
97
98    /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
99    ///
100    /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
101    pub fn configure_reqwest<F>(self, closure: F) -> Self
102    where
103        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
104    {
105        let Ok(mut inner) = self.0 else { return self };
106        inner.reqwest_builder = closure(inner.reqwest_builder);
107        Self(Ok(inner))
108    }
109
110    /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
111    ///
112    /// # Errors
113    ///
114    /// This method fails if:
115    /// - the given `service_url` is invalid or cannot be used as a base URL
116    /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
117    ///   more information on when this can happen.
118    pub fn build(self) -> crate::Result<Client> {
119        let inner = self.0?.apply_defaults();
120
121        Ok(Client {
122            inner: Arc::new(ClientInner {
123                reqwest: inner.reqwest_builder.build()?,
124                service_url: inner.service_url,
125                propagate_traces: inner.propagate_traces,
126            }),
127        })
128    }
129}
130
131/// An identifier for a workload in Objectstore, along with defaults to use for all
132/// operations within that Usecase.
133///
134/// Usecases need to be statically defined in Objectstore's configuration server-side.
135/// Objectstore can make decisions based on the Usecase. For example, choosing the most
136/// suitable storage backend.
137#[derive(Debug, Clone)]
138pub struct Usecase {
139    name: Arc<str>,
140    compression: Compression,
141    expiration_policy: ExpirationPolicy,
142}
143
144impl Usecase {
145    /// Creates a new Usecase.
146    pub fn new(name: &str) -> Self {
147        Self {
148            name: name.into(),
149            compression: Compression::Zstd,
150            expiration_policy: Default::default(),
151        }
152    }
153
154    /// Returns the name of this usecase.
155    #[inline]
156    pub fn name(&self) -> &str {
157        &self.name
158    }
159
160    /// Returns the compression algorithm to use for operations within this usecase.
161    #[inline]
162    pub fn compression(&self) -> Compression {
163        self.compression
164    }
165
166    /// Sets the compression algorithm to use for operations within this usecase.
167    ///
168    /// It's still possible to override this default on each operation's builder.
169    ///
170    /// By default, [`Compression::Zstd`] is used.
171    pub fn with_compression(self, compression: Compression) -> Self {
172        Self {
173            compression,
174            ..self
175        }
176    }
177
178    /// Returns the expiration policy to use by default for operations within this usecase.
179    #[inline]
180    pub fn expiration_policy(&self) -> ExpirationPolicy {
181        self.expiration_policy
182    }
183
184    /// Sets the expiration policy to use for operations within this usecase.
185    ///
186    /// It's still possible to override this default on each operation's builder.
187    ///
188    /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
189    /// expire.
190    pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
191        Self {
192            expiration_policy,
193            ..self
194        }
195    }
196
197    /// Creates a new custom [`Scope`].
198    ///
199    /// Add parts to it using [`Scope::push`].
200    ///
201    /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
202    /// so prefer using those methods rather than creating your own custom [`Scope`].
203    pub fn scope(&self) -> Scope {
204        Scope::new(self.clone())
205    }
206
207    /// Creates a new [`Scope`] tied to the given organization.
208    pub fn for_organization(&self, organization: u64) -> Scope {
209        Scope::for_organization(self.clone(), organization)
210    }
211
212    /// Creates a new [`Scope`] tied to the given organization and project.
213    pub fn for_project(&self, organization: u64, project: u64) -> Scope {
214        Scope::for_project(self.clone(), organization, project)
215    }
216}
217
218#[derive(Debug)]
219pub(crate) struct ScopeInner {
220    usecase: Usecase,
221    scope: String,
222}
223
224impl ScopeInner {
225    #[inline]
226    pub(crate) fn usecase(&self) -> &Usecase {
227        &self.usecase
228    }
229
230    fn as_path_segment(&self) -> &str {
231        if self.scope.is_empty() {
232            "_"
233        } else {
234            &self.scope
235        }
236    }
237}
238
239impl std::fmt::Display for ScopeInner {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.write_str(&self.scope)
242    }
243}
244
245/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
246/// [`Usecase`].
247///
248/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
249/// [`Usecase::scope`] for custom scopes.
250#[derive(Debug)]
251pub struct Scope(crate::Result<ScopeInner>);
252
253impl Scope {
254    /// Creates a new root-level Scope for the given usecase.
255    ///
256    /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
257    pub fn new(usecase: Usecase) -> Self {
258        Self(Ok(ScopeInner {
259            usecase,
260            scope: String::new(),
261        }))
262    }
263
264    fn for_organization(usecase: Usecase, organization: u64) -> Self {
265        let scope = format!("org={}", organization);
266        Self(Ok(ScopeInner { usecase, scope }))
267    }
268
269    fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
270        let scope = format!("org={};project={}", organization, project);
271        Self(Ok(ScopeInner { usecase, scope }))
272    }
273
274    /// Extends this Scope by creating a new sub-scope nested within it.
275    pub fn push<V>(self, key: &str, value: V) -> Self
276    where
277        V: std::fmt::Display,
278    {
279        let result = self.0.and_then(|mut inner| {
280            Self::validate_key(key)?;
281
282            let value = value.to_string();
283            Self::validate_value(&value)?;
284
285            if !inner.scope.is_empty() {
286                inner.scope.push(';');
287            }
288            inner.scope.push_str(key);
289            inner.scope.push('=');
290            inner.scope.push_str(&value);
291
292            Ok(inner)
293        });
294
295        Self(result)
296    }
297
298    /// Characters allowed in a Scope's key and value.
299    /// These are the URL safe characters, except for `.` which we use as separator between
300    /// key and value of Scope components.
301    const ALLOWED_CHARS: &[u8] =
302        b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
303
304    /// Validates that a scope key contains only allowed characters and is not empty.
305    fn validate_key(key: &str) -> crate::Result<()> {
306        if key.is_empty() {
307            return Err(crate::Error::InvalidScope {
308                message: "Scope key cannot be empty".to_string(),
309            });
310        }
311        if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
312            Ok(())
313        } else {
314            Err(crate::Error::InvalidScope {
315                message: format!("Invalid scope key '{key}'."),
316            })
317        }
318    }
319
320    /// Validates that a scope value contains only allowed characters and is not empty.
321    fn validate_value(value: &str) -> crate::Result<()> {
322        if value.is_empty() {
323            return Err(crate::Error::InvalidScope {
324                message: "Scope value cannot be empty".to_string(),
325            });
326        }
327        if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
328            Ok(())
329        } else {
330            Err(crate::Error::InvalidScope {
331                message: format!("Invalid scope value '{value}'."),
332            })
333        }
334    }
335
336    /// Creates a session for this scope using the given client.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
341    pub fn session(self, client: &Client) -> crate::Result<Session> {
342        client.session(self)
343    }
344}
345
346#[derive(Debug)]
347pub(crate) struct ClientInner {
348    reqwest: reqwest::Client,
349    service_url: Url,
350    propagate_traces: bool,
351}
352
353/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
354///
355/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
356/// and Scope in order to create a [`Session`].
357///
358/// # Example
359///
360/// ```no_run
361/// use std::time::Duration;
362/// use objectstore_client::{Client, Usecase};
363///
364/// # async fn example() -> objectstore_client::Result<()> {
365/// let client = Client::builder("http://localhost:8888/")
366///     .timeout(Duration::from_secs(1))
367///     .propagate_traces(true)
368///     .build()?;
369/// let usecase = Usecase::new("my_app");
370///
371/// let session = client.session(usecase.for_project(12345, 1337))?;
372///
373/// let response = session.put("hello world").send().await?;
374///
375/// # Ok(())
376/// # }
377/// ```
378#[derive(Debug, Clone)]
379pub struct Client {
380    inner: Arc<ClientInner>,
381}
382
383impl Client {
384    /// Creates a new [`Client`], configured with the given `service_url` and default
385    /// configuration.
386    ///
387    /// Use [`Client::builder`] for more fine-grained configuration.
388    ///
389    /// # Errors
390    ///
391    /// This method fails if [`ClientBuilder::build`] fails.
392    pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
393        ClientBuilder::new(service_url).build()
394    }
395
396    /// Convenience function to create a [`ClientBuilder`].
397    pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
398        ClientBuilder::new(service_url)
399    }
400
401    /// Creates a session for the given scope using this client.
402    ///
403    /// # Errors
404    ///
405    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
406    pub fn session(&self, scope: Scope) -> crate::Result<Session> {
407        scope.0.map(|inner| Session {
408            scope: inner.into(),
409            client: self.inner.clone(),
410        })
411    }
412}
413
414/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
415///
416/// Create a Session using [`Client::session`] or [`Scope::session`].
417#[derive(Debug, Clone)]
418pub struct Session {
419    pub(crate) scope: Arc<ScopeInner>,
420    pub(crate) client: Arc<ClientInner>,
421}
422
423/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
424pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
425
426impl Session {
427    /// Generates a GET url to the object with the given `key`.
428    ///
429    /// This can then be used by downstream services to fetch the given object.
430    /// NOTE however that the service does not strictly follow HTTP semantics,
431    /// in particular in relation to `Accept-Encoding`.
432    pub fn object_url(&self, object_key: &str) -> Url {
433        let mut url = self.client.service_url.clone();
434
435        // `path_segments_mut` can only error if the url is cannot-be-a-base,
436        // and we check that in `ClientBuilder::new`, therefore this will never panic.
437        let mut segments = url.path_segments_mut().unwrap();
438        segments
439            .push("v1")
440            .push("objects")
441            .push(&self.scope.usecase.name)
442            .push(self.scope.as_path_segment())
443            .extend(object_key.split("/"));
444        drop(segments);
445
446        url
447    }
448
449    pub(crate) fn request(
450        &self,
451        method: reqwest::Method,
452        object_key: &str,
453    ) -> reqwest::RequestBuilder {
454        let url = self.object_url(object_key);
455
456        let mut builder = self.client.reqwest.request(method, url);
457
458        if self.client.propagate_traces {
459            let trace_headers =
460                sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
461            for (header_name, value) in trace_headers.into_iter().flatten() {
462                builder = builder.header(header_name, value);
463            }
464        }
465
466        builder
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[test]
475    fn test_object_url() {
476        let client = Client::new("http://127.0.0.1:8888/").unwrap();
477        let usecase = Usecase::new("testing");
478        let scope = usecase
479            .for_project(12345, 1337)
480            .push("app_slug", "email_app");
481        let session = client.session(scope).unwrap();
482
483        assert_eq!(
484            session.object_url("foo/bar").to_string(),
485            "http://127.0.0.1:8888/v1/objects/testing/org=12345;project=1337;app_slug=email_app/foo/bar"
486        )
487    }
488
489    #[test]
490    fn test_object_url_with_base_path() {
491        let client = Client::new("http://127.0.0.1:8888/api/prefix").unwrap();
492        let usecase = Usecase::new("testing");
493        let scope = usecase.for_project(12345, 1337);
494        let session = client.session(scope).unwrap();
495
496        assert_eq!(
497            session.object_url("foo/bar").to_string(),
498            "http://127.0.0.1:8888/api/prefix/v1/objects/testing/org=12345;project=1337/foo/bar"
499        )
500    }
501}