Skip to main content

coil_runtime/
ops.rs

1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct QueuedReportExport {
5    pub plan: ReportExportPlan,
6    pub queued_job_id: JobId,
7}
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct QueuedBulkOperation {
11    pub plan: BulkOperationPlan,
12    pub queued_job_id: JobId,
13}
14
15#[derive(Debug, Clone)]
16pub struct OpsHost {
17    planner: OpsPlanner,
18    jobs: JobsHost,
19}
20
21impl OpsHost {
22    pub(crate) fn new(planner: OpsPlanner, jobs: JobsHost) -> Self {
23        Self { planner, jobs }
24    }
25
26    pub fn planner(&self) -> &OpsPlanner {
27        &self.planner
28    }
29
30    pub fn jobs(&self) -> &JobsHost {
31        &self.jobs
32    }
33
34    pub fn jobs_mut(&mut self) -> &mut JobsHost {
35        &mut self.jobs
36    }
37
38    pub fn queue_report_export(
39        &mut self,
40        request: ReportExportRequest,
41    ) -> Result<QueuedReportExport, RuntimeOpsError> {
42        let requested_at = request.requested_at;
43        let plan = self.planner.plan_report_export(request)?;
44        let queued_job_id = self.jobs.enqueue_spec(plan.job.clone(), requested_at)?;
45
46        Ok(QueuedReportExport {
47            plan,
48            queued_job_id,
49        })
50    }
51
52    pub fn queue_bulk_operation(
53        &mut self,
54        request: BulkOperationRequest,
55    ) -> Result<QueuedBulkOperation, RuntimeOpsError> {
56        let requested_at = request.requested_at;
57        let plan = self.planner.plan_bulk_operation(request)?;
58        let queued_job_id = self.jobs.enqueue_spec(plan.job.clone(), requested_at)?;
59
60        Ok(QueuedBulkOperation {
61            plan,
62            queued_job_id,
63        })
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SearchInvalidationPlan {
69    pub trigger: SearchInvalidationTrigger,
70    pub indexes: Vec<OpsSearchIndexContribution>,
71}
72
73#[derive(Debug, Error, PartialEq, Eq)]
74pub enum RuntimeSearchError {
75    #[error(transparent)]
76    Ops(#[from] RuntimeOpsError),
77    #[error(transparent)]
78    Model(#[from] OpsModelError),
79    #[error(transparent)]
80    Jobs(#[from] JobsModelError),
81    #[error("search host requires at least one configured index contribution")]
82    EmptyCatalog,
83}
84
85#[derive(Debug, Clone)]
86pub struct SearchHost {
87    catalog: SearchCatalog,
88    ops: OpsHost,
89}
90
91impl SearchHost {
92    pub(crate) fn new(catalog: SearchCatalog, ops: OpsHost) -> Self {
93        Self { catalog, ops }
94    }
95
96    pub fn catalog(&self) -> &SearchCatalog {
97        &self.catalog
98    }
99
100    pub fn visible_to(
101        &self,
102        capabilities: &[coil_auth::Capability],
103    ) -> Vec<&OpsSearchIndexContribution> {
104        self.catalog.visible_to(capabilities)
105    }
106
107    pub fn indexes_for_trigger(
108        &self,
109        trigger: SearchInvalidationTrigger,
110    ) -> Vec<&OpsSearchIndexContribution> {
111        self.catalog
112            .contributions
113            .iter()
114            .filter(|index| {
115                index
116                    .invalidation_rules
117                    .iter()
118                    .any(|rule| rule.trigger == trigger)
119            })
120            .collect()
121    }
122
123    pub fn invalidation_plan(&self, trigger: SearchInvalidationTrigger) -> SearchInvalidationPlan {
124        SearchInvalidationPlan {
125            trigger,
126            indexes: self
127                .indexes_for_trigger(trigger)
128                .into_iter()
129                .cloned()
130                .collect(),
131        }
132    }
133
134    pub fn scheduled_rebuilds(&self) -> Vec<&OpsSearchIndexContribution> {
135        self.catalog
136            .contributions
137            .iter()
138            .filter(|index| {
139                matches!(
140                    index.rebuild_strategy,
141                    SearchRebuildStrategy::Scheduled { .. }
142                )
143            })
144            .collect()
145    }
146
147    pub fn queue_full_reindex(
148        &mut self,
149        execution_id: BulkExecutionId,
150        requested_by: impl Into<String>,
151        requested_at: JobInstant,
152        operator_capabilities: Vec<coil_auth::Capability>,
153        dry_run: bool,
154    ) -> Result<QueuedBulkOperation, RuntimeSearchError> {
155        if self.catalog.contributions.is_empty() {
156            return Err(RuntimeSearchError::EmptyCatalog);
157        }
158
159        let idempotency_key =
160            IdempotencyKey::new(format!("search.reindex:{}", execution_id.as_str()))?;
161        let mut request = BulkOperationRequest::new(
162            execution_id,
163            BulkOperationId::new("bulk.search.reindex")?,
164            requested_by,
165            requested_at,
166            self.catalog.contributions.len(),
167        )?;
168
169        for capability in operator_capabilities {
170            request = request.with_capability(capability);
171        }
172
173        request = request
174            .with_idempotency_key(idempotency_key)
175            .dry_run(dry_run);
176        Ok(self.ops.queue_bulk_operation(request)?)
177    }
178}
179
180#[derive(Debug, Error, PartialEq, Eq)]
181pub enum RuntimeOpsError {
182    #[error(transparent)]
183    Ops(#[from] OpsModelError),
184    #[error(transparent)]
185    Jobs(#[from] RuntimeJobsError),
186}