Skip to main content

nodedb_types/columnar/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Strict document and columnar schemas with shared operations trait.
4
5use serde::{Deserialize, Serialize};
6
7use super::column_def::ColumnDef;
8use crate::columnar::ColumnType;
9
10/// Shared schema operations (eliminates duplication between Strict and Columnar).
11pub trait SchemaOps {
12    fn columns(&self) -> &[ColumnDef];
13
14    fn column_index(&self, name: &str) -> Option<usize> {
15        self.columns().iter().position(|c| c.name == name)
16    }
17
18    fn column(&self, name: &str) -> Option<&ColumnDef> {
19        self.columns().iter().find(|c| c.name == name)
20    }
21
22    fn primary_key_columns(&self) -> Vec<&ColumnDef> {
23        self.columns().iter().filter(|c| c.primary_key).collect()
24    }
25
26    fn len(&self) -> usize {
27        self.columns().len()
28    }
29
30    fn is_empty(&self) -> bool {
31        self.columns().is_empty()
32    }
33}
34
35/// Schema for a strict document collection (Binary Tuple serialization).
36#[derive(
37    Debug,
38    Clone,
39    PartialEq,
40    Eq,
41    Serialize,
42    Deserialize,
43    zerompk::ToMessagePack,
44    zerompk::FromMessagePack,
45)]
46#[msgpack(map)]
47pub struct StrictSchema {
48    pub columns: Vec<ColumnDef>,
49    pub version: u32,
50    /// Columns that were removed via `ALTER DROP COLUMN`. Retained so the
51    /// reader can reconstruct the physical layout of tuples written before
52    /// the drop.
53    #[serde(default, skip_serializing_if = "Vec::is_empty")]
54    pub dropped_columns: Vec<DroppedColumn>,
55    /// When true, the tuple reserves fixed-Int64 slots 0/1/2 for
56    /// `__system_from_ms`, `__valid_from_ms`, `__valid_until_ms`. These
57    /// columns are prepended by `StrictSchema::new_bitemporal` and appear
58    /// in `columns` like any other field; the flag preserves the intent
59    /// across catalog round-trips.
60    #[serde(default, skip_serializing_if = "std::ops::Not::not")]
61    #[msgpack(default)]
62    pub bitemporal: bool,
63}
64
65/// Tombstone for a column removed by `ALTER DROP COLUMN`.
66#[derive(
67    Debug,
68    Clone,
69    PartialEq,
70    Eq,
71    Serialize,
72    Deserialize,
73    zerompk::ToMessagePack,
74    zerompk::FromMessagePack,
75)]
76pub struct DroppedColumn {
77    /// The full column definition at time of drop.
78    pub def: ColumnDef,
79    /// The column's position in the column list before it was removed.
80    pub position: usize,
81    /// The schema version at which the column was dropped.
82    pub dropped_at_version: u32,
83}
84
85/// Schema for a columnar collection (compressed segment files).
86#[derive(
87    Debug,
88    Clone,
89    PartialEq,
90    Eq,
91    Serialize,
92    Deserialize,
93    zerompk::ToMessagePack,
94    zerompk::FromMessagePack,
95)]
96pub struct ColumnarSchema {
97    pub columns: Vec<ColumnDef>,
98    pub version: u32,
99}
100
101/// Reserved strict-tuple column names for bitemporal collections. Stored
102/// in fixed Int64 slots 0/1/2 so the decoder can extract them via a
103/// constant-offset jump.
104pub const BITEMPORAL_SYSTEM_FROM: &str = "__system_from_ms";
105pub const BITEMPORAL_VALID_FROM: &str = "__valid_from_ms";
106pub const BITEMPORAL_VALID_UNTIL: &str = "__valid_until_ms";
107
108/// All reserved bitemporal column names, in slot order (0, 1, 2).
109pub const BITEMPORAL_RESERVED_COLUMNS: [&str; 3] = [
110    BITEMPORAL_SYSTEM_FROM,
111    BITEMPORAL_VALID_FROM,
112    BITEMPORAL_VALID_UNTIL,
113];
114
115/// Schema validation errors.
116#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
117#[non_exhaustive]
118pub enum SchemaError {
119    #[error("schema must have at least one column")]
120    Empty,
121    #[error("duplicate column name: '{0}'")]
122    DuplicateColumn(String),
123    #[error("VECTOR dimension must be positive, got 0 for column '{0}'")]
124    ZeroVectorDim(String),
125    #[error("primary key column '{0}' must be NOT NULL")]
126    NullablePrimaryKey(String),
127    #[error("column name '{0}' is reserved for bitemporal collections")]
128    ReservedColumnName(String),
129}
130
131fn validate_columns(columns: &[ColumnDef]) -> Result<(), SchemaError> {
132    if columns.is_empty() {
133        return Err(SchemaError::Empty);
134    }
135    let mut seen = std::collections::HashSet::with_capacity(columns.len());
136    for col in columns {
137        if !seen.insert(&col.name) {
138            return Err(SchemaError::DuplicateColumn(col.name.clone()));
139        }
140        if col.primary_key && col.nullable {
141            return Err(SchemaError::NullablePrimaryKey(col.name.clone()));
142        }
143        if let ColumnType::Vector(0) = col.column_type {
144            return Err(SchemaError::ZeroVectorDim(col.name.clone()));
145        }
146    }
147    Ok(())
148}
149
150impl SchemaOps for StrictSchema {
151    fn columns(&self) -> &[ColumnDef] {
152        &self.columns
153    }
154}
155
156impl SchemaOps for ColumnarSchema {
157    fn columns(&self) -> &[ColumnDef] {
158        &self.columns
159    }
160}
161
162impl StrictSchema {
163    pub fn new(columns: Vec<ColumnDef>) -> Result<Self, SchemaError> {
164        for col in &columns {
165            if BITEMPORAL_RESERVED_COLUMNS.contains(&col.name.as_str()) {
166                return Err(SchemaError::ReservedColumnName(col.name.clone()));
167            }
168        }
169        validate_columns(&columns)?;
170        Ok(Self {
171            columns,
172            version: 1,
173            dropped_columns: Vec::new(),
174            bitemporal: false,
175        })
176    }
177
178    /// Build a schema for a bitemporal strict collection. Prepends three
179    /// reserved Int64 columns (`__system_from_ms`, `__valid_from_ms`,
180    /// `__valid_until_ms`) at positions 0/1/2 so the tuple decoder can
181    /// extract them via fixed-offset jump. User columns are rejected if
182    /// any collides with a reserved name.
183    pub fn new_bitemporal(user_columns: Vec<ColumnDef>) -> Result<Self, SchemaError> {
184        for col in &user_columns {
185            if BITEMPORAL_RESERVED_COLUMNS.contains(&col.name.as_str()) {
186                return Err(SchemaError::ReservedColumnName(col.name.clone()));
187            }
188        }
189        let mut columns = Vec::with_capacity(3 + user_columns.len());
190        columns.push(ColumnDef::required(
191            BITEMPORAL_SYSTEM_FROM,
192            ColumnType::Int64,
193        ));
194        columns.push(ColumnDef::required(
195            BITEMPORAL_VALID_FROM,
196            ColumnType::Int64,
197        ));
198        columns.push(ColumnDef::required(
199            BITEMPORAL_VALID_UNTIL,
200            ColumnType::Int64,
201        ));
202        columns.extend(user_columns);
203        validate_columns(&columns)?;
204        Ok(Self {
205            columns,
206            version: 1,
207            dropped_columns: Vec::new(),
208            bitemporal: true,
209        })
210    }
211
212    /// Count of variable-length columns (determines offset table size).
213    pub fn variable_column_count(&self) -> usize {
214        self.columns
215            .iter()
216            .filter(|c| c.column_type.is_variable_length())
217            .count()
218    }
219
220    /// Total fixed-field byte size (for Binary Tuple layout computation).
221    pub fn fixed_fields_size(&self) -> usize {
222        self.columns
223            .iter()
224            .filter_map(|c| c.column_type.fixed_size())
225            .sum()
226    }
227
228    /// Null bitmap size in bytes.
229    pub fn null_bitmap_size(&self) -> usize {
230        self.columns.len().div_ceil(8)
231    }
232
233    /// Build a sub-schema matching the physical layout of tuples written at
234    /// the given version. Columns added after `version` are excluded;
235    /// columns dropped after `version` are re-inserted at their original
236    /// positions.
237    pub fn schema_for_version(&self, version: u32) -> StrictSchema {
238        // Start with live columns that existed at this version.
239        let mut cols: Vec<ColumnDef> = self
240            .columns
241            .iter()
242            .filter(|c| c.added_at_version <= version)
243            .cloned()
244            .collect();
245
246        // Re-insert dropped columns that were still alive at this version,
247        // sorted by position (ascending) so inserts don't shift later indices.
248        let mut to_reinsert: Vec<&DroppedColumn> = self
249            .dropped_columns
250            .iter()
251            .filter(|dc| dc.def.added_at_version <= version && dc.dropped_at_version > version)
252            .collect();
253        to_reinsert.sort_by_key(|dc| dc.position);
254        for dc in to_reinsert {
255            let pos = dc.position.min(cols.len());
256            cols.insert(pos, dc.def.clone());
257        }
258
259        StrictSchema {
260            version,
261            columns: cols,
262            dropped_columns: Vec::new(),
263            bitemporal: self.bitemporal,
264        }
265    }
266
267    /// Parse a SQL default literal (e.g. `'n/a'`, `0`, `true`) into a `Value`.
268    ///
269    /// Covers the common cases produced by `ALTER ADD COLUMN ... DEFAULT ...`.
270    /// Returns `Value::Null` for expressions that cannot be trivially evaluated
271    /// at read time (functions, sub-queries, etc.).
272    pub fn parse_default_literal(expr: &str) -> crate::value::Value {
273        use crate::value::Value;
274
275        let trimmed = expr.trim();
276
277        // String literals: 'foo'
278        if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 {
279            return Value::String(trimmed[1..trimmed.len() - 1].replace("''", "'"));
280        }
281
282        // Boolean
283        match trimmed.to_uppercase().as_str() {
284            "TRUE" => return Value::Bool(true),
285            "FALSE" => return Value::Bool(false),
286            "NULL" => return Value::Null,
287            _ => {}
288        }
289
290        // Integer
291        if let Ok(i) = trimmed.parse::<i64>() {
292            return Value::Integer(i);
293        }
294
295        // Float
296        if let Ok(f) = trimmed.parse::<f64>() {
297            return Value::Float(f);
298        }
299
300        Value::Null
301    }
302}
303
304impl ColumnarSchema {
305    pub fn new(columns: Vec<ColumnDef>) -> Result<Self, SchemaError> {
306        validate_columns(&columns)?;
307        Ok(Self {
308            columns,
309            version: 1,
310        })
311    }
312
313    /// Whether this schema has the reserved `_ts_system` bitemporal column.
314    ///
315    /// Detected by column name rather than a separate flag to keep the
316    /// on-disk manifest format unchanged; `_ts_system` is only inserted
317    /// by `prepend_bitemporal_columns` on the write path, so its
318    /// presence is a reliable bitemporal signal.
319    pub fn is_bitemporal(&self) -> bool {
320        self.columns.iter().any(|c| c.name == "_ts_system")
321    }
322
323    /// Position of the `_ts_system` column, or `None` for non-bitemporal.
324    pub fn ts_system_idx(&self) -> Option<usize> {
325        self.columns.iter().position(|c| c.name == "_ts_system")
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332    use crate::columnar::ColumnType;
333
334    #[test]
335    fn strict_schema_validation() {
336        let schema = StrictSchema::new(vec![
337            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
338            ColumnDef::nullable("name", ColumnType::String),
339        ]);
340        assert!(schema.is_ok());
341        assert!(StrictSchema::new(vec![]).is_err());
342    }
343
344    #[test]
345    fn schema_ops_trait() {
346        let schema = StrictSchema::new(vec![
347            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
348            ColumnDef::nullable("name", ColumnType::String),
349            ColumnDef::nullable(
350                "balance",
351                ColumnType::Decimal {
352                    precision: 18,
353                    scale: 4,
354                },
355            ),
356        ])
357        .unwrap();
358        assert_eq!(schema.len(), 3);
359        assert_eq!(schema.column_index("balance"), Some(2));
360        assert!(schema.column("nonexistent").is_none());
361        assert_eq!(schema.primary_key_columns().len(), 1);
362    }
363
364    #[test]
365    fn strict_layout_helpers() {
366        let schema = StrictSchema::new(vec![
367            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
368            ColumnDef::nullable("name", ColumnType::String),
369            ColumnDef::nullable(
370                "balance",
371                ColumnType::Decimal {
372                    precision: 18,
373                    scale: 4,
374                },
375            ),
376            ColumnDef::nullable("bio", ColumnType::String),
377        ])
378        .unwrap();
379        assert_eq!(schema.null_bitmap_size(), 1);
380        assert_eq!(schema.fixed_fields_size(), 8 + 16);
381        assert_eq!(schema.variable_column_count(), 2);
382    }
383
384    #[test]
385    fn columnar_schema_validation() {
386        let schema = ColumnarSchema::new(vec![
387            ColumnDef::required("time", ColumnType::Timestamp),
388            ColumnDef::nullable("cpu", ColumnType::Float64),
389        ]);
390        assert!(schema.is_ok());
391        assert_eq!(schema.unwrap().len(), 2);
392    }
393
394    #[test]
395    fn nullable_pk_rejected() {
396        let cols = vec![ColumnDef {
397            name: "id".into(),
398            column_type: ColumnType::Int64,
399            nullable: true,
400            default: None,
401            primary_key: true,
402            modifiers: Vec::new(),
403            generated_expr: None,
404            generated_deps: Vec::new(),
405            added_at_version: 1,
406        }];
407        assert!(matches!(
408            StrictSchema::new(cols),
409            Err(SchemaError::NullablePrimaryKey(_))
410        ));
411    }
412
413    #[test]
414    fn zero_vector_dim_rejected() {
415        let cols = vec![ColumnDef::required("emb", ColumnType::Vector(0))];
416        assert!(matches!(
417            StrictSchema::new(cols),
418            Err(SchemaError::ZeroVectorDim(_))
419        ));
420    }
421}