1use super::*;
2
3impl RedDBRuntime {
4 pub fn create_export(&self, name: impl Into<String>) -> RedDBResult<ExportDescriptor> {
5 self.inner
6 .db
7 .create_named_export(name)
8 .map_err(|err| RedDBError::Internal(err.to_string()))
9 }
10
11 pub fn graph_projections(&self) -> RedDBResult<Vec<PhysicalGraphProjection>> {
12 Ok(self.inner.db.declared_graph_projections())
13 }
14
15 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
16 self.inner.db.operational_graph_projections()
17 }
18
19 pub fn graph_projection_named(&self, name: &str) -> RedDBResult<RuntimeGraphProjection> {
20 let status = self
21 .graph_projection_statuses()
22 .into_iter()
23 .find(|status| status.name == name)
24 .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
25 if !status.declared {
26 return Err(RedDBError::Catalog(format!(
27 "graph projection '{name}' is not declared"
28 )));
29 }
30 if !status.operational {
31 return Err(RedDBError::Catalog(format!(
32 "graph projection '{name}' is declared but not operationally materialized"
33 )));
34 }
35 if status.lifecycle_state == "stale" {
36 return Err(RedDBError::Catalog(format!(
37 "graph projection '{name}' is stale and must be rematerialized before use"
38 )));
39 }
40 let projection = self
41 .operational_graph_projections()
42 .into_iter()
43 .find(|projection| projection.name == name)
44 .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
45 Ok(RuntimeGraphProjection {
46 node_labels: (!projection.node_labels.is_empty()).then_some(projection.node_labels),
47 node_types: (!projection.node_types.is_empty()).then_some(projection.node_types),
48 edge_labels: (!projection.edge_labels.is_empty()).then_some(projection.edge_labels),
49 })
50 }
51
52 pub fn save_graph_projection(
53 &self,
54 name: impl Into<String>,
55 projection: RuntimeGraphProjection,
56 source: Option<String>,
57 ) -> RedDBResult<PhysicalGraphProjection> {
58 self.inner
59 .db
60 .save_graph_projection(
61 name,
62 projection.node_labels.unwrap_or_default(),
63 projection.node_types.unwrap_or_default(),
64 projection.edge_labels.unwrap_or_default(),
65 source.unwrap_or_else(|| "runtime".to_string()),
66 )
67 .map_err(|err| RedDBError::Internal(err.to_string()))
68 }
69
70 pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
71 self.inner
72 .db
73 .materialize_graph_projection(name)
74 .map_err(|err| RedDBError::Internal(err.to_string()))?
75 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
76 }
77
78 pub fn mark_graph_projection_materializing(
79 &self,
80 name: &str,
81 ) -> RedDBResult<PhysicalGraphProjection> {
82 self.inner
83 .db
84 .mark_graph_projection_materializing(name)
85 .map_err(|err| RedDBError::Internal(err.to_string()))?
86 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
87 }
88
89 pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
90 self.inner
91 .db
92 .fail_graph_projection(name)
93 .map_err(|err| RedDBError::Internal(err.to_string()))?
94 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
95 }
96
97 pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
98 self.inner
99 .db
100 .mark_graph_projection_stale(name)
101 .map_err(|err| RedDBError::Internal(err.to_string()))?
102 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
103 }
104
105 pub fn analytics_jobs(&self) -> RedDBResult<Vec<PhysicalAnalyticsJob>> {
106 Ok(self.inner.db.declared_analytics_jobs())
107 }
108
109 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
110 self.inner.db.operational_analytics_jobs()
111 }
112
113 pub fn save_analytics_job(
114 &self,
115 kind: impl Into<String>,
116 projection_name: Option<String>,
117 metadata: std::collections::BTreeMap<String, String>,
118 ) -> RedDBResult<PhysicalAnalyticsJob> {
119 self.inner
120 .db
121 .save_analytics_job(kind, projection_name, metadata)
122 .map_err(|err| RedDBError::Internal(err.to_string()))
123 }
124
125 pub fn start_analytics_job(
126 &self,
127 kind: impl Into<String>,
128 projection_name: Option<String>,
129 metadata: std::collections::BTreeMap<String, String>,
130 ) -> RedDBResult<PhysicalAnalyticsJob> {
131 if let Some(projection_name) = projection_name.as_deref() {
132 let status = self
133 .graph_projection_statuses()
134 .into_iter()
135 .find(|status| status.name == projection_name)
136 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
137 if !status.declared {
138 return Err(RedDBError::Catalog(format!(
139 "graph projection '{projection_name}' is not declared"
140 )));
141 }
142 if !status.operational {
143 return Err(RedDBError::Catalog(format!(
144 "graph projection '{projection_name}' is declared but not operationally materialized"
145 )));
146 }
147 if status.lifecycle_state == "stale" {
148 return Err(RedDBError::Catalog(format!(
149 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can start against it"
150 )));
151 }
152 }
153 self.inner
154 .db
155 .start_analytics_job(kind, projection_name, metadata)
156 .map_err(|err| RedDBError::Internal(err.to_string()))
157 }
158
159 pub fn queue_analytics_job(
160 &self,
161 kind: impl Into<String>,
162 projection_name: Option<String>,
163 metadata: std::collections::BTreeMap<String, String>,
164 ) -> RedDBResult<PhysicalAnalyticsJob> {
165 if let Some(projection_name) = projection_name.as_deref() {
166 let status = self
167 .graph_projection_statuses()
168 .into_iter()
169 .find(|status| status.name == projection_name)
170 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
171 if !status.declared {
172 return Err(RedDBError::Catalog(format!(
173 "graph projection '{projection_name}' is not declared"
174 )));
175 }
176 if !status.operational {
177 return Err(RedDBError::Catalog(format!(
178 "graph projection '{projection_name}' is declared but not operationally materialized"
179 )));
180 }
181 if status.lifecycle_state == "stale" {
182 return Err(RedDBError::Catalog(format!(
183 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can be queued against it"
184 )));
185 }
186 }
187 self.inner
188 .db
189 .queue_analytics_job(kind, projection_name, metadata)
190 .map_err(|err| RedDBError::Internal(err.to_string()))
191 }
192
193 pub fn fail_analytics_job(
194 &self,
195 kind: impl Into<String>,
196 projection_name: Option<String>,
197 metadata: std::collections::BTreeMap<String, String>,
198 ) -> RedDBResult<PhysicalAnalyticsJob> {
199 self.inner
200 .db
201 .fail_analytics_job(kind, projection_name, metadata)
202 .map_err(|err| RedDBError::Internal(err.to_string()))
203 }
204
205 pub fn mark_analytics_job_stale(
206 &self,
207 kind: impl Into<String>,
208 projection_name: Option<String>,
209 metadata: std::collections::BTreeMap<String, String>,
210 ) -> RedDBResult<PhysicalAnalyticsJob> {
211 self.inner
212 .db
213 .mark_analytics_job_stale(kind, projection_name, metadata)
214 .map_err(|err| RedDBError::Internal(err.to_string()))
215 }
216
217 pub fn complete_analytics_job(
218 &self,
219 kind: impl Into<String>,
220 projection_name: Option<String>,
221 metadata: std::collections::BTreeMap<String, String>,
222 ) -> RedDBResult<PhysicalAnalyticsJob> {
223 self.inner
224 .db
225 .record_analytics_job(kind, projection_name, metadata)
226 .map_err(|err| RedDBError::Internal(err.to_string()))
227 }
228
229 pub fn record_analytics_job(
230 &self,
231 kind: impl Into<String>,
232 projection_name: Option<String>,
233 metadata: std::collections::BTreeMap<String, String>,
234 ) -> RedDBResult<PhysicalAnalyticsJob> {
235 if let Some(projection_name) = projection_name.as_deref() {
236 let status = self
237 .graph_projection_statuses()
238 .into_iter()
239 .find(|status| status.name == projection_name)
240 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
241 if !status.declared {
242 return Err(RedDBError::Catalog(format!(
243 "graph projection '{projection_name}' is not declared"
244 )));
245 }
246 if !status.operational {
247 return Err(RedDBError::Catalog(format!(
248 "graph projection '{projection_name}' is declared but not operationally materialized"
249 )));
250 }
251 if status.lifecycle_state == "stale" {
252 return Err(RedDBError::Catalog(format!(
253 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can complete against it"
254 )));
255 }
256 }
257 self.inner
258 .db
259 .record_analytics_job(kind, projection_name, metadata)
260 .map_err(|err| RedDBError::Internal(err.to_string()))
261 }
262
263 pub fn resolve_graph_projection(
264 &self,
265 projection_name: Option<&str>,
266 inline: Option<RuntimeGraphProjection>,
267 ) -> RedDBResult<Option<RuntimeGraphProjection>> {
268 let named = match projection_name {
269 Some(name) => Some(self.graph_projection_named(name)?),
270 None => None,
271 };
272 Ok(merge_runtime_projection(named, inline))
273 }
274
275 pub fn apply_retention_policy(&self) -> RedDBResult<()> {
276 self.inner
277 .db
278 .enforce_retention_policy()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.invalidate_result_cache();
281 Ok(())
282 }
283
284 pub fn indexes(&self) -> Vec<crate::PhysicalIndexState> {
285 self.inner.db.operational_indexes()
286 }
287
288 pub fn declared_indexes(&self) -> Vec<crate::PhysicalIndexState> {
289 self.inner.db.declared_indexes()
290 }
291
292 pub fn declared_indexes_for_collection(
293 &self,
294 collection: &str,
295 ) -> Vec<crate::PhysicalIndexState> {
296 self.inner
297 .db
298 .declared_indexes()
299 .into_iter()
300 .filter(|index| index.collection.as_deref() == Some(collection))
301 .collect()
302 }
303
304 pub fn index_statuses(&self) -> Vec<crate::catalog::CatalogIndexStatus> {
305 self.inner.db.index_statuses()
306 }
307
308 pub fn graph_projection_statuses(&self) -> Vec<crate::catalog::CatalogGraphProjectionStatus> {
309 self.inner
310 .db
311 .catalog_model_snapshot()
312 .graph_projection_statuses
313 }
314
315 pub fn analytics_job_statuses(&self) -> Vec<crate::catalog::CatalogAnalyticsJobStatus> {
316 self.inner
317 .db
318 .catalog_model_snapshot()
319 .analytics_job_statuses
320 }
321
322 pub fn indexes_for_collection(&self, collection: &str) -> Vec<crate::PhysicalIndexState> {
323 self.inner
324 .db
325 .operational_indexes()
326 .into_iter()
327 .filter(|index| index.collection.as_deref() == Some(collection))
328 .collect()
329 }
330
331 pub fn set_index_enabled(
332 &self,
333 name: &str,
334 enabled: bool,
335 ) -> RedDBResult<crate::PhysicalIndexState> {
336 self.check_write(crate::runtime::write_gate::WriteKind::Maintenance)?;
337 self.inner
338 .db
339 .set_index_enabled(name, enabled)
340 .map_err(|err| RedDBError::Internal(err.to_string()))?
341 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
342 }
343
344 pub fn mark_index_building(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
345 self.inner
346 .db
347 .mark_index_building(name)
348 .map_err(|err| RedDBError::Internal(err.to_string()))?
349 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
350 }
351
352 pub fn fail_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
353 self.inner
354 .db
355 .fail_index(name)
356 .map_err(|err| RedDBError::Internal(err.to_string()))?
357 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
358 }
359
360 pub fn mark_index_stale(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
361 self.inner
362 .db
363 .mark_index_stale(name)
364 .map_err(|err| RedDBError::Internal(err.to_string()))?
365 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
366 }
367
368 pub fn mark_index_ready(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
369 self.inner
370 .db
371 .mark_index_ready(name)
372 .map_err(|err| RedDBError::Internal(err.to_string()))?
373 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
374 }
375
376 pub fn warmup_index_with_lifecycle(
377 &self,
378 name: &str,
379 ) -> RedDBResult<crate::PhysicalIndexState> {
380 self.mark_index_building(name)?;
381 match self.warmup_index(name) {
382 Ok(index) => Ok(index),
383 Err(err) => {
384 let _ = self.fail_index(name);
385 Err(err)
386 }
387 }
388 }
389
390 pub fn warmup_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
391 self.inner
392 .db
393 .warmup_index(name)
394 .map_err(|err| RedDBError::Internal(err.to_string()))?
395 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
396 }
397
398 pub fn rebuild_indexes(
399 &self,
400 collection: Option<&str>,
401 ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
402 self.inner
403 .db
404 .rebuild_index_registry(collection)
405 .map_err(|err| RedDBError::Internal(err.to_string()))
406 }
407
408 pub fn rebuild_indexes_with_lifecycle(
409 &self,
410 collection: Option<&str>,
411 ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
412 let target_names: Vec<String> = match collection {
413 Some(collection) => self
414 .declared_indexes_for_collection(collection)
415 .into_iter()
416 .map(|index| index.name)
417 .collect(),
418 None => self
419 .declared_indexes()
420 .into_iter()
421 .map(|index| index.name)
422 .collect(),
423 };
424
425 let mut marked_building = Vec::new();
426 for name in target_names {
427 if self.mark_index_building(&name).is_ok() {
428 marked_building.push(name);
429 }
430 }
431
432 match self.rebuild_indexes(collection) {
433 Ok(indexes) => Ok(indexes),
434 Err(err) => {
435 for name in marked_building {
436 let _ = self.fail_index(&name);
437 }
438 Err(err)
439 }
440 }
441 }
442}