Skip to main content

pulsedb/model/
schema.rs

1use std::collections::BTreeMap;
2
3use anyhow::{bail, Result};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6
7use crate::model::{DataPoint, FieldValue};
8
9/// The data type of a measurement field.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11pub enum FieldType {
12    Float,
13    Integer,
14    UInteger,
15    Boolean,
16    String,
17}
18
19impl FieldType {
20    pub fn from_field_value(v: &FieldValue) -> Self {
21        match v {
22            FieldValue::Float(_) => FieldType::Float,
23            FieldValue::Integer(_) => FieldType::Integer,
24            FieldValue::UInteger(_) => FieldType::UInteger,
25            FieldValue::Boolean(_) => FieldType::Boolean,
26            FieldValue::String(_) => FieldType::String,
27        }
28    }
29}
30
31/// Schema describing the expected field types for a measurement.
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct MeasurementSchema {
34    pub name: String,
35    pub field_types: BTreeMap<String, FieldType>,
36}
37
38/// Registry that tracks field types per measurement and rejects type mismatches.
39pub struct SchemaRegistry {
40    schemas: RwLock<BTreeMap<String, BTreeMap<String, FieldType>>>,
41}
42
43impl SchemaRegistry {
44    pub fn new() -> Self {
45        Self {
46            schemas: RwLock::new(BTreeMap::new()),
47        }
48    }
49
50    /// Validate a data point's fields against the schema.
51    /// On first write, registers the field types.
52    /// On subsequent writes, rejects type mismatches.
53    pub fn validate(&self, point: &DataPoint) -> Result<()> {
54        let mut schemas = self.schemas.write();
55        let schema = schemas
56            .entry(point.measurement.clone())
57            .or_insert_with(BTreeMap::new);
58
59        for (field_name, field_value) in &point.fields {
60            let actual_type = FieldType::from_field_value(field_value);
61            if let Some(&expected_type) = schema.get(field_name) {
62                if expected_type != actual_type {
63                    bail!(
64                        "schema conflict: field '{}' in measurement '{}' has type {:?} but got {:?}",
65                        field_name,
66                        point.measurement,
67                        expected_type,
68                        actual_type,
69                    );
70                }
71            } else {
72                schema.insert(field_name.clone(), actual_type);
73            }
74        }
75
76        Ok(())
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use std::collections::BTreeMap;
84
85    use crate::model::{DataPoint, FieldValue};
86
87    fn make_point(measurement: &str, fields: Vec<(&str, FieldValue)>) -> DataPoint {
88        DataPoint {
89            measurement: measurement.into(),
90            tags: BTreeMap::new(),
91            fields: fields
92                .into_iter()
93                .map(|(k, v)| (k.to_string(), v))
94                .collect(),
95            timestamp: 1_000_000_000,
96        }
97    }
98
99    #[test]
100    fn first_write_registers_schema() {
101        let registry = SchemaRegistry::new();
102        let point = make_point("cpu", vec![("usage", FieldValue::Float(42.0))]);
103        assert!(registry.validate(&point).is_ok());
104
105        let schemas = registry.schemas.read();
106        assert_eq!(schemas["cpu"]["usage"], FieldType::Float);
107    }
108
109    #[test]
110    fn same_types_succeed() {
111        let registry = SchemaRegistry::new();
112        let p1 = make_point("cpu", vec![("usage", FieldValue::Float(42.0))]);
113        let p2 = make_point("cpu", vec![("usage", FieldValue::Float(99.0))]);
114        assert!(registry.validate(&p1).is_ok());
115        assert!(registry.validate(&p2).is_ok());
116    }
117
118    #[test]
119    fn mismatched_type_is_rejected() {
120        let registry = SchemaRegistry::new();
121        let p1 = make_point("cpu", vec![("usage", FieldValue::Float(42.0))]);
122        let p2 = make_point("cpu", vec![("usage", FieldValue::Integer(42))]);
123        assert!(registry.validate(&p1).is_ok());
124        let err = registry.validate(&p2).unwrap_err();
125        assert!(err.to_string().contains("schema conflict"));
126        assert!(err.to_string().contains("usage"));
127    }
128
129    #[test]
130    fn different_measurements_have_independent_schemas() {
131        let registry = SchemaRegistry::new();
132        let p1 = make_point("cpu", vec![("value", FieldValue::Float(1.0))]);
133        let p2 = make_point("mem", vec![("value", FieldValue::Integer(1024))]);
134        assert!(registry.validate(&p1).is_ok());
135        assert!(registry.validate(&p2).is_ok());
136    }
137}