Skip to main content

reddb_server/runtime/
impl_physical.rs

1use super::*;
2
3impl RedDBRuntime {
4    pub fn create_export(&self, name: impl Into<String>) -> RedDBResult<ExportDescriptor> {
5        self.inner
6            .db
7            .create_named_export(name)
8            .map_err(|err| RedDBError::Internal(err.to_string()))
9    }
10
11    pub fn graph_projections(&self) -> RedDBResult<Vec<PhysicalGraphProjection>> {
12        Ok(self.inner.db.declared_graph_projections())
13    }
14
15    pub fn operational_graph_projections(&self) -> Vec<PhysicalGraphProjection> {
16        self.inner.db.operational_graph_projections()
17    }
18
19    pub fn graph_projection_named(&self, name: &str) -> RedDBResult<RuntimeGraphProjection> {
20        let status = self
21            .graph_projection_statuses()
22            .into_iter()
23            .find(|status| status.name == name)
24            .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
25        if !status.declared {
26            return Err(RedDBError::Catalog(format!(
27                "graph projection '{name}' is not declared"
28            )));
29        }
30        if !status.operational {
31            return Err(RedDBError::Catalog(format!(
32                "graph projection '{name}' is declared but not operationally materialized"
33            )));
34        }
35        if status.lifecycle_state == "stale" {
36            return Err(RedDBError::Catalog(format!(
37                "graph projection '{name}' is stale and must be rematerialized before use"
38            )));
39        }
40        let projection = self
41            .operational_graph_projections()
42            .into_iter()
43            .find(|projection| projection.name == name)
44            .ok_or_else(|| RedDBError::NotFound(name.to_string()))?;
45        Ok(RuntimeGraphProjection {
46            node_labels: (!projection.node_labels.is_empty()).then_some(projection.node_labels),
47            node_types: (!projection.node_types.is_empty()).then_some(projection.node_types),
48            edge_labels: (!projection.edge_labels.is_empty()).then_some(projection.edge_labels),
49        })
50    }
51
52    pub fn save_graph_projection(
53        &self,
54        name: impl Into<String>,
55        projection: RuntimeGraphProjection,
56        source: Option<String>,
57    ) -> RedDBResult<PhysicalGraphProjection> {
58        self.inner
59            .db
60            .save_graph_projection(
61                name,
62                projection.node_labels.unwrap_or_default(),
63                projection.node_types.unwrap_or_default(),
64                projection.edge_labels.unwrap_or_default(),
65                source.unwrap_or_else(|| "runtime".to_string()),
66            )
67            .map_err(|err| RedDBError::Internal(err.to_string()))
68    }
69
70    pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
71        self.inner
72            .db
73            .materialize_graph_projection(name)
74            .map_err(|err| RedDBError::Internal(err.to_string()))?
75            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
76    }
77
78    pub fn mark_graph_projection_materializing(
79        &self,
80        name: &str,
81    ) -> RedDBResult<PhysicalGraphProjection> {
82        self.inner
83            .db
84            .mark_graph_projection_materializing(name)
85            .map_err(|err| RedDBError::Internal(err.to_string()))?
86            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
87    }
88
89    pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
90        self.inner
91            .db
92            .fail_graph_projection(name)
93            .map_err(|err| RedDBError::Internal(err.to_string()))?
94            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
95    }
96
97    pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
98        self.inner
99            .db
100            .mark_graph_projection_stale(name)
101            .map_err(|err| RedDBError::Internal(err.to_string()))?
102            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
103    }
104
105    pub fn analytics_jobs(&self) -> RedDBResult<Vec<PhysicalAnalyticsJob>> {
106        Ok(self.inner.db.declared_analytics_jobs())
107    }
108
109    pub fn operational_analytics_jobs(&self) -> Vec<PhysicalAnalyticsJob> {
110        self.inner.db.operational_analytics_jobs()
111    }
112
113    pub fn save_analytics_job(
114        &self,
115        kind: impl Into<String>,
116        projection_name: Option<String>,
117        metadata: std::collections::BTreeMap<String, String>,
118    ) -> RedDBResult<PhysicalAnalyticsJob> {
119        self.inner
120            .db
121            .save_analytics_job(kind, projection_name, metadata)
122            .map_err(|err| RedDBError::Internal(err.to_string()))
123    }
124
125    pub fn start_analytics_job(
126        &self,
127        kind: impl Into<String>,
128        projection_name: Option<String>,
129        metadata: std::collections::BTreeMap<String, String>,
130    ) -> RedDBResult<PhysicalAnalyticsJob> {
131        if let Some(projection_name) = projection_name.as_deref() {
132            let status = self
133                .graph_projection_statuses()
134                .into_iter()
135                .find(|status| status.name == projection_name)
136                .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
137            if !status.declared {
138                return Err(RedDBError::Catalog(format!(
139                    "graph projection '{projection_name}' is not declared"
140                )));
141            }
142            if !status.operational {
143                return Err(RedDBError::Catalog(format!(
144                    "graph projection '{projection_name}' is declared but not operationally materialized"
145                )));
146            }
147            if status.lifecycle_state == "stale" {
148                return Err(RedDBError::Catalog(format!(
149                    "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can start against it"
150                )));
151            }
152        }
153        self.inner
154            .db
155            .start_analytics_job(kind, projection_name, metadata)
156            .map_err(|err| RedDBError::Internal(err.to_string()))
157    }
158
159    pub fn queue_analytics_job(
160        &self,
161        kind: impl Into<String>,
162        projection_name: Option<String>,
163        metadata: std::collections::BTreeMap<String, String>,
164    ) -> RedDBResult<PhysicalAnalyticsJob> {
165        if let Some(projection_name) = projection_name.as_deref() {
166            let status = self
167                .graph_projection_statuses()
168                .into_iter()
169                .find(|status| status.name == projection_name)
170                .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
171            if !status.declared {
172                return Err(RedDBError::Catalog(format!(
173                    "graph projection '{projection_name}' is not declared"
174                )));
175            }
176            if !status.operational {
177                return Err(RedDBError::Catalog(format!(
178                    "graph projection '{projection_name}' is declared but not operationally materialized"
179                )));
180            }
181            if status.lifecycle_state == "stale" {
182                return Err(RedDBError::Catalog(format!(
183                    "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can be queued against it"
184                )));
185            }
186        }
187        self.inner
188            .db
189            .queue_analytics_job(kind, projection_name, metadata)
190            .map_err(|err| RedDBError::Internal(err.to_string()))
191    }
192
193    pub fn fail_analytics_job(
194        &self,
195        kind: impl Into<String>,
196        projection_name: Option<String>,
197        metadata: std::collections::BTreeMap<String, String>,
198    ) -> RedDBResult<PhysicalAnalyticsJob> {
199        self.inner
200            .db
201            .fail_analytics_job(kind, projection_name, metadata)
202            .map_err(|err| RedDBError::Internal(err.to_string()))
203    }
204
205    pub fn mark_analytics_job_stale(
206        &self,
207        kind: impl Into<String>,
208        projection_name: Option<String>,
209        metadata: std::collections::BTreeMap<String, String>,
210    ) -> RedDBResult<PhysicalAnalyticsJob> {
211        self.inner
212            .db
213            .mark_analytics_job_stale(kind, projection_name, metadata)
214            .map_err(|err| RedDBError::Internal(err.to_string()))
215    }
216
217    pub fn complete_analytics_job(
218        &self,
219        kind: impl Into<String>,
220        projection_name: Option<String>,
221        metadata: std::collections::BTreeMap<String, String>,
222    ) -> RedDBResult<PhysicalAnalyticsJob> {
223        self.inner
224            .db
225            .record_analytics_job(kind, projection_name, metadata)
226            .map_err(|err| RedDBError::Internal(err.to_string()))
227    }
228
229    pub fn record_analytics_job(
230        &self,
231        kind: impl Into<String>,
232        projection_name: Option<String>,
233        metadata: std::collections::BTreeMap<String, String>,
234    ) -> RedDBResult<PhysicalAnalyticsJob> {
235        if let Some(projection_name) = projection_name.as_deref() {
236            let status = self
237                .graph_projection_statuses()
238                .into_iter()
239                .find(|status| status.name == projection_name)
240                .ok_or_else(|| RedDBError::NotFound(projection_name.to_string()))?;
241            if !status.declared {
242                return Err(RedDBError::Catalog(format!(
243                    "graph projection '{projection_name}' is not declared"
244                )));
245            }
246            if !status.operational {
247                return Err(RedDBError::Catalog(format!(
248                    "graph projection '{projection_name}' is declared but not operationally materialized"
249                )));
250            }
251            if status.lifecycle_state == "stale" {
252                return Err(RedDBError::Catalog(format!(
253                    "graph projection '{projection_name}' is stale and must be rematerialized before analytics jobs can complete against it"
254                )));
255            }
256        }
257        self.inner
258            .db
259            .record_analytics_job(kind, projection_name, metadata)
260            .map_err(|err| RedDBError::Internal(err.to_string()))
261    }
262
263    pub fn resolve_graph_projection(
264        &self,
265        projection_name: Option<&str>,
266        inline: Option<RuntimeGraphProjection>,
267    ) -> RedDBResult<Option<RuntimeGraphProjection>> {
268        let named = match projection_name {
269            Some(name) => Some(self.graph_projection_named(name)?),
270            None => None,
271        };
272        Ok(merge_runtime_projection(named, inline))
273    }
274
275    pub fn apply_retention_policy(&self) -> RedDBResult<()> {
276        self.inner
277            .db
278            .enforce_retention_policy()
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        self.invalidate_result_cache();
281        Ok(())
282    }
283
284    pub fn indexes(&self) -> Vec<crate::PhysicalIndexState> {
285        self.inner.db.operational_indexes()
286    }
287
288    pub fn declared_indexes(&self) -> Vec<crate::PhysicalIndexState> {
289        self.inner.db.declared_indexes()
290    }
291
292    pub fn declared_indexes_for_collection(
293        &self,
294        collection: &str,
295    ) -> Vec<crate::PhysicalIndexState> {
296        self.inner
297            .db
298            .declared_indexes()
299            .into_iter()
300            .filter(|index| index.collection.as_deref() == Some(collection))
301            .collect()
302    }
303
304    pub fn index_statuses(&self) -> Vec<crate::catalog::CatalogIndexStatus> {
305        self.inner.db.index_statuses()
306    }
307
308    pub fn graph_projection_statuses(&self) -> Vec<crate::catalog::CatalogGraphProjectionStatus> {
309        self.inner
310            .db
311            .catalog_model_snapshot()
312            .graph_projection_statuses
313    }
314
315    pub fn analytics_job_statuses(&self) -> Vec<crate::catalog::CatalogAnalyticsJobStatus> {
316        self.inner
317            .db
318            .catalog_model_snapshot()
319            .analytics_job_statuses
320    }
321
322    pub fn indexes_for_collection(&self, collection: &str) -> Vec<crate::PhysicalIndexState> {
323        self.inner
324            .db
325            .operational_indexes()
326            .into_iter()
327            .filter(|index| index.collection.as_deref() == Some(collection))
328            .collect()
329    }
330
331    pub fn set_index_enabled(
332        &self,
333        name: &str,
334        enabled: bool,
335    ) -> RedDBResult<crate::PhysicalIndexState> {
336        self.check_write(crate::runtime::write_gate::WriteKind::Maintenance)?;
337        self.inner
338            .db
339            .set_index_enabled(name, enabled)
340            .map_err(|err| RedDBError::Internal(err.to_string()))?
341            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
342    }
343
344    pub fn mark_index_building(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
345        self.inner
346            .db
347            .mark_index_building(name)
348            .map_err(|err| RedDBError::Internal(err.to_string()))?
349            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
350    }
351
352    pub fn fail_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
353        self.inner
354            .db
355            .fail_index(name)
356            .map_err(|err| RedDBError::Internal(err.to_string()))?
357            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
358    }
359
360    pub fn mark_index_stale(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
361        self.inner
362            .db
363            .mark_index_stale(name)
364            .map_err(|err| RedDBError::Internal(err.to_string()))?
365            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
366    }
367
368    pub fn mark_index_ready(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
369        self.inner
370            .db
371            .mark_index_ready(name)
372            .map_err(|err| RedDBError::Internal(err.to_string()))?
373            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
374    }
375
376    pub fn warmup_index_with_lifecycle(
377        &self,
378        name: &str,
379    ) -> RedDBResult<crate::PhysicalIndexState> {
380        self.mark_index_building(name)?;
381        match self.warmup_index(name) {
382            Ok(index) => Ok(index),
383            Err(err) => {
384                let _ = self.fail_index(name);
385                Err(err)
386            }
387        }
388    }
389
390    pub fn warmup_index(&self, name: &str) -> RedDBResult<crate::PhysicalIndexState> {
391        self.inner
392            .db
393            .warmup_index(name)
394            .map_err(|err| RedDBError::Internal(err.to_string()))?
395            .ok_or_else(|| RedDBError::NotFound(name.to_string()))
396    }
397
398    pub fn rebuild_indexes(
399        &self,
400        collection: Option<&str>,
401    ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
402        self.inner
403            .db
404            .rebuild_index_registry(collection)
405            .map_err(|err| RedDBError::Internal(err.to_string()))
406    }
407
408    pub fn rebuild_indexes_with_lifecycle(
409        &self,
410        collection: Option<&str>,
411    ) -> RedDBResult<Vec<crate::PhysicalIndexState>> {
412        let target_names: Vec<String> = match collection {
413            Some(collection) => self
414                .declared_indexes_for_collection(collection)
415                .into_iter()
416                .map(|index| index.name)
417                .collect(),
418            None => self
419                .declared_indexes()
420                .into_iter()
421                .map(|index| index.name)
422                .collect(),
423        };
424
425        let mut marked_building = Vec::new();
426        for name in target_names {
427            if self.mark_index_building(&name).is_ok() {
428                marked_building.push(name);
429            }
430        }
431
432        match self.rebuild_indexes(collection) {
433            Ok(indexes) => Ok(indexes),
434            Err(err) => {
435                for name in marked_building {
436                    let _ = self.fail_index(&name);
437                }
438                Err(err)
439            }
440        }
441    }
442}