objectstore_client/client.rs
1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use url::Url;
9
10pub use objectstore_types::Compression;
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14#[derive(Debug)]
15struct ClientBuilderInner {
16 service_url: Url,
17 propagate_traces: bool,
18 reqwest_builder: reqwest::ClientBuilder,
19}
20
21impl ClientBuilderInner {
22 /// Applies defaults that cannot be overridden by the caller.
23 fn apply_defaults(mut self) -> Self {
24 self.reqwest_builder = self
25 .reqwest_builder
26 // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
27 // we are dealing with de/compression ourselves:
28 .no_brotli()
29 .no_deflate()
30 .no_gzip()
31 .no_zstd();
32 self
33 }
34}
35
36/// Builder to create a [`Client`].
37#[must_use = "call .build() on this ClientBuilder to create a Client"]
38#[derive(Debug)]
39pub struct ClientBuilder(crate::Result<ClientBuilderInner>);
40
41impl ClientBuilder {
42 /// Creates a new [`ClientBuilder`], configured with the given `service_url`.
43 ///
44 /// To perform CRUD operations, one has to create a [`Client`], and then scope it to a [`Usecase`]
45 /// and Scope in order to create a [`Session`].
46 pub fn new(service_url: impl reqwest::IntoUrl) -> Self {
47 let service_url = match service_url.into_url() {
48 Ok(url) => url,
49 Err(err) => return Self(Err(err.into())),
50 };
51
52 let reqwest_builder = reqwest::Client::builder()
53 // The read timeout "applies to each read operation", so should work fine for larger
54 // transfers that are split into multiple chunks.
55 // We define both as 500ms which is still very conservative, given that we are in the same network,
56 // and expect our backends to respond in <100ms.
57 // This can be overridden by the caller.
58 .connect_timeout(Duration::from_millis(500))
59 .read_timeout(Duration::from_millis(500))
60 .user_agent(USER_AGENT);
61
62 Self(Ok(ClientBuilderInner {
63 service_url,
64 propagate_traces: false,
65 reqwest_builder,
66 }))
67 }
68
69 /// Changes whether the `sentry-trace` header will be sent to Objectstore
70 /// to take advantage of Sentry's distributed tracing.
71 ///
72 /// By default, tracing headers will not be propagated.
73 pub fn propagate_traces(mut self, propagate_traces: bool) -> Self {
74 if let Ok(ref mut inner) = self.0 {
75 inner.propagate_traces = propagate_traces;
76 }
77 self
78 }
79
80 /// Sets both the connect and the read timeout for the [`reqwest::Client`].
81 /// For more fine-grained configuration, use [`Self::configure_reqwest`].
82 ///
83 /// By default, a connect and read timeout of 500ms is set.
84 pub fn timeout(self, timeout: Duration) -> Self {
85 let Ok(mut inner) = self.0 else { return self };
86 inner.reqwest_builder = inner
87 .reqwest_builder
88 .connect_timeout(timeout)
89 .read_timeout(timeout);
90 Self(Ok(inner))
91 }
92
93 /// Calls the closure with the underlying [`reqwest::ClientBuilder`].
94 ///
95 /// By default, the ClientBuilder is configured to create a reqwest Client with a connect and read timeout of 500ms and a user agent identifying this library.
96 pub fn configure_reqwest<F>(self, closure: F) -> Self
97 where
98 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
99 {
100 let Ok(mut inner) = self.0 else { return self };
101 inner.reqwest_builder = closure(inner.reqwest_builder);
102 Self(Ok(inner))
103 }
104
105 /// Returns a [`Client`] that uses this [`ClientBuilder`] configuration.
106 ///
107 /// # Errors
108 ///
109 /// This method fails if:
110 /// - the given `service_url` is invalid
111 /// - the [`reqwest::Client`] fails to build. Refer to [`reqwest::ClientBuilder::build`] for
112 /// more information on when this can happen.
113 pub fn build(self) -> crate::Result<Client> {
114 let inner = self.0?.apply_defaults();
115 Ok(Client {
116 inner: Arc::new(ClientInner {
117 reqwest: inner.reqwest_builder.build()?,
118 service_url: inner.service_url,
119 propagate_traces: inner.propagate_traces,
120 }),
121 })
122 }
123}
124
125/// An identifier for a workload in Objectstore, along with defaults to use for all
126/// operations within that Usecase.
127///
128/// Usecases need to be statically defined in Objectstore's configuration server-side.
129/// Objectstore can make decisions based on the Usecase. For example, choosing the most
130/// suitable storage backend.
131#[derive(Debug, Clone)]
132pub struct Usecase {
133 name: Arc<str>,
134 compression: Compression,
135 expiration_policy: ExpirationPolicy,
136}
137
138impl Usecase {
139 /// Creates a new Usecase.
140 pub fn new(name: &str) -> Self {
141 Self {
142 name: name.into(),
143 compression: Compression::Zstd,
144 expiration_policy: Default::default(),
145 }
146 }
147
148 /// Returns the name of this usecase.
149 #[inline]
150 pub fn name(&self) -> &str {
151 &self.name
152 }
153
154 /// Returns the compression algorithm to use for operations within this usecase.
155 #[inline]
156 pub fn compression(&self) -> Compression {
157 self.compression
158 }
159
160 /// Sets the compression algorithm to use for operations within this usecase.
161 ///
162 /// It's still possible to override this default on each operation's builder.
163 ///
164 /// By default, [`Compression::Zstd`] is used.
165 pub fn with_compression(self, compression: Compression) -> Self {
166 Self {
167 compression,
168 ..self
169 }
170 }
171
172 /// Returns the expiration policy to use by default for operations within this usecase.
173 #[inline]
174 pub fn expiration_policy(&self) -> ExpirationPolicy {
175 self.expiration_policy
176 }
177
178 /// Sets the expiration policy to use for operations within this usecase.
179 ///
180 /// It's still possible to override this default on each operation's builder.
181 ///
182 /// By default, [`ExpirationPolicy::Manual`] is used, meaning that objects won't automatically
183 /// expire.
184 pub fn with_expiration_policy(self, expiration_policy: ExpirationPolicy) -> Self {
185 Self {
186 expiration_policy,
187 ..self
188 }
189 }
190
191 /// Creates a new custom [`Scope`].
192 ///
193 /// Add parts to it using [`Scope::push`].
194 ///
195 /// Generally, [`Usecase::for_organization`] and [`Usecase::for_project`] should fit most usecases,
196 /// so prefer using those methods rather than creating your own custom [`Scope`].
197 pub fn scope(&self) -> Scope {
198 Scope::new(self.clone())
199 }
200
201 /// Creates a new [`Scope`] tied to the given organization.
202 pub fn for_organization(&self, organization: u64) -> Scope {
203 Scope::for_organization(self.clone(), organization)
204 }
205
206 /// Creates a new [`Scope`] tied to the given organization and project.
207 pub fn for_project(&self, organization: u64, project: u64) -> Scope {
208 Scope::for_project(self.clone(), organization, project)
209 }
210}
211
212#[derive(Debug)]
213pub(crate) struct ScopeInner {
214 usecase: Usecase,
215 scope: String,
216}
217
218impl ScopeInner {
219 #[inline]
220 pub(crate) fn usecase(&self) -> &Usecase {
221 &self.usecase
222 }
223}
224
225impl std::fmt::Display for ScopeInner {
226 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227 f.write_str(&self.scope)
228 }
229}
230
231/// A [`Scope`] is a sequence of key-value pairs that defines a (possibly nested) namespace within a
232/// [`Usecase`].
233///
234/// To construct a [`Scope`], use [`Usecase::for_organization`], [`Usecase::for_project`], or
235/// [`Usecase::scope`] for custom scopes.
236#[derive(Debug)]
237pub struct Scope(crate::Result<ScopeInner>);
238
239impl Scope {
240 /// Creates a new root-level Scope for the given usecase.
241 ///
242 /// Using a custom Scope is discouraged, prefer using [`Usecase::for_organization`] or [`Usecase::for_project`] instead.
243 pub fn new(usecase: Usecase) -> Self {
244 Self(Ok(ScopeInner {
245 usecase,
246 scope: String::new(),
247 }))
248 }
249
250 fn for_organization(usecase: Usecase, organization: u64) -> Self {
251 let scope = format!("org.{}", organization);
252 Self(Ok(ScopeInner { usecase, scope }))
253 }
254
255 fn for_project(usecase: Usecase, organization: u64, project: u64) -> Self {
256 let scope = format!("org.{}/project.{}", organization, project);
257 Self(Ok(ScopeInner { usecase, scope }))
258 }
259
260 /// Extends this Scope by creating a new sub-scope nested within it.
261 pub fn push<V>(self, key: &str, value: V) -> Self
262 where
263 V: std::fmt::Display,
264 {
265 let result = self.0.and_then(|mut inner| {
266 Self::validate_key(key)?;
267
268 let value = value.to_string();
269 Self::validate_value(&value)?;
270
271 if !inner.scope.is_empty() {
272 inner.scope.push('/');
273 }
274 inner.scope.push_str(key);
275 inner.scope.push('.');
276 inner.scope.push_str(&value);
277
278 Ok(inner)
279 });
280
281 Self(result)
282 }
283
284 /// Characters allowed in a Scope's key and value.
285 /// These are the URL safe characters, except for `.` which we use as separator between
286 /// key and value of Scope components.
287 const ALLOWED_CHARS: &[u8] =
288 b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_-()$!+'";
289
290 /// Validates that a scope key contains only allowed characters and is not empty.
291 fn validate_key(key: &str) -> crate::Result<()> {
292 if key.is_empty() {
293 return Err(crate::Error::InvalidScope {
294 message: "Scope key cannot be empty".to_string(),
295 });
296 }
297 if key.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
298 Ok(())
299 } else {
300 Err(crate::Error::InvalidScope {
301 message: format!("Invalid scope key '{key}'."),
302 })
303 }
304 }
305
306 /// Validates that a scope value contains only allowed characters and is not empty.
307 fn validate_value(value: &str) -> crate::Result<()> {
308 if value.is_empty() {
309 return Err(crate::Error::InvalidScope {
310 message: "Scope value cannot be empty".to_string(),
311 });
312 }
313 if value.bytes().all(|b| Self::ALLOWED_CHARS.contains(&b)) {
314 Ok(())
315 } else {
316 Err(crate::Error::InvalidScope {
317 message: format!("Invalid scope value '{value}'."),
318 })
319 }
320 }
321
322 /// Creates a session for this scope using the given client.
323 ///
324 /// # Errors
325 ///
326 /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
327 pub fn session(self, client: &Client) -> crate::Result<Session> {
328 client.session(self)
329 }
330}
331
332#[derive(Debug)]
333pub(crate) struct ClientInner {
334 reqwest: reqwest::Client,
335 service_url: Url,
336 propagate_traces: bool,
337}
338
339/// A client for Objectstore. Use [`Client::builder`] to configure and construct a Client.
340///
341/// To perform CRUD operations, one has to create a Client, and then scope it to a [`Usecase`]
342/// and Scope in order to create a [`Session`].
343///
344/// # Example
345///
346/// ```no_run
347/// use std::time::Duration;
348/// use objectstore_client::{Client, Usecase};
349///
350/// # async fn example() -> objectstore_client::Result<()> {
351/// let client = Client::builder("http://localhost:8888/")
352/// .timeout(Duration::from_secs(1))
353/// .propagate_traces(true)
354/// .build()?;
355/// let usecase = Usecase::new("my_app");
356///
357/// let session = client.session(usecase.for_project(12345, 1337))?;
358///
359/// let response = session.put("hello world").send().await?;
360///
361/// # Ok(())
362/// # }
363/// ```
364#[derive(Debug, Clone)]
365pub struct Client {
366 inner: Arc<ClientInner>,
367}
368
369impl Client {
370 /// Creates a new [`Client`], configured with the given `service_url` and default
371 /// configuration.
372 ///
373 /// Use [`Client::builder`] for more fine-grained configuration.
374 ///
375 /// # Errors
376 ///
377 /// This method fails if [`ClientBuilder::build`] fails.
378 pub fn new(service_url: impl reqwest::IntoUrl) -> crate::Result<Client> {
379 ClientBuilder::new(service_url).build()
380 }
381
382 /// Convenience function to create a [`ClientBuilder`].
383 pub fn builder(service_url: impl reqwest::IntoUrl) -> ClientBuilder {
384 ClientBuilder::new(service_url)
385 }
386
387 /// Creates a session for the given scope using this client.
388 ///
389 /// # Errors
390 ///
391 /// Returns an error if the scope is invalid (e.g. it contains invalid characters).
392 pub fn session(&self, scope: Scope) -> crate::Result<Session> {
393 scope.0.map(|inner| Session {
394 scope: inner.into(),
395 client: self.inner.clone(),
396 })
397 }
398}
399
400/// Represents a session with Objectstore, tied to a specific Usecase and Scope within it.
401///
402/// Create a Session using [`Client::session`] or [`Scope::session`].
403#[derive(Debug, Clone)]
404pub struct Session {
405 pub(crate) scope: Arc<ScopeInner>,
406 pub(crate) client: Arc<ClientInner>,
407}
408
409/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
410pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
411
412impl Session {
413 /// Generates a GET url to the object with the given `key`.
414 ///
415 /// This can then be used by downstream services to fetch the given object.
416 /// NOTE however that the service does not strictly follow HTTP semantics,
417 /// in particular in relation to `Accept-Encoding`.
418 pub fn object_url(&self, object_key: &str) -> Url {
419 let mut url = self.client.service_url.clone();
420 let path = format!(
421 "v1/{}/{}/objects/{object_key}",
422 self.scope.usecase.name, self.scope.scope
423 );
424 url.set_path(&path);
425 url
426 }
427
428 pub(crate) fn request(
429 &self,
430 method: reqwest::Method,
431 object_key: &str,
432 ) -> reqwest::RequestBuilder {
433 let url = self.object_url(object_key);
434
435 let mut builder = self.client.reqwest.request(method, url);
436
437 if self.client.propagate_traces {
438 let trace_headers =
439 sentry_core::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
440 for (header_name, value) in trace_headers.into_iter().flatten() {
441 builder = builder.header(header_name, value);
442 }
443 }
444
445 builder
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452
453 #[test]
454 fn test_object_url() {
455 let client = Client::new("http://127.0.0.1:8888/").unwrap();
456 let usecase = Usecase::new("testing");
457 let scope = usecase
458 .for_project(12345, 1337)
459 .push("app_slug", "email_app");
460 let session = client.session(scope).unwrap();
461
462 assert_eq!(
463 session.object_url("foo/bar").to_string(),
464 "http://127.0.0.1:8888/v1/testing/org.12345/project.1337/app_slug.email_app/objects/foo/bar"
465 )
466 }
467}