1use serde::{Deserialize, Serialize};
6
7use super::column_def::ColumnDef;
8use crate::columnar::ColumnType;
9
10pub trait SchemaOps {
12 fn columns(&self) -> &[ColumnDef];
13
14 fn column_index(&self, name: &str) -> Option<usize> {
15 self.columns().iter().position(|c| c.name == name)
16 }
17
18 fn column(&self, name: &str) -> Option<&ColumnDef> {
19 self.columns().iter().find(|c| c.name == name)
20 }
21
22 fn primary_key_columns(&self) -> Vec<&ColumnDef> {
23 self.columns().iter().filter(|c| c.primary_key).collect()
24 }
25
26 fn len(&self) -> usize {
27 self.columns().len()
28 }
29
30 fn is_empty(&self) -> bool {
31 self.columns().is_empty()
32 }
33}
34
35#[derive(
37 Debug,
38 Clone,
39 PartialEq,
40 Eq,
41 Serialize,
42 Deserialize,
43 zerompk::ToMessagePack,
44 zerompk::FromMessagePack,
45)]
46#[msgpack(map)]
47pub struct StrictSchema {
48 pub columns: Vec<ColumnDef>,
49 pub version: u32,
50 #[serde(default, skip_serializing_if = "Vec::is_empty")]
54 pub dropped_columns: Vec<DroppedColumn>,
55 #[serde(default, skip_serializing_if = "std::ops::Not::not")]
61 #[msgpack(default)]
62 pub bitemporal: bool,
63}
64
65#[derive(
67 Debug,
68 Clone,
69 PartialEq,
70 Eq,
71 Serialize,
72 Deserialize,
73 zerompk::ToMessagePack,
74 zerompk::FromMessagePack,
75)]
76pub struct DroppedColumn {
77 pub def: ColumnDef,
79 pub position: usize,
81 pub dropped_at_version: u32,
83}
84
85#[derive(
87 Debug,
88 Clone,
89 PartialEq,
90 Eq,
91 Serialize,
92 Deserialize,
93 zerompk::ToMessagePack,
94 zerompk::FromMessagePack,
95)]
96pub struct ColumnarSchema {
97 pub columns: Vec<ColumnDef>,
98 pub version: u32,
99}
100
101pub const BITEMPORAL_SYSTEM_FROM: &str = "__system_from_ms";
105pub const BITEMPORAL_VALID_FROM: &str = "__valid_from_ms";
106pub const BITEMPORAL_VALID_UNTIL: &str = "__valid_until_ms";
107
108pub const BITEMPORAL_RESERVED_COLUMNS: [&str; 3] = [
110 BITEMPORAL_SYSTEM_FROM,
111 BITEMPORAL_VALID_FROM,
112 BITEMPORAL_VALID_UNTIL,
113];
114
115#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
117#[non_exhaustive]
118pub enum SchemaError {
119 #[error("schema must have at least one column")]
120 Empty,
121 #[error("duplicate column name: '{0}'")]
122 DuplicateColumn(String),
123 #[error("VECTOR dimension must be positive, got 0 for column '{0}'")]
124 ZeroVectorDim(String),
125 #[error("primary key column '{0}' must be NOT NULL")]
126 NullablePrimaryKey(String),
127 #[error("column name '{0}' is reserved for bitemporal collections")]
128 ReservedColumnName(String),
129}
130
131fn validate_columns(columns: &[ColumnDef]) -> Result<(), SchemaError> {
132 if columns.is_empty() {
133 return Err(SchemaError::Empty);
134 }
135 let mut seen = std::collections::HashSet::with_capacity(columns.len());
136 for col in columns {
137 if !seen.insert(&col.name) {
138 return Err(SchemaError::DuplicateColumn(col.name.clone()));
139 }
140 if col.primary_key && col.nullable {
141 return Err(SchemaError::NullablePrimaryKey(col.name.clone()));
142 }
143 if let ColumnType::Vector(0) = col.column_type {
144 return Err(SchemaError::ZeroVectorDim(col.name.clone()));
145 }
146 }
147 Ok(())
148}
149
150impl SchemaOps for StrictSchema {
151 fn columns(&self) -> &[ColumnDef] {
152 &self.columns
153 }
154}
155
156impl SchemaOps for ColumnarSchema {
157 fn columns(&self) -> &[ColumnDef] {
158 &self.columns
159 }
160}
161
162impl StrictSchema {
163 pub fn new(columns: Vec<ColumnDef>) -> Result<Self, SchemaError> {
164 for col in &columns {
165 if BITEMPORAL_RESERVED_COLUMNS.contains(&col.name.as_str()) {
166 return Err(SchemaError::ReservedColumnName(col.name.clone()));
167 }
168 }
169 validate_columns(&columns)?;
170 Ok(Self {
171 columns,
172 version: 1,
173 dropped_columns: Vec::new(),
174 bitemporal: false,
175 })
176 }
177
178 pub fn new_bitemporal(user_columns: Vec<ColumnDef>) -> Result<Self, SchemaError> {
184 for col in &user_columns {
185 if BITEMPORAL_RESERVED_COLUMNS.contains(&col.name.as_str()) {
186 return Err(SchemaError::ReservedColumnName(col.name.clone()));
187 }
188 }
189 let mut columns = Vec::with_capacity(3 + user_columns.len());
190 columns.push(ColumnDef::required(
191 BITEMPORAL_SYSTEM_FROM,
192 ColumnType::Int64,
193 ));
194 columns.push(ColumnDef::required(
195 BITEMPORAL_VALID_FROM,
196 ColumnType::Int64,
197 ));
198 columns.push(ColumnDef::required(
199 BITEMPORAL_VALID_UNTIL,
200 ColumnType::Int64,
201 ));
202 columns.extend(user_columns);
203 validate_columns(&columns)?;
204 Ok(Self {
205 columns,
206 version: 1,
207 dropped_columns: Vec::new(),
208 bitemporal: true,
209 })
210 }
211
212 pub fn variable_column_count(&self) -> usize {
214 self.columns
215 .iter()
216 .filter(|c| c.column_type.is_variable_length())
217 .count()
218 }
219
220 pub fn fixed_fields_size(&self) -> usize {
222 self.columns
223 .iter()
224 .filter_map(|c| c.column_type.fixed_size())
225 .sum()
226 }
227
228 pub fn null_bitmap_size(&self) -> usize {
230 self.columns.len().div_ceil(8)
231 }
232
233 pub fn schema_for_version(&self, version: u32) -> StrictSchema {
238 let mut cols: Vec<ColumnDef> = self
240 .columns
241 .iter()
242 .filter(|c| c.added_at_version <= version)
243 .cloned()
244 .collect();
245
246 let mut to_reinsert: Vec<&DroppedColumn> = self
249 .dropped_columns
250 .iter()
251 .filter(|dc| dc.def.added_at_version <= version && dc.dropped_at_version > version)
252 .collect();
253 to_reinsert.sort_by_key(|dc| dc.position);
254 for dc in to_reinsert {
255 let pos = dc.position.min(cols.len());
256 cols.insert(pos, dc.def.clone());
257 }
258
259 StrictSchema {
260 version,
261 columns: cols,
262 dropped_columns: Vec::new(),
263 bitemporal: self.bitemporal,
264 }
265 }
266
267 pub fn parse_default_literal(expr: &str) -> crate::value::Value {
273 use crate::value::Value;
274
275 let trimmed = expr.trim();
276
277 if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 {
279 return Value::String(trimmed[1..trimmed.len() - 1].replace("''", "'"));
280 }
281
282 match trimmed.to_uppercase().as_str() {
284 "TRUE" => return Value::Bool(true),
285 "FALSE" => return Value::Bool(false),
286 "NULL" => return Value::Null,
287 _ => {}
288 }
289
290 if let Ok(i) = trimmed.parse::<i64>() {
292 return Value::Integer(i);
293 }
294
295 if let Ok(f) = trimmed.parse::<f64>() {
297 return Value::Float(f);
298 }
299
300 Value::Null
301 }
302}
303
304impl ColumnarSchema {
305 pub fn new(columns: Vec<ColumnDef>) -> Result<Self, SchemaError> {
306 validate_columns(&columns)?;
307 Ok(Self {
308 columns,
309 version: 1,
310 })
311 }
312
313 pub fn is_bitemporal(&self) -> bool {
320 self.columns.iter().any(|c| c.name == "_ts_system")
321 }
322
323 pub fn ts_system_idx(&self) -> Option<usize> {
325 self.columns.iter().position(|c| c.name == "_ts_system")
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::columnar::ColumnType;
333
334 #[test]
335 fn strict_schema_validation() {
336 let schema = StrictSchema::new(vec![
337 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
338 ColumnDef::nullable("name", ColumnType::String),
339 ]);
340 assert!(schema.is_ok());
341 assert!(StrictSchema::new(vec![]).is_err());
342 }
343
344 #[test]
345 fn schema_ops_trait() {
346 let schema = StrictSchema::new(vec![
347 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
348 ColumnDef::nullable("name", ColumnType::String),
349 ColumnDef::nullable(
350 "balance",
351 ColumnType::Decimal {
352 precision: 18,
353 scale: 4,
354 },
355 ),
356 ])
357 .unwrap();
358 assert_eq!(schema.len(), 3);
359 assert_eq!(schema.column_index("balance"), Some(2));
360 assert!(schema.column("nonexistent").is_none());
361 assert_eq!(schema.primary_key_columns().len(), 1);
362 }
363
364 #[test]
365 fn strict_layout_helpers() {
366 let schema = StrictSchema::new(vec![
367 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
368 ColumnDef::nullable("name", ColumnType::String),
369 ColumnDef::nullable(
370 "balance",
371 ColumnType::Decimal {
372 precision: 18,
373 scale: 4,
374 },
375 ),
376 ColumnDef::nullable("bio", ColumnType::String),
377 ])
378 .unwrap();
379 assert_eq!(schema.null_bitmap_size(), 1);
380 assert_eq!(schema.fixed_fields_size(), 8 + 16);
381 assert_eq!(schema.variable_column_count(), 2);
382 }
383
384 #[test]
385 fn columnar_schema_validation() {
386 let schema = ColumnarSchema::new(vec![
387 ColumnDef::required("time", ColumnType::Timestamp),
388 ColumnDef::nullable("cpu", ColumnType::Float64),
389 ]);
390 assert!(schema.is_ok());
391 assert_eq!(schema.unwrap().len(), 2);
392 }
393
394 #[test]
395 fn nullable_pk_rejected() {
396 let cols = vec![ColumnDef {
397 name: "id".into(),
398 column_type: ColumnType::Int64,
399 nullable: true,
400 default: None,
401 primary_key: true,
402 modifiers: Vec::new(),
403 generated_expr: None,
404 generated_deps: Vec::new(),
405 added_at_version: 1,
406 }];
407 assert!(matches!(
408 StrictSchema::new(cols),
409 Err(SchemaError::NullablePrimaryKey(_))
410 ));
411 }
412
413 #[test]
414 fn zero_vector_dim_rejected() {
415 let cols = vec![ColumnDef::required("emb", ColumnType::Vector(0))];
416 assert!(matches!(
417 StrictSchema::new(cols),
418 Err(SchemaError::ZeroVectorDim(_))
419 ));
420 }
421}