Skip to main content

coil_storage/execution/
config.rs

1use serde::Deserialize;
2use thiserror::Error;
3use url::Url;
4
5const DEFAULT_S3_REGION: &str = "us-east-1";
6const DEFAULT_SIGNED_URL_TTL_SECS: u64 = 300;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct ObjectStoreClientConfig {
10    pub endpoint_url: Option<String>,
11    pub bucket: String,
12    pub region: String,
13    pub credentials: ObjectStoreCredentials,
14    pub signed_url_ttl_secs: u64,
15    pub allow_http: bool,
16    pub virtual_hosted_style_request: bool,
17}
18
19impl ObjectStoreClientConfig {
20    pub fn new(
21        bucket: impl Into<String>,
22        region: impl Into<String>,
23    ) -> Result<Self, ObjectStoreClientConfigError> {
24        let bucket = bucket.into();
25        if bucket.trim().is_empty() {
26            return Err(ObjectStoreClientConfigError::MissingBucket);
27        }
28
29        let region = region.into();
30        if region.trim().is_empty() {
31            return Err(ObjectStoreClientConfigError::MissingRegion);
32        }
33
34        Ok(Self {
35            endpoint_url: None,
36            bucket,
37            region,
38            credentials: ObjectStoreCredentials::default(),
39            signed_url_ttl_secs: DEFAULT_SIGNED_URL_TTL_SECS,
40            allow_http: false,
41            virtual_hosted_style_request: false,
42        })
43    }
44
45    pub fn from_secret_value(secret_value: &str) -> Result<Self, ObjectStoreClientConfigError> {
46        let secret_value = secret_value.trim();
47        if secret_value.is_empty() {
48            return Err(ObjectStoreClientConfigError::EmptySecret);
49        }
50
51        if let Ok(url) = Url::parse(secret_value) {
52            return Self::from_legacy_url(url);
53        }
54
55        if let Ok(document) = serde_json::from_str::<ObjectStoreSecretDocument>(secret_value) {
56            return Self::from_document(document);
57        }
58
59        if let Ok(document) = toml::from_str::<ObjectStoreSecretDocument>(secret_value) {
60            return Self::from_document(document);
61        }
62
63        Err(ObjectStoreClientConfigError::UnsupportedSecretFormat)
64    }
65
66    pub fn from_structured_secret_value(
67        secret_value: &str,
68    ) -> Result<Self, ObjectStoreClientConfigError> {
69        let secret_value = secret_value.trim();
70        if secret_value.is_empty() {
71            return Err(ObjectStoreClientConfigError::EmptySecret);
72        }
73
74        if let Ok(document) = serde_json::from_str::<ObjectStoreSecretDocument>(secret_value) {
75            return Self::from_document(document);
76        }
77
78        if let Ok(document) = toml::from_str::<ObjectStoreSecretDocument>(secret_value) {
79            return Self::from_document(document);
80        }
81
82        Err(ObjectStoreClientConfigError::UnsupportedStructuredSecretFormat)
83    }
84
85    pub fn with_endpoint_url(
86        mut self,
87        endpoint_url: impl Into<String>,
88    ) -> Result<Self, ObjectStoreClientConfigError> {
89        let endpoint_url = endpoint_url.into();
90        let parsed = Url::parse(endpoint_url.trim()).map_err(|source| {
91            ObjectStoreClientConfigError::InvalidEndpointUrl {
92                value: endpoint_url.clone(),
93                source,
94            }
95        })?;
96        self.allow_http = parsed.scheme() == "http";
97        self.endpoint_url = Some(normalize_endpoint_url(&parsed));
98        Ok(self)
99    }
100
101    pub fn with_static_credentials(
102        mut self,
103        access_key_id: impl Into<String>,
104        secret_access_key: impl Into<String>,
105    ) -> Result<Self, ObjectStoreClientConfigError> {
106        let access_key_id = access_key_id.into();
107        let secret_access_key = secret_access_key.into();
108        if access_key_id.trim().is_empty() {
109            return Err(ObjectStoreClientConfigError::MissingAccessKeyId);
110        }
111        if secret_access_key.trim().is_empty() {
112            return Err(ObjectStoreClientConfigError::MissingSecretAccessKey);
113        }
114        self.credentials = ObjectStoreCredentials::Static {
115            access_key_id,
116            secret_access_key,
117            session_token: None,
118        };
119        Ok(self)
120    }
121
122    pub fn with_session_token(mut self, token: impl Into<String>) -> Self {
123        let token = token.into();
124        self.credentials = match self.credentials {
125            ObjectStoreCredentials::Static {
126                access_key_id,
127                secret_access_key,
128                ..
129            } => ObjectStoreCredentials::Static {
130                access_key_id,
131                secret_access_key,
132                session_token: Some(token),
133            },
134            ObjectStoreCredentials::Environment => ObjectStoreCredentials::Environment,
135        };
136        self
137    }
138
139    pub fn with_signed_url_ttl_secs(mut self, signed_url_ttl_secs: u64) -> Self {
140        self.signed_url_ttl_secs = signed_url_ttl_secs.max(1);
141        self
142    }
143
144    pub fn with_virtual_hosted_style_request(mut self, virtual_hosted_style_request: bool) -> Self {
145        self.virtual_hosted_style_request = virtual_hosted_style_request;
146        self
147    }
148
149    fn from_document(
150        document: ObjectStoreSecretDocument,
151    ) -> Result<Self, ObjectStoreClientConfigError> {
152        let bucket = document
153            .bucket
154            .or(document.bucket_name)
155            .filter(|value| !value.trim().is_empty())
156            .ok_or(ObjectStoreClientConfigError::MissingBucket)?;
157        let region = document
158            .region
159            .or(document.default_region)
160            .filter(|value| !value.trim().is_empty())
161            .unwrap_or_else(|| DEFAULT_S3_REGION.to_string());
162
163        let mut config = Self::new(bucket, region)?;
164        if let Some(endpoint_url) = document.endpoint_url.or(document.endpoint) {
165            config = config.with_endpoint_url(endpoint_url)?;
166        }
167        config = config.with_signed_url_ttl_secs(
168            document
169                .signed_url_ttl_secs
170                .unwrap_or(DEFAULT_SIGNED_URL_TTL_SECS),
171        );
172        config = config.with_virtual_hosted_style_request(
173            document.virtual_hosted_style_request.unwrap_or(false),
174        );
175        if let Some(allow_http) = document.allow_http {
176            config.allow_http = allow_http;
177        }
178
179        config.credentials = match (
180            document.access_key_id,
181            document.secret_access_key,
182            document.session_token.or(document.token),
183        ) {
184            (Some(access_key_id), Some(secret_access_key), session_token) => {
185                ObjectStoreCredentials::Static {
186                    access_key_id,
187                    secret_access_key,
188                    session_token,
189                }
190            }
191            (None, None, _) => ObjectStoreCredentials::Environment,
192            (Some(_), None, _) => return Err(ObjectStoreClientConfigError::MissingSecretAccessKey),
193            (None, Some(_), _) => return Err(ObjectStoreClientConfigError::MissingAccessKeyId),
194        };
195
196        Ok(config)
197    }
198
199    fn from_legacy_url(url: Url) -> Result<Self, ObjectStoreClientConfigError> {
200        match url.scheme() {
201            "s3" => {
202                let bucket = url
203                    .host_str()
204                    .filter(|value| !value.trim().is_empty())
205                    .ok_or(ObjectStoreClientConfigError::MissingBucket)?;
206                let region = url
207                    .query_pairs()
208                    .find_map(|(name, value)| {
209                        (name == "region" || name == "aws_region").then_some(value.into_owned())
210                    })
211                    .filter(|value| !value.trim().is_empty())
212                    .unwrap_or_else(|| DEFAULT_S3_REGION.to_string());
213                Self::new(bucket, region)
214            }
215            "http" | "https" => {
216                let mut segments = url
217                    .path_segments()
218                    .ok_or(ObjectStoreClientConfigError::MissingBucket)?
219                    .filter(|segment| !segment.is_empty());
220                let bucket = segments
221                    .next()
222                    .ok_or(ObjectStoreClientConfigError::MissingBucket)?;
223                let mut config = Self::new(bucket.to_string(), DEFAULT_S3_REGION.to_string())?;
224                config.endpoint_url = Some(normalize_endpoint_url(&url));
225                config.allow_http = url.scheme() == "http";
226                Ok(config)
227            }
228            scheme => Err(ObjectStoreClientConfigError::UnsupportedUrlScheme {
229                scheme: scheme.to_string(),
230            }),
231        }
232    }
233}
234
235#[derive(Debug, Clone, Default, PartialEq, Eq)]
236pub enum ObjectStoreCredentials {
237    #[default]
238    Environment,
239    Static {
240        access_key_id: String,
241        secret_access_key: String,
242        session_token: Option<String>,
243    },
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Error)]
247pub enum ObjectStoreClientConfigError {
248    #[error("object-store secret is empty")]
249    EmptySecret,
250    #[error("object-store secret must be a supported URL, TOML, or JSON document")]
251    UnsupportedSecretFormat,
252    #[error("object-store secret must be a supported TOML or JSON document")]
253    UnsupportedStructuredSecretFormat,
254    #[error("object-store config requires a bucket name")]
255    MissingBucket,
256    #[error("object-store config requires a region")]
257    MissingRegion,
258    #[error("object-store config requires an access key id when a secret access key is set")]
259    MissingAccessKeyId,
260    #[error("object-store config requires a secret access key when an access key id is set")]
261    MissingSecretAccessKey,
262    #[error("object-store endpoint `{value}` is not a valid URL: {source}")]
263    InvalidEndpointUrl {
264        value: String,
265        source: url::ParseError,
266    },
267    #[error("object-store URL scheme `{scheme}` is not supported")]
268    UnsupportedUrlScheme { scheme: String },
269}
270
271#[derive(Debug, Clone, Default, Deserialize)]
272struct ObjectStoreSecretDocument {
273    #[serde(default)]
274    bucket: Option<String>,
275    #[serde(default)]
276    bucket_name: Option<String>,
277    #[serde(default)]
278    region: Option<String>,
279    #[serde(default)]
280    default_region: Option<String>,
281    #[serde(default)]
282    endpoint_url: Option<String>,
283    #[serde(default)]
284    endpoint: Option<String>,
285    #[serde(default)]
286    access_key_id: Option<String>,
287    #[serde(default)]
288    secret_access_key: Option<String>,
289    #[serde(default)]
290    session_token: Option<String>,
291    #[serde(default)]
292    token: Option<String>,
293    #[serde(default)]
294    signed_url_ttl_secs: Option<u64>,
295    #[serde(default)]
296    allow_http: Option<bool>,
297    #[serde(default)]
298    virtual_hosted_style_request: Option<bool>,
299}
300
301fn normalize_endpoint_url(url: &Url) -> String {
302    let host = url.host_str().unwrap_or_default();
303    let mut normalized = format!("{}://{host}", url.scheme());
304    if let Some(port) = url.port() {
305        normalized.push(':');
306        normalized.push_str(&port.to_string());
307    }
308    normalized
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn parses_structured_toml_secret() {
317        let config = ObjectStoreClientConfig::from_secret_value(
318            r#"
319bucket = "runtime"
320region = "eu-west-2"
321endpoint_url = "https://storage.internal"
322access_key_id = "runtime-access"
323secret_access_key = "runtime-secret"
324session_token = "runtime-session"
325signed_url_ttl_secs = 900
326virtual_hosted_style_request = true
327"#,
328        )
329        .unwrap();
330
331        assert_eq!(
332            config.endpoint_url.as_deref(),
333            Some("https://storage.internal")
334        );
335        assert_eq!(config.bucket, "runtime");
336        assert_eq!(config.region, "eu-west-2");
337        assert_eq!(config.signed_url_ttl_secs, 900);
338        assert!(config.virtual_hosted_style_request);
339        assert_eq!(
340            config.credentials,
341            ObjectStoreCredentials::Static {
342                access_key_id: "runtime-access".to_string(),
343                secret_access_key: "runtime-secret".to_string(),
344                session_token: Some("runtime-session".to_string()),
345            }
346        );
347    }
348
349    #[test]
350    fn parses_legacy_http_url_secret() {
351        let config =
352            ObjectStoreClientConfig::from_secret_value("https://s3.internal/runtime").unwrap();
353
354        assert_eq!(config.endpoint_url.as_deref(), Some("https://s3.internal"));
355        assert_eq!(config.bucket, "runtime");
356        assert_eq!(config.region, DEFAULT_S3_REGION);
357        assert_eq!(config.credentials, ObjectStoreCredentials::Environment);
358    }
359
360    #[test]
361    fn rejects_legacy_url_secret_for_structured_only_parser() {
362        let error =
363            ObjectStoreClientConfig::from_structured_secret_value("https://s3.internal/runtime")
364                .unwrap_err();
365
366        assert_eq!(
367            error,
368            ObjectStoreClientConfigError::UnsupportedStructuredSecretFormat
369        );
370    }
371
372    #[test]
373    fn rejects_partial_static_credentials() {
374        let error = ObjectStoreClientConfig::from_secret_value(
375            r#"
376bucket = "runtime"
377access_key_id = "runtime-access"
378"#,
379        )
380        .unwrap_err();
381
382        assert_eq!(error, ObjectStoreClientConfigError::MissingSecretAccessKey);
383    }
384}