reddb_server/runtime/
continuous_materialized_view.rs1use crate::api::{RedDBError, RedDBResult};
15use crate::storage::schema::Value;
16use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
17
18pub const CATALOG_COLLECTION: &str = "red_materialized_view_defs";
22
23#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct MaterializedViewDescriptor {
31 pub name: String,
33 pub source_sql: String,
37 pub source_collections: Vec<String>,
40 pub refresh_every_ms: Option<u64>,
43 pub retention_duration_ms: Option<u64>,
46}
47
48impl MaterializedViewDescriptor {
49 fn to_row_entity(&self) -> UnifiedEntity {
55 let mut named = std::collections::HashMap::new();
56 named.insert("name".to_string(), Value::text(self.name.clone()));
57 named.insert(
58 "source_sql".to_string(),
59 Value::text(self.source_sql.clone()),
60 );
61 named.insert(
62 "source_collections".to_string(),
63 Value::Array(
64 self.source_collections
65 .iter()
66 .map(|s| Value::text(s.clone()))
67 .collect(),
68 ),
69 );
70 named.insert(
71 "refresh_every_ms".to_string(),
72 match self.refresh_every_ms {
73 Some(ms) => Value::UnsignedInteger(ms),
74 None => Value::Null,
75 },
76 );
77 named.insert(
78 "retention_duration_ms".to_string(),
79 match self.retention_duration_ms {
80 Some(ms) => Value::UnsignedInteger(ms),
81 None => Value::Null,
82 },
83 );
84 UnifiedEntity::new(
85 EntityId::new(0),
86 EntityKind::TableRow {
87 table: std::sync::Arc::from(CATALOG_COLLECTION),
88 row_id: 0,
89 },
90 EntityData::Row(RowData {
91 columns: Vec::new(),
92 named: Some(named),
93 schema: None,
94 }),
95 )
96 }
97}
98
99pub(crate) fn decode_row(entity: &UnifiedEntity) -> Option<MaterializedViewDescriptor> {
104 let EntityData::Row(row) = &entity.data else {
105 return None;
106 };
107 let named = row.named.as_ref()?;
108 let name = match named.get("name")? {
109 Value::Text(s) => s.to_string(),
110 _ => return None,
111 };
112 let source_sql = match named.get("source_sql")? {
113 Value::Text(s) => s.to_string(),
114 _ => return None,
115 };
116 let source_collections = match named.get("source_collections") {
117 Some(Value::Array(values)) => values
118 .iter()
119 .filter_map(|v| match v {
120 Value::Text(s) => Some(s.to_string()),
121 _ => None,
122 })
123 .collect(),
124 _ => Vec::new(),
125 };
126 let refresh_every_ms = match named.get("refresh_every_ms") {
127 Some(Value::UnsignedInteger(ms)) => Some(*ms),
128 Some(Value::Integer(ms)) if *ms >= 0 => Some(*ms as u64),
129 _ => None,
130 };
131 let retention_duration_ms = match named.get("retention_duration_ms") {
132 Some(Value::UnsignedInteger(ms)) => Some(*ms),
133 Some(Value::Integer(ms)) if *ms >= 0 => Some(*ms as u64),
134 _ => None,
135 };
136 Some(MaterializedViewDescriptor {
137 name,
138 source_sql,
139 source_collections,
140 refresh_every_ms,
141 retention_duration_ms,
142 })
143}
144
145pub(crate) fn persist_descriptor(
151 store: &crate::storage::unified::UnifiedStore,
152 descriptor: &MaterializedViewDescriptor,
153) -> RedDBResult<()> {
154 let _ = store.get_or_create_collection(CATALOG_COLLECTION);
155 remove_by_name(store, &descriptor.name)?;
156 let entity = descriptor.to_row_entity();
157 store
158 .insert_auto(CATALOG_COLLECTION, entity)
159 .map_err(|err| {
160 RedDBError::Internal(format!(
161 "persist materialized-view descriptor {}: {err}",
162 descriptor.name
163 ))
164 })?;
165 Ok(())
166}
167
168pub(crate) fn remove_by_name(
172 store: &crate::storage::unified::UnifiedStore,
173 name: &str,
174) -> RedDBResult<()> {
175 let Some(manager) = store.get_collection(CATALOG_COLLECTION) else {
176 return Ok(());
177 };
178 let mut to_delete: Vec<EntityId> = Vec::new();
179 for entity in manager.query_all(|_| true) {
180 let EntityData::Row(row) = &entity.data else {
181 continue;
182 };
183 let Some(named) = &row.named else { continue };
184 if let Some(Value::Text(stored)) = named.get("name") {
185 if stored.as_ref() == name {
186 to_delete.push(entity.id);
187 }
188 }
189 }
190 for id in to_delete {
191 store.delete(CATALOG_COLLECTION, id).map_err(|err| {
192 RedDBError::Internal(format!(
193 "delete materialized-view descriptor row for {name}: {err}"
194 ))
195 })?;
196 }
197 Ok(())
198}
199
200pub(crate) fn load_all(
203 store: &crate::storage::unified::UnifiedStore,
204) -> Vec<MaterializedViewDescriptor> {
205 let Some(manager) = store.get_collection(CATALOG_COLLECTION) else {
206 return Vec::new();
207 };
208 manager
209 .query_all(|_| true)
210 .iter()
211 .filter_map(decode_row)
212 .collect()
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn row_entity_roundtrips_through_decode() {
221 let d = MaterializedViewDescriptor {
222 name: "v".into(),
223 source_sql: "CREATE MATERIALIZED VIEW v AS SELECT * FROM t".into(),
224 source_collections: vec!["t".into(), "u".into()],
225 refresh_every_ms: Some(60_000),
226 retention_duration_ms: Some(7 * 24 * 3_600_000),
227 };
228 let entity = d.to_row_entity();
229 let back = decode_row(&entity).expect("decode");
230 assert_eq!(d, back);
231 }
232
233 #[test]
234 fn null_options_decode_to_none() {
235 let d = MaterializedViewDescriptor {
236 name: "v".into(),
237 source_sql: "CREATE MATERIALIZED VIEW v AS SELECT 1".into(),
238 source_collections: vec![],
239 refresh_every_ms: None,
240 retention_duration_ms: None,
241 };
242 let entity = d.to_row_entity();
243 let back = decode_row(&entity).expect("decode");
244 assert!(back.refresh_every_ms.is_none());
245 assert!(back.retention_duration_ms.is_none());
246 }
247}