1use std::fmt;
2use std::path::{Path, PathBuf};
3
4use thiserror::Error;
5
6use crate::{StorageBackendKind, StoragePlan, StorageTopology, WriteTarget};
7
8mod config;
9mod local;
10mod object_store;
11
12#[cfg(test)]
13mod tests;
14
15pub use config::{ObjectStoreClientConfig, ObjectStoreClientConfigError, ObjectStoreCredentials};
16use local::LocalDiskStorageClient;
17pub use object_store::{S3CompatibleObjectStoreClient, SignedObjectUrl};
18
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub enum StorageDeliveryLocation {
21 PublicCdn {
22 public_url: String,
23 object_key: String,
24 },
25 SignedObject {
26 object_key: String,
27 signed_url: String,
28 expires_at_unix_seconds: u64,
29 },
30 AppProxy {
31 path: String,
32 },
33 LocalPath {
34 path: PathBuf,
35 },
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct StorageWriteReceipt {
40 pub target: WriteTarget,
41 pub path: PathBuf,
42 pub bytes_written: u64,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct StorageReadReceipt {
47 pub target: WriteTarget,
48 pub path: PathBuf,
49 pub bytes: Vec<u8>,
50 pub bytes_read: u64,
51}
52
53pub trait ObjectStoreClient: fmt::Debug + Send + Sync {
54 fn put(
55 &self,
56 object_key: &str,
57 bytes: &[u8],
58 content_type: Option<&str>,
59 ) -> Result<PathBuf, StorageExecutionError>;
60 fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError>;
61 fn signed_get_url(&self, object_key: &str) -> Result<SignedObjectUrl, StorageExecutionError>;
62}
63
64impl ObjectStoreClient for S3CompatibleObjectStoreClient {
65 fn put(
66 &self,
67 object_key: &str,
68 bytes: &[u8],
69 content_type: Option<&str>,
70 ) -> Result<PathBuf, StorageExecutionError> {
71 self.put(object_key, bytes, content_type)
72 }
73
74 fn get(&self, object_key: &str) -> Result<(PathBuf, Vec<u8>), StorageExecutionError> {
75 self.get(object_key)
76 }
77
78 fn signed_get_url(&self, object_key: &str) -> Result<SignedObjectUrl, StorageExecutionError> {
79 self.signed_get_url(object_key)
80 }
81}
82
83#[derive(Debug, Clone)]
84pub struct StorageExecutor {
85 local_client: LocalDiskStorageClient,
86 object_store_required: bool,
87 object_store: Option<S3CompatibleObjectStoreClient>,
88}
89
90impl StorageExecutor {
91 pub fn from_topology(topology: &StorageTopology) -> Self {
92 Self::from_topology_and_object_store(topology, None)
93 }
94
95 pub fn from_topology_and_object_store(
96 topology: &StorageTopology,
97 object_store: Option<ObjectStoreClientConfig>,
98 ) -> Self {
99 let local_root = PathBuf::from(&topology.local_root);
100 let object_store_required = topology.object_store.is_some();
101 let object_store = topology
102 .object_store
103 .as_ref()
104 .and_then(|_| object_store.map(S3CompatibleObjectStoreClient::new));
105
106 Self {
107 local_client: LocalDiskStorageClient::new(local_root),
108 object_store_required,
109 object_store,
110 }
111 }
112
113 pub fn execute_write(
114 &self,
115 plan: &StoragePlan,
116 bytes: impl AsRef<[u8]>,
117 ) -> Result<StorageWriteReceipt, StorageExecutionError> {
118 self.execute_write_with_content_type(plan, bytes, None)
119 }
120
121 pub fn execute_write_with_content_type(
122 &self,
123 plan: &StoragePlan,
124 bytes: impl AsRef<[u8]>,
125 content_type: Option<&str>,
126 ) -> Result<StorageWriteReceipt, StorageExecutionError> {
127 let bytes = bytes.as_ref();
128 let target = plan.primary_write_target().cloned().ok_or_else(|| {
129 StorageExecutionError::MissingPrimaryWriteTarget {
130 logical_path: plan.logical_path.clone(),
131 }
132 })?;
133
134 let path = match target.backend {
135 StorageBackendKind::LocalDisk => {
136 let path = plan.local_path.as_ref().ok_or_else(|| {
137 StorageExecutionError::MissingLocalPath {
138 logical_path: plan.logical_path.clone(),
139 }
140 })?;
141 self.local_client.write(Path::new(path), bytes)?
142 }
143 StorageBackendKind::S3Compatible => {
144 let object_key = plan.object_key.as_deref().ok_or_else(|| {
145 StorageExecutionError::MissingObjectKey {
146 logical_path: plan.logical_path.clone(),
147 }
148 })?;
149 self.object_store_client(&plan.logical_path)?.put(
150 object_key,
151 bytes,
152 content_type,
153 )?
154 }
155 };
156
157 Ok(StorageWriteReceipt {
158 target,
159 path,
160 bytes_written: bytes.len() as u64,
161 })
162 }
163
164 pub fn execute_read(
165 &self,
166 plan: &StoragePlan,
167 ) -> Result<StorageReadReceipt, StorageExecutionError> {
168 let target = plan.primary_write_target().cloned().ok_or_else(|| {
169 StorageExecutionError::MissingPrimaryWriteTarget {
170 logical_path: plan.logical_path.clone(),
171 }
172 })?;
173
174 let (path, bytes) = match target.backend {
175 StorageBackendKind::LocalDisk => {
176 let path = plan.local_path.as_ref().ok_or_else(|| {
177 StorageExecutionError::MissingLocalPath {
178 logical_path: plan.logical_path.clone(),
179 }
180 })?;
181 self.local_client.read(Path::new(path))?
182 }
183 StorageBackendKind::S3Compatible => {
184 let object_key = plan.object_key.as_deref().ok_or_else(|| {
185 StorageExecutionError::MissingObjectKey {
186 logical_path: plan.logical_path.clone(),
187 }
188 })?;
189 self.object_store_client(&plan.logical_path)?
190 .get(object_key)?
191 }
192 };
193
194 Ok(StorageReadReceipt {
195 target,
196 path,
197 bytes_read: bytes.len() as u64,
198 bytes,
199 })
200 }
201
202 pub fn delivery_location(
203 &self,
204 plan: &StoragePlan,
205 cdn_base_url: Option<&str>,
206 ) -> Result<StorageDeliveryLocation, StorageExecutionError> {
207 match plan.policy.delivery_mode {
208 crate::DeliveryMode::PublicCdn => {
209 let object_key = plan.object_key.as_deref().ok_or_else(|| {
210 StorageExecutionError::MissingObjectKey {
211 logical_path: plan.logical_path.clone(),
212 }
213 })?;
214 let cdn_base_url =
215 cdn_base_url.ok_or_else(|| StorageExecutionError::MissingCdnBaseUrl {
216 logical_path: plan.logical_path.clone(),
217 })?;
218 Ok(StorageDeliveryLocation::PublicCdn {
219 public_url: join_base_url(cdn_base_url, object_key),
220 object_key: object_key.to_string(),
221 })
222 }
223 crate::DeliveryMode::SignedUrl => {
224 let object_key = plan.object_key.as_deref().ok_or_else(|| {
225 StorageExecutionError::MissingObjectKey {
226 logical_path: plan.logical_path.clone(),
227 }
228 })?;
229 let signed = self
230 .object_store_client(&plan.logical_path)?
231 .signed_get_url(object_key)?;
232 Ok(StorageDeliveryLocation::SignedObject {
233 object_key: signed.object_key,
234 signed_url: signed.signed_url,
235 expires_at_unix_seconds: signed.expires_at_unix_seconds,
236 })
237 }
238 crate::DeliveryMode::AppProxy => Ok(StorageDeliveryLocation::AppProxy {
239 path: plan.logical_path.clone(),
240 }),
241 crate::DeliveryMode::LocalOnly => {
242 let path = plan
243 .local_path
244 .as_ref()
245 .ok_or_else(|| StorageExecutionError::MissingLocalPath {
246 logical_path: plan.logical_path.clone(),
247 })?
248 .clone();
249 Ok(StorageDeliveryLocation::LocalPath {
250 path: PathBuf::from(path),
251 })
252 }
253 }
254 }
255
256 fn object_store_client(
257 &self,
258 logical_path: &str,
259 ) -> Result<&dyn ObjectStoreClient, StorageExecutionError> {
260 self.object_store
261 .as_ref()
262 .map(|client| client as &dyn ObjectStoreClient)
263 .ok_or_else(|| {
264 if self.object_store_required {
265 StorageExecutionError::MissingObjectStoreConfiguration {
266 logical_path: logical_path.to_string(),
267 }
268 } else {
269 StorageExecutionError::MissingObjectStoreBackend {
270 logical_path: logical_path.to_string(),
271 }
272 }
273 })
274 }
275}
276
277fn join_base_url(base_url: &str, object_key: &str) -> String {
278 format!(
279 "{}/{}",
280 base_url.trim_end_matches('/'),
281 object_key.trim_start_matches('/')
282 )
283}
284
285#[derive(Debug, Clone, PartialEq, Eq, Error)]
286pub enum StorageExecutionError {
287 #[error("storage plan for `{logical_path}` does not provide a primary write target")]
288 MissingPrimaryWriteTarget { logical_path: String },
289 #[error("storage plan for `{logical_path}` requires a local path")]
290 MissingLocalPath { logical_path: String },
291 #[error("storage plan for `{logical_path}` requires an object key")]
292 MissingObjectKey { logical_path: String },
293 #[error("storage plan for `{logical_path}` requires an object-store backend")]
294 MissingObjectStoreBackend { logical_path: String },
295 #[error("storage plan for `{logical_path}` requires object-store client configuration")]
296 MissingObjectStoreConfiguration { logical_path: String },
297 #[error("storage plan for `{logical_path}` requires a configured object-store endpoint")]
298 MissingObjectStoreEndpoint { logical_path: String },
299 #[error("storage plan for `{logical_path}` requires `cdn_base_url` to resolve public delivery")]
300 MissingCdnBaseUrl { logical_path: String },
301 #[error("object-store configuration is invalid: {detail}")]
302 InvalidObjectStoreConfiguration { detail: String },
303 #[error("storage path `{path}` is outside the configured storage root")]
304 InvalidTargetPath { path: String },
305 #[error("failed to read storage path `{path}`: {message}")]
306 ReadFailed { path: String, message: String },
307 #[error("failed to generate a signed URL for `{object_key}`: {message}")]
308 SignedUrlGenerationFailed { object_key: String, message: String },
309 #[error("failed to write storage path `{path}`: {message}")]
310 WriteFailed { path: String, message: String },
311}