1use crate::bulk::{BulkOperationPlan, BulkOperationRequest};
2use crate::catalog::OpsCatalog;
3use crate::error::OpsModelError;
4use crate::recovery::{RecoveryPlan, RecoveryPlanRequest, RecoveryStage};
5use crate::reports::{ReportExportPlan, ReportExportRequest};
6use coil_jobs::{JobId, JobName, JobSpec, JobsPlanner, JobsRuntime, RetryPolicy};
7use std::time::Duration;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct OpsPlanner {
11 jobs: JobsPlanner,
12 catalog: OpsCatalog,
13}
14
15impl OpsPlanner {
16 pub fn new(runtime: JobsRuntime, catalog: OpsCatalog) -> Result<Self, OpsModelError> {
17 catalog.validate()?;
18 Ok(Self {
19 jobs: runtime.planner(),
20 catalog,
21 })
22 }
23
24 pub fn jobs_planner(&self) -> &JobsPlanner {
25 &self.jobs
26 }
27
28 pub fn catalog(&self) -> &OpsCatalog {
29 &self.catalog
30 }
31
32 pub fn plan_report_export(
33 &self,
34 request: ReportExportRequest,
35 ) -> Result<ReportExportPlan, OpsModelError> {
36 let definition = self
37 .catalog
38 .reports
39 .definition(&request.report_id)
40 .ok_or_else(|| OpsModelError::DuplicateIdentifier {
41 kind: "report",
42 id: request.report_id.to_string(),
43 })?;
44
45 if !definition.allows(&request.operator_capabilities) {
46 return Err(OpsModelError::MissingCapability {
47 operation: "report export",
48 required: definition.required_capability,
49 });
50 }
51
52 let queue_topology = self.jobs.describe_queue_topology();
53 let queue = if request.scheduled_for.is_some() {
54 queue_topology.scheduled_queue.clone()
55 } else {
56 queue_topology.work_queue.clone()
57 };
58
59 let job_id = JobId::new(request.export_id.as_str().to_string())?;
60 let job_name = JobName::new(format!("report.export.{}", definition.id.as_str()))?;
61 let mut retry_policy = definition.retry_policy.clone();
62 retry_policy =
63 retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
64
65 let mut spec = JobSpec::new(
66 job_id,
67 job_name,
68 queue,
69 format!(
70 "report export for `{}` in format `{}`",
71 definition.id, definition.format
72 ),
73 )?;
74 if let Some(scheduled_for) = request.scheduled_for {
75 spec = spec.scheduled_for(scheduled_for);
76 }
77 spec = spec.with_retry_policy(retry_policy);
78 if let Some(key) = request.idempotency_key.clone() {
79 spec = spec.with_idempotency_key(key);
80 }
81
82 let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
83
84 Ok(ReportExportPlan {
85 definition: definition.clone(),
86 job: spec,
87 planned_job,
88 output_object_key: format!(
89 "{}/{}.{}",
90 definition.export_prefix,
91 request.export_id,
92 definition.format.extension()
93 ),
94 parameters: request.parameters,
95 })
96 }
97
98 pub fn plan_bulk_operation(
99 &self,
100 request: BulkOperationRequest,
101 ) -> Result<BulkOperationPlan, OpsModelError> {
102 let definition = self
103 .catalog
104 .bulk
105 .definition(&request.definition_id)
106 .ok_or_else(|| OpsModelError::DuplicateIdentifier {
107 kind: "bulk operation",
108 id: request.definition_id.to_string(),
109 })?;
110
111 if !definition.allows(&request.operator_capabilities) {
112 return Err(OpsModelError::MissingCapability {
113 operation: "bulk operation",
114 required: definition.required_capability,
115 });
116 }
117
118 if request.target_count == 0 {
119 return Err(OpsModelError::InvalidItemCount {
120 operation: "bulk operation",
121 count: request.target_count,
122 });
123 }
124
125 if let Some(max_items) = definition.max_items {
126 if request.target_count > max_items {
127 return Err(OpsModelError::InvalidItemCount {
128 operation: "bulk operation",
129 count: request.target_count,
130 });
131 }
132 }
133
134 if definition.requires_idempotency_key && request.idempotency_key.is_none() {
135 return Err(OpsModelError::InvalidBulkOperation {
136 operation_id: definition.id.to_string(),
137 reason: "idempotency key is required for retry-safe execution".to_string(),
138 });
139 }
140
141 let queue_topology = self.jobs.describe_queue_topology();
142 let queue = if request.scheduled_for.is_some() {
143 queue_topology.scheduled_queue.clone()
144 } else {
145 queue_topology.work_queue.clone()
146 };
147
148 let job_id = JobId::new(request.execution_id.as_str().to_string())?;
149 let job_name = JobName::new(format!("bulk.{}", definition.id.as_str()))?;
150 let mut retry_policy = definition.retry_policy.clone();
151 retry_policy =
152 retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
153
154 let mut spec = JobSpec::new(
155 job_id,
156 job_name,
157 queue,
158 format!(
159 "bulk `{}` on `{}` items",
160 definition.kind, request.target_count
161 ),
162 )?;
163 if let Some(scheduled_for) = request.scheduled_for {
164 spec = spec.scheduled_for(scheduled_for);
165 }
166 spec = spec.with_retry_policy(retry_policy);
167 if let Some(key) = request.idempotency_key.clone() {
168 spec = spec.with_idempotency_key(key);
169 }
170
171 let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
172
173 Ok(BulkOperationPlan {
174 definition: definition.clone(),
175 job: spec,
176 planned_job,
177 dry_run: request.dry_run,
178 target_count: request.target_count,
179 audit_message: format!(
180 "bulk `{}` requested by `{}` for `{}` items",
181 definition.id, request.requested_by, request.target_count
182 ),
183 })
184 }
185
186 pub fn plan_recovery_workflow(
187 &self,
188 request: RecoveryPlanRequest,
189 ) -> Result<RecoveryPlan, OpsModelError> {
190 let definition = self
191 .catalog
192 .recovery
193 .definition(&request.definition_id)
194 .ok_or_else(|| OpsModelError::DuplicateIdentifier {
195 kind: "recovery workflow",
196 id: request.definition_id.to_string(),
197 })?;
198
199 if !definition.allows(&request.operator_capabilities) {
200 return Err(OpsModelError::MissingCapability {
201 operation: "recovery workflow",
202 required: definition.required_capability,
203 });
204 }
205
206 if definition.requires_idempotency_key && request.idempotency_key.is_none() {
207 return Err(OpsModelError::InvalidRecoveryWorkflow {
208 workflow_id: definition.id.to_string(),
209 reason: "idempotency key is required for retry-safe recovery execution".to_string(),
210 });
211 }
212
213 if definition.requires_local_only_sensitive_ack
214 && request.local_only_sensitive_present
215 && !request.local_only_sensitive_acknowledged
216 {
217 return Err(OpsModelError::MissingOperatorAcknowledgement {
218 workflow_id: definition.id.to_string(),
219 requirement:
220 "local_only_sensitive restore requires explicit operator acknowledgement"
221 .to_string(),
222 });
223 }
224
225 let queue_topology = self.jobs.describe_queue_topology();
226 let queue = if request.scheduled_for.is_some() {
227 queue_topology.scheduled_queue.clone()
228 } else {
229 queue_topology.work_queue.clone()
230 };
231
232 let job_id = JobId::new(request.execution_id.as_str().to_string())?;
233 let job_name = JobName::new(format!("recovery.{}", definition.id.as_str()))?;
234 let mut retry_policy = definition.retry_policy.clone();
235 retry_policy =
236 retry_policy.with_dead_letter_queue(queue_topology.dead_letter_queue.clone());
237
238 let mut spec = JobSpec::new(
239 job_id,
240 job_name,
241 queue,
242 format!(
243 "recovery `{}` for customer app `{}`",
244 definition.id, request.customer_app_id
245 ),
246 )?;
247 if let Some(scheduled_for) = request.scheduled_for {
248 spec = spec.scheduled_for(scheduled_for);
249 }
250 spec = spec.with_retry_policy(retry_policy);
251 if let Some(key) = request.idempotency_key.clone() {
252 spec = spec.with_idempotency_key(key);
253 }
254
255 let planned_job = self.jobs.plan_job(spec.clone(), request.requested_at)?;
256 let stages = planned_recovery_stages(
257 &definition.default_stages,
258 request.local_only_sensitive_present,
259 );
260
261 Ok(RecoveryPlan {
262 definition: definition.clone(),
263 job: spec,
264 planned_job,
265 customer_app_id: request.customer_app_id.clone(),
266 stages,
267 audit_message: format!(
268 "recovery `{}` requested by `{}` for customer app `{}`",
269 definition.id, request.requested_by, request.customer_app_id
270 ),
271 requires_host_local_restore: request.local_only_sensitive_present,
272 })
273 }
274}
275
276pub(crate) fn default_retry_policy() -> RetryPolicy {
277 RetryPolicy::new(3, Duration::from_secs(15), Duration::from_secs(300))
278 .expect("constant retry policy is valid")
279}
280
281fn planned_recovery_stages(
282 default_stages: &[RecoveryStage],
283 local_only_sensitive_present: bool,
284) -> Vec<RecoveryStage> {
285 default_stages
286 .iter()
287 .copied()
288 .filter(|stage| {
289 local_only_sensitive_present || *stage != RecoveryStage::RestoreLocalOnlySensitive
290 })
291 .collect()
292}