Skip to main content

reddb_server/runtime/
continuous_materialized_view.rs

1//! Continuous (materialized) view descriptor + catalog persistence
2//! (issue #593, slice 9a of #575).
3//!
4//! Today the view registry on `RuntimeInner` is purely in-memory:
5//! every `CREATE [MATERIALIZED] VIEW` lands in
6//! `inner.views: RwLock<HashMap<String, Arc<CreateViewQuery>>>` and a
7//! restart loses every definition. This module introduces the
8//! descriptor type and the persistence layer that backs the registry
9//! onto the system collection [`CATALOG_COLLECTION`]. Read / write /
10//! refresh code paths are unchanged — the rehydrate hook at boot
11//! repopulates the in-memory registry from the persisted rows before
12//! the API opens.
13
14use crate::api::{RedDBError, RedDBResult};
15use crate::storage::schema::Value;
16use crate::storage::unified::entity::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
17
18/// Name of the system collection that stores one row per
19/// materialized-view definition. Bootstrapped alongside the other
20/// keyed system collections (`red.config`, `red.vault`) at boot.
21pub const CATALOG_COLLECTION: &str = "red_materialized_view_defs";
22
23/// Persisted shape of a single `CREATE MATERIALIZED VIEW`.
24///
25/// The descriptor stores the original SQL source so the body AST can
26/// be recovered by re-parsing at boot — this avoids embedding a
27/// version-dependent AST serialization in the on-disk catalog, and
28/// keeps the rehydrate path symmetric with the user-facing CREATE.
29#[derive(Debug, Clone, PartialEq, Eq)]
30pub struct MaterializedViewDescriptor {
31    /// View name as declared in `CREATE MATERIALIZED VIEW <name>`.
32    pub name: String,
33    /// Verbatim SQL source of the `CREATE MATERIALIZED VIEW`
34    /// statement. The rehydrate path re-parses this string to recover
35    /// the body AST.
36    pub source_sql: String,
37    /// Source collections referenced by the view body — populated
38    /// from `collect_table_refs(&q.query)` at creation time.
39    pub source_collections: Vec<String>,
40    /// `REFRESH EVERY <duration>` clause in milliseconds, or `None`
41    /// for refresh-on-demand views.
42    pub refresh_every_ms: Option<u64>,
43    /// `WITH RETENTION <duration>` clause in milliseconds, or `None`
44    /// when no retention policy was declared on the view.
45    pub retention_duration_ms: Option<u64>,
46}
47
48impl MaterializedViewDescriptor {
49    /// Build a row entity suitable for `insert_auto(CATALOG_COLLECTION, …)`.
50    /// Each descriptor field maps to one named column on the row, in
51    /// the same shape `red_config` uses for its key/value entries —
52    /// keeps the storage layout introspectable from SQL without a
53    /// JSON parser.
54    fn to_row_entity(&self) -> UnifiedEntity {
55        let mut named = std::collections::HashMap::new();
56        named.insert("name".to_string(), Value::text(self.name.clone()));
57        named.insert(
58            "source_sql".to_string(),
59            Value::text(self.source_sql.clone()),
60        );
61        named.insert(
62            "source_collections".to_string(),
63            Value::Array(
64                self.source_collections
65                    .iter()
66                    .map(|s| Value::text(s.clone()))
67                    .collect(),
68            ),
69        );
70        named.insert(
71            "refresh_every_ms".to_string(),
72            match self.refresh_every_ms {
73                Some(ms) => Value::UnsignedInteger(ms),
74                None => Value::Null,
75            },
76        );
77        named.insert(
78            "retention_duration_ms".to_string(),
79            match self.retention_duration_ms {
80                Some(ms) => Value::UnsignedInteger(ms),
81                None => Value::Null,
82            },
83        );
84        UnifiedEntity::new(
85            EntityId::new(0),
86            EntityKind::TableRow {
87                table: std::sync::Arc::from(CATALOG_COLLECTION),
88                row_id: 0,
89            },
90            EntityData::Row(RowData {
91                columns: Vec::new(),
92                named: Some(named),
93                schema: None,
94            }),
95        )
96    }
97}
98
99/// Decode a stored row back into a descriptor. Returns `None` for
100/// rows that are missing the required `name` / `source_sql` columns
101/// — boot-time rehydrate logs and continues so a single malformed
102/// entry does not block startup.
103pub(crate) fn decode_row(entity: &UnifiedEntity) -> Option<MaterializedViewDescriptor> {
104    let EntityData::Row(row) = &entity.data else {
105        return None;
106    };
107    let named = row.named.as_ref()?;
108    let name = match named.get("name")? {
109        Value::Text(s) => s.to_string(),
110        _ => return None,
111    };
112    let source_sql = match named.get("source_sql")? {
113        Value::Text(s) => s.to_string(),
114        _ => return None,
115    };
116    let source_collections = match named.get("source_collections") {
117        Some(Value::Array(values)) => values
118            .iter()
119            .filter_map(|v| match v {
120                Value::Text(s) => Some(s.to_string()),
121                _ => None,
122            })
123            .collect(),
124        _ => Vec::new(),
125    };
126    let refresh_every_ms = match named.get("refresh_every_ms") {
127        Some(Value::UnsignedInteger(ms)) => Some(*ms),
128        Some(Value::Integer(ms)) if *ms >= 0 => Some(*ms as u64),
129        _ => None,
130    };
131    let retention_duration_ms = match named.get("retention_duration_ms") {
132        Some(Value::UnsignedInteger(ms)) => Some(*ms),
133        Some(Value::Integer(ms)) if *ms >= 0 => Some(*ms as u64),
134        _ => None,
135    };
136    Some(MaterializedViewDescriptor {
137        name,
138        source_sql,
139        source_collections,
140        refresh_every_ms,
141        retention_duration_ms,
142    })
143}
144
145/// Persist a descriptor to the catalog collection, replacing any
146/// prior rows with the same `name`. Idempotent: re-persisting an
147/// existing view (e.g. `CREATE OR REPLACE`) leaves exactly one row
148/// behind, so the catalog never accumulates duplicates across
149/// repeated definition churn.
150pub(crate) fn persist_descriptor(
151    store: &crate::storage::unified::UnifiedStore,
152    descriptor: &MaterializedViewDescriptor,
153) -> RedDBResult<()> {
154    let _ = store.get_or_create_collection(CATALOG_COLLECTION);
155    remove_by_name(store, &descriptor.name)?;
156    let entity = descriptor.to_row_entity();
157    store
158        .insert_auto(CATALOG_COLLECTION, entity)
159        .map_err(|err| {
160            RedDBError::Internal(format!(
161                "persist materialized-view descriptor {}: {err}",
162                descriptor.name
163            ))
164        })?;
165    Ok(())
166}
167
168/// Remove every row whose `name` column matches `name`. Used on
169/// `DROP MATERIALIZED VIEW` and as the first half of
170/// [`persist_descriptor`]'s upsert semantics.
171pub(crate) fn remove_by_name(
172    store: &crate::storage::unified::UnifiedStore,
173    name: &str,
174) -> RedDBResult<()> {
175    let Some(manager) = store.get_collection(CATALOG_COLLECTION) else {
176        return Ok(());
177    };
178    let mut to_delete: Vec<EntityId> = Vec::new();
179    for entity in manager.query_all(|_| true) {
180        let EntityData::Row(row) = &entity.data else {
181            continue;
182        };
183        let Some(named) = &row.named else { continue };
184        if let Some(Value::Text(stored)) = named.get("name") {
185            if stored.as_ref() == name {
186                to_delete.push(entity.id);
187            }
188        }
189    }
190    for id in to_delete {
191        store.delete(CATALOG_COLLECTION, id).map_err(|err| {
192            RedDBError::Internal(format!(
193                "delete materialized-view descriptor row for {name}: {err}"
194            ))
195        })?;
196    }
197    Ok(())
198}
199
200/// Read every persisted descriptor. Returns an empty vector when the
201/// catalog collection doesn't exist (fresh datadir / first boot).
202pub(crate) fn load_all(
203    store: &crate::storage::unified::UnifiedStore,
204) -> Vec<MaterializedViewDescriptor> {
205    let Some(manager) = store.get_collection(CATALOG_COLLECTION) else {
206        return Vec::new();
207    };
208    manager
209        .query_all(|_| true)
210        .iter()
211        .filter_map(decode_row)
212        .collect()
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn row_entity_roundtrips_through_decode() {
221        let d = MaterializedViewDescriptor {
222            name: "v".into(),
223            source_sql: "CREATE MATERIALIZED VIEW v AS SELECT * FROM t".into(),
224            source_collections: vec!["t".into(), "u".into()],
225            refresh_every_ms: Some(60_000),
226            retention_duration_ms: Some(7 * 24 * 3_600_000),
227        };
228        let entity = d.to_row_entity();
229        let back = decode_row(&entity).expect("decode");
230        assert_eq!(d, back);
231    }
232
233    #[test]
234    fn null_options_decode_to_none() {
235        let d = MaterializedViewDescriptor {
236            name: "v".into(),
237            source_sql: "CREATE MATERIALIZED VIEW v AS SELECT 1".into(),
238            source_collections: vec![],
239            refresh_every_ms: None,
240            retention_duration_ms: None,
241        };
242        let entity = d.to_row_entity();
243        let back = decode_row(&entity).expect("decode");
244        assert!(back.refresh_every_ms.is_none());
245        assert!(back.retention_duration_ms.is_none());
246    }
247}