1use super::*;
2
3impl RedDBRuntime {
4 pub fn create_export(&self, name: impl Into<String>) -> RedDBResult<ExportDescriptor> {
5 self.inner
6 .db
7 .create_named_export(name)
8 .map_err(|err| RedDBError::Internal(err.to_string()))
9 }
10
11 pub fn graph_projections(&self) -> RedDBResult<Vec<PhysicalGraphProjection>> {
12 Ok(self.inner.db.declared_graph_projections())
13 }
14
15 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
16 self.inner.db.operational_graph_projections()
17 }
18
19 pub fn graph_projection_named(&self, name: &str) -> RedDBResult<RuntimeGraphProjection> {
20 let status = self
21 .graph_projection_statuses()
22 .into_iter()
23 .find(|status| status.name == name)
24 .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
25 if !status.declared {
26 return Err(RedDBError::Catalog(format!(
27 "graph projection '{name}' is not declared"
28 )));
29 }
30 if !status.operational {
31 return Err(RedDBError::Catalog(format!(
32 "graph projection '{name}' is declared but not operationally materialized"
33 )));
34 }
35 if status.lifecycle_state == "stale" {
36 return Err(RedDBError::Catalog(format!(
37 "graph projection '{name}' is stale and must be rematerialized before use"
38 )));
39 }
40 let projection = self
41 .operational_graph_projections()
42 .into_iter()
43 .find(|projection| projection.name == name)
44 .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
45 Ok(RuntimeGraphProjection {
46 node_labels: (!projection.node_labels.is_empty()).then_some(projection.node_labels),
47 node_types: (!projection.node_types.is_empty()).then_some(projection.node_types),
48 edge_labels: (!projection.edge_labels.is_empty()).then_some(projection.edge_labels),
49 })
50 }
51
52 pub fn save_graph_projection(
53 &self,
54 name: impl Into<String>,
55 projection: RuntimeGraphProjection,
56 source: Option<String>,
57 ) -> RedDBResult<PhysicalGraphProjection> {
58 self.inner
59 .db
60 .save_graph_projection(
61 name,
62 projection.node_labels.unwrap_or_default(),
63 projection.node_types.unwrap_or_default(),
64 projection.edge_labels.unwrap_or_default(),
65 source.unwrap_or_else(|| "runtime".to_string()),
66 )
67 .map_err(|err| RedDBError::Internal(err.to_string()))
68 }
69
70 pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
71 self.inner
72 .db
73 .materialize_graph_projection(name)
74 .map_err(|err| RedDBError::Internal(err.to_string()))?
75 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
76 }
77
78 pub fn mark_graph_projection_materializing(
79 &self,
80 name: &str,
81 ) -> RedDBResult<PhysicalGraphProjection> {
82 self.inner
83 .db
84 .mark_graph_projection_materializing(name)
85 .map_err(|err| RedDBError::Internal(err.to_string()))?
86 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
87 }
88
89 pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
90 self.inner
91 .db
92 .fail_graph_projection(name)
93 .map_err(|err| RedDBError::Internal(err.to_string()))?
94 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
95 }
96
97 pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
98 self.inner
99 .db
100 .mark_graph_projection_stale(name)
101 .map_err(|err| RedDBError::Internal(err.to_string()))?
102 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
103 }
104
105 pub fn analytics_jobs(&self) -> RedDBResult<Vec<PhysicalAnalyticsJob>> {
106 Ok(self.inner.db.declared_analytics_jobs())
107 }
108
109 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
110 self.inner.db.operational_analytics_jobs()
111 }
112
113 pub fn save_analytics_job(
114 &self,
115 kind: impl Into<String>,
116 projection_name: Option<String>,
117 metadata: std::collections::BTreeMap<String, String>,
118 ) -> RedDBResult<PhysicalAnalyticsJob> {
119 self.inner
120 .db
121 .save_analytics_job(kind, projection_name, metadata)
122 .map_err(|err| RedDBError::Internal(err.to_string()))
123 }
124
125 pub fn start_analytics_job(
126 &self,
127 kind: impl Into<String>,
128 projection_name: Option<String>,
129 metadata: std::collections::BTreeMap<String, String>,
130 ) -> RedDBResult<PhysicalAnalyticsJob> {
131 if let Some(projection_name) = projection_name.as_deref() {
132 let status = self
133 .graph_projection_statuses()
134 .into_iter()
135 .find(|status| status.name == projection_name)
136 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
137 if !status.declared {
138 return Err(RedDBError::Catalog(format!(
139 "graph projection '{projection_name}' is not declared"
140 )));
141 }
142 if !status.operational {
143 return Err(RedDBError::Catalog(format!(
144 "graph projection '{projection_name}' is declared but not operationally materialized"
145 )));
146 }
147 if status.lifecycle_state == "stale" {
148 return Err(RedDBError::Catalog(format!(
149 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can start against it"
150 )));
151 }
152 }
153 self.inner
154 .db
155 .start_analytics_job(kind, projection_name, metadata)
156 .map_err(|err| RedDBError::Internal(err.to_string()))
157 }
158
159 pub fn queue_analytics_job(
160 &self,
161 kind: impl Into<String>,
162 projection_name: Option<String>,
163 metadata: std::collections::BTreeMap<String, String>,
164 ) -> RedDBResult<PhysicalAnalyticsJob> {
165 if let Some(projection_name) = projection_name.as_deref() {
166 let status = self
167 .graph_projection_statuses()
168 .into_iter()
169 .find(|status| status.name == projection_name)
170 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
171 if !status.declared {
172 return Err(RedDBError::Catalog(format!(
173 "graph projection '{projection_name}' is not declared"
174 )));
175 }
176 if !status.operational {
177 return Err(RedDBError::Catalog(format!(
178 "graph projection '{projection_name}' is declared but not operationally materialized"
179 )));
180 }
181 if status.lifecycle_state == "stale" {
182 return Err(RedDBError::Catalog(format!(
183 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can be queued against it"
184 )));
185 }
186 }
187 self.inner
188 .db
189 .queue_analytics_job(kind, projection_name, metadata)
190 .map_err(|err| RedDBError::Internal(err.to_string()))
191 }
192
193 pub fn fail_analytics_job(
194 &self,
195 kind: impl Into<String>,
196 projection_name: Option<String>,
197 metadata: std::collections::BTreeMap<String, String>,
198 ) -> RedDBResult<PhysicalAnalyticsJob> {
199 self.inner
200 .db
201 .fail_analytics_job(kind, projection_name, metadata)
202 .map_err(|err| RedDBError::Internal(err.to_string()))
203 }
204
205 pub fn mark_analytics_job_stale(
206 &self,
207 kind: impl Into<String>,
208 projection_name: Option<String>,
209 metadata: std::collections::BTreeMap<String, String>,
210 ) -> RedDBResult<PhysicalAnalyticsJob> {
211 self.inner
212 .db
213 .mark_analytics_job_stale(kind, projection_name, metadata)
214 .map_err(|err| RedDBError::Internal(err.to_string()))
215 }
216
217 pub fn complete_analytics_job(
218 &self,
219 kind: impl Into<String>,
220 projection_name: Option<String>,
221 metadata: std::collections::BTreeMap<String, String>,
222 ) -> RedDBResult<PhysicalAnalyticsJob> {
223 self.inner
224 .db
225 .record_analytics_job(kind, projection_name, metadata)
226 .map_err(|err| RedDBError::Internal(err.to_string()))
227 }
228
229 pub fn record_analytics_job(
230 &self,
231 kind: impl Into<String>,
232 projection_name: Option<String>,
233 metadata: std::collections::BTreeMap<String, String>,
234 ) -> RedDBResult<PhysicalAnalyticsJob> {
235 if let Some(projection_name) = projection_name.as_deref() {
236 let status = self
237 .graph_projection_statuses()
238 .into_iter()
239 .find(|status| status.name == projection_name)
240 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
241 if !status.declared {
242 return Err(RedDBError::Catalog(format!(
243 "graph projection '{projection_name}' is not declared"
244 )));
245 }
246 if !status.operational {
247 return Err(RedDBError::Catalog(format!(
248 "graph projection '{projection_name}' is declared but not operationally materialized"
249 )));
250 }
251 if status.lifecycle_state == "stale" {
252 return Err(RedDBError::Catalog(format!(
253 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can complete against it"
254 )));
255 }
256 }
257 self.inner
258 .db
259 .record_analytics_job(kind, projection_name, metadata)
260 .map_err(|err| RedDBError::Internal(err.to_string()))
261 }
262
263 pub fn resolve_graph_projection(
264 &self,
265 projection_name: Option<&str>,
266 inline: Option<RuntimeGraphProjection>,
267 ) -> RedDBResult<Option<RuntimeGraphProjection>> {
268 let named = match projection_name {
269 Some(name) => Some(self.graph_projection_named(name)?),
270 None => None,
271 };
272 Ok(merge_runtime_projection(named, inline))
273 }
274
275 pub fn apply_retention_policy(&self) -> RedDBResult<()> {
276 self.inner
277 .db
278 .enforce_retention_policy()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.enforce_metrics_raw_retention()?;
281 self.invalidate_result_cache();
282 Ok(())
283 }
284
285 fn enforce_metrics_raw_retention(&self) -> RedDBResult<()> {
286 let now_ns = std::time::SystemTime::now()
287 .duration_since(std::time::UNIX_EPOCH)
288 .unwrap_or_default()
289 .as_nanos()
290 .min(u128::from(u64::MAX)) as u64;
291 let store = self.inner.db.store();
292
293 for contract in self
294 .inner
295 .db
296 .collection_contracts()
297 .into_iter()
298 .filter(|contract| contract.declared_model == crate::catalog::CollectionModel::Metrics)
299 {
300 let Some(raw_retention_ms) = contract.metrics_raw_retention_ms else {
301 continue;
302 };
303 let cutoff_ns = now_ns.saturating_sub(raw_retention_ms.saturating_mul(1_000_000));
304 let Some(manager) = store.get_collection(&contract.name) else {
305 continue;
306 };
307 let expired = manager.query_all(|entity| match &entity.data {
308 crate::storage::EntityData::TimeSeries(point) => point.timestamp_ns < cutoff_ns,
309 _ => false,
310 });
311 for entity in expired {
312 store
313 .delete(&contract.name, entity.id)
314 .map_err(|err| RedDBError::Internal(err.to_string()))?;
315 }
316 }
317
318 Ok(())
319 }
320
321 pub fn indexes(&self) -> Vec<crate::PhysicalIndexState> {
322 self.inner.db.operational_indexes()
323 }
324
325 pub fn declared_indexes(&self) -> Vec<crate::PhysicalIndexState> {
326 self.inner.db.declared_indexes()
327 }
328
329 pub fn declared_indexes_for_collection(
330 &self,
331 collection: &str,
332 ) -> Vec<crate::PhysicalIndexState> {
333 self.inner
334 .db
335 .declared_indexes()
336 .into_iter()
337 .filter(|index| index.collection.as_deref() == Some(collection))
338 .collect()
339 }
340
341 pub fn index_statuses(&self) -> Vec<crate::catalog::CatalogIndexStatus> {
342 self.inner.db.index_statuses()
343 }
344
345 pub fn graph_projection_statuses(&self) -> Vec<crate::catalog::CatalogGraphProjectionStatus> {
346 self.inner
347 .db
348 .catalog_model_snapshot()
349 .graph_projection_statuses
350 }
351
352 pub fn analytics_job_statuses(&self) -> Vec<crate::catalog::CatalogAnalyticsJobStatus> {
353 self.inner
354 .db
355 .catalog_model_snapshot()
356 .analytics_job_statuses
357 }
358
359 pub fn indexes_for_collection(&self, collection: &str) -> Vec<crate::PhysicalIndexState> {
360 self.inner
361 .db
362 .operational_indexes()
363 .into_iter()
364 .filter(|index| index.collection.as_deref() == Some(collection))
365 .collect()
366 }
367
368 pub fn set_index_enabled(
369 &self,
370 name: &str,
371 enabled: bool,
372 ) -> RedDBResult<crate::PhysicalIndexState> {
373 self.check_write(crate::runtime::write_gate::WriteKind::Maintenance)?;
374 self.inner
375 .db
376 .set_index_enabled(name, enabled)
377 .map_err(|err| RedDBError::Internal(err.to_string()))?
378 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
379 }
380
381 pub fn mark_index_building(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
382 self.inner
383 .db
384 .mark_index_building(name)
385 .map_err(|err| RedDBError::Internal(err.to_string()))?
386 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
387 }
388
389 pub fn fail_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
390 self.inner
391 .db
392 .fail_index(name)
393 .map_err(|err| RedDBError::Internal(err.to_string()))?
394 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
395 }
396
397 pub fn mark_index_stale(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
398 self.inner
399 .db
400 .mark_index_stale(name)
401 .map_err(|err| RedDBError::Internal(err.to_string()))?
402 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
403 }
404
405 pub fn mark_index_ready(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
406 self.inner
407 .db
408 .mark_index_ready(name)
409 .map_err(|err| RedDBError::Internal(err.to_string()))?
410 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
411 }
412
413 pub fn warmup_index_with_lifecycle(
414 &self,
415 name: &str,
416 ) -> RedDBResult<crate::PhysicalIndexState> {
417 self.mark_index_building(name)?;
418 match self.warmup_index(name) {
419 Ok(index) => Ok(index),
420 Err(err) => {
421 let _ = self.fail_index(name);
422 Err(err)
423 }
424 }
425 }
426
427 pub fn warmup_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
428 self.inner
429 .db
430 .warmup_index(name)
431 .map_err(|err| RedDBError::Internal(err.to_string()))?
432 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
433 }
434
435 pub fn rebuild_indexes(
436 &self,
437 collection: Option<&str>,
438 ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
439 self.inner
440 .db
441 .rebuild_index_registry(collection)
442 .map_err(|err| RedDBError::Internal(err.to_string()))
443 }
444
445 pub fn rebuild_indexes_with_lifecycle(
446 &self,
447 collection: Option<&str>,
448 ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
449 let target_names: Vec<String> = match collection {
450 Some(collection) => self
451 .declared_indexes_for_collection(collection)
452 .into_iter()
453 .map(|index| index.name)
454 .collect(),
455 None => self
456 .declared_indexes()
457 .into_iter()
458 .map(|index| index.name)
459 .collect(),
460 };
461
462 let mut marked_building = Vec::new();
463 for name in target_names {
464 if self.mark_index_building(&name).is_ok() {
465 marked_building.push(name);
466 }
467 }
468
469 match self.rebuild_indexes(collection) {
470 Ok(indexes) => Ok(indexes),
471 Err(err) => {
472 for name in marked_building {
473 let _ = self.fail_index(&name);
474 }
475 Err(err)
476 }
477 }
478 }
479}