Skip to main content

coil_storage/
planner.rs

1use coil_config::{PlatformConfig, StorageClass};
2use thiserror::Error;
3
4use crate::policy::{join_local_path, join_relative, normalize_relative_path};
5use crate::{
6    DurableStore, StorageBackendKind, StoragePolicy, StoragePolicyError, StoragePolicyOverride,
7    StoragePolicySet, StorageTopology, SyncMode,
8};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub enum StorageUploadDisposition {
12    FollowHierarchy,
13    KeepLocal,
14}
15
16impl StorageUploadDisposition {
17    pub const fn keep_local() -> Self {
18        Self::KeepLocal
19    }
20}
21
22impl Default for StorageUploadDisposition {
23    fn default() -> Self {
24        Self::FollowHierarchy
25    }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct StoragePlanRequest {
30    pub logical_path: String,
31    pub storage_class: Option<StorageClass>,
32    pub override_policy: Option<StoragePolicyOverride>,
33    pub upload_disposition: StorageUploadDisposition,
34}
35
36impl StoragePlanRequest {
37    pub fn new(logical_path: impl Into<String>) -> Self {
38        Self {
39            logical_path: logical_path.into(),
40            storage_class: None,
41            override_policy: None,
42            upload_disposition: StorageUploadDisposition::default(),
43        }
44    }
45
46    pub fn with_storage_class(mut self, storage_class: StorageClass) -> Self {
47        self.storage_class = Some(storage_class);
48        self
49    }
50
51    pub fn with_override(mut self, override_policy: StoragePolicyOverride) -> Self {
52        self.override_policy = Some(override_policy);
53        self
54    }
55
56    pub fn with_local_only(mut self) -> Self {
57        self.upload_disposition = StorageUploadDisposition::KeepLocal;
58        self
59    }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum WriteTargetKind {
64    Primary,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct WriteTarget {
69    pub backend: StorageBackendKind,
70    pub locator: String,
71    pub kind: WriteTargetKind,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum StorageDeploymentScope {
76    Scalable,
77    SingleNodeOnly,
78}
79
80impl StorageDeploymentScope {
81    pub const fn requires_single_node(self) -> bool {
82        matches!(self, Self::SingleNodeOnly)
83    }
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub struct StoragePlan {
88    pub logical_path: String,
89    pub storage_class: StorageClass,
90    pub policy: StoragePolicy,
91    pub durable_store: DurableStore,
92    pub object_key: Option<String>,
93    pub local_path: Option<String>,
94    pub matched_rule_prefix: Option<String>,
95    pub write_targets: Vec<WriteTarget>,
96    pub deployment_scope: StorageDeploymentScope,
97}
98
99impl StoragePlan {
100    pub fn primary_write_target(&self) -> Option<&WriteTarget> {
101        self.write_targets
102            .iter()
103            .find(|target| target.kind == WriteTargetKind::Primary)
104    }
105
106    pub fn ensure_public_delivery_allowed(&self) -> Result<(), StoragePlanningError> {
107        if self.policy.is_public_delivery_eligible() {
108            Ok(())
109        } else {
110            Err(StoragePlanningError::PublicDeliveryNotEligible {
111                logical_path: self.logical_path.clone(),
112                policy: self.policy,
113            })
114        }
115    }
116
117    pub const fn public_delivery_eligible(&self) -> bool {
118        self.policy.is_public_delivery_eligible()
119    }
120
121    pub const fn requires_single_node(&self) -> bool {
122        self.deployment_scope.requires_single_node()
123    }
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127pub struct StoragePlanner {
128    topology: StorageTopology,
129    policies: StoragePolicySet,
130}
131
132impl StoragePlanner {
133    pub fn from_config(config: &PlatformConfig) -> Self {
134        Self {
135            topology: StorageTopology::from_config(config),
136            policies: StoragePolicySet::default(),
137        }
138    }
139
140    pub fn new(topology: StorageTopology, policies: StoragePolicySet) -> Self {
141        Self { topology, policies }
142    }
143
144    pub fn topology(&self) -> &StorageTopology {
145        &self.topology
146    }
147
148    pub fn policies(&self) -> &StoragePolicySet {
149        &self.policies
150    }
151
152    pub fn single_node_escape_hatch(&self) -> SingleNodeEscapeHatchPlanner {
153        SingleNodeEscapeHatchPlanner::new(self.topology.clone(), self.policies.clone())
154    }
155
156    pub fn plan_scalable_write(
157        &self,
158        request: StoragePlanRequest,
159    ) -> Result<StoragePlan, StoragePlanningError> {
160        let logical_path = normalize_relative_path(&request.logical_path)?;
161        let storage_class = request.storage_class.unwrap_or(self.topology.default_class);
162        let resolved = self.policies.resolve(
163            storage_class,
164            &logical_path,
165            request.override_policy.as_ref(),
166        )?;
167
168        self.plan_resolved_scalable_write(logical_path, resolved, request.upload_disposition)
169    }
170
171    pub fn plan_write(
172        &self,
173        request: StoragePlanRequest,
174    ) -> Result<StoragePlan, StoragePlanningError> {
175        self.plan_scalable_write(request)
176    }
177
178    fn plan_resolved_scalable_write(
179        &self,
180        logical_path: String,
181        resolved: crate::ResolvedStoragePolicy,
182        upload_disposition: StorageUploadDisposition,
183    ) -> Result<StoragePlan, StoragePlanningError> {
184        let mut policy = resolved.policy;
185        if matches!(upload_disposition, StorageUploadDisposition::KeepLocal) {
186            policy = StoragePolicyOverride::force_single_node_escape_hatch().apply_to(policy);
187        }
188        if policy.sync_mode == SyncMode::LocalOnly {
189            return Err(StoragePlanningError::SingleNodeEscapeHatchRequested {
190                logical_path,
191                policy,
192            });
193        }
194
195        let durable_store = policy.durable_store();
196        let object_key = Some(join_relative(
197            resolved.object_prefix.as_deref(),
198            &logical_path,
199        ));
200
201        if self.topology.object_store.is_none() {
202            return Err(StoragePlanningError::ObjectStoreRequired {
203                logical_path,
204                policy,
205            });
206        }
207
208        let write_targets = vec![WriteTarget {
209            backend: self
210                .topology
211                .object_store
212                .as_ref()
213                .expect("object store availability checked")
214                .backend_kind(),
215            locator: object_key
216                .clone()
217                .expect("object key is present for object store policies"),
218            kind: WriteTargetKind::Primary,
219        }];
220
221        Ok(StoragePlan {
222            logical_path,
223            storage_class: resolved.storage_class,
224            policy,
225            durable_store,
226            object_key,
227            local_path: None,
228            matched_rule_prefix: resolved.matched_rule_prefix,
229            write_targets,
230            deployment_scope: StorageDeploymentScope::Scalable,
231        })
232    }
233}
234
235#[derive(Debug, Error, PartialEq, Eq)]
236pub enum StoragePlanningError {
237    #[error(transparent)]
238    Policy(#[from] StoragePolicyError),
239    #[error(
240        "storage plan for `{logical_path}` is not eligible for public delivery with policy {policy:?}"
241    )]
242    PublicDeliveryNotEligible {
243        logical_path: String,
244        policy: StoragePolicy,
245    },
246    #[error(
247        "storage plan for `{logical_path}` with policy {policy:?} requires the single-node escape hatch and must use the explicit escape-hatch planner"
248    )]
249    SingleNodeEscapeHatchRequested {
250        logical_path: String,
251        policy: StoragePolicy,
252    },
253    #[error(
254        "storage plan for `{logical_path}` requires object storage but no object-store backend is configured for policy {policy:?}"
255    )]
256    ObjectStoreRequired {
257        logical_path: String,
258        policy: StoragePolicy,
259    },
260    #[error(
261        "storage plan for `{logical_path}` with policy {policy:?} is not allowed for deployment {deployment:?} because the single-node escape hatch is {single_node_escape_hatch:?}"
262    )]
263    SingleNodeEscapeHatchNotAllowedForDeployment {
264        logical_path: String,
265        policy: StoragePolicy,
266        deployment: coil_config::StorageDeployment,
267        single_node_escape_hatch: coil_config::SingleNodeStorageMode,
268    },
269}
270
271#[derive(Debug, Clone, PartialEq, Eq)]
272pub struct SingleNodeEscapeHatchPlanner {
273    topology: StorageTopology,
274    policies: StoragePolicySet,
275}
276
277impl SingleNodeEscapeHatchPlanner {
278    pub fn from_config(config: &PlatformConfig) -> Self {
279        Self {
280            topology: StorageTopology::from_config(config),
281            policies: StoragePolicySet::default(),
282        }
283    }
284
285    pub fn new(topology: StorageTopology, policies: StoragePolicySet) -> Self {
286        Self { topology, policies }
287    }
288
289    pub fn plan_write(
290        &self,
291        request: StoragePlanRequest,
292    ) -> Result<StoragePlan, StoragePlanningError> {
293        let logical_path = normalize_relative_path(&request.logical_path)?;
294        let storage_class = request.storage_class.unwrap_or(self.topology.default_class);
295        let resolved = self.policies.resolve(
296            storage_class,
297            &logical_path,
298            request.override_policy.as_ref(),
299        )?;
300
301        self.plan_resolved_single_node_escape_hatch_write(
302            logical_path,
303            resolved,
304            request.upload_disposition,
305        )
306    }
307
308    fn plan_resolved_single_node_escape_hatch_write(
309        &self,
310        logical_path: String,
311        resolved: crate::ResolvedStoragePolicy,
312        upload_disposition: StorageUploadDisposition,
313    ) -> Result<StoragePlan, StoragePlanningError> {
314        let mut policy = resolved.policy;
315        if matches!(upload_disposition, StorageUploadDisposition::KeepLocal) {
316            policy = StoragePolicyOverride::force_single_node_escape_hatch().apply_to(policy);
317        }
318        if policy.sync_mode != SyncMode::LocalOnly {
319            return StoragePlanner::new(self.topology.clone(), self.policies.clone())
320                .plan_resolved_scalable_write(logical_path, resolved, upload_disposition);
321        }
322
323        if !self.topology.allows_explicit_local_only() {
324            return Err(
325                StoragePlanningError::SingleNodeEscapeHatchNotAllowedForDeployment {
326                    logical_path,
327                    policy,
328                    deployment: self.topology.deployment,
329                    single_node_escape_hatch: self.topology.single_node_escape_hatch,
330                },
331            );
332        }
333
334        let local_path = Some(join_local_path(
335            &self.topology.local_root,
336            resolved.local_subdir.as_deref(),
337            &logical_path,
338        ));
339        let write_targets = vec![WriteTarget {
340            backend: StorageBackendKind::LocalDisk,
341            locator: local_path
342                .clone()
343                .expect("local path is present for local policies"),
344            kind: WriteTargetKind::Primary,
345        }];
346
347        Ok(StoragePlan {
348            logical_path,
349            storage_class: resolved.storage_class,
350            policy,
351            durable_store: policy.durable_store(),
352            object_key: None,
353            local_path,
354            matched_rule_prefix: resolved.matched_rule_prefix,
355            write_targets,
356            deployment_scope: StorageDeploymentScope::SingleNodeOnly,
357        })
358    }
359}