objectstore_client/
client.rs1use std::io;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use futures_util::stream::BoxStream;
6use objectstore_types::ExpirationPolicy;
7use reqwest::header::HeaderName;
8
9pub use objectstore_types::{Compression, PARAM_SCOPE, PARAM_USECASE};
10
11const USER_AGENT: &str = concat!("objectstore-client/", env!("CARGO_PKG_VERSION"));
12
13#[derive(Debug)]
20pub struct ClientBuilder {
21 service_url: Arc<str>,
22 client: reqwest::Client,
23 propagate_traces: bool,
24
25 usecase: Arc<str>,
26 default_compression: Compression,
27 default_expiration_policy: ExpirationPolicy,
28}
29
30impl ClientBuilder {
31 pub fn new(service_url: &str, usecase: &str) -> anyhow::Result<Self> {
39 let client = reqwest::Client::builder()
40 .user_agent(USER_AGENT)
41 .no_brotli()
44 .no_deflate()
45 .no_gzip()
46 .no_zstd()
47 .build()?;
48
49 Ok(Self {
50 service_url: service_url.trim_end_matches('/').into(),
51 client,
52 propagate_traces: false,
53
54 usecase: usecase.into(),
55 default_compression: Compression::Zstd,
56 default_expiration_policy: ExpirationPolicy::Manual,
57 })
58 }
59
60 pub fn default_compression(mut self, compression: Compression) -> Self {
62 self.default_compression = compression;
63 self
64 }
65
66 pub fn default_expiration_policy(mut self, expiration_policy: ExpirationPolicy) -> Self {
68 self.default_expiration_policy = expiration_policy;
69 self
70 }
71
72 pub fn with_distributed_tracing(mut self, propagate_traces: bool) -> Self {
75 self.propagate_traces = propagate_traces;
76 self
77 }
78
79 fn make_client(&self, scope: String) -> Client {
80 Client {
81 service_url: self.service_url.clone(),
82 http: self.client.clone(),
83 propagate_traces: self.propagate_traces,
84
85 usecase: self.usecase.clone(),
86 scope,
87 default_compression: self.default_compression,
88 default_expiration_policy: self.default_expiration_policy,
89 }
90 }
91
92 pub fn for_organization(&self, organization_id: u64) -> Client {
94 let scope = format!("org.{organization_id}");
95 self.make_client(scope)
96 }
97
98 pub fn for_project(&self, organization_id: u64, project_id: u64) -> Client {
101 let scope = format!("org.{organization_id}/proj.{project_id}");
102 self.make_client(scope)
103 }
104}
105
106#[derive(Debug)]
108pub struct Client {
109 pub(crate) http: reqwest::Client,
110 pub(crate) service_url: Arc<str>,
111 propagate_traces: bool,
112
113 pub(crate) usecase: Arc<str>,
114
115 pub(crate) scope: String,
126 pub(crate) default_compression: Compression,
127 pub(crate) default_expiration_policy: ExpirationPolicy,
128}
129
130pub type ClientStream = BoxStream<'static, io::Result<Bytes>>;
132
133impl Client {
134 pub(crate) fn request<U: reqwest::IntoUrl>(
135 &self,
136 method: reqwest::Method,
137 uri: U,
138 ) -> anyhow::Result<reqwest::RequestBuilder> {
139 let mut builder = self.http.request(method, uri).query(&[
140 (PARAM_SCOPE, self.scope.as_ref()),
141 (PARAM_USECASE, self.usecase.as_ref()),
142 ]);
143
144 if self.propagate_traces {
145 let trace_headers =
146 sentry::configure_scope(|scope| Some(scope.iter_trace_propagation_headers()));
147 for (header_name, value) in trace_headers.into_iter().flatten() {
148 builder = builder.header(HeaderName::try_from(header_name)?, value);
149 }
150 }
151
152 Ok(builder)
153 }
154
155 pub async fn delete(&self, id: &str) -> anyhow::Result<()> {
157 let delete_url = format!("{}/v1/{id}", self.service_url);
158
159 let _response = self
160 .request(reqwest::Method::DELETE, delete_url)?
161 .send()
162 .await?;
163
164 Ok(())
165 }
166}