Skip to main content

coil_ops/
planner.rs

1use crate::bulk::{BulkOperationPlan, BulkOperationRequest};
2use crate::catalog::OpsCatalog;
3use crate::error::OpsModelError;
4use crate::recovery::{RecoveryPlan, RecoveryPlanRequest, RecoveryStage};
5use crate::reports::{ReportExportPlan, ReportExportRequest};
6use coil_jobs::{JobId, JobName, JobSpec, JobsPlanner, JobsRuntime, RetryPolicy};
7use std::time::Duration;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct OpsPlanner {
11    jobs: JobsPlanner,
12    catalog: OpsCatalog,
13}
14
15impl OpsPlanner {
16    pub fn new(runtime: JobsRuntime, catalog: OpsCatalog) -> Result<Self, OpsModelError> {
17        catalog.validate()?;
18        Ok(Self {
19            jobs: runtime.planner(),
20            catalog,
21        })
22    }
23
24    pub fn jobs_planner(&self) -> &JobsPlanner {
25        &self.jobs
26    }
27
28    pub fn catalog(&self) -> &OpsCatalog {
29        &self.catalog
30    }
31
32    pub fn plan_report_export(
33        &self,
34        request: ReportExportRequest,
35    ) -> Result<ReportExportPlan, OpsModelError> {
36        let definition = self
37            .catalog
38            .reports
39            .definition(&request.report_id)
40            .ok_or_else(|| OpsModelError::DuplicateIdentifier {
41                kind: "report",
42                id: request.report_id.to_string(),
43            })?;
44
45        if !definition.allows(&request.operator_capabilities) {
46            return Err(OpsModelError::MissingCapability {
47                operation: "report export",
48                required: definition.required_capability,
49            });
50        }
51
52        let queue_topology = self.jobs.describe_queue_topology();
53        let queue = if request.scheduled_for.is_some() {
54            queue_topology.scheduled_queue.clone()
55        } else {
56            queue_topology.work_queue.clone()
57        };
58
59        let job_id = JobId::new(request.export_id.as_str().to_string())?;
60        let job_name = JobName::new(format!("report.export.{}", definition.id.as_str()))?;
61        let mut retry_policy = definition.retry_policy.clone();
62        retry_policy =
63            retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
64
65        let mut spec = JobSpec::new(
66            job_id,
67            job_name,
68            queue,
69            format!(
70                "report export for `{}` in format `{}`",
71                definition.id, definition.format
72            ),
73        )?;
74        if let Some(scheduled_for) = request.scheduled_for {
75            spec = spec.scheduled_for(scheduled_for);
76        }
77        spec = spec.with_retry_policy(retry_policy);
78        if let Some(key) = request.idempotency_key.clone() {
79            spec = spec.with_idempotency_key(key);
80        }
81
82        let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
83
84        Ok(ReportExportPlan {
85            definition: definition.clone(),
86            job: spec,
87            planned_job,
88            output_object_key: format!(
89                "{}/{}.{}",
90                definition.export_prefix,
91                request.export_id,
92                definition.format.extension()
93            ),
94            parameters: request.parameters,
95        })
96    }
97
98    pub fn plan_bulk_operation(
99        &self,
100        request: BulkOperationRequest,
101    ) -> Result<BulkOperationPlan, OpsModelError> {
102        let definition = self
103            .catalog
104            .bulk
105            .definition(&request.definition_id)
106            .ok_or_else(|| OpsModelError::DuplicateIdentifier {
107                kind: "bulk operation",
108                id: request.definition_id.to_string(),
109            })?;
110
111        if !definition.allows(&request.operator_capabilities) {
112            return Err(OpsModelError::MissingCapability {
113                operation: "bulk operation",
114                required: definition.required_capability,
115            });
116        }
117
118        if request.target_count == 0 {
119            return Err(OpsModelError::InvalidItemCount {
120                operation: "bulk operation",
121                count: request.target_count,
122            });
123        }
124
125        if let Some(max_items) = definition.max_items {
126            if request.target_count > max_items {
127                return Err(OpsModelError::InvalidItemCount {
128                    operation: "bulk operation",
129                    count: request.target_count,
130                });
131            }
132        }
133
134        if definition.requires_idempotency_key && request.idempotency_key.is_none() {
135            return Err(OpsModelError::InvalidBulkOperation {
136                operation_id: definition.id.to_string(),
137                reason: "idempotency key is required for retry-safe execution".to_string(),
138            });
139        }
140
141        let queue_topology = self.jobs.describe_queue_topology();
142        let queue = if request.scheduled_for.is_some() {
143            queue_topology.scheduled_queue.clone()
144        } else {
145            queue_topology.work_queue.clone()
146        };
147
148        let job_id = JobId::new(request.execution_id.as_str().to_string())?;
149        let job_name = JobName::new(format!("bulk.{}", definition.id.as_str()))?;
150        let mut retry_policy = definition.retry_policy.clone();
151        retry_policy =
152            retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
153
154        let mut spec = JobSpec::new(
155            job_id,
156            job_name,
157            queue,
158            format!(
159                "bulk `{}` on `{}` items",
160                definition.kind, request.target_count
161            ),
162        )?;
163        if let Some(scheduled_for) = request.scheduled_for {
164            spec = spec.scheduled_for(scheduled_for);
165        }
166        spec = spec.with_retry_policy(retry_policy);
167        if let Some(key) = request.idempotency_key.clone() {
168            spec = spec.with_idempotency_key(key);
169        }
170
171        let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
172
173        Ok(BulkOperationPlan {
174            definition: definition.clone(),
175            job: spec,
176            planned_job,
177            dry_run: request.dry_run,
178            target_count: request.target_count,
179            audit_message: format!(
180                "bulk `{}` requested by `{}` for `{}` items",
181                definition.id, request.requested_by, request.target_count
182            ),
183        })
184    }
185
186    pub fn plan_recovery_workflow(
187        &self,
188        request: RecoveryPlanRequest,
189    ) -> Result<RecoveryPlan, OpsModelError> {
190        let definition = self
191            .catalog
192            .recovery
193            .definition(&request.definition_id)
194            .ok_or_else(|| OpsModelError::DuplicateIdentifier {
195                kind: "recovery workflow",
196                id: request.definition_id.to_string(),
197            })?;
198
199        if !definition.allows(&request.operator_capabilities) {
200            return Err(OpsModelError::MissingCapability {
201                operation: "recovery workflow",
202                required: definition.required_capability,
203            });
204        }
205
206        if definition.requires_idempotency_key && request.idempotency_key.is_none() {
207            return Err(OpsModelError::InvalidRecoveryWorkflow {
208                workflow_id: definition.id.to_string(),
209                reason: "idempotency key is required for retry-safe recovery execution".to_string(),
210            });
211        }
212
213        if definition.requires_local_only_sensitive_ack
214            && request.local_only_sensitive_present
215            && !request.local_only_sensitive_acknowledged
216        {
217            return Err(OpsModelError::MissingOperatorAcknowledgement {
218                workflow_id: definition.id.to_string(),
219                requirement:
220                    "local_only_sensitive restore requires explicit operator acknowledgement"
221                        .to_string(),
222            });
223        }
224
225        let queue_topology = self.jobs.describe_queue_topology();
226        let queue = if request.scheduled_for.is_some() {
227            queue_topology.scheduled_queue.clone()
228        } else {
229            queue_topology.work_queue.clone()
230        };
231
232        let job_id = JobId::new(request.execution_id.as_str().to_string())?;
233        let job_name = JobName::new(format!("recovery.{}", definition.id.as_str()))?;
234        let mut retry_policy = definition.retry_policy.clone();
235        retry_policy =
236            retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
237
238        let mut spec = JobSpec::new(
239            job_id,
240            job_name,
241            queue,
242            format!(
243                "recovery `{}` for customer app `{}`",
244                definition.id, request.customer_app_id
245            ),
246        )?;
247        if let Some(scheduled_for) = request.scheduled_for {
248            spec = spec.scheduled_for(scheduled_for);
249        }
250        spec = spec.with_retry_policy(retry_policy);
251        if let Some(key) = request.idempotency_key.clone() {
252            spec = spec.with_idempotency_key(key);
253        }
254
255        let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
256        let stages = planned_recovery_stages(
257            &definition.default_stages,
258            request.local_only_sensitive_present,
259        );
260
261        Ok(RecoveryPlan {
262            definition: definition.clone(),
263            job: spec,
264            planned_job,
265            customer_app_id: request.customer_app_id.clone(),
266            stages,
267            audit_message: format!(
268                "recovery `{}` requested by `{}` for customer app `{}`",
269                definition.id, request.requested_by, request.customer_app_id
270            ),
271            requires_host_local_restore: request.local_only_sensitive_present,
272        })
273    }
274}
275
276pub(crate) fn default_retry_policy() -> RetryPolicy {
277    RetryPolicy::new(3, Duration::from_secs(15), Duration::from_secs(300))
278        .expect("constant retry policy is valid")
279}
280
281fn planned_recovery_stages(
282    default_stages: &[RecoveryStage],
283    local_only_sensitive_present: bool,
284) -> Vec<RecoveryStage> {
285    default_stages
286        .iter()
287        .copied()
288        .filter(|stage| {
289            local_only_sensitive_present || *stage != RecoveryStage::RestoreLocalOnlySensitive
290        })
291        .collect()
292}