coil-storage 0.1.1

Object storage primitives for the Coil framework.
Documentation
use coil_config::{PlatformConfig, StorageClass};
use thiserror::Error;

use crate::policy::{join_local_path, join_relative, normalize_relative_path};
use crate::{
    DurableStore, StorageBackendKind, StoragePolicy, StoragePolicyError, StoragePolicyOverride,
    StoragePolicySet, StorageTopology, SyncMode,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum StorageUploadDisposition {
    FollowHierarchy,
    KeepLocal,
}

impl StorageUploadDisposition {
    pub const fn keep_local() -> Self {
        Self::KeepLocal
    }
}

impl Default for StorageUploadDisposition {
    fn default() -> Self {
        Self::FollowHierarchy
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StoragePlanRequest {
    pub logical_path: String,
    pub storage_class: Option<StorageClass>,
    pub override_policy: Option<StoragePolicyOverride>,
    pub upload_disposition: StorageUploadDisposition,
}

impl StoragePlanRequest {
    pub fn new(logical_path: impl Into<String>) -> Self {
        Self {
            logical_path: logical_path.into(),
            storage_class: None,
            override_policy: None,
            upload_disposition: StorageUploadDisposition::default(),
        }
    }

    pub fn with_storage_class(mut self, storage_class: StorageClass) -> Self {
        self.storage_class = Some(storage_class);
        self
    }

    pub fn with_override(mut self, override_policy: StoragePolicyOverride) -> Self {
        self.override_policy = Some(override_policy);
        self
    }

    pub fn with_local_only(mut self) -> Self {
        self.upload_disposition = StorageUploadDisposition::KeepLocal;
        self
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteTargetKind {
    Primary,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WriteTarget {
    pub backend: StorageBackendKind,
    pub locator: String,
    pub kind: WriteTargetKind,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageDeploymentScope {
    Scalable,
    SingleNodeOnly,
}

impl StorageDeploymentScope {
    pub const fn requires_single_node(self) -> bool {
        matches!(self, Self::SingleNodeOnly)
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StoragePlan {
    pub logical_path: String,
    pub storage_class: StorageClass,
    pub policy: StoragePolicy,
    pub durable_store: DurableStore,
    pub object_key: Option<String>,
    pub local_path: Option<String>,
    pub matched_rule_prefix: Option<String>,
    pub write_targets: Vec<WriteTarget>,
    pub deployment_scope: StorageDeploymentScope,
}

impl StoragePlan {
    pub fn primary_write_target(&self) -> Option<&WriteTarget> {
        self.write_targets
            .iter()
            .find(|target| target.kind == WriteTargetKind::Primary)
    }

    pub fn ensure_public_delivery_allowed(&self) -> Result<(), StoragePlanningError> {
        if self.policy.is_public_delivery_eligible() {
            Ok(())
        } else {
            Err(StoragePlanningError::PublicDeliveryNotEligible {
                logical_path: self.logical_path.clone(),
                policy: self.policy,
            })
        }
    }

    pub const fn public_delivery_eligible(&self) -> bool {
        self.policy.is_public_delivery_eligible()
    }

    pub const fn requires_single_node(&self) -> bool {
        self.deployment_scope.requires_single_node()
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StoragePlanner {
    topology: StorageTopology,
    policies: StoragePolicySet,
}

impl StoragePlanner {
    pub fn from_config(config: &PlatformConfig) -> Self {
        Self {
            topology: StorageTopology::from_config(config),
            policies: StoragePolicySet::default(),
        }
    }

    pub fn new(topology: StorageTopology, policies: StoragePolicySet) -> Self {
        Self { topology, policies }
    }

    pub fn topology(&self) -> &StorageTopology {
        &self.topology
    }

    pub fn policies(&self) -> &StoragePolicySet {
        &self.policies
    }

    pub fn single_node_escape_hatch(&self) -> SingleNodeEscapeHatchPlanner {
        SingleNodeEscapeHatchPlanner::new(self.topology.clone(), self.policies.clone())
    }

    pub fn plan_scalable_write(
        &self,
        request: StoragePlanRequest,
    ) -> Result<StoragePlan, StoragePlanningError> {
        let logical_path = normalize_relative_path(&request.logical_path)?;
        let storage_class = request.storage_class.unwrap_or(self.topology.default_class);
        let resolved = self.policies.resolve(
            storage_class,
            &logical_path,
            request.override_policy.as_ref(),
        )?;

        self.plan_resolved_scalable_write(logical_path, resolved, request.upload_disposition)
    }

    pub fn plan_write(
        &self,
        request: StoragePlanRequest,
    ) -> Result<StoragePlan, StoragePlanningError> {
        self.plan_scalable_write(request)
    }

    fn plan_resolved_scalable_write(
        &self,
        logical_path: String,
        resolved: crate::ResolvedStoragePolicy,
        upload_disposition: StorageUploadDisposition,
    ) -> Result<StoragePlan, StoragePlanningError> {
        let mut policy = resolved.policy;
        if matches!(upload_disposition, StorageUploadDisposition::KeepLocal) {
            policy = StoragePolicyOverride::force_single_node_escape_hatch().apply_to(policy);
        }
        if policy.sync_mode == SyncMode::LocalOnly {
            return Err(StoragePlanningError::SingleNodeEscapeHatchRequested {
                logical_path,
                policy,
            });
        }

        let durable_store = policy.durable_store();
        let object_key = Some(join_relative(
            resolved.object_prefix.as_deref(),
            &logical_path,
        ));

        if self.topology.object_store.is_none() {
            return Err(StoragePlanningError::ObjectStoreRequired {
                logical_path,
                policy,
            });
        }

        let write_targets = vec![WriteTarget {
            backend: self
                .topology
                .object_store
                .as_ref()
                .expect("object store availability checked")
                .backend_kind(),
            locator: object_key
                .clone()
                .expect("object key is present for object store policies"),
            kind: WriteTargetKind::Primary,
        }];

        Ok(StoragePlan {
            logical_path,
            storage_class: resolved.storage_class,
            policy,
            durable_store,
            object_key,
            local_path: None,
            matched_rule_prefix: resolved.matched_rule_prefix,
            write_targets,
            deployment_scope: StorageDeploymentScope::Scalable,
        })
    }
}

#[derive(Debug, Error, PartialEq, Eq)]
pub enum StoragePlanningError {
    #[error(transparent)]
    Policy(#[from] StoragePolicyError),
    #[error(
        "storage plan for `{logical_path}` is not eligible for public delivery with policy {policy:?}"
    )]
    PublicDeliveryNotEligible {
        logical_path: String,
        policy: StoragePolicy,
    },
    #[error(
        "storage plan for `{logical_path}` with policy {policy:?} requires the single-node escape hatch and must use the explicit escape-hatch planner"
    )]
    SingleNodeEscapeHatchRequested {
        logical_path: String,
        policy: StoragePolicy,
    },
    #[error(
        "storage plan for `{logical_path}` requires object storage but no object-store backend is configured for policy {policy:?}"
    )]
    ObjectStoreRequired {
        logical_path: String,
        policy: StoragePolicy,
    },
    #[error(
        "storage plan for `{logical_path}` with policy {policy:?} is not allowed for deployment {deployment:?} because the single-node escape hatch is {single_node_escape_hatch:?}"
    )]
    SingleNodeEscapeHatchNotAllowedForDeployment {
        logical_path: String,
        policy: StoragePolicy,
        deployment: coil_config::StorageDeployment,
        single_node_escape_hatch: coil_config::SingleNodeStorageMode,
    },
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SingleNodeEscapeHatchPlanner {
    topology: StorageTopology,
    policies: StoragePolicySet,
}

impl SingleNodeEscapeHatchPlanner {
    pub fn from_config(config: &PlatformConfig) -> Self {
        Self {
            topology: StorageTopology::from_config(config),
            policies: StoragePolicySet::default(),
        }
    }

    pub fn new(topology: StorageTopology, policies: StoragePolicySet) -> Self {
        Self { topology, policies }
    }

    pub fn plan_write(
        &self,
        request: StoragePlanRequest,
    ) -> Result<StoragePlan, StoragePlanningError> {
        let logical_path = normalize_relative_path(&request.logical_path)?;
        let storage_class = request.storage_class.unwrap_or(self.topology.default_class);
        let resolved = self.policies.resolve(
            storage_class,
            &logical_path,
            request.override_policy.as_ref(),
        )?;

        self.plan_resolved_single_node_escape_hatch_write(
            logical_path,
            resolved,
            request.upload_disposition,
        )
    }

    fn plan_resolved_single_node_escape_hatch_write(
        &self,
        logical_path: String,
        resolved: crate::ResolvedStoragePolicy,
        upload_disposition: StorageUploadDisposition,
    ) -> Result<StoragePlan, StoragePlanningError> {
        let mut policy = resolved.policy;
        if matches!(upload_disposition, StorageUploadDisposition::KeepLocal) {
            policy = StoragePolicyOverride::force_single_node_escape_hatch().apply_to(policy);
        }
        if policy.sync_mode != SyncMode::LocalOnly {
            return StoragePlanner::new(self.topology.clone(), self.policies.clone())
                .plan_resolved_scalable_write(logical_path, resolved, upload_disposition);
        }

        if !self.topology.allows_explicit_local_only() {
            return Err(
                StoragePlanningError::SingleNodeEscapeHatchNotAllowedForDeployment {
                    logical_path,
                    policy,
                    deployment: self.topology.deployment,
                    single_node_escape_hatch: self.topology.single_node_escape_hatch,
                },
            );
        }

        let local_path = Some(join_local_path(
            &self.topology.local_root,
            resolved.local_subdir.as_deref(),
            &logical_path,
        ));
        let write_targets = vec![WriteTarget {
            backend: StorageBackendKind::LocalDisk,
            locator: local_path
                .clone()
                .expect("local path is present for local policies"),
            kind: WriteTargetKind::Primary,
        }];

        Ok(StoragePlan {
            logical_path,
            storage_class: resolved.storage_class,
            policy,
            durable_store: policy.durable_store(),
            object_key: None,
            local_path,
            matched_rule_prefix: resolved.matched_rule_prefix,
            write_targets,
            deployment_scope: StorageDeploymentScope::SingleNodeOnly,
        })
    }
}