objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::Compression;
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16    service_url: Url,
17    propagate_traces: bool,
18    reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22    /// Applies defaults that cannot be overridden by the caller.
23    fn apply_defaults(mut self) -> Self {
24        self.reqwest_builder = self
25            .reqwest_builder
26            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
27            // we are dealing with de/compression ourselves:
28            .no_brotli()
29            .no_deflate()
30            .no_gzip()
31            .no_zstd();
32        self
33    }
34}
35
36/// Builder to create a [`Client`].
37#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42    /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
43    ///
44    /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
45    /// and Scope in order to create a [`Session`].
46    pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47        let service_url = match service_url.into_url() {
48            Ok(url) => url,
49            Err(err) => return Self(Err(err.into())),
50        };
51
52        let reqwest_builder = reqwest::Client::builder()
53            // The read timeout "applies to each read operation", so should work fine for larger
54            // transfers that are split into multiple chunks.
55            // We define both as 500ms which is still very conservative, given that we are in the same network,
56            // and expect our backends to respond in <100ms.
57            // This can be overridden by the caller.
58            .connect_timeout(Duration::from_millis(500))
59            .read_timeout(Duration::from_millis(500))
60            .user_agent(USER_AGENT);
61
62        Self(Ok(ClientBuilderInner {
63            service_url,
64            propagate_traces: false,
65            reqwest_builder,
66        }))
67    }
68
69    /// Changes whether the `sentry-trace` header will be sent to Objectstore
70    /// to take advantage of Sentry's distributed tracing.
71    ///
72    /// By default, tracing headers will not be propagated.
73    pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
74        if let Ok(ref mut inner) = self.0 {
75            inner.propagate_traces = propagate_traces;
76        }
77        self
78    }
79
80    /// Sets both the connect and the read timeout for the [`reqwest::Client`].
81    /// For more fine-grained configuration, use [`Self::configure_reqwest`].
82    ///
83    /// By default, a connect and read timeout of 500ms is set.
84    pub fn timeout(self, timeout: Duration) -> Self {
85        let Ok(mut inner) = self.0 else { return self };
86        inner.reqwest_builder = inner
87            .reqwest_builder
88            .connect_timeout(timeout)
89            .read_timeout(timeout);
90        Self(Ok(inner))
91    }
92
93    /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
94    ///
95    /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
96    pub fn configure_reqwest<F>(self, closure: F) -> Self
97    where
98        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
99    {
100        let Ok(mut inner) = self.0 else { return self };
101        inner.reqwest_builder = closure(inner.reqwest_builder);
102        Self(Ok(inner))
103    }
104
105    /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
106    ///
107    /// # Errors
108    ///
109    /// This method fails if:
110    /// - the given `service_url` is invalid
111    /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
112    ///   more information on when this can happen.
113    pub fn build(self) -> crate::Result<Client> {
114        let inner = self.0?.apply_defaults();
115        Ok(Client {
116            inner: Arc::new(ClientInner {
117                reqwest: inner.reqwest_builder.build()?,
118                service_url: inner.service_url,
119                propagate_traces: inner.propagate_traces,
120            }),
121        })
122    }
123}
124
125/// An identifier for a workload in Objectstore, along with defaults to use for all
126/// operations within that Usecase.
127///
128/// Usecases need to be statically defined in Objectstore's configuration server-side.
129/// Objectstore can make decisions based on the Usecase. For example, choosing the most
130/// suitable storage backend.
131#[derive(Debug, Clone)]
132pub struct Usecase {
133    name: Arc<str>,
134    compression: Compression,
135    expiration_policy: ExpirationPolicy,
136}
137
138impl Usecase {
139    /// Creates a new Usecase.
140    pub fn new(name: &str) -> Self {
141        Self {
142            name: name.into(),
143            compression: Compression::Zstd,
144            expiration_policy: Default::default(),
145        }
146    }
147
148    /// Returns the name of this usecase.
149    #[inline]
150    pub fn name(&self) -> &str {
151        &self.name
152    }
153
154    /// Returns the compression algorithm to use for operations within this usecase.
155    #[inline]
156    pub fn compression(&self) -> Compression {
157        self.compression
158    }
159
160    /// Sets the compression algorithm to use for operations within this usecase.
161    ///
162    /// It's still possible to override this default on each operation's builder.
163    ///
164    /// By default, [`Compression::Zstd`] is used.
165    pub fn with_compression(self, compression: Compression) -> Self {
166        Self {
167            compression,
168            ..self
169        }
170    }
171
172    /// Returns the expiration policy to use by default for operations within this usecase.
173    #[inline]
174    pub fn expiration_policy(&self) -> ExpirationPolicy {
175        self.expiration_policy
176    }
177
178    /// Sets the expiration policy to use for operations within this usecase.
179    ///
180    /// It's still possible to override this default on each operation's builder.
181    ///
182    /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
183    /// expire.
184    pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
185        Self {
186            expiration_policy,
187            ..self
188        }
189    }
190
191    /// Creates a new custom [`Scope`].
192    ///
193    /// Add parts to it using [`Scope::push`].
194    ///
195    /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
196    /// so prefer using those methods rather than creating your own custom [`Scope`].
197    pub fn scope(&self) -> Scope {
198        Scope::new(self.clone())
199    }
200
201    /// Creates a new [`Scope`] tied to the given organization.
202    pub fn for_organization(&self, organization: u64) -> Scope {
203        Scope::for_organization(self.clone(), organization)
204    }
205
206    /// Creates a new [`Scope`] tied to the given organization and project.
207    pub fn for_project(&self, organization: u64, project: u64) -> Scope {
208        Scope::for_project(self.clone(), organization, project)
209    }
210}
211
212#[derive(Debug)]
213pub(crate) struct ScopeInner {
214    usecase: Usecase,
215    scope: String,
216}
217
218impl ScopeInner {
219    #[inline]
220    pub(crate) fn usecase(&self) -> &Usecase {
221        &self.usecase
222    }
223}
224
225impl std::fmt::Display for ScopeInner {
226    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227        f.write_str(&self.scope)
228    }
229}
230
231/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
232/// [`Usecase`].
233///
234/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
235/// [`Usecase::scope`] for custom scopes.
236#[derive(Debug)]
237pub struct Scope(crate::Result<ScopeInner>);
238
239impl Scope {
240    /// Creates a new root-level Scope for the given usecase.
241    ///
242    /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
243    pub fn new(usecase: Usecase) -> Self {
244        Self(Ok(ScopeInner {
245            usecase,
246            scope: String::new(),
247        }))
248    }
249
250    fn for_organization(usecase: Usecase, organization: u64) -> Self {
251        let scope = format!("org.{}", organization);
252        Self(Ok(ScopeInner { usecase, scope }))
253    }
254
255    fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
256        let scope = format!("org.{}/project.{}", organization, project);
257        Self(Ok(ScopeInner { usecase, scope }))
258    }
259
260    /// Extends this Scope by creating a new sub-scope nested within it.
261    pub fn push<V>(self, key: &str, value: V) -> Self
262    where
263        V: std::fmt::Display,
264    {
265        let result = self.0.and_then(|mut inner| {
266            Self::validate_key(key)?;
267
268            let value = value.to_string();
269            Self::validate_value(&value)?;
270
271            if !inner.scope.is_empty() {
272                inner.scope.push('/');
273            }
274            inner.scope.push_str(key);
275            inner.scope.push('.');
276            inner.scope.push_str(&value);
277
278            Ok(inner)
279        });
280
281        Self(result)
282    }
283
284    /// Characters allowed in a Scope's key and value.
285    /// These are the URL safe characters, except for `.` which we use as separator between
286    /// key and value of Scope components.
287    const ALLOWED_CHARS: &[u8] =
288        b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
289
290    /// Validates that a scope key contains only allowed characters and is not empty.
291    fn validate_key(key: &str) -> crate::Result<()> {
292        if key.is_empty() {
293            return Err(crate::Error::InvalidScope {
294                message: "Scope key cannot be empty".to_string(),
295            });
296        }
297        if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
298            Ok(())
299        } else {
300            Err(crate::Error::InvalidScope {
301                message: format!("Invalid scope key '{key}'."),
302            })
303        }
304    }
305
306    /// Validates that a scope value contains only allowed characters and is not empty.
307    fn validate_value(value: &str) -> crate::Result<()> {
308        if value.is_empty() {
309            return Err(crate::Error::InvalidScope {
310                message: "Scope value cannot be empty".to_string(),
311            });
312        }
313        if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
314            Ok(())
315        } else {
316            Err(crate::Error::InvalidScope {
317                message: format!("Invalid scope value '{value}'."),
318            })
319        }
320    }
321
322    /// Creates a session for this scope using the given client.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
327    pub fn session(self, client: &Client) -> crate::Result<Session> {
328        client.session(self)
329    }
330}
331
332#[derive(Debug)]
333pub(crate) struct ClientInner {
334    reqwest: reqwest::Client,
335    service_url: Url,
336    propagate_traces: bool,
337}
338
339/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
340///
341/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
342/// and Scope in order to create a [`Session`].
343///
344/// # Example
345///
346/// ```no_run
347/// use std::time::Duration;
348/// use objectstore_client::{Client, Usecase};
349///
350/// # async fn example() -> objectstore_client::Result<()> {
351/// let client = Client::builder("http://localhost:8888/")
352///     .timeout(Duration::from_secs(1))
353///     .propagate_traces(true)
354///     .build()?;
355/// let usecase = Usecase::new("my_app");
356///
357/// let session = client.session(usecase.for_project(12345, 1337))?;
358///
359/// let response = session.put("hello world").send().await?;
360///
361/// # Ok(())
362/// # }
363/// ```
364#[derive(Debug, Clone)]
365pub struct Client {
366    inner: Arc<ClientInner>,
367}
368
369impl Client {
370    /// Creates a new [`Client`], configured with the given `service_url` and default
371    /// configuration.
372    ///
373    /// Use [`Client::builder`] for more fine-grained configuration.
374    ///
375    /// # Errors
376    ///
377    /// This method fails if [`ClientBuilder::build`] fails.
378    pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
379        ClientBuilder::new(service_url).build()
380    }
381
382    /// Convenience function to create a [`ClientBuilder`].
383    pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
384        ClientBuilder::new(service_url)
385    }
386
387    /// Creates a session for the given scope using this client.
388    ///
389    /// # Errors
390    ///
391    /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
392    pub fn session(&self, scope: Scope) -> crate::Result<Session> {
393        scope.0.map(|inner| Session {
394            scope: inner.into(),
395            client: self.inner.clone(),
396        })
397    }
398}
399
400/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
401///
402/// Create a Session using [`Client::session`] or [`Scope::session`].
403#[derive(Debug, Clone)]
404pub struct Session {
405    pub(crate) scope: Arc<ScopeInner>,
406    pub(crate) client: Arc<ClientInner>,
407}
408
409/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
410pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
411
412impl Session {
413    /// Generates a GET url to the object with the given `key`.
414    ///
415    /// This can then be used by downstream services to fetch the given object.
416    /// NOTE however that the service does not strictly follow HTTP semantics,
417    /// in particular in relation to `Accept-Encoding`.
418    pub fn object_url(&self, object_key: &str) -> Url {
419        let mut url = self.client.service_url.clone();
420        let path = format!(
421            "v1/{}/{}/objects/{object_key}",
422            self.scope.usecase.name, self.scope.scope
423        );
424        url.set_path(&path);
425        url
426    }
427
428    pub(crate) fn request(
429        &self,
430        method: reqwest::Method,
431        object_key: &str,
432    ) -> reqwest::RequestBuilder {
433        let url = self.object_url(object_key);
434
435        let mut builder = self.client.reqwest.request(method, url);
436
437        if self.client.propagate_traces {
438            let trace_headers =
439                sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
440            for (header_name, value) in trace_headers.into_iter().flatten() {
441                builder = builder.header(header_name, value);
442            }
443        }
444
445        builder
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452
453    #[test]
454    fn test_object_url() {
455        let client = Client::new("http://127.0.0.1:8888/").unwrap();
456        let usecase = Usecase::new("testing");
457        let scope = usecase
458            .for_project(12345, 1337)
459            .push("app_slug", "email_app");
460        let session = client.session(scope).unwrap();
461
462        assert_eq!(
463            session.object_url("foo/bar").to_string(),
464            "http://127.0.0.1:8888/v1/testing/org.12345/project.1337/app_slug.email_app/objects/foo/bar"
465        )
466    }
467}