1use bytes::Bytes;
2use object_store::ObjectStoreExt;
3use object_store::aws::{AmazonS3, AmazonS3Builder};
4use object_store::path::Path as ObjectPath;
5use object_store::signer::Signer;
6use object_store::{Attribute, Attributes, ObjectStore, PutOptions};
7use reqwest::Method;
8use std::future::Future;
9use std::path::PathBuf;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use super::{ObjectStoreClientConfig, ObjectStoreCredentials, StorageExecutionError};
13
14#[derive(Debug, Clone)]
15pub struct S3CompatibleObjectStoreClient {
16 state: ObjectStoreClientState,
17 signed_url_ttl: Duration,
18}
19
20impl S3CompatibleObjectStoreClient {
21 pub fn new(config: ObjectStoreClientConfig) -> Self {
22 let signed_url_ttl = Duration::from_secs(config.signed_url_ttl_secs.max(1));
23 let state = match build_store(&config) {
24 Ok(store) => ObjectStoreClientState::Ready { store },
25 Err(message) => ObjectStoreClientState::Invalid { message },
26 };
27
28 Self {
29 state,
30 signed_url_ttl,
31 }
32 }
33
34 pub fn put(
35 &self,
36 object_key: &str,
37 bytes: &[u8],
38 content_type: Option<&str>,
39 ) -> Result<PathBuf, StorageExecutionError> {
40 let path = object_path(object_key)?;
41 let store = self.ready_store()?.clone();
42 let payload = Bytes::copy_from_slice(bytes);
43 let mut attributes = Attributes::new();
44 if let Some(content_type) = content_type {
45 attributes.insert(Attribute::ContentType, content_type.to_string().into());
46 }
47 let options = PutOptions {
48 attributes,
49 ..Default::default()
50 };
51 run_object_store_future(
52 async move { store.put_opts(&path, payload.into(), options).await },
53 )
54 .map_err(|message| StorageExecutionError::WriteFailed {
55 path: object_key.to_string(),
56 message,
57 })?;
58 Ok(PathBuf::from(normalize_object_key(object_key)?))
59 }
60
61 pub fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError> {
62 let path = object_path(object_key)?;
63 let store = self.ready_store()?.clone();
64 let bytes = run_object_store_future(async move {
65 let result = store.get(&path).await?;
66 result.bytes().await
67 })
68 .map_err(|message| StorageExecutionError::ReadFailed {
69 path: object_key.to_string(),
70 message,
71 })?;
72 Ok((
73 PathBuf::from(normalize_object_key(object_key)?),
74 bytes.to_vec(),
75 ))
76 }
77
78 pub fn signed_get_url(
79 &self,
80 object_key: &str,
81 ) -> Result<SignedObjectUrl, StorageExecutionError> {
82 let path = object_path(object_key)?;
83 let store = self.ready_store()?.clone();
84 let signed_url_ttl = self.signed_url_ttl;
85 let signed_url = run_object_store_future(async move {
86 store.signed_url(Method::GET, &path, signed_url_ttl).await
87 })
88 .map_err(|message| StorageExecutionError::SignedUrlGenerationFailed {
89 object_key: object_key.to_string(),
90 message,
91 })?;
92 let expires_at_unix_seconds = SystemTime::now()
93 .checked_add(signed_url_ttl)
94 .and_then(|instant| instant.duration_since(UNIX_EPOCH).ok())
95 .map(|duration| duration.as_secs())
96 .unwrap_or(0);
97 Ok(SignedObjectUrl {
98 object_key: normalize_object_key(object_key)?,
99 signed_url: signed_url.to_string(),
100 expires_at_unix_seconds,
101 })
102 }
103
104 fn ready_store(&self) -> Result<&AmazonS3, StorageExecutionError> {
105 match &self.state {
106 ObjectStoreClientState::Ready { store } => Ok(store),
107 ObjectStoreClientState::Invalid { message } => {
108 Err(StorageExecutionError::InvalidObjectStoreConfiguration {
109 detail: message.clone(),
110 })
111 }
112 }
113 }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct SignedObjectUrl {
118 pub object_key: String,
119 pub signed_url: String,
120 pub expires_at_unix_seconds: u64,
121}
122
123#[derive(Debug, Clone)]
124enum ObjectStoreClientState {
125 Ready { store: AmazonS3 },
126 Invalid { message: String },
127}
128
129fn build_store(config: &ObjectStoreClientConfig) -> Result<AmazonS3, String> {
130 validate_runtime_config(config)?;
131
132 let mut builder = AmazonS3Builder::new()
133 .with_bucket_name(config.bucket.clone())
134 .with_region(config.region.clone())
135 .with_virtual_hosted_style_request(config.virtual_hosted_style_request);
136 if let Some(endpoint_url) = &config.endpoint_url {
137 builder = builder
138 .with_endpoint(endpoint_url.clone())
139 .with_allow_http(config.allow_http);
140 }
141 let ObjectStoreCredentials::Static {
142 access_key_id,
143 secret_access_key,
144 session_token,
145 } = &config.credentials
146 else {
147 return Err(
148 "runtime-backed object-store clients require explicit access_key_id and secret_access_key in the structured object-store secret"
149 .to_string(),
150 );
151 };
152 builder = builder
153 .with_access_key_id(access_key_id.clone())
154 .with_secret_access_key(secret_access_key.clone());
155 if let Some(session_token) = session_token {
156 builder = builder.with_token(session_token.clone());
157 }
158 builder.build().map_err(|error| error.to_string())
159}
160
161fn validate_runtime_config(config: &ObjectStoreClientConfig) -> Result<(), String> {
162 if matches!(&config.credentials, ObjectStoreCredentials::Environment) {
163 return Err(
164 "runtime-backed object-store clients require explicit access_key_id and secret_access_key in the structured object-store secret"
165 .to_string(),
166 );
167 }
168
169 match config.endpoint_url.as_deref() {
170 Some(endpoint_url) if endpoint_url.starts_with("http://") && !config.allow_http => {
171 Err("object-store endpoint uses http but allow_http is not enabled".to_string())
172 }
173 None if config.allow_http => Err(
174 "allow_http requires an explicit endpoint_url in the object-store config".to_string(),
175 ),
176 _ => Ok(()),
177 }
178}
179
180fn run_object_store_future<T, F>(future: F) -> Result<T, String>
181where
182 T: Send + 'static,
183 F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
184{
185 match tokio::runtime::Handle::try_current() {
186 Ok(handle) => match handle.runtime_flavor() {
187 tokio::runtime::RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| {
188 handle.block_on(future).map_err(|error| error.to_string())
189 }),
190 tokio::runtime::RuntimeFlavor::CurrentThread => run_future_on_dedicated_runtime(future),
191 _ => run_future_on_dedicated_runtime(future),
192 },
193 Err(_) => run_future_on_ephemeral_runtime(future),
194 }
195}
196
197fn run_future_on_dedicated_runtime<T, F>(future: F) -> Result<T, String>
198where
199 T: Send + 'static,
200 F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
201{
202 std::thread::spawn(move || {
203 let runtime = tokio::runtime::Builder::new_current_thread()
204 .enable_all()
205 .build()
206 .map_err(|error| error.to_string())?;
207 runtime.block_on(future).map_err(|error| error.to_string())
208 })
209 .join()
210 .map_err(|_| "object-store worker thread panicked".to_string())?
211}
212
213fn run_future_on_ephemeral_runtime<T, F>(future: F) -> Result<T, String>
214where
215 T: Send + 'static,
216 F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
217{
218 let runtime = tokio::runtime::Builder::new_current_thread()
219 .enable_all()
220 .build()
221 .map_err(|error| error.to_string())?;
222 runtime.block_on(future).map_err(|error| error.to_string())
223}
224
225fn object_path(object_key: &str) -> Result<ObjectPath, StorageExecutionError> {
226 ObjectPath::parse(normalize_object_key(object_key)?).map_err(|error| {
227 StorageExecutionError::InvalidTargetPath {
228 path: error.to_string(),
229 }
230 })
231}
232
233fn normalize_object_key(object_key: &str) -> Result<String, StorageExecutionError> {
234 let mut parts = Vec::new();
235 for component in std::path::Path::new(object_key).components() {
236 match component {
237 std::path::Component::Normal(part) => {
238 parts.push(part.to_string_lossy().into_owned());
239 }
240 _ => {
241 return Err(StorageExecutionError::InvalidTargetPath {
242 path: object_key.to_string(),
243 });
244 }
245 }
246 }
247
248 if parts.is_empty() {
249 return Err(StorageExecutionError::InvalidTargetPath {
250 path: object_key.to_string(),
251 });
252 }
253
254 Ok(parts.join("/"))
255}