objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16    service_url: Url,
17    propagate_traces: bool,
18    reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22    /// Applies defaults that cannot be overridden by the caller.
23    fn apply_defaults(mut self) -> Self {
24        self.reqwest_builder = self
25            .reqwest_builder
26            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
27            // we are dealing with de/compression ourselves:
28            .no_brotli()
29            .no_deflate()
30            .no_gzip()
31            .no_zstd();
32        self
33    }
34}
35
36/// Builder to create a [`Client`].
37#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42    /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
43    ///
44    /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
45    /// and Scope in order to create a [`Session`].
46    pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47        let service_url = match service_url.into_url() {
48            Ok(url) => url,
49            Err(err) => return Self(Err(err.into())),
50        };
51
52        let reqwest_builder = reqwest::Client::builder()
53            // The read timeout "applies to each read operation", so should work fine for larger
54            // transfers that are split into multiple chunks.
55            // We define both as 500ms which is still very conservative, given that we are in the same network,
56            // and expect our backends to respond in <100ms.
57            // This can be overridden by the caller.
58            .connect_timeout(Duration::from_millis(500))
59            .read_timeout(Duration::from_millis(500))
60            .user_agent(USER_AGENT);
61
62        Self(Ok(ClientBuilderInner {
63            service_url,
64            propagate_traces: false,
65            reqwest_builder,
66        }))
67    }
68
69    /// Changes whether the `sentry-trace` header will be sent to Objectstore
70    /// to take advantage of Sentry's distributed tracing.
71    pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
72        if let Ok(ref mut inner) = self.0 {
73            inner.propagate_traces = propagate_traces;
74        }
75        self
76    }
77
78    /// Sets both the connect and the read timeout for the [`reqwest::Client`].
79    /// For more fine-grained configuration, use [`Self::configure_reqwest`].
80    pub fn timeout(self, timeout: Duration) -> Self {
81        let Ok(mut inner) = self.0 else { return self };
82        inner.reqwest_builder = inner
83            .reqwest_builder
84            .connect_timeout(timeout)
85            .read_timeout(timeout);
86        Self(Ok(inner))
87    }
88
89    /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
90    pub fn configure_reqwest<F>(self, closure: F) -> Self
91    where
92        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
93    {
94        let Ok(mut inner) = self.0 else { return self };
95        inner.reqwest_builder = closure(inner.reqwest_builder);
96        Self(Ok(inner))
97    }
98
99    /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
100    ///
101    /// # Errors
102    ///
103    /// This method fails if:
104    /// - the given `service_url` is invalid
105    /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
106    ///   more information on when this can happen.
107    pub fn build(self) -> crate::Result<Client> {
108        let inner = self.0?.apply_defaults();
109        Ok(Client {
110            inner: Arc::new(ClientInner {
111                reqwest: inner.reqwest_builder.build()?,
112                service_url: inner.service_url,
113                propagate_traces: inner.propagate_traces,
114            }),
115        })
116    }
117}
118
119/// An identifier for a workload in Objectstore, along with defaults to use for all
120/// operations within that Usecase.
121///
122/// Usecases need to be statically defined in Objectstore's configuration server-side.
123/// Objectstore can make decisions based on the Usecase. For example, choosing the most
124/// suitable storage backend.
125#[derive(Debug, Clone)]
126pub struct Usecase {
127    name: Arc<str>,
128    compression: Compression,
129    expiration: ExpirationPolicy,
130}
131
132impl Usecase {
133    /// Creates a new Usecase.
134    pub fn new(name: &str) -> Self {
135        Self {
136            name: name.into(),
137            compression: Compression::Zstd,
138            expiration: Default::default(),
139        }
140    }
141
142    /// TODO: document
143    #[inline]
144    pub fn name(&self) -> &str {
145        &self.name
146    }
147
148    /// TODO: document
149    #[inline]
150    pub fn compression(&self) -> Compression {
151        self.compression
152    }
153
154    /// TODO: document
155    pub fn with_compression(self, compression: Compression) -> Self {
156        Self {
157            compression,
158            ..self
159        }
160    }
161
162    /// TODO: document
163    #[inline]
164    pub fn expiration(&self) -> ExpirationPolicy {
165        self.expiration
166    }
167
168    /// TODO: document
169    pub fn with_expiration(self, expiration: ExpirationPolicy) -> Self {
170        Self { expiration, ..self }
171    }
172
173    /// TODO: document
174    pub fn scope(&self) -> Scope {
175        Scope::new(self.clone())
176    }
177
178    /// TODO: document
179    pub fn for_organization(&self, organization: u64) -> Scope {
180        Scope::for_organization(self.clone(), organization)
181    }
182
183    /// TODO: document
184    pub fn for_project(&self, organization: u64, project: u64) -> Scope {
185        Scope::for_project(self.clone(), organization, project)
186    }
187}
188
189#[derive(Debug)]
190pub(crate) struct ScopeInner {
191    usecase: Usecase,
192    scope: String,
193}
194
195impl ScopeInner {
196    #[inline]
197    pub(crate) fn usecase(&self) -> &Usecase {
198        &self.usecase
199    }
200}
201
202impl std::fmt::Display for ScopeInner {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.write_str(&self.scope)
205    }
206}
207
208/// TODO: document
209#[derive(Debug)]
210pub struct Scope(crate::Result<ScopeInner>);
211
212impl Scope {
213    /// TODO: document
214    pub fn new(usecase: Usecase) -> Self {
215        Self(Ok(ScopeInner {
216            usecase,
217            scope: String::new(),
218        }))
219    }
220
221    fn for_organization(usecase: Usecase, organization: u64) -> Self {
222        let scope = format!("org.{}", organization);
223        Self(Ok(ScopeInner { usecase, scope }))
224    }
225
226    fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
227        let scope = format!("org.{}/project.{}", organization, project);
228        Self(Ok(ScopeInner { usecase, scope }))
229    }
230
231    /// TODO: document
232    pub fn push<V>(self, key: &str, value: &V) -> Self
233    where
234        V: std::fmt::Display,
235    {
236        let result = self.0.and_then(|mut inner| {
237            Self::validate_key(key)?;
238
239            let value = value.to_string();
240            Self::validate_value(&value)?;
241
242            if !inner.scope.is_empty() {
243                inner.scope.push('/');
244            }
245            inner.scope.push_str(key);
246            inner.scope.push('.');
247            inner.scope.push_str(&value);
248
249            Ok(inner)
250        });
251
252        Self(result)
253    }
254
255    /// Characters allowed in a Scope's key and value.
256    /// These are the URL safe characters, except for `.` which we use as separator between
257    /// key and value of Scope components.
258    const ALLOWED_CHARS: &[u8] =
259        b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
260
261    /// Validates that a scope key contains only allowed characters and is not empty.
262    fn validate_key(key: &str) -> crate::Result<()> {
263        if key.is_empty() {
264            return Err(crate::Error::InvalidScope {
265                message: "Scope key cannot be empty".to_string(),
266            });
267        }
268        if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
269            Ok(())
270        } else {
271            Err(crate::Error::InvalidScope {
272                message: format!("Invalid scope key '{key}'."),
273            })
274        }
275    }
276
277    /// Validates that a scope value contains only allowed characters and is not empty.
278    fn validate_value(value: &str) -> crate::Result<()> {
279        if value.is_empty() {
280            return Err(crate::Error::InvalidScope {
281                message: "Scope value cannot be empty".to_string(),
282            });
283        }
284        if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
285            Ok(())
286        } else {
287            Err(crate::Error::InvalidScope {
288                message: format!("Invalid scope value '{value}'."),
289            })
290        }
291    }
292
293    /// TODO: document
294    pub fn session(self, client: &Client) -> crate::Result<Session> {
295        client.session(self)
296    }
297}
298
299#[derive(Debug)]
300pub(crate) struct ClientInner {
301    reqwest: reqwest::Client,
302    service_url: Url,
303    propagate_traces: bool,
304}
305
306/// A client for Objectstore. Use [`Client::builder`] to get configure and construct this.
307///
308/// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
309/// and Scope in order to create a [`Session`].
310///
311/// # Example
312///
313/// ```no_run
314/// use std::time::Duration;
315/// use objectstore_client::{Client, Usecase};
316///
317/// # async fn example() -> objectstore_client::Result<()> {
318/// let client = Client::builder("http://localhost:8888/")
319///     .timeout(Duration::from_secs(1))
320///     .propagate_traces(true)
321///     .build()?;
322/// let usecase = Usecase::new("my_app");
323///
324/// let session = client.session(usecase.for_project(12345, 1337))?;
325///
326/// let response = session.put("hello world").send().await?;
327///
328/// # Ok(())
329/// # }
330/// ```
331#[derive(Debug, Clone)]
332pub struct Client {
333    inner: Arc<ClientInner>,
334}
335
336impl Client {
337    /// Creates a new [`Client`], configured with the given `service_url` and default
338    /// configuration.
339    ///
340    /// Use [`Client::builder`] for more fine-grained configuration.
341    ///
342    /// # Errors
343    ///
344    /// This method fails if [`ClientBuilder::build`] fails.
345    pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
346        ClientBuilder::new(service_url).build()
347    }
348
349    /// Convenience function to create a [`ClientBuilder`].
350    pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
351        ClientBuilder::new(service_url)
352    }
353
354    /// TODO: document
355    pub fn session(&self, scope: Scope) -> crate::Result<Session> {
356        scope.0.map(|inner| Session {
357            scope: inner.into(),
358            client: self.inner.clone(),
359        })
360    }
361}
362
363/// TODO: document
364#[derive(Debug, Clone)]
365pub struct Session {
366    pub(crate) scope: Arc<ScopeInner>,
367    pub(crate) client: Arc<ClientInner>,
368}
369
370/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
371pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
372
373impl Session {
374    pub(crate) fn request(
375        &self,
376        method: reqwest::Method,
377        resource_id: &str,
378    ) -> crate::Result<reqwest::RequestBuilder> {
379        let mut url = self.client.service_url.clone();
380        url.path_segments_mut()
381            .map_err(|_| crate::Error::InvalidUrl {
382                message: format!("The URL {} cannot be a base", self.client.service_url),
383            })?
384            .extend(&["v1", resource_id]);
385
386        let mut builder = self.client.reqwest.request(method, url).query(&[
387            (PARAM_SCOPE, self.scope.scope.as_str()),
388            (PARAM_USECASE, self.scope.usecase.name.as_ref()),
389        ]);
390
391        if self.client.propagate_traces {
392            let trace_headers =
393                sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
394            for (header_name, value) in trace_headers.into_iter().flatten() {
395                builder = builder.header(header_name, value);
396            }
397        }
398
399        Ok(builder)
400    }
401}