use crate::bulk::{BulkOperationPlan, BulkOperationRequest};
use crate::catalog::OpsCatalog;
use crate::error::OpsModelError;
use crate::recovery::{RecoveryPlan, RecoveryPlanRequest, RecoveryStage};
use crate::reports::{ReportExportPlan, ReportExportRequest};
use coil_jobs::{JobId, JobName, JobSpec, JobsPlanner, JobsRuntime, RetryPolicy};
use std::time::Duration;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OpsPlanner {
jobs: JobsPlanner,
catalog: OpsCatalog,
}
impl OpsPlanner {
pub fn new(runtime: JobsRuntime, catalog: OpsCatalog) -> Result<Self, OpsModelError> {
catalog.validate()?;
Ok(Self {
jobs: runtime.planner(),
catalog,
})
}
pub fn jobs_planner(&self) -> &JobsPlanner {
&self.jobs
}
pub fn catalog(&self) -> &OpsCatalog {
&self.catalog
}
pub fn plan_report_export(
&self,
request: ReportExportRequest,
) -> Result<ReportExportPlan, OpsModelError> {
let definition = self
.catalog
.reports
.definition(&request.report_id)
.ok_or_else(|| OpsModelError::DuplicateIdentifier {
kind: "report",
id: request.report_id.to_string(),
})?;
if !definition.allows(&request.operator_capabilities) {
return Err(OpsModelError::MissingCapability {
operation: "report export",
required: definition.required_capability,
});
}
let queue_topology = self.jobs.describe_queue_topology();
let queue = if request.scheduled_for.is_some() {
queue_topology.scheduled_queue.clone()
} else {
queue_topology.work_queue.clone()
};
let job_id = JobId::new(request.export_id.as_str().to_string())?;
let job_name = JobName::new(format!("report.export.{}", definition.id.as_str()))?;
let mut retry_policy = definition.retry_policy.clone();
retry_policy =
retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
let mut spec = JobSpec::new(
job_id,
job_name,
queue,
format!(
"report export for `{}` in format `{}`",
definition.id, definition.format
),
)?;
if let Some(scheduled_for) = request.scheduled_for {
spec = spec.scheduled_for(scheduled_for);
}
spec = spec.with_retry_policy(retry_policy);
if let Some(key) = request.idempotency_key.clone() {
spec = spec.with_idempotency_key(key);
}
let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
Ok(ReportExportPlan {
definition: definition.clone(),
job: spec,
planned_job,
output_object_key: format!(
"{}/{}.{}",
definition.export_prefix,
request.export_id,
definition.format.extension()
),
parameters: request.parameters,
})
}
pub fn plan_bulk_operation(
&self,
request: BulkOperationRequest,
) -> Result<BulkOperationPlan, OpsModelError> {
let definition = self
.catalog
.bulk
.definition(&request.definition_id)
.ok_or_else(|| OpsModelError::DuplicateIdentifier {
kind: "bulk operation",
id: request.definition_id.to_string(),
})?;
if !definition.allows(&request.operator_capabilities) {
return Err(OpsModelError::MissingCapability {
operation: "bulk operation",
required: definition.required_capability,
});
}
if request.target_count == 0 {
return Err(OpsModelError::InvalidItemCount {
operation: "bulk operation",
count: request.target_count,
});
}
if let Some(max_items) = definition.max_items {
if request.target_count > max_items {
return Err(OpsModelError::InvalidItemCount {
operation: "bulk operation",
count: request.target_count,
});
}
}
if definition.requires_idempotency_key && request.idempotency_key.is_none() {
return Err(OpsModelError::InvalidBulkOperation {
operation_id: definition.id.to_string(),
reason: "idempotency key is required for retry-safe execution".to_string(),
});
}
let queue_topology = self.jobs.describe_queue_topology();
let queue = if request.scheduled_for.is_some() {
queue_topology.scheduled_queue.clone()
} else {
queue_topology.work_queue.clone()
};
let job_id = JobId::new(request.execution_id.as_str().to_string())?;
let job_name = JobName::new(format!("bulk.{}", definition.id.as_str()))?;
let mut retry_policy = definition.retry_policy.clone();
retry_policy =
retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
let mut spec = JobSpec::new(
job_id,
job_name,
queue,
format!(
"bulk `{}` on `{}` items",
definition.kind, request.target_count
),
)?;
if let Some(scheduled_for) = request.scheduled_for {
spec = spec.scheduled_for(scheduled_for);
}
spec = spec.with_retry_policy(retry_policy);
if let Some(key) = request.idempotency_key.clone() {
spec = spec.with_idempotency_key(key);
}
let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
Ok(BulkOperationPlan {
definition: definition.clone(),
job: spec,
planned_job,
dry_run: request.dry_run,
target_count: request.target_count,
audit_message: format!(
"bulk `{}` requested by `{}` for `{}` items",
definition.id, request.requested_by, request.target_count
),
})
}
pub fn plan_recovery_workflow(
&self,
request: RecoveryPlanRequest,
) -> Result<RecoveryPlan, OpsModelError> {
let definition = self
.catalog
.recovery
.definition(&request.definition_id)
.ok_or_else(|| OpsModelError::DuplicateIdentifier {
kind: "recovery workflow",
id: request.definition_id.to_string(),
})?;
if !definition.allows(&request.operator_capabilities) {
return Err(OpsModelError::MissingCapability {
operation: "recovery workflow",
required: definition.required_capability,
});
}
if definition.requires_idempotency_key && request.idempotency_key.is_none() {
return Err(OpsModelError::InvalidRecoveryWorkflow {
workflow_id: definition.id.to_string(),
reason: "idempotency key is required for retry-safe recovery execution".to_string(),
});
}
if definition.requires_local_only_sensitive_ack
&& request.local_only_sensitive_present
&& !request.local_only_sensitive_acknowledged
{
return Err(OpsModelError::MissingOperatorAcknowledgement {
workflow_id: definition.id.to_string(),
requirement:
"local_only_sensitive restore requires explicit operator acknowledgement"
.to_string(),
});
}
let queue_topology = self.jobs.describe_queue_topology();
let queue = if request.scheduled_for.is_some() {
queue_topology.scheduled_queue.clone()
} else {
queue_topology.work_queue.clone()
};
let job_id = JobId::new(request.execution_id.as_str().to_string())?;
let job_name = JobName::new(format!("recovery.{}", definition.id.as_str()))?;
let mut retry_policy = definition.retry_policy.clone();
retry_policy =
retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
let mut spec = JobSpec::new(
job_id,
job_name,
queue,
format!(
"recovery `{}` for customer app `{}`",
definition.id, request.customer_app_id
),
)?;
if let Some(scheduled_for) = request.scheduled_for {
spec = spec.scheduled_for(scheduled_for);
}
spec = spec.with_retry_policy(retry_policy);
if let Some(key) = request.idempotency_key.clone() {
spec = spec.with_idempotency_key(key);
}
let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
let stages = planned_recovery_stages(
&definition.default_stages,
request.local_only_sensitive_present,
);
Ok(RecoveryPlan {
definition: definition.clone(),
job: spec,
planned_job,
customer_app_id: request.customer_app_id.clone(),
stages,
audit_message: format!(
"recovery `{}` requested by `{}` for customer app `{}`",
definition.id, request.requested_by, request.customer_app_id
),
requires_host_local_restore: request.local_only_sensitive_present,
})
}
}
pub(crate) fn default_retry_policy() -> RetryPolicy {
RetryPolicy::new(3, Duration::from_secs(15), Duration::from_secs(300))
.expect("constant retry policy is valid")
}
fn planned_recovery_stages(
default_stages: &[RecoveryStage],
local_only_sensitive_present: bool,
) -> Vec<RecoveryStage> {
default_stages
.iter()
.copied()
.filter(|stage| {
local_only_sensitive_present || *stage != RecoveryStage::RestoreLocalOnlySensitive
})
.collect()
}