reifydb_core/encoded/schema/
mod.rs1pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17 alloc::{Layout, alloc_zeroed, handle_alloc_error},
18 fmt::Debug,
19 ops::Deref,
20 sync::{Arc, OnceLock},
21};
22
23use reifydb_type::{
24 util::cowvec::CowVec,
25 value::{constraint::TypeConstraint, r#type::Type},
26};
27use serde::{Deserialize, Serialize};
28
29use super::encoded::EncodedValues;
30use crate::encoded::schema::fingerprint::{SchemaFingerprint, compute_fingerprint};
31
32pub const SCHEMA_HEADER_SIZE: usize = 8;
34
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct SchemaField {
38 pub name: String,
40 pub constraint: TypeConstraint,
42 pub offset: u32,
44 pub size: u32,
46 pub align: u8,
48}
49
50impl SchemaField {
51 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
54 let storage_type = constraint.storage_type();
55 Self {
56 name: name.into(),
57 constraint,
58 offset: 0,
59 size: storage_type.size() as u32,
60 align: storage_type.alignment() as u8,
61 }
62 }
63
64 pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
67 Self::new(name, TypeConstraint::unconstrained(field_type))
68 }
69}
70
71pub struct Schema(Arc<Inner>);
73
74#[derive(Debug, Serialize, Deserialize)]
80pub struct Inner {
81 pub fingerprint: SchemaFingerprint,
83 pub fields: Vec<SchemaField>,
85 #[serde(skip)]
87 cached_layout: OnceLock<(usize, usize)>,
88}
89
90impl PartialEq for Inner {
91 fn eq(&self, other: &Self) -> bool {
92 self.fingerprint == other.fingerprint && self.fields == other.fields
93 }
94}
95
96impl Eq for Inner {}
97
98impl Deref for Schema {
99 type Target = Inner;
100
101 fn deref(&self) -> &Self::Target {
102 &self.0
103 }
104}
105
106impl Clone for Schema {
107 fn clone(&self) -> Self {
108 Self(self.0.clone())
109 }
110}
111
112impl Debug for Schema {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 self.0.fmt(f)
115 }
116}
117
118impl PartialEq for Schema {
119 fn eq(&self, other: &Self) -> bool {
120 self.0.as_ref() == other.0.as_ref()
121 }
122}
123
124impl Eq for Schema {}
125
126impl Schema {
127 pub fn new(fields: Vec<SchemaField>) -> Self {
131 let fields = Self::compute_layout(fields);
132 let fingerprint = compute_fingerprint(&fields);
133
134 Self(Arc::new(Inner {
135 fingerprint,
136 fields,
137 cached_layout: OnceLock::new(),
138 }))
139 }
140
141 pub fn from_parts(fingerprint: SchemaFingerprint, fields: Vec<SchemaField>) -> Self {
144 Self(Arc::new(Inner {
145 fingerprint,
146 fields,
147 cached_layout: OnceLock::new(),
148 }))
149 }
150
151 pub fn fingerprint(&self) -> SchemaFingerprint {
153 self.fingerprint
154 }
155
156 pub fn fields(&self) -> &[SchemaField] {
158 &self.fields
159 }
160
161 pub fn field_count(&self) -> usize {
163 self.fields.len()
164 }
165
166 pub fn find_field(&self, name: &str) -> Option<&SchemaField> {
168 self.fields.iter().find(|f| f.name == name)
169 }
170
171 pub fn find_field_index(&self, name: &str) -> Option<usize> {
173 self.fields.iter().position(|f| f.name == name)
174 }
175
176 pub fn get_field(&self, index: usize) -> Option<&SchemaField> {
178 self.fields.get(index)
179 }
180
181 pub fn get_field_name(&self, index: usize) -> Option<&str> {
183 self.fields.get(index).map(|f| f.name.as_str())
184 }
185
186 pub fn field_names(&self) -> impl Iterator<Item = &str> {
188 self.fields.iter().map(|f| f.name.as_str())
189 }
190
191 fn compute_layout(mut fields: Vec<SchemaField>) -> Vec<SchemaField> {
194 let bitvec_size = (fields.len() + 7) / 8;
196 let mut offset: u32 = (SCHEMA_HEADER_SIZE + bitvec_size) as u32;
197
198 for field in fields.iter_mut() {
199 let storage_type = field.constraint.storage_type();
200 field.size = storage_type.size() as u32;
201 field.align = storage_type.alignment() as u8;
202
203 let align = field.align as u32;
205 if align > 0 {
206 offset = (offset + align - 1) & !(align - 1);
207 }
208
209 field.offset = offset;
210 offset += field.size;
211 }
212
213 fields
214 }
215
216 pub fn bitvec_size(&self) -> usize {
218 (self.fields.len() + 7) / 8
219 }
220
221 pub fn data_offset(&self) -> usize {
223 SCHEMA_HEADER_SIZE + self.bitvec_size()
224 }
225
226 fn get_cached_layout(&self) -> (usize, usize) {
229 *self.cached_layout.get_or_init(|| {
230 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
232
233 let total_size = if self.fields.is_empty() {
235 SCHEMA_HEADER_SIZE + self.bitvec_size()
236 } else {
237 let last_field = &self.fields[self.fields.len() - 1];
238 let end = last_field.offset as usize + last_field.size as usize;
239 Self::align_up(end, max_align)
241 };
242
243 (total_size, max_align)
244 })
245 }
246
247 pub fn total_static_size(&self) -> usize {
249 self.get_cached_layout().0
250 }
251
252 pub fn dynamic_section_start(&self) -> usize {
254 self.total_static_size()
255 }
256
257 pub fn dynamic_section_size(&self, row: &EncodedValues) -> usize {
259 row.len().saturating_sub(self.total_static_size())
260 }
261
262 pub fn allocate(&self) -> EncodedValues {
264 let (total_size, max_align) = self.get_cached_layout();
265 let layout = Layout::from_size_align(total_size, max_align).unwrap();
266 unsafe {
267 let ptr = alloc_zeroed(layout);
268 if ptr.is_null() {
269 handle_alloc_error(layout);
270 }
271 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
272 let mut row = EncodedValues(CowVec::new(vec));
273 row.set_fingerprint(self.fingerprint);
274 row
275 }
276 }
277
278 fn align_up(offset: usize, align: usize) -> usize {
279 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
280 }
281
282 pub fn set_undefined(&self, row: &mut EncodedValues, index: usize) {
284 row.set_valid(index, false);
285 }
286
287 pub fn testing(types: &[Type]) -> Self {
291 Schema::new(
292 types.iter()
293 .enumerate()
294 .map(|(i, t)| SchemaField::unconstrained(format!("f{}", i), t.clone()))
295 .collect(),
296 )
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn test_schema_creation() {
306 let fields = vec![
307 SchemaField::unconstrained("id", Type::Int8),
308 SchemaField::unconstrained("name", Type::Utf8),
309 SchemaField::unconstrained("active", Type::Boolean),
310 ];
311
312 let schema = Schema::new(fields);
313
314 assert_eq!(schema.field_count(), 3);
315 assert_eq!(schema.fields()[0].name, "id");
316 assert_eq!(schema.fields()[1].name, "name");
317 assert_eq!(schema.fields()[2].name, "active");
318 }
319
320 #[test]
321 fn test_schema_fingerprint_deterministic() {
322 let fields1 =
323 vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
324
325 let fields2 =
326 vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
327
328 let schema1 = Schema::new(fields1);
329 let schema2 = Schema::new(fields2);
330
331 assert_eq!(schema1.fingerprint(), schema2.fingerprint());
332 }
333
334 #[test]
335 fn test_schema_fingerprint_different_for_different_schemas() {
336 let fields1 = vec![SchemaField::unconstrained("a", Type::Int4)];
337 let fields2 = vec![SchemaField::unconstrained("a", Type::Int8)];
338
339 let schema1 = Schema::new(fields1);
340 let schema2 = Schema::new(fields2);
341
342 assert_ne!(schema1.fingerprint(), schema2.fingerprint());
343 }
344
345 #[test]
346 fn test_find_field() {
347 let fields = vec![
348 SchemaField::unconstrained("id", Type::Int8),
349 SchemaField::unconstrained("name", Type::Utf8),
350 ];
351
352 let schema = Schema::new(fields);
353
354 assert!(schema.find_field("id").is_some());
355 assert!(schema.find_field("name").is_some());
356 assert!(schema.find_field("missing").is_none());
357 }
358}