1use std::collections::BTreeMap;
2
3use anyhow::{bail, Result};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6
7use crate::model::{DataPoint, FieldValue};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
11pub enum FieldType {
12 Float,
13 Integer,
14 UInteger,
15 Boolean,
16 String,
17}
18
19impl FieldType {
20 pub fn from_field_value(v: &FieldValue) -> Self {
21 match v {
22 FieldValue::Float(_) => FieldType::Float,
23 FieldValue::Integer(_) => FieldType::Integer,
24 FieldValue::UInteger(_) => FieldType::UInteger,
25 FieldValue::Boolean(_) => FieldType::Boolean,
26 FieldValue::String(_) => FieldType::String,
27 }
28 }
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct MeasurementSchema {
34 pub name: String,
35 pub field_types: BTreeMap<String, FieldType>,
36}
37
38pub struct SchemaRegistry {
40 schemas: RwLock<BTreeMap<String, BTreeMap<String, FieldType>>>,
41}
42
43impl SchemaRegistry {
44 pub fn new() -> Self {
45 Self {
46 schemas: RwLock::new(BTreeMap::new()),
47 }
48 }
49
50 pub fn validate(&self, point: &DataPoint) -> Result<()> {
54 let mut schemas = self.schemas.write();
55 let schema = schemas
56 .entry(point.measurement.clone())
57 .or_insert_with(BTreeMap::new);
58
59 for (field_name, field_value) in &point.fields {
60 let actual_type = FieldType::from_field_value(field_value);
61 if let Some(&expected_type) = schema.get(field_name) {
62 if expected_type != actual_type {
63 bail!(
64 "schema conflict: field '{}' in measurement '{}' has type {:?} but got {:?}",
65 field_name,
66 point.measurement,
67 expected_type,
68 actual_type,
69 );
70 }
71 } else {
72 schema.insert(field_name.clone(), actual_type);
73 }
74 }
75
76 Ok(())
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use std::collections::BTreeMap;
84
85 use crate::model::{DataPoint, FieldValue};
86
87 fn make_point(measurement: &str, fields: Vec<(&str, FieldValue)>) -> DataPoint {
88 DataPoint {
89 measurement: measurement.into(),
90 tags: BTreeMap::new(),
91 fields: fields
92 .into_iter()
93 .map(|(k, v)| (k.to_string(), v))
94 .collect(),
95 timestamp: 1_000_000_000,
96 }
97 }
98
99 #[test]
100 fn first_write_registers_schema() {
101 let registry = SchemaRegistry::new();
102 let point = make_point("cpu", vec![("usage", FieldValue::Float(42.0))]);
103 assert!(registry.validate(&point).is_ok());
104
105 let schemas = registry.schemas.read();
106 assert_eq!(schemas["cpu"]["usage"], FieldType::Float);
107 }
108
109 #[test]
110 fn same_types_succeed() {
111 let registry = SchemaRegistry::new();
112 let p1 = make_point("cpu", vec![("usage", FieldValue::Float(42.0))]);
113 let p2 = make_point("cpu", vec![("usage", FieldValue::Float(99.0))]);
114 assert!(registry.validate(&p1).is_ok());
115 assert!(registry.validate(&p2).is_ok());
116 }
117
118 #[test]
119 fn mismatched_type_is_rejected() {
120 let registry = SchemaRegistry::new();
121 let p1 = make_point("cpu", vec![("usage", FieldValue::Float(42.0))]);
122 let p2 = make_point("cpu", vec![("usage", FieldValue::Integer(42))]);
123 assert!(registry.validate(&p1).is_ok());
124 let err = registry.validate(&p2).unwrap_err();
125 assert!(err.to_string().contains("schema conflict"));
126 assert!(err.to_string().contains("usage"));
127 }
128
129 #[test]
130 fn different_measurements_have_independent_schemas() {
131 let registry = SchemaRegistry::new();
132 let p1 = make_point("cpu", vec![("value", FieldValue::Float(1.0))]);
133 let p2 = make_point("mem", vec![("value", FieldValue::Integer(1024))]);
134 assert!(registry.validate(&p1).is_ok());
135 assert!(registry.validate(&p2).is_ok());
136 }
137}