1use super::*;
2
3impl RedDBRuntime {
4 pub fn create_export(&self, name: impl Into<String>) -> RedDBResult<ExportDescriptor> {
5 self.inner
6 .db
7 .create_named_export(name)
8 .map_err(|err| RedDBError::Internal(err.to_string()))
9 }
10
11 pub fn graph_projections(&self) -> RedDBResult<Vec<PhysicalGraphProjection>> {
12 Ok(self.inner.db.declared_graph_projections())
13 }
14
15 pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
16 self.inner.db.operational_graph_projections()
17 }
18
19 pub fn graph_projection_named(&self, name: &str) -> RedDBResult<RuntimeGraphProjection> {
20 let status = self
21 .graph_projection_statuses()
22 .into_iter()
23 .find(|status| status.name == name)
24 .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
25 if !status.declared {
26 return Err(RedDBError::Catalog(format!(
27 "graph projection '{name}' is not declared"
28 )));
29 }
30 if !status.operational {
31 return Err(RedDBError::Catalog(format!(
32 "graph projection '{name}' is declared but not operationally materialized"
33 )));
34 }
35 if status.lifecycle_state == "stale" {
36 return Err(RedDBError::Catalog(format!(
37 "graph projection '{name}' is stale and must be rematerialized before use"
38 )));
39 }
40 let projection = self
41 .operational_graph_projections()
42 .into_iter()
43 .find(|projection| projection.name == name)
44 .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
45 Ok(RuntimeGraphProjection {
46 node_labels: (!projection.node_labels.is_empty()).then_some(projection.node_labels),
47 node_types: (!projection.node_types.is_empty()).then_some(projection.node_types),
48 edge_labels: (!projection.edge_labels.is_empty()).then_some(projection.edge_labels),
49 })
50 }
51
52 pub fn save_graph_projection(
53 &self,
54 name: impl Into<String>,
55 projection: RuntimeGraphProjection,
56 source: Option<String>,
57 ) -> RedDBResult<PhysicalGraphProjection> {
58 self.inner
59 .db
60 .save_graph_projection(
61 name,
62 projection.node_labels.unwrap_or_default(),
63 projection.node_types.unwrap_or_default(),
64 projection.edge_labels.unwrap_or_default(),
65 source.unwrap_or_else(|| "runtime".to_string()),
66 )
67 .map_err(|err| RedDBError::Internal(err.to_string()))
68 }
69
70 pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
71 self.inner
72 .db
73 .materialize_graph_projection(name)
74 .map_err(|err| RedDBError::Internal(err.to_string()))?
75 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
76 }
77
78 pub fn mark_graph_projection_materializing(
79 &self,
80 name: &str,
81 ) -> RedDBResult<PhysicalGraphProjection> {
82 self.inner
83 .db
84 .mark_graph_projection_materializing(name)
85 .map_err(|err| RedDBError::Internal(err.to_string()))?
86 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
87 }
88
89 pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
90 self.inner
91 .db
92 .fail_graph_projection(name)
93 .map_err(|err| RedDBError::Internal(err.to_string()))?
94 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
95 }
96
97 pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
98 self.inner
99 .db
100 .mark_graph_projection_stale(name)
101 .map_err(|err| RedDBError::Internal(err.to_string()))?
102 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
103 }
104
105 pub fn analytics_jobs(&self) -> RedDBResult<Vec<PhysicalAnalyticsJob>> {
106 Ok(self.inner.db.declared_analytics_jobs())
107 }
108
109 pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
110 self.inner.db.operational_analytics_jobs()
111 }
112
113 pub fn save_analytics_job(
114 &self,
115 kind: impl Into<String>,
116 projection_name: Option<String>,
117 metadata: std::collections::BTreeMap<String, String>,
118 ) -> RedDBResult<PhysicalAnalyticsJob> {
119 self.inner
120 .db
121 .save_analytics_job(kind, projection_name, metadata)
122 .map_err(|err| RedDBError::Internal(err.to_string()))
123 }
124
125 pub fn start_analytics_job(
126 &self,
127 kind: impl Into<String>,
128 projection_name: Option<String>,
129 metadata: std::collections::BTreeMap<String, String>,
130 ) -> RedDBResult<PhysicalAnalyticsJob> {
131 if let Some(projection_name) = projection_name.as_deref() {
132 let status = self
133 .graph_projection_statuses()
134 .into_iter()
135 .find(|status| status.name == projection_name)
136 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
137 if !status.declared {
138 return Err(RedDBError::Catalog(format!(
139 "graph projection '{projection_name}' is not declared"
140 )));
141 }
142 if !status.operational {
143 return Err(RedDBError::Catalog(format!(
144 "graph projection '{projection_name}' is declared but not operationally materialized"
145 )));
146 }
147 if status.lifecycle_state == "stale" {
148 return Err(RedDBError::Catalog(format!(
149 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can start against it"
150 )));
151 }
152 }
153 self.inner
154 .db
155 .start_analytics_job(kind, projection_name, metadata)
156 .map_err(|err| RedDBError::Internal(err.to_string()))
157 }
158
159 pub fn queue_analytics_job(
160 &self,
161 kind: impl Into<String>,
162 projection_name: Option<String>,
163 metadata: std::collections::BTreeMap<String, String>,
164 ) -> RedDBResult<PhysicalAnalyticsJob> {
165 if let Some(projection_name) = projection_name.as_deref() {
166 let status = self
167 .graph_projection_statuses()
168 .into_iter()
169 .find(|status| status.name == projection_name)
170 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
171 if !status.declared {
172 return Err(RedDBError::Catalog(format!(
173 "graph projection '{projection_name}' is not declared"
174 )));
175 }
176 if !status.operational {
177 return Err(RedDBError::Catalog(format!(
178 "graph projection '{projection_name}' is declared but not operationally materialized"
179 )));
180 }
181 if status.lifecycle_state == "stale" {
182 return Err(RedDBError::Catalog(format!(
183 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can be queued against it"
184 )));
185 }
186 }
187 self.inner
188 .db
189 .queue_analytics_job(kind, projection_name, metadata)
190 .map_err(|err| RedDBError::Internal(err.to_string()))
191 }
192
193 pub fn fail_analytics_job(
194 &self,
195 kind: impl Into<String>,
196 projection_name: Option<String>,
197 metadata: std::collections::BTreeMap<String, String>,
198 ) -> RedDBResult<PhysicalAnalyticsJob> {
199 self.inner
200 .db
201 .fail_analytics_job(kind, projection_name, metadata)
202 .map_err(|err| RedDBError::Internal(err.to_string()))
203 }
204
205 pub fn mark_analytics_job_stale(
206 &self,
207 kind: impl Into<String>,
208 projection_name: Option<String>,
209 metadata: std::collections::BTreeMap<String, String>,
210 ) -> RedDBResult<PhysicalAnalyticsJob> {
211 self.inner
212 .db
213 .mark_analytics_job_stale(kind, projection_name, metadata)
214 .map_err(|err| RedDBError::Internal(err.to_string()))
215 }
216
217 pub fn complete_analytics_job(
218 &self,
219 kind: impl Into<String>,
220 projection_name: Option<String>,
221 metadata: std::collections::BTreeMap<String, String>,
222 ) -> RedDBResult<PhysicalAnalyticsJob> {
223 self.inner
224 .db
225 .record_analytics_job(kind, projection_name, metadata)
226 .map_err(|err| RedDBError::Internal(err.to_string()))
227 }
228
229 pub fn record_analytics_job(
230 &self,
231 kind: impl Into<String>,
232 projection_name: Option<String>,
233 metadata: std::collections::BTreeMap<String, String>,
234 ) -> RedDBResult<PhysicalAnalyticsJob> {
235 if let Some(projection_name) = projection_name.as_deref() {
236 let status = self
237 .graph_projection_statuses()
238 .into_iter()
239 .find(|status| status.name == projection_name)
240 .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
241 if !status.declared {
242 return Err(RedDBError::Catalog(format!(
243 "graph projection '{projection_name}' is not declared"
244 )));
245 }
246 if !status.operational {
247 return Err(RedDBError::Catalog(format!(
248 "graph projection '{projection_name}' is declared but not operationally materialized"
249 )));
250 }
251 if status.lifecycle_state == "stale" {
252 return Err(RedDBError::Catalog(format!(
253 "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can complete against it"
254 )));
255 }
256 }
257 self.inner
258 .db
259 .record_analytics_job(kind, projection_name, metadata)
260 .map_err(|err| RedDBError::Internal(err.to_string()))
261 }
262
263 pub fn resolve_graph_projection(
264 &self,
265 projection_name: Option<&str>,
266 inline: Option<RuntimeGraphProjection>,
267 ) -> RedDBResult<Option<RuntimeGraphProjection>> {
268 let named = match projection_name {
269 Some(name) => Some(self.graph_projection_named(name)?),
270 None => None,
271 };
272 Ok(merge_runtime_projection(named, inline))
273 }
274
275 pub fn apply_retention_policy(&self) -> RedDBResult<()> {
276 self.check_write(crate::runtime::write_gate::WriteKind::Maintenance)?;
277 let expired = self
278 .inner
279 .db
280 .ttl_expired_entities_now()
281 .map_err(|err| RedDBError::Internal(err.to_string()))?;
282 let store = self.inner.db.store();
283 for (collection, id) in expired {
284 let deleted = store
285 .delete(&collection, id)
286 .map_err(|err| RedDBError::Internal(err.to_string()))?;
287 if deleted {
288 store.context_index().remove_entity(id);
289 self.cdc_emit(
290 crate::replication::cdc::ChangeOperation::Delete,
291 &collection,
292 id.raw(),
293 "entity",
294 );
295 }
296 }
297 self.inner
298 .db
299 .enforce_retention_policy()
300 .map_err(|err| RedDBError::Internal(err.to_string()))?;
301 self.enforce_metrics_raw_retention()?;
302 self.invalidate_result_cache();
303 Ok(())
304 }
305
306 fn enforce_metrics_raw_retention(&self) -> RedDBResult<()> {
307 let now_ns = std::time::SystemTime::now()
308 .duration_since(std::time::UNIX_EPOCH)
309 .unwrap_or_default()
310 .as_nanos()
311 .min(u128::from(u64::MAX)) as u64;
312 let store = self.inner.db.store();
313
314 for contract in self
315 .inner
316 .db
317 .collection_contracts()
318 .into_iter()
319 .filter(|contract| contract.declared_model == crate::catalog::CollectionModel::Metrics)
320 {
321 let Some(raw_retention_ms) = contract.metrics_raw_retention_ms else {
322 continue;
323 };
324 let cutoff_ns = now_ns.saturating_sub(raw_retention_ms.saturating_mul(1_000_000));
325 let Some(manager) = store.get_collection(&contract.name) else {
326 continue;
327 };
328 let expired = manager.query_all(|entity| match &entity.data {
329 crate::storage::EntityData::TimeSeries(point) => point.timestamp_ns < cutoff_ns,
330 _ => false,
331 });
332 for entity in expired {
333 let deleted = store
334 .delete(&contract.name, entity.id)
335 .map_err(|err| RedDBError::Internal(err.to_string()))?;
336 if deleted {
337 store.context_index().remove_entity(entity.id);
338 self.cdc_emit(
339 crate::replication::cdc::ChangeOperation::Delete,
340 &contract.name,
341 entity.id.raw(),
342 "entity",
343 );
344 }
345 }
346 }
347
348 Ok(())
349 }
350
351 pub fn indexes(&self) -> Vec<crate::PhysicalIndexState> {
352 self.inner.db.operational_indexes()
353 }
354
355 pub fn declared_indexes(&self) -> Vec<crate::PhysicalIndexState> {
356 self.inner.db.declared_indexes()
357 }
358
359 pub fn declared_indexes_for_collection(
360 &self,
361 collection: &str,
362 ) -> Vec<crate::PhysicalIndexState> {
363 self.inner
364 .db
365 .declared_indexes()
366 .into_iter()
367 .filter(|index| index.collection.as_deref() == Some(collection))
368 .collect()
369 }
370
371 pub fn index_statuses(&self) -> Vec<crate::catalog::CatalogIndexStatus> {
372 self.inner.db.index_statuses()
373 }
374
375 pub fn graph_projection_statuses(&self) -> Vec<crate::catalog::CatalogGraphProjectionStatus> {
376 self.inner
377 .db
378 .catalog_model_snapshot()
379 .graph_projection_statuses
380 }
381
382 pub fn analytics_job_statuses(&self) -> Vec<crate::catalog::CatalogAnalyticsJobStatus> {
383 self.inner
384 .db
385 .catalog_model_snapshot()
386 .analytics_job_statuses
387 }
388
389 pub fn indexes_for_collection(&self, collection: &str) -> Vec<crate::PhysicalIndexState> {
390 self.inner
391 .db
392 .operational_indexes()
393 .into_iter()
394 .filter(|index| index.collection.as_deref() == Some(collection))
395 .collect()
396 }
397
398 pub fn set_index_enabled(
399 &self,
400 name: &str,
401 enabled: bool,
402 ) -> RedDBResult<crate::PhysicalIndexState> {
403 self.check_write(crate::runtime::write_gate::WriteKind::Maintenance)?;
404 self.inner
405 .db
406 .set_index_enabled(name, enabled)
407 .map_err(|err| RedDBError::Internal(err.to_string()))?
408 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
409 }
410
411 pub fn mark_index_building(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
412 self.inner
413 .db
414 .mark_index_building(name)
415 .map_err(|err| RedDBError::Internal(err.to_string()))?
416 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
417 }
418
419 pub fn fail_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
420 self.inner
421 .db
422 .fail_index(name)
423 .map_err(|err| RedDBError::Internal(err.to_string()))?
424 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
425 }
426
427 pub fn mark_index_stale(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
428 self.inner
429 .db
430 .mark_index_stale(name)
431 .map_err(|err| RedDBError::Internal(err.to_string()))?
432 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
433 }
434
435 pub fn mark_index_ready(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
436 self.inner
437 .db
438 .mark_index_ready(name)
439 .map_err(|err| RedDBError::Internal(err.to_string()))?
440 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
441 }
442
443 pub fn warmup_index_with_lifecycle(
444 &self,
445 name: &str,
446 ) -> RedDBResult<crate::PhysicalIndexState> {
447 self.mark_index_building(name)?;
448 match self.warmup_index(name) {
449 Ok(index) => Ok(index),
450 Err(err) => {
451 let _ = self.fail_index(name);
452 Err(err)
453 }
454 }
455 }
456
457 pub fn warmup_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
458 self.inner
459 .db
460 .warmup_index(name)
461 .map_err(|err| RedDBError::Internal(err.to_string()))?
462 .ok_or_else(|| RedDBError::NotFound(name.to_string()))
463 }
464
465 pub fn rebuild_indexes(
466 &self,
467 collection: Option<&str>,
468 ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
469 self.inner
470 .db
471 .rebuild_index_registry(collection)
472 .map_err(|err| RedDBError::Internal(err.to_string()))
473 }
474
475 pub fn rebuild_indexes_with_lifecycle(
476 &self,
477 collection: Option<&str>,
478 ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
479 let target_names: Vec<String> = match collection {
480 Some(collection) => self
481 .declared_indexes_for_collection(collection)
482 .into_iter()
483 .map(|index| index.name)
484 .collect(),
485 None => self
486 .declared_indexes()
487 .into_iter()
488 .map(|index| index.name)
489 .collect(),
490 };
491
492 let mut marked_building = Vec::new();
493 for name in target_names {
494 if self.mark_index_building(&name).is_ok() {
495 marked_building.push(name);
496 }
497 }
498
499 match self.rebuild_indexes(collection) {
500 Ok(indexes) => Ok(indexes),
501 Err(err) => {
502 for name in marked_building {
503 let _ = self.fail_index(&name);
504 }
505 Err(err)
506 }
507 }
508 }
509}