Skip to main content

coil_storage/execution/
object_store.rs

1use bytes::Bytes;
2use object_store::ObjectStoreExt;
3use object_store::aws::{AmazonS3, AmazonS3Builder};
4use object_store::path::Path as ObjectPath;
5use object_store::signer::Signer;
6use object_store::{Attribute, Attributes, ObjectStore, PutOptions};
7use reqwest::Method;
8use std::future::Future;
9use std::path::PathBuf;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use super::{ObjectStoreClientConfig, ObjectStoreCredentials, StorageExecutionError};
13
14#[derive(Debug, Clone)]
15pub struct S3CompatibleObjectStoreClient {
16    state: ObjectStoreClientState,
17    signed_url_ttl: Duration,
18}
19
20impl S3CompatibleObjectStoreClient {
21    pub fn new(config: ObjectStoreClientConfig) -> Self {
22        let signed_url_ttl = Duration::from_secs(config.signed_url_ttl_secs.max(1));
23        let state = match build_store(&config) {
24            Ok(store) => ObjectStoreClientState::Ready { store },
25            Err(message) => ObjectStoreClientState::Invalid { message },
26        };
27
28        Self {
29            state,
30            signed_url_ttl,
31        }
32    }
33
34    pub fn put(
35        &self,
36        object_key: &str,
37        bytes: &[u8],
38        content_type: Option<&str>,
39    ) -> Result<PathBuf, StorageExecutionError> {
40        let path = object_path(object_key)?;
41        let store = self.ready_store()?.clone();
42        let payload = Bytes::copy_from_slice(bytes);
43        let mut attributes = Attributes::new();
44        if let Some(content_type) = content_type {
45            attributes.insert(Attribute::ContentType, content_type.to_string().into());
46        }
47        let options = PutOptions {
48            attributes,
49            ..Default::default()
50        };
51        run_object_store_future(
52            async move { store.put_opts(&path, payload.into(), options).await },
53        )
54        .map_err(|message| StorageExecutionError::WriteFailed {
55            path: object_key.to_string(),
56            message,
57        })?;
58        Ok(PathBuf::from(normalize_object_key(object_key)?))
59    }
60
61    pub fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError> {
62        let path = object_path(object_key)?;
63        let store = self.ready_store()?.clone();
64        let bytes = run_object_store_future(async move {
65            let result = store.get(&path).await?;
66            result.bytes().await
67        })
68        .map_err(|message| StorageExecutionError::ReadFailed {
69            path: object_key.to_string(),
70            message,
71        })?;
72        Ok((
73            PathBuf::from(normalize_object_key(object_key)?),
74            bytes.to_vec(),
75        ))
76    }
77
78    pub fn signed_get_url(
79        &self,
80        object_key: &str,
81    ) -> Result<SignedObjectUrl, StorageExecutionError> {
82        let path = object_path(object_key)?;
83        let store = self.ready_store()?.clone();
84        let signed_url_ttl = self.signed_url_ttl;
85        let signed_url = run_object_store_future(async move {
86            store.signed_url(Method::GET, &path, signed_url_ttl).await
87        })
88        .map_err(|message| StorageExecutionError::SignedUrlGenerationFailed {
89            object_key: object_key.to_string(),
90            message,
91        })?;
92        let expires_at_unix_seconds = SystemTime::now()
93            .checked_add(signed_url_ttl)
94            .and_then(|instant| instant.duration_since(UNIX_EPOCH).ok())
95            .map(|duration| duration.as_secs())
96            .unwrap_or(0);
97        Ok(SignedObjectUrl {
98            object_key: normalize_object_key(object_key)?,
99            signed_url: signed_url.to_string(),
100            expires_at_unix_seconds,
101        })
102    }
103
104    fn ready_store(&self) -> Result<&AmazonS3, StorageExecutionError> {
105        match &self.state {
106            ObjectStoreClientState::Ready { store } => Ok(store),
107            ObjectStoreClientState::Invalid { message } => {
108                Err(StorageExecutionError::InvalidObjectStoreConfiguration {
109                    detail: message.clone(),
110                })
111            }
112        }
113    }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct SignedObjectUrl {
118    pub object_key: String,
119    pub signed_url: String,
120    pub expires_at_unix_seconds: u64,
121}
122
123#[derive(Debug, Clone)]
124enum ObjectStoreClientState {
125    Ready { store: AmazonS3 },
126    Invalid { message: String },
127}
128
129fn build_store(config: &ObjectStoreClientConfig) -> Result<AmazonS3, String> {
130    validate_runtime_config(config)?;
131
132    let mut builder = AmazonS3Builder::new()
133        .with_bucket_name(config.bucket.clone())
134        .with_region(config.region.clone())
135        .with_virtual_hosted_style_request(config.virtual_hosted_style_request);
136    if let Some(endpoint_url) = &config.endpoint_url {
137        builder = builder
138            .with_endpoint(endpoint_url.clone())
139            .with_allow_http(config.allow_http);
140    }
141    let ObjectStoreCredentials::Static {
142        access_key_id,
143        secret_access_key,
144        session_token,
145    } = &config.credentials
146    else {
147        return Err(
148            "runtime-backed object-store clients require explicit access_key_id and secret_access_key in the structured object-store secret"
149                .to_string(),
150        );
151    };
152    builder = builder
153        .with_access_key_id(access_key_id.clone())
154        .with_secret_access_key(secret_access_key.clone());
155    if let Some(session_token) = session_token {
156        builder = builder.with_token(session_token.clone());
157    }
158    builder.build().map_err(|error| error.to_string())
159}
160
161fn validate_runtime_config(config: &ObjectStoreClientConfig) -> Result<(), String> {
162    if matches!(&config.credentials, ObjectStoreCredentials::Environment) {
163        return Err(
164            "runtime-backed object-store clients require explicit access_key_id and secret_access_key in the structured object-store secret"
165                .to_string(),
166        );
167    }
168
169    match config.endpoint_url.as_deref() {
170        Some(endpoint_url) if endpoint_url.starts_with("http://") && !config.allow_http => {
171            Err("object-store endpoint uses http but allow_http is not enabled".to_string())
172        }
173        None if config.allow_http => Err(
174            "allow_http requires an explicit endpoint_url in the object-store config".to_string(),
175        ),
176        _ => Ok(()),
177    }
178}
179
180fn run_object_store_future<T, F>(future: F) -> Result<T, String>
181where
182    T: Send + 'static,
183    F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
184{
185    match tokio::runtime::Handle::try_current() {
186        Ok(handle) => match handle.runtime_flavor() {
187            tokio::runtime::RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| {
188                handle.block_on(future).map_err(|error| error.to_string())
189            }),
190            tokio::runtime::RuntimeFlavor::CurrentThread => run_future_on_dedicated_runtime(future),
191            _ => run_future_on_dedicated_runtime(future),
192        },
193        Err(_) => run_future_on_ephemeral_runtime(future),
194    }
195}
196
197fn run_future_on_dedicated_runtime<T, F>(future: F) -> Result<T, String>
198where
199    T: Send + 'static,
200    F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
201{
202    std::thread::spawn(move || {
203        let runtime = tokio::runtime::Builder::new_current_thread()
204            .enable_all()
205            .build()
206            .map_err(|error| error.to_string())?;
207        runtime.block_on(future).map_err(|error| error.to_string())
208    })
209    .join()
210    .map_err(|_| "object-store worker thread panicked".to_string())?
211}
212
213fn run_future_on_ephemeral_runtime<T, F>(future: F) -> Result<T, String>
214where
215    T: Send + 'static,
216    F: Future<Output = Result<T, object_store::Error>> + Send + 'static,
217{
218    let runtime = tokio::runtime::Builder::new_current_thread()
219        .enable_all()
220        .build()
221        .map_err(|error| error.to_string())?;
222    runtime.block_on(future).map_err(|error| error.to_string())
223}
224
225fn object_path(object_key: &str) -> Result<ObjectPath, StorageExecutionError> {
226    ObjectPath::parse(normalize_object_key(object_key)?).map_err(|error| {
227        StorageExecutionError::InvalidTargetPath {
228            path: error.to_string(),
229        }
230    })
231}
232
233fn normalize_object_key(object_key: &str) -> Result<String, StorageExecutionError> {
234    let mut parts = Vec::new();
235    for component in std::path::Path::new(object_key).components() {
236        match component {
237            std::path::Component::Normal(part) => {
238                parts.push(part.to_string_lossy().into_owned());
239            }
240            _ => {
241                return Err(StorageExecutionError::InvalidTargetPath {
242                    path: object_key.to_string(),
243                });
244            }
245        }
246    }
247
248    if parts.is_empty() {
249        return Err(StorageExecutionError::InvalidTargetPath {
250            path: object_key.to_string(),
251        });
252    }
253
254    Ok(parts.join("/"))
255}