reddb_server/application/
admin.rs1use std::collections::BTreeMap;
2
3use crate::application::ports::RuntimeAdminPort;
4use crate::catalog::{CatalogAnalyticsJobStatus, CatalogGraphProjectionStatus, CatalogIndexStatus};
5use crate::runtime::RuntimeGraphProjection;
6use crate::{PhysicalAnalyticsJob, PhysicalGraphProjection, PhysicalIndexState, RedDBResult};
7
8#[derive(Debug, Clone, Default)]
9pub struct ServerlessAnalyticsWarmupTarget {
10 pub kind: String,
11 pub projection: Option<String>,
12}
13
14#[derive(Debug, Clone, Default)]
15pub struct ServerlessWarmupPlan {
16 pub indexes: Vec<String>,
17 pub graph_projections: Vec<String>,
18 pub analytics_jobs: Vec<ServerlessAnalyticsWarmupTarget>,
19 pub includes_native_artifacts: bool,
20}
21
22pub struct AdminUseCases<'a, P: ?Sized> {
23 runtime: &'a P,
24}
25
26impl<'a, P: RuntimeAdminPort + ?Sized> AdminUseCases<'a, P> {
27 pub fn new(runtime: &'a P) -> Self {
28 Self { runtime }
29 }
30
31 pub fn set_index_enabled(&self, name: &str, enabled: bool) -> RedDBResult<PhysicalIndexState> {
32 self.runtime.set_index_enabled(name, enabled)
33 }
34
35 pub fn mark_index_building(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
36 self.runtime.mark_index_building(name)
37 }
38
39 pub fn fail_index(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
40 self.runtime.fail_index(name)
41 }
42
43 pub fn mark_index_stale(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
44 self.runtime.mark_index_stale(name)
45 }
46
47 pub fn mark_index_ready(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
48 self.runtime.mark_index_ready(name)
49 }
50
51 pub fn warmup_index(&self, name: &str) -> RedDBResult<PhysicalIndexState> {
52 self.runtime.warmup_index_with_lifecycle(name)
53 }
54
55 pub fn rebuild_indexes(
56 &self,
57 collection: Option<&str>,
58 ) -> RedDBResult<Vec<PhysicalIndexState>> {
59 self.runtime.rebuild_indexes_with_lifecycle(collection)
60 }
61
62 pub fn save_graph_projection(
63 &self,
64 name: impl Into<String>,
65 projection: RuntimeGraphProjection,
66 source: Option<String>,
67 ) -> RedDBResult<PhysicalGraphProjection> {
68 self.runtime.save_graph_projection(name, projection, source)
69 }
70
71 pub fn mark_graph_projection_materializing(
72 &self,
73 name: &str,
74 ) -> RedDBResult<PhysicalGraphProjection> {
75 self.runtime.mark_graph_projection_materializing(name)
76 }
77
78 pub fn materialize_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
79 self.runtime.materialize_graph_projection(name)
80 }
81
82 pub fn fail_graph_projection(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
83 self.runtime.fail_graph_projection(name)
84 }
85
86 pub fn mark_graph_projection_stale(&self, name: &str) -> RedDBResult<PhysicalGraphProjection> {
87 self.runtime.mark_graph_projection_stale(name)
88 }
89
90 pub fn save_analytics_job(
91 &self,
92 kind: impl Into<String>,
93 projection_name: Option<String>,
94 metadata: BTreeMap<String, String>,
95 ) -> RedDBResult<PhysicalAnalyticsJob> {
96 self.runtime
97 .save_analytics_job(kind, projection_name, metadata)
98 }
99
100 pub fn start_analytics_job(
101 &self,
102 kind: impl Into<String>,
103 projection_name: Option<String>,
104 metadata: BTreeMap<String, String>,
105 ) -> RedDBResult<PhysicalAnalyticsJob> {
106 self.runtime
107 .start_analytics_job(kind, projection_name, metadata)
108 }
109
110 pub fn queue_analytics_job(
111 &self,
112 kind: impl Into<String>,
113 projection_name: Option<String>,
114 metadata: BTreeMap<String, String>,
115 ) -> RedDBResult<PhysicalAnalyticsJob> {
116 self.runtime
117 .queue_analytics_job(kind, projection_name, metadata)
118 }
119
120 pub fn fail_analytics_job(
121 &self,
122 kind: impl Into<String>,
123 projection_name: Option<String>,
124 metadata: BTreeMap<String, String>,
125 ) -> RedDBResult<PhysicalAnalyticsJob> {
126 self.runtime
127 .fail_analytics_job(kind, projection_name, metadata)
128 }
129
130 pub fn mark_analytics_job_stale(
131 &self,
132 kind: impl Into<String>,
133 projection_name: Option<String>,
134 metadata: BTreeMap<String, String>,
135 ) -> RedDBResult<PhysicalAnalyticsJob> {
136 self.runtime
137 .mark_analytics_job_stale(kind, projection_name, metadata)
138 }
139
140 pub fn complete_analytics_job(
141 &self,
142 kind: impl Into<String>,
143 projection_name: Option<String>,
144 metadata: BTreeMap<String, String>,
145 ) -> RedDBResult<PhysicalAnalyticsJob> {
146 self.runtime
147 .complete_analytics_job(kind, projection_name, metadata)
148 }
149
150 pub fn build_serverless_warmup_plan(
151 &self,
152 index_statuses: &[CatalogIndexStatus],
153 graph_projection_statuses: &[CatalogGraphProjectionStatus],
154 analytics_job_statuses: &[CatalogAnalyticsJobStatus],
155 force: bool,
156 include_indexes: bool,
157 include_graph_projections: bool,
158 include_analytics_jobs: bool,
159 include_native_artifacts: bool,
160 ) -> ServerlessWarmupPlan {
161 let mut plan = ServerlessWarmupPlan::default();
162
163 if include_indexes {
164 for status in index_statuses {
165 if !status.declared {
166 continue;
167 }
168 if force || status.requires_rebuild {
169 plan.indexes.push(status.name.clone());
170 }
171 }
172 }
173
174 if include_graph_projections {
175 for status in graph_projection_statuses {
176 if !status.declared {
177 continue;
178 }
179 let should_rematerialize = force
180 || status.requires_rematerialization
181 || status.rerun_required
182 || !status.dependent_jobs_in_sync;
183 if should_rematerialize {
184 plan.graph_projections.push(status.name.clone());
185 }
186 }
187 }
188
189 if include_analytics_jobs {
190 for status in analytics_job_statuses {
191 if !status.declared {
192 continue;
193 }
194 if force || status.requires_rerun || status.executable {
195 plan.analytics_jobs.push(ServerlessAnalyticsWarmupTarget {
196 kind: status.kind.clone(),
197 projection: status.projection.clone(),
198 });
199 }
200 }
201 }
202
203 plan.includes_native_artifacts = include_native_artifacts;
204 plan
205 }
206}