1use super::*;
2
3#[derive(Debug, Clone, PartialEq, Eq)]
4pub struct QueuedReportExport {
5 pub plan: ReportExportPlan,
6 pub queued_job_id: JobId,
7}
8
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct QueuedBulkOperation {
11 pub plan: BulkOperationPlan,
12 pub queued_job_id: JobId,
13}
14
15#[derive(Debug, Clone)]
16pub struct OpsHost {
17 planner: OpsPlanner,
18 jobs: JobsHost,
19}
20
21impl OpsHost {
22 pub(crate) fn new(planner: OpsPlanner, jobs: JobsHost) -> Self {
23 Self { planner, jobs }
24 }
25
26 pub fn planner(&self) -> &OpsPlanner {
27 &self.planner
28 }
29
30 pub fn jobs(&self) -> &JobsHost {
31 &self.jobs
32 }
33
34 pub fn jobs_mut(&mut self) -> &mut JobsHost {
35 &mut self.jobs
36 }
37
38 pub fn queue_report_export(
39 &mut self,
40 request: ReportExportRequest,
41 ) -> Result<QueuedReportExport, RuntimeOpsError> {
42 let requested_at = request.requested_at;
43 let plan = self.planner.plan_report_export(request)?;
44 let queued_job_id = self.jobs.enqueue_spec(plan.job.clone(), requested_at)?;
45
46 Ok(QueuedReportExport {
47 plan,
48 queued_job_id,
49 })
50 }
51
52 pub fn queue_bulk_operation(
53 &mut self,
54 request: BulkOperationRequest,
55 ) -> Result<QueuedBulkOperation, RuntimeOpsError> {
56 let requested_at = request.requested_at;
57 let plan = self.planner.plan_bulk_operation(request)?;
58 let queued_job_id = self.jobs.enqueue_spec(plan.job.clone(), requested_at)?;
59
60 Ok(QueuedBulkOperation {
61 plan,
62 queued_job_id,
63 })
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct SearchInvalidationPlan {
69 pub trigger: SearchInvalidationTrigger,
70 pub indexes: Vec<OpsSearchIndexContribution>,
71}
72
73#[derive(Debug, Error, PartialEq, Eq)]
74pub enum RuntimeSearchError {
75 #[error(transparent)]
76 Ops(#[from] RuntimeOpsError),
77 #[error(transparent)]
78 Model(#[from] OpsModelError),
79 #[error(transparent)]
80 Jobs(#[from] JobsModelError),
81 #[error("search host requires at least one configured index contribution")]
82 EmptyCatalog,
83}
84
85#[derive(Debug, Clone)]
86pub struct SearchHost {
87 catalog: SearchCatalog,
88 ops: OpsHost,
89}
90
91impl SearchHost {
92 pub(crate) fn new(catalog: SearchCatalog, ops: OpsHost) -> Self {
93 Self { catalog, ops }
94 }
95
96 pub fn catalog(&self) -> &SearchCatalog {
97 &self.catalog
98 }
99
100 pub fn visible_to(
101 &self,
102 capabilities: &[coil_auth::Capability],
103 ) -> Vec<&OpsSearchIndexContribution> {
104 self.catalog.visible_to(capabilities)
105 }
106
107 pub fn indexes_for_trigger(
108 &self,
109 trigger: SearchInvalidationTrigger,
110 ) -> Vec<&OpsSearchIndexContribution> {
111 self.catalog
112 .contributions
113 .iter()
114 .filter(|index| {
115 index
116 .invalidation_rules
117 .iter()
118 .any(|rule| rule.trigger == trigger)
119 })
120 .collect()
121 }
122
123 pub fn invalidation_plan(&self, trigger: SearchInvalidationTrigger) -> SearchInvalidationPlan {
124 SearchInvalidationPlan {
125 trigger,
126 indexes: self
127 .indexes_for_trigger(trigger)
128 .into_iter()
129 .cloned()
130 .collect(),
131 }
132 }
133
134 pub fn scheduled_rebuilds(&self) -> Vec<&OpsSearchIndexContribution> {
135 self.catalog
136 .contributions
137 .iter()
138 .filter(|index| {
139 matches!(
140 index.rebuild_strategy,
141 SearchRebuildStrategy::Scheduled { .. }
142 )
143 })
144 .collect()
145 }
146
147 pub fn queue_full_reindex(
148 &mut self,
149 execution_id: BulkExecutionId,
150 requested_by: impl Into<String>,
151 requested_at: JobInstant,
152 operator_capabilities: Vec<coil_auth::Capability>,
153 dry_run: bool,
154 ) -> Result<QueuedBulkOperation, RuntimeSearchError> {
155 if self.catalog.contributions.is_empty() {
156 return Err(RuntimeSearchError::EmptyCatalog);
157 }
158
159 let idempotency_key =
160 IdempotencyKey::new(format!("search.reindex:{}", execution_id.as_str()))?;
161 let mut request = BulkOperationRequest::new(
162 execution_id,
163 BulkOperationId::new("bulk.search.reindex")?,
164 requested_by,
165 requested_at,
166 self.catalog.contributions.len(),
167 )?;
168
169 for capability in operator_capabilities {
170 request = request.with_capability(capability);
171 }
172
173 request = request
174 .with_idempotency_key(idempotency_key)
175 .dry_run(dry_run);
176 Ok(self.ops.queue_bulk_operation(request)?)
177 }
178}
179
180#[derive(Debug, Error, PartialEq, Eq)]
181pub enum RuntimeOpsError {
182 #[error(transparent)]
183 Ops(#[from] OpsModelError),
184 #[error(transparent)]
185 Jobs(#[from] RuntimeJobsError),
186}