coil-runtime 0.1.1

HTTP runtime and request handling for the Coil framework.
Documentation
use super::*;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueuedReportExport {
    pub plan: ReportExportPlan,
    pub queued_job_id: JobId,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueuedBulkOperation {
    pub plan: BulkOperationPlan,
    pub queued_job_id: JobId,
}

#[derive(Debug, Clone)]
pub struct OpsHost {
    planner: OpsPlanner,
    jobs: JobsHost,
}

impl OpsHost {
    pub(crate) fn new(planner: OpsPlanner, jobs: JobsHost) -> Self {
        Self { planner, jobs }
    }

    pub fn planner(&self) -> &OpsPlanner {
        &self.planner
    }

    pub fn jobs(&self) -> &JobsHost {
        &self.jobs
    }

    pub fn jobs_mut(&mut self) -> &mut JobsHost {
        &mut self.jobs
    }

    pub fn queue_report_export(
        &mut self,
        request: ReportExportRequest,
    ) -> Result<QueuedReportExport, RuntimeOpsError> {
        let requested_at = request.requested_at;
        let plan = self.planner.plan_report_export(request)?;
        let queued_job_id = self.jobs.enqueue_spec(plan.job.clone(), requested_at)?;

        Ok(QueuedReportExport {
            plan,
            queued_job_id,
        })
    }

    pub fn queue_bulk_operation(
        &mut self,
        request: BulkOperationRequest,
    ) -> Result<QueuedBulkOperation, RuntimeOpsError> {
        let requested_at = request.requested_at;
        let plan = self.planner.plan_bulk_operation(request)?;
        let queued_job_id = self.jobs.enqueue_spec(plan.job.clone(), requested_at)?;

        Ok(QueuedBulkOperation {
            plan,
            queued_job_id,
        })
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SearchInvalidationPlan {
    pub trigger: SearchInvalidationTrigger,
    pub indexes: Vec<OpsSearchIndexContribution>,
}

#[derive(Debug, Error, PartialEq, Eq)]
pub enum RuntimeSearchError {
    #[error(transparent)]
    Ops(#[from] RuntimeOpsError),
    #[error(transparent)]
    Model(#[from] OpsModelError),
    #[error(transparent)]
    Jobs(#[from] JobsModelError),
    #[error("search host requires at least one configured index contribution")]
    EmptyCatalog,
}

#[derive(Debug, Clone)]
pub struct SearchHost {
    catalog: SearchCatalog,
    ops: OpsHost,
}

impl SearchHost {
    pub(crate) fn new(catalog: SearchCatalog, ops: OpsHost) -> Self {
        Self { catalog, ops }
    }

    pub fn catalog(&self) -> &SearchCatalog {
        &self.catalog
    }

    pub fn visible_to(
        &self,
        capabilities: &[coil_auth::Capability],
    ) -> Vec<&OpsSearchIndexContribution> {
        self.catalog.visible_to(capabilities)
    }

    pub fn indexes_for_trigger(
        &self,
        trigger: SearchInvalidationTrigger,
    ) -> Vec<&OpsSearchIndexContribution> {
        self.catalog
            .contributions
            .iter()
            .filter(|index| {
                index
                    .invalidation_rules
                    .iter()
                    .any(|rule| rule.trigger == trigger)
            })
            .collect()
    }

    pub fn invalidation_plan(&self, trigger: SearchInvalidationTrigger) -> SearchInvalidationPlan {
        SearchInvalidationPlan {
            trigger,
            indexes: self
                .indexes_for_trigger(trigger)
                .into_iter()
                .cloned()
                .collect(),
        }
    }

    pub fn scheduled_rebuilds(&self) -> Vec<&OpsSearchIndexContribution> {
        self.catalog
            .contributions
            .iter()
            .filter(|index| {
                matches!(
                    index.rebuild_strategy,
                    SearchRebuildStrategy::Scheduled { .. }
                )
            })
            .collect()
    }

    pub fn queue_full_reindex(
        &mut self,
        execution_id: BulkExecutionId,
        requested_by: impl Into<String>,
        requested_at: JobInstant,
        operator_capabilities: Vec<coil_auth::Capability>,
        dry_run: bool,
    ) -> Result<QueuedBulkOperation, RuntimeSearchError> {
        if self.catalog.contributions.is_empty() {
            return Err(RuntimeSearchError::EmptyCatalog);
        }

        let idempotency_key =
            IdempotencyKey::new(format!("search.reindex:{}", execution_id.as_str()))?;
        let mut request = BulkOperationRequest::new(
            execution_id,
            BulkOperationId::new("bulk.search.reindex")?,
            requested_by,
            requested_at,
            self.catalog.contributions.len(),
        )?;

        for capability in operator_capabilities {
            request = request.with_capability(capability);
        }

        request = request
            .with_idempotency_key(idempotency_key)
            .dry_run(dry_run);
        Ok(self.ops.queue_bulk_operation(request)?)
    }
}

#[derive(Debug, Error, PartialEq, Eq)]
pub enum RuntimeOpsError {
    #[error(transparent)]
    Ops(#[from] OpsModelError),
    #[error(transparent)]
    Jobs(#[from] RuntimeJobsError),
}