objectstore_client/
client.rs

1use std::io;
2use std::sync::Arc;
3use std::time::Duration;
4
5use bytes::Bytes;
6use futures_util::stream::BoxStream;
7use objectstore_types::ExpirationPolicy;
8use reqwest::header::HeaderName;
9
10pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
11
12const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
13
14/// Service for storing and retrieving objects.
15///
16/// The Service contains the base configuration to connect to a service.
17/// It has to be further initialized with credentials using the
18/// [`for_organization`](Self::for_organization) and
19/// [`for_project`](Self::for_project) functions.
20#[derive(Debug)]
21pub struct ClientBuilder {
22    service_url: Arc<str>,
23    client: reqwest::Client,
24    propagate_traces: bool,
25
26    usecase: Arc<str>,
27    default_compression: Compression,
28    default_expiration_policy: ExpirationPolicy,
29}
30
31impl ClientBuilder {
32    /// Creates a new [`ClientBuilder`].
33    ///
34    /// This service instance is configured to target the given `service_url`.
35    /// It is also scoped for the given `usecase`.
36    ///
37    /// In order to get or put objects, one has to create a [`Client`] using the
38    /// [`for_organization`](Self::for_organization) function.
39    pub fn new(service_url: &str, usecase: &str) -> anyhow::Result<Self> {
40        let client = reqwest::Client::builder()
41            .user_agent(USER_AGENT)
42            // hickory-dns: Controlled by the `reqwest/hickory-dns` feature flag
43            // we are dealing with de/compression ourselves:
44            .no_brotli()
45            .no_deflate()
46            .no_gzip()
47            .no_zstd()
48            // The read timeout "applies to each read operation", so should work fine for larger
49            // transfers that are split into multiple chunks.
50            // We define both as 500ms which is still very conservative, given that we are in the same network,
51            // and expect our backends to respond in <100ms.
52            .connect_timeout(Duration::from_millis(500))
53            .read_timeout(Duration::from_millis(500))
54            .build()?;
55
56        Ok(Self {
57            service_url: service_url.trim_end_matches('/').into(),
58            client,
59            propagate_traces: false,
60
61            usecase: usecase.into(),
62            default_compression: Compression::Zstd,
63            default_expiration_policy: ExpirationPolicy::Manual,
64        })
65    }
66
67    /// This changes the default compression used for uploads.
68    pub fn default_compression(mut self, compression: Compression) -> Self {
69        self.default_compression = compression;
70        self
71    }
72
73    /// This sets a default expiration policy used for uploads.
74    pub fn default_expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
75        self.default_expiration_policy = expiration_policy;
76        self
77    }
78
79    /// This changes whether the `sentry-trace` header will be sent to Objectstore
80    /// to take advantage of Sentry's distributed tracing.
81    pub fn with_distributed_tracing(mut self, propagate_traces: bool) -> Self {
82        self.propagate_traces = propagate_traces;
83        self
84    }
85
86    fn make_client(&self, scope: String) -> Client {
87        Client {
88            service_url: self.service_url.clone(),
89            http: self.client.clone(),
90            propagate_traces: self.propagate_traces,
91
92            usecase: self.usecase.clone(),
93            scope,
94            default_compression: self.default_compression,
95            default_expiration_policy: self.default_expiration_policy,
96        }
97    }
98
99    /// Create a new [`Client`] and sets its `scope` based on the provided organization.
100    pub fn for_organization(&self, organization_id: u64) -> Client {
101        let scope = format!("org.{organization_id}");
102        self.make_client(scope)
103    }
104
105    /// Create a new [`Client`] and sets its `scope` based on the provided organization
106    /// and project.
107    pub fn for_project(&self, organization_id: u64, project_id: u64) -> Client {
108        let scope = format!("org.{organization_id}/proj.{project_id}");
109        self.make_client(scope)
110    }
111}
112
113/// A scoped objectstore client that can access objects in a specific use case and scope.
114#[derive(Debug)]
115pub struct Client {
116    pub(crate) http: reqwest::Client,
117    pub(crate) service_url: Arc<str>,
118    propagate_traces: bool,
119
120    pub(crate) usecase: Arc<str>,
121
122    /// The scope that this client operates within.
123    ///
124    /// Scopes are expected to be serialized ordered lists of key/value pairs. Each
125    /// pair is serialized with a `.` character between the key and value, and with
126    /// a `/` character between each pair. For example:
127    /// - `org.123/proj.456`
128    /// - `state.washington/city.seattle`
129    ///
130    /// It is recommended that both keys and values be restricted to alphanumeric
131    /// characters.
132    pub(crate) scope: String,
133    pub(crate) default_compression: Compression,
134    pub(crate) default_expiration_policy: ExpirationPolicy,
135}
136
137/// The type of [`Stream`](futures_util::Stream) to be used for a PUT request.
138pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
139
140impl Client {
141    pub(crate) fn request<U: reqwest::IntoUrl>(
142        &self,
143        method: reqwest::Method,
144        uri: U,
145    ) -> anyhow::Result<reqwest::RequestBuilder> {
146        let mut builder = self.http.request(method, uri).query(&[
147            (PARAM_SCOPE, self.scope.as_ref()),
148            (PARAM_USECASE, self.usecase.as_ref()),
149        ]);
150
151        if self.propagate_traces {
152            let trace_headers =
153                sentry::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
154            for (header_name, value) in trace_headers.into_iter().flatten() {
155                builder = builder.header(HeaderName::try_from(header_name)?, value);
156            }
157        }
158
159        Ok(builder)
160    }
161
162    /// Deletes the object with the given `id`.
163    pub async fn delete(&self, id: &str) -> anyhow::Result<()> {
164        let delete_url = format!("{}/v1/{id}", self.service_url);
165
166        let _response = self
167            .request(reqwest::Method::DELETE, delete_url)?
168            .send()
169            .await?;
170
171        Ok(())
172    }
173}