Skip to main content

coil_storage/execution/
mod.rs

1use std::fmt;
2use std::path::{Path, PathBuf};
3
4use thiserror::Error;
5
6use crate::{StorageBackendKind, StoragePlan, StorageTopology, WriteTarget};
7
8mod config;
9mod local;
10mod object_store;
11
12#[cfg(test)]
13mod tests;
14
15pub use config::{ObjectStoreClientConfig, ObjectStoreClientConfigError, ObjectStoreCredentials};
16use local::LocalDiskStorageClient;
17pub use object_store::{S3CompatibleObjectStoreClient, SignedObjectUrl};
18
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum StorageDeliveryLocation {
21    PublicCdn {
22        public_url: String,
23        object_key: String,
24    },
25    SignedObject {
26        object_key: String,
27        signed_url: String,
28        expires_at_unix_seconds: u64,
29    },
30    AppProxy {
31        path: String,
32    },
33    LocalPath {
34        path: PathBuf,
35    },
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct StorageWriteReceipt {
40    pub target: WriteTarget,
41    pub path: PathBuf,
42    pub bytes_written: u64,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct StorageReadReceipt {
47    pub target: WriteTarget,
48    pub path: PathBuf,
49    pub bytes: Vec<u8>,
50    pub bytes_read: u64,
51}
52
53pub trait ObjectStoreClient: fmt::Debug + Send + Sync {
54    fn put(
55        &self,
56        object_key: &str,
57        bytes: &[u8],
58        content_type: Option<&str>,
59    ) -> Result<PathBuf, StorageExecutionError>;
60    fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError>;
61    fn signed_get_url(&self, object_key: &str) -> Result<SignedObjectUrl, StorageExecutionError>;
62}
63
64impl ObjectStoreClient for S3CompatibleObjectStoreClient {
65    fn put(
66        &self,
67        object_key: &str,
68        bytes: &[u8],
69        content_type: Option<&str>,
70    ) -> Result<PathBuf, StorageExecutionError> {
71        self.put(object_key, bytes, content_type)
72    }
73
74    fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError> {
75        self.get(object_key)
76    }
77
78    fn signed_get_url(&self, object_key: &str) -> Result<SignedObjectUrl, StorageExecutionError> {
79        self.signed_get_url(object_key)
80    }
81}
82
83#[derive(Debug, Clone)]
84pub struct StorageExecutor {
85    local_client: LocalDiskStorageClient,
86    object_store_required: bool,
87    object_store: Option<S3CompatibleObjectStoreClient>,
88}
89
90impl StorageExecutor {
91    pub fn from_topology(topology: &StorageTopology) -> Self {
92        Self::from_topology_and_object_store(topology, None)
93    }
94
95    pub fn from_topology_and_object_store(
96        topology: &StorageTopology,
97        object_store: Option<ObjectStoreClientConfig>,
98    ) -> Self {
99        let local_root = PathBuf::from(&topology.local_root);
100        let object_store_required = topology.object_store.is_some();
101        let object_store = topology
102            .object_store
103            .as_ref()
104            .and_then(|_| object_store.map(S3CompatibleObjectStoreClient::new));
105
106        Self {
107            local_client: LocalDiskStorageClient::new(local_root),
108            object_store_required,
109            object_store,
110        }
111    }
112
113    pub fn execute_write(
114        &self,
115        plan: &StoragePlan,
116        bytes: impl AsRef<[u8]>,
117    ) -> Result<StorageWriteReceipt, StorageExecutionError> {
118        self.execute_write_with_content_type(plan, bytes, None)
119    }
120
121    pub fn execute_write_with_content_type(
122        &self,
123        plan: &StoragePlan,
124        bytes: impl AsRef<[u8]>,
125        content_type: Option<&str>,
126    ) -> Result<StorageWriteReceipt, StorageExecutionError> {
127        let bytes = bytes.as_ref();
128        let target = plan.primary_write_target().cloned().ok_or_else(|| {
129            StorageExecutionError::MissingPrimaryWriteTarget {
130                logical_path: plan.logical_path.clone(),
131            }
132        })?;
133
134        let path = match target.backend {
135            StorageBackendKind::LocalDisk => {
136                let path = plan.local_path.as_ref().ok_or_else(|| {
137                    StorageExecutionError::MissingLocalPath {
138                        logical_path: plan.logical_path.clone(),
139                    }
140                })?;
141                self.local_client.write(Path::new(path), bytes)?
142            }
143            StorageBackendKind::S3Compatible => {
144                let object_key = plan.object_key.as_deref().ok_or_else(|| {
145                    StorageExecutionError::MissingObjectKey {
146                        logical_path: plan.logical_path.clone(),
147                    }
148                })?;
149                self.object_store_client(&plan.logical_path)?.put(
150                    object_key,
151                    bytes,
152                    content_type,
153                )?
154            }
155        };
156
157        Ok(StorageWriteReceipt {
158            target,
159            path,
160            bytes_written: bytes.len() as u64,
161        })
162    }
163
164    pub fn execute_read(
165        &self,
166        plan: &StoragePlan,
167    ) -> Result<StorageReadReceipt, StorageExecutionError> {
168        let target = plan.primary_write_target().cloned().ok_or_else(|| {
169            StorageExecutionError::MissingPrimaryWriteTarget {
170                logical_path: plan.logical_path.clone(),
171            }
172        })?;
173
174        let (path, bytes) = match target.backend {
175            StorageBackendKind::LocalDisk => {
176                let path = plan.local_path.as_ref().ok_or_else(|| {
177                    StorageExecutionError::MissingLocalPath {
178                        logical_path: plan.logical_path.clone(),
179                    }
180                })?;
181                self.local_client.read(Path::new(path))?
182            }
183            StorageBackendKind::S3Compatible => {
184                let object_key = plan.object_key.as_deref().ok_or_else(|| {
185                    StorageExecutionError::MissingObjectKey {
186                        logical_path: plan.logical_path.clone(),
187                    }
188                })?;
189                self.object_store_client(&plan.logical_path)?
190                    .get(object_key)?
191            }
192        };
193
194        Ok(StorageReadReceipt {
195            target,
196            path,
197            bytes_read: bytes.len() as u64,
198            bytes,
199        })
200    }
201
202    pub fn delivery_location(
203        &self,
204        plan: &StoragePlan,
205        cdn_base_url: Option<&str>,
206    ) -> Result<StorageDeliveryLocation, StorageExecutionError> {
207        match plan.policy.delivery_mode {
208            crate::DeliveryMode::PublicCdn => {
209                let object_key = plan.object_key.as_deref().ok_or_else(|| {
210                    StorageExecutionError::MissingObjectKey {
211                        logical_path: plan.logical_path.clone(),
212                    }
213                })?;
214                let cdn_base_url =
215                    cdn_base_url.ok_or_else(|| StorageExecutionError::MissingCdnBaseUrl {
216                        logical_path: plan.logical_path.clone(),
217                    })?;
218                Ok(StorageDeliveryLocation::PublicCdn {
219                    public_url: join_base_url(cdn_base_url, object_key),
220                    object_key: object_key.to_string(),
221                })
222            }
223            crate::DeliveryMode::SignedUrl => {
224                let object_key = plan.object_key.as_deref().ok_or_else(|| {
225                    StorageExecutionError::MissingObjectKey {
226                        logical_path: plan.logical_path.clone(),
227                    }
228                })?;
229                let signed = self
230                    .object_store_client(&plan.logical_path)?
231                    .signed_get_url(object_key)?;
232                Ok(StorageDeliveryLocation::SignedObject {
233                    object_key: signed.object_key,
234                    signed_url: signed.signed_url,
235                    expires_at_unix_seconds: signed.expires_at_unix_seconds,
236                })
237            }
238            crate::DeliveryMode::AppProxy => Ok(StorageDeliveryLocation::AppProxy {
239                path: plan.logical_path.clone(),
240            }),
241            crate::DeliveryMode::LocalOnly => {
242                let path = plan
243                    .local_path
244                    .as_ref()
245                    .ok_or_else(|| StorageExecutionError::MissingLocalPath {
246                        logical_path: plan.logical_path.clone(),
247                    })?
248                    .clone();
249                Ok(StorageDeliveryLocation::LocalPath {
250                    path: PathBuf::from(path),
251                })
252            }
253        }
254    }
255
256    fn object_store_client(
257        &self,
258        logical_path: &str,
259    ) -> Result<&dyn ObjectStoreClient, StorageExecutionError> {
260        self.object_store
261            .as_ref()
262            .map(|client| client as &dyn ObjectStoreClient)
263            .ok_or_else(|| {
264                if self.object_store_required {
265                    StorageExecutionError::MissingObjectStoreConfiguration {
266                        logical_path: logical_path.to_string(),
267                    }
268                } else {
269                    StorageExecutionError::MissingObjectStoreBackend {
270                        logical_path: logical_path.to_string(),
271                    }
272                }
273            })
274    }
275}
276
277fn join_base_url(base_url: &str, object_key: &str) -> String {
278    format!(
279        "{}/{}",
280        base_url.trim_end_matches('/'),
281        object_key.trim_start_matches('/')
282    )
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Error)]
286pub enum StorageExecutionError {
287    #[error("storage plan for `{logical_path}` does not provide a primary write target")]
288    MissingPrimaryWriteTarget { logical_path: String },
289    #[error("storage plan for `{logical_path}` requires a local path")]
290    MissingLocalPath { logical_path: String },
291    #[error("storage plan for `{logical_path}` requires an object key")]
292    MissingObjectKey { logical_path: String },
293    #[error("storage plan for `{logical_path}` requires an object-store backend")]
294    MissingObjectStoreBackend { logical_path: String },
295    #[error("storage plan for `{logical_path}` requires object-store client configuration")]
296    MissingObjectStoreConfiguration { logical_path: String },
297    #[error("storage plan for `{logical_path}` requires a configured object-store endpoint")]
298    MissingObjectStoreEndpoint { logical_path: String },
299    #[error("storage plan for `{logical_path}` requires `cdn_base_url` to resolve public delivery")]
300    MissingCdnBaseUrl { logical_path: String },
301    #[error("object-store configuration is invalid: {detail}")]
302    InvalidObjectStoreConfiguration { detail: String },
303    #[error("storage path `{path}` is outside the configured storage root")]
304    InvalidTargetPath { path: String },
305    #[error("failed to read storage path `{path}`: {message}")]
306    ReadFailed { path: String, message: String },
307    #[error("failed to generate a signed URL for `{object_key}`: {message}")]
308    SignedUrlGenerationFailed { object_key: String, message: String },
309    #[error("failed to write storage path `{path}`: {message}")]
310    WriteFailed { path: String, message: String },
311}