Skip to main content

reddb_server/application/
admin.rs

1use std::collections::BTreeMap;
2
3use crate::application::ports::RuntimeAdminPort;
4use crate::catalog::{CatalogAnalyticsJobStatus, CatalogGraphProjectionStatus, CatalogIndexStatus};
5use crate::runtime::RuntimeGraphProjection;
6use crate::{PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState, RedDBResult};
7
8#[derive(Debug, Clone, Default)]
9pub struct ServerlessAnalyticsWarmupTarget {
10    pub kind: String,
11    pub projection: Option<String>,
12}
13
14#[derive(Debug, Clone, Default)]
15pub struct ServerlessWarmupPlan {
16    pub indexes: Vec<String>,
17    pub graph_projections: Vec<String>,
18    pub analytics_jobs: Vec<ServerlessAnalyticsWarmupTarget>,
19    pub includes_native_artifacts: bool,
20}
21
22pub struct AdminUseCases<'a, P: ?Sized> {
23    runtime: &'a P,
24}
25
26impl<'a, P: RuntimeAdminPort + ?Sized> AdminUseCases<'a, P> {
27    pub fn new(runtime: &'a P) -> Self {
28        Self { runtime }
29    }
30
31    pub fn set_index_enabled(&self, name: &str, enabled: bool) -> RedDBResult<PhysicalIndexState> {
32        self.runtime.set_index_enabled(name, enabled)
33    }
34
35    pub fn mark_index_building(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
36        self.runtime.mark_index_building(name)
37    }
38
39    pub fn fail_index(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
40        self.runtime.fail_index(name)
41    }
42
43    pub fn mark_index_stale(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
44        self.runtime.mark_index_stale(name)
45    }
46
47    pub fn mark_index_ready(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
48        self.runtime.mark_index_ready(name)
49    }
50
51    pub fn warmup_index(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
52        self.runtime.warmup_index_with_lifecycle(name)
53    }
54
55    pub fn rebuild_indexes(
56        &self,
57        collection: Option<&str>,
58    ) -> RedDBResult<Vec<PhysicalIndexState>> {
59        self.runtime.rebuild_indexes_with_lifecycle(collection)
60    }
61
62    pub fn save_graph_projection(
63        &self,
64        name: impl Into<String>,
65        projection: RuntimeGraphProjection,
66        source: Option<String>,
67    ) -> RedDBResult<PhysicalGraphProjection> {
68        self.runtime.save_graph_projection(name, projection, source)
69    }
70
71    pub fn mark_graph_projection_materializing(
72        &self,
73        name: &str,
74    ) -> RedDBResult<PhysicalGraphProjection> {
75        self.runtime.mark_graph_projection_materializing(name)
76    }
77
78    pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
79        self.runtime.materialize_graph_projection(name)
80    }
81
82    pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
83        self.runtime.fail_graph_projection(name)
84    }
85
86    pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
87        self.runtime.mark_graph_projection_stale(name)
88    }
89
90    pub fn save_analytics_job(
91        &self,
92        kind: impl Into<String>,
93        projection_name: Option<String>,
94        metadata: BTreeMap<String, String>,
95    ) -> RedDBResult<PhysicalAnalyticsJob> {
96        self.runtime
97            .save_analytics_job(kind, projection_name, metadata)
98    }
99
100    pub fn start_analytics_job(
101        &self,
102        kind: impl Into<String>,
103        projection_name: Option<String>,
104        metadata: BTreeMap<String, String>,
105    ) -> RedDBResult<PhysicalAnalyticsJob> {
106        self.runtime
107            .start_analytics_job(kind, projection_name, metadata)
108    }
109
110    pub fn queue_analytics_job(
111        &self,
112        kind: impl Into<String>,
113        projection_name: Option<String>,
114        metadata: BTreeMap<String, String>,
115    ) -> RedDBResult<PhysicalAnalyticsJob> {
116        self.runtime
117            .queue_analytics_job(kind, projection_name, metadata)
118    }
119
120    pub fn fail_analytics_job(
121        &self,
122        kind: impl Into<String>,
123        projection_name: Option<String>,
124        metadata: BTreeMap<String, String>,
125    ) -> RedDBResult<PhysicalAnalyticsJob> {
126        self.runtime
127            .fail_analytics_job(kind, projection_name, metadata)
128    }
129
130    pub fn mark_analytics_job_stale(
131        &self,
132        kind: impl Into<String>,
133        projection_name: Option<String>,
134        metadata: BTreeMap<String, String>,
135    ) -> RedDBResult<PhysicalAnalyticsJob> {
136        self.runtime
137            .mark_analytics_job_stale(kind, projection_name, metadata)
138    }
139
140    pub fn complete_analytics_job(
141        &self,
142        kind: impl Into<String>,
143        projection_name: Option<String>,
144        metadata: BTreeMap<String, String>,
145    ) -> RedDBResult<PhysicalAnalyticsJob> {
146        self.runtime
147            .complete_analytics_job(kind, projection_name, metadata)
148    }
149
150    pub fn build_serverless_warmup_plan(
151        &self,
152        index_statuses: &[CatalogIndexStatus],
153        graph_projection_statuses: &[CatalogGraphProjectionStatus],
154        analytics_job_statuses: &[CatalogAnalyticsJobStatus],
155        force: bool,
156        include_indexes: bool,
157        include_graph_projections: bool,
158        include_analytics_jobs: bool,
159        include_native_artifacts: bool,
160    ) -> ServerlessWarmupPlan {
161        let mut plan = ServerlessWarmupPlan::default();
162
163        if include_indexes {
164            for status in index_statuses {
165                if !status.declared {
166                    continue;
167                }
168                if force || status.requires_rebuild {
169                    plan.indexes.push(status.name.clone());
170                }
171            }
172        }
173
174        if include_graph_projections {
175            for status in graph_projection_statuses {
176                if !status.declared {
177                    continue;
178                }
179                let should_rematerialize = force
180                    || status.requires_rematerialization
181                    || status.rerun_required
182                    || !status.dependent_jobs_in_sync;
183                if should_rematerialize {
184                    plan.graph_projections.push(status.name.clone());
185                }
186            }
187        }
188
189        if include_analytics_jobs {
190            for status in analytics_job_statuses {
191                if !status.declared {
192                    continue;
193                }
194                if force || status.requires_rerun || status.executable {
195                    plan.analytics_jobs.push(ServerlessAnalyticsWarmupTarget {
196                        kind: status.kind.clone(),
197                        projection: status.projection.clone(),
198                    });
199                }
200            }
201        }
202
203        plan.includes_native_artifacts = include_native_artifacts;
204        plan
205    }
206}