Skip to main content

reddb_server/runtime/
impl_physical.rs

1use super::*;
2
3impl RedDBRuntime {
4    pub fn create_export(&self, name: impl Into<String>) -> RedDBResult<ExportDescriptor> {
5        self.inner
6            .db
7            .create_named_export(name)
8            .map_err(|err| RedDBError::Internal(err.to_string()))
9    }
10
11    pub fn graph_projections(&self) -> RedDBResult<Vec<PhysicalGraphProjection>> {
12        Ok(self.inner.db.declared_graph_projections())
13    }
14
15    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
16        self.inner.db.operational_graph_projections()
17    }
18
19    pub fn graph_projection_named(&self, name: &str) -> RedDBResult<RuntimeGraphProjection> {
20        let status = self
21            .graph_projection_statuses()
22            .into_iter()
23            .find(|status| status.name == name)
24            .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
25        if !status.declared {
26            return Err(RedDBError::Catalog(format!(
27                "graph projection '{name}' is not declared"
28            )));
29        }
30        if !status.operational {
31            return Err(RedDBError::Catalog(format!(
32                "graph projection '{name}' is declared but not operationally materialized"
33            )));
34        }
35        if status.lifecycle_state == "stale" {
36            return Err(RedDBError::Catalog(format!(
37                "graph projection '{name}' is stale and must be rematerialized before use"
38            )));
39        }
40        let projection = self
41            .operational_graph_projections()
42            .into_iter()
43            .find(|projection| projection.name == name)
44            .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
45        Ok(RuntimeGraphProjection {
46            node_labels: (!projection.node_labels.is_empty()).then_some(projection.node_labels),
47            node_types: (!projection.node_types.is_empty()).then_some(projection.node_types),
48            edge_labels: (!projection.edge_labels.is_empty()).then_some(projection.edge_labels),
49        })
50    }
51
52    pub fn save_graph_projection(
53        &self,
54        name: impl Into<String>,
55        projection: RuntimeGraphProjection,
56        source: Option<String>,
57    ) -> RedDBResult<PhysicalGraphProjection> {
58        self.inner
59            .db
60            .save_graph_projection(
61                name,
62                projection.node_labels.unwrap_or_default(),
63                projection.node_types.unwrap_or_default(),
64                projection.edge_labels.unwrap_or_default(),
65                source.unwrap_or_else(|| "runtime".to_string()),
66            )
67            .map_err(|err| RedDBError::Internal(err.to_string()))
68    }
69
70    pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
71        self.inner
72            .db
73            .materialize_graph_projection(name)
74            .map_err(|err| RedDBError::Internal(err.to_string()))?
75            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
76    }
77
78    pub fn mark_graph_projection_materializing(
79        &self,
80        name: &str,
81    ) -> RedDBResult<PhysicalGraphProjection> {
82        self.inner
83            .db
84            .mark_graph_projection_materializing(name)
85            .map_err(|err| RedDBError::Internal(err.to_string()))?
86            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
87    }
88
89    pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
90        self.inner
91            .db
92            .fail_graph_projection(name)
93            .map_err(|err| RedDBError::Internal(err.to_string()))?
94            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
95    }
96
97    pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
98        self.inner
99            .db
100            .mark_graph_projection_stale(name)
101            .map_err(|err| RedDBError::Internal(err.to_string()))?
102            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
103    }
104
105    pub fn analytics_jobs(&self) -> RedDBResult<Vec<PhysicalAnalyticsJob>> {
106        Ok(self.inner.db.declared_analytics_jobs())
107    }
108
109    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
110        self.inner.db.operational_analytics_jobs()
111    }
112
113    pub fn save_analytics_job(
114        &self,
115        kind: impl Into<String>,
116        projection_name: Option<String>,
117        metadata: std::collections::BTreeMap<String, String>,
118    ) -> RedDBResult<PhysicalAnalyticsJob> {
119        self.inner
120            .db
121            .save_analytics_job(kind, projection_name, metadata)
122            .map_err(|err| RedDBError::Internal(err.to_string()))
123    }
124
125    pub fn start_analytics_job(
126        &self,
127        kind: impl Into<String>,
128        projection_name: Option<String>,
129        metadata: std::collections::BTreeMap<String, String>,
130    ) -> RedDBResult<PhysicalAnalyticsJob> {
131        if let Some(projection_name) = projection_name.as_deref() {
132            let status = self
133                .graph_projection_statuses()
134                .into_iter()
135                .find(|status| status.name == projection_name)
136                .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
137            if !status.declared {
138                return Err(RedDBError::Catalog(format!(
139                    "graph projection '{projection_name}' is not declared"
140                )));
141            }
142            if !status.operational {
143                return Err(RedDBError::Catalog(format!(
144                    "graph projection '{projection_name}' is declared but not operationally materialized"
145                )));
146            }
147            if status.lifecycle_state == "stale" {
148                return Err(RedDBError::Catalog(format!(
149                    "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can start against it"
150                )));
151            }
152        }
153        self.inner
154            .db
155            .start_analytics_job(kind, projection_name, metadata)
156            .map_err(|err| RedDBError::Internal(err.to_string()))
157    }
158
159    pub fn queue_analytics_job(
160        &self,
161        kind: impl Into<String>,
162        projection_name: Option<String>,
163        metadata: std::collections::BTreeMap<String, String>,
164    ) -> RedDBResult<PhysicalAnalyticsJob> {
165        if let Some(projection_name) = projection_name.as_deref() {
166            let status = self
167                .graph_projection_statuses()
168                .into_iter()
169                .find(|status| status.name == projection_name)
170                .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
171            if !status.declared {
172                return Err(RedDBError::Catalog(format!(
173                    "graph projection '{projection_name}' is not declared"
174                )));
175            }
176            if !status.operational {
177                return Err(RedDBError::Catalog(format!(
178                    "graph projection '{projection_name}' is declared but not operationally materialized"
179                )));
180            }
181            if status.lifecycle_state == "stale" {
182                return Err(RedDBError::Catalog(format!(
183                    "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can be queued against it"
184                )));
185            }
186        }
187        self.inner
188            .db
189            .queue_analytics_job(kind, projection_name, metadata)
190            .map_err(|err| RedDBError::Internal(err.to_string()))
191    }
192
193    pub fn fail_analytics_job(
194        &self,
195        kind: impl Into<String>,
196        projection_name: Option<String>,
197        metadata: std::collections::BTreeMap<String, String>,
198    ) -> RedDBResult<PhysicalAnalyticsJob> {
199        self.inner
200            .db
201            .fail_analytics_job(kind, projection_name, metadata)
202            .map_err(|err| RedDBError::Internal(err.to_string()))
203    }
204
205    pub fn mark_analytics_job_stale(
206        &self,
207        kind: impl Into<String>,
208        projection_name: Option<String>,
209        metadata: std::collections::BTreeMap<String, String>,
210    ) -> RedDBResult<PhysicalAnalyticsJob> {
211        self.inner
212            .db
213            .mark_analytics_job_stale(kind, projection_name, metadata)
214            .map_err(|err| RedDBError::Internal(err.to_string()))
215    }
216
217    pub fn complete_analytics_job(
218        &self,
219        kind: impl Into<String>,
220        projection_name: Option<String>,
221        metadata: std::collections::BTreeMap<String, String>,
222    ) -> RedDBResult<PhysicalAnalyticsJob> {
223        self.inner
224            .db
225            .record_analytics_job(kind, projection_name, metadata)
226            .map_err(|err| RedDBError::Internal(err.to_string()))
227    }
228
229    pub fn record_analytics_job(
230        &self,
231        kind: impl Into<String>,
232        projection_name: Option<String>,
233        metadata: std::collections::BTreeMap<String, String>,
234    ) -> RedDBResult<PhysicalAnalyticsJob> {
235        if let Some(projection_name) = projection_name.as_deref() {
236            let status = self
237                .graph_projection_statuses()
238                .into_iter()
239                .find(|status| status.name == projection_name)
240                .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
241            if !status.declared {
242                return Err(RedDBError::Catalog(format!(
243                    "graph projection '{projection_name}' is not declared"
244                )));
245            }
246            if !status.operational {
247                return Err(RedDBError::Catalog(format!(
248                    "graph projection '{projection_name}' is declared but not operationally materialized"
249                )));
250            }
251            if status.lifecycle_state == "stale" {
252                return Err(RedDBError::Catalog(format!(
253                    "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can complete against it"
254                )));
255            }
256        }
257        self.inner
258            .db
259            .record_analytics_job(kind, projection_name, metadata)
260            .map_err(|err| RedDBError::Internal(err.to_string()))
261    }
262
263    pub fn resolve_graph_projection(
264        &self,
265        projection_name: Option<&str>,
266        inline: Option<RuntimeGraphProjection>,
267    ) -> RedDBResult<Option<RuntimeGraphProjection>> {
268        let named = match projection_name {
269            Some(name) => Some(self.graph_projection_named(name)?),
270            None => None,
271        };
272        Ok(merge_runtime_projection(named, inline))
273    }
274
275    pub fn apply_retention_policy(&self) -> RedDBResult<()> {
276        self.inner
277            .db
278            .enforce_retention_policy()
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        self.enforce_metrics_raw_retention()?;
281        self.invalidate_result_cache();
282        Ok(())
283    }
284
285    fn enforce_metrics_raw_retention(&self) -> RedDBResult<()> {
286        let now_ns = std::time::SystemTime::now()
287            .duration_since(std::time::UNIX_EPOCH)
288            .unwrap_or_default()
289            .as_nanos()
290            .min(u128::from(u64::MAX)) as u64;
291        let store = self.inner.db.store();
292
293        for contract in self
294            .inner
295            .db
296            .collection_contracts()
297            .into_iter()
298            .filter(|contract| contract.declared_model == crate::catalog::CollectionModel::Metrics)
299        {
300            let Some(raw_retention_ms) = contract.metrics_raw_retention_ms else {
301                continue;
302            };
303            let cutoff_ns = now_ns.saturating_sub(raw_retention_ms.saturating_mul(1_000_000));
304            let Some(manager) = store.get_collection(&contract.name) else {
305                continue;
306            };
307            let expired = manager.query_all(|entity| match &entity.data {
308                crate::storage::EntityData::TimeSeries(point) => point.timestamp_ns < cutoff_ns,
309                _ => false,
310            });
311            for entity in expired {
312                store
313                    .delete(&contract.name, entity.id)
314                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
315            }
316        }
317
318        Ok(())
319    }
320
321    pub fn indexes(&self) -> Vec<crate::PhysicalIndexState> {
322        self.inner.db.operational_indexes()
323    }
324
325    pub fn declared_indexes(&self) -> Vec<crate::PhysicalIndexState> {
326        self.inner.db.declared_indexes()
327    }
328
329    pub fn declared_indexes_for_collection(
330        &self,
331        collection: &str,
332    ) -> Vec<crate::PhysicalIndexState> {
333        self.inner
334            .db
335            .declared_indexes()
336            .into_iter()
337            .filter(|index| index.collection.as_deref() == Some(collection))
338            .collect()
339    }
340
341    pub fn index_statuses(&self) -> Vec<crate::catalog::CatalogIndexStatus> {
342        self.inner.db.index_statuses()
343    }
344
345    pub fn graph_projection_statuses(&self) -> Vec<crate::catalog::CatalogGraphProjectionStatus> {
346        self.inner
347            .db
348            .catalog_model_snapshot()
349            .graph_projection_statuses
350    }
351
352    pub fn analytics_job_statuses(&self) -> Vec<crate::catalog::CatalogAnalyticsJobStatus> {
353        self.inner
354            .db
355            .catalog_model_snapshot()
356            .analytics_job_statuses
357    }
358
359    pub fn indexes_for_collection(&self, collection: &str) -> Vec<crate::PhysicalIndexState> {
360        self.inner
361            .db
362            .operational_indexes()
363            .into_iter()
364            .filter(|index| index.collection.as_deref() == Some(collection))
365            .collect()
366    }
367
368    pub fn set_index_enabled(
369        &self,
370        name: &str,
371        enabled: bool,
372    ) -> RedDBResult<crate::PhysicalIndexState> {
373        self.check_write(crate::runtime::write_gate::WriteKind::Maintenance)?;
374        self.inner
375            .db
376            .set_index_enabled(name, enabled)
377            .map_err(|err| RedDBError::Internal(err.to_string()))?
378            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
379    }
380
381    pub fn mark_index_building(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
382        self.inner
383            .db
384            .mark_index_building(name)
385            .map_err(|err| RedDBError::Internal(err.to_string()))?
386            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
387    }
388
389    pub fn fail_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
390        self.inner
391            .db
392            .fail_index(name)
393            .map_err(|err| RedDBError::Internal(err.to_string()))?
394            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
395    }
396
397    pub fn mark_index_stale(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
398        self.inner
399            .db
400            .mark_index_stale(name)
401            .map_err(|err| RedDBError::Internal(err.to_string()))?
402            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
403    }
404
405    pub fn mark_index_ready(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
406        self.inner
407            .db
408            .mark_index_ready(name)
409            .map_err(|err| RedDBError::Internal(err.to_string()))?
410            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
411    }
412
413    pub fn warmup_index_with_lifecycle(
414        &self,
415        name: &str,
416    ) -> RedDBResult<crate::PhysicalIndexState> {
417        self.mark_index_building(name)?;
418        match self.warmup_index(name) {
419            Ok(index) => Ok(index),
420            Err(err) => {
421                let _ = self.fail_index(name);
422                Err(err)
423            }
424        }
425    }
426
427    pub fn warmup_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
428        self.inner
429            .db
430            .warmup_index(name)
431            .map_err(|err| RedDBError::Internal(err.to_string()))?
432            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
433    }
434
435    pub fn rebuild_indexes(
436        &self,
437        collection: Option<&str>,
438    ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
439        self.inner
440            .db
441            .rebuild_index_registry(collection)
442            .map_err(|err| RedDBError::Internal(err.to_string()))
443    }
444
445    pub fn rebuild_indexes_with_lifecycle(
446        &self,
447        collection: Option<&str>,
448    ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
449        let target_names: Vec<String> = match collection {
450            Some(collection) => self
451                .declared_indexes_for_collection(collection)
452                .into_iter()
453                .map(|index| index.name)
454                .collect(),
455            None => self
456                .declared_indexes()
457                .into_iter()
458                .map(|index| index.name)
459                .collect(),
460        };
461
462        let mut marked_building = Vec::new();
463        for name in target_names {
464            if self.mark_index_building(&name).is_ok() {
465                marked_building.push(name);
466            }
467        }
468
469        match self.rebuild_indexes(collection) {
470            Ok(indexes) => Ok(indexes),
471            Err(err) => {
472                for name in marked_building {
473                    let _ = self.fail_index(&name);
474                }
475                Err(err)
476            }
477        }
478    }
479}