reifydb_core/encoded/schema/
mod.rs1pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17 alloc::{Layout, alloc_zeroed, handle_alloc_error},
18 fmt,
19 fmt::Debug,
20 ops::Deref,
21 sync::{Arc, OnceLock},
22};
23
24use reifydb_type::{
25 util::cowvec::CowVec,
26 value::{constraint::TypeConstraint, r#type::Type},
27};
28use serde::{Deserialize, Serialize};
29
30use super::encoded::EncodedValues;
31use crate::encoded::schema::fingerprint::{SchemaFingerprint, compute_fingerprint};
32
33pub const SCHEMA_HEADER_SIZE: usize = 8;
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct SchemaField {
39 pub name: String,
41 pub constraint: TypeConstraint,
43 pub offset: u32,
45 pub size: u32,
47 pub align: u8,
49}
50
51impl SchemaField {
52 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
55 let storage_type = constraint.storage_type();
56 Self {
57 name: name.into(),
58 constraint,
59 offset: 0,
60 size: storage_type.size() as u32,
61 align: storage_type.alignment() as u8,
62 }
63 }
64
65 pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
68 Self::new(name, TypeConstraint::unconstrained(field_type))
69 }
70}
71
72pub struct Schema(Arc<Inner>);
74
75#[derive(Debug, Serialize, Deserialize)]
81pub struct Inner {
82 pub fingerprint: SchemaFingerprint,
84 pub fields: Vec<SchemaField>,
86 #[serde(skip)]
88 cached_layout: OnceLock<(usize, usize)>,
89}
90
91impl PartialEq for Inner {
92 fn eq(&self, other: &Self) -> bool {
93 self.fingerprint == other.fingerprint && self.fields == other.fields
94 }
95}
96
97impl Eq for Inner {}
98
99impl Deref for Schema {
100 type Target = Inner;
101
102 fn deref(&self) -> &Self::Target {
103 &self.0
104 }
105}
106
107impl Clone for Schema {
108 fn clone(&self) -> Self {
109 Self(self.0.clone())
110 }
111}
112
113impl Debug for Schema {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 self.0.fmt(f)
116 }
117}
118
119impl PartialEq for Schema {
120 fn eq(&self, other: &Self) -> bool {
121 self.0.as_ref() == other.0.as_ref()
122 }
123}
124
125impl Eq for Schema {}
126
127impl Schema {
128 pub fn new(fields: Vec<SchemaField>) -> Self {
132 let fields = Self::compute_layout(fields);
133 let fingerprint = compute_fingerprint(&fields);
134
135 Self(Arc::new(Inner {
136 fingerprint,
137 fields,
138 cached_layout: OnceLock::new(),
139 }))
140 }
141
142 pub fn from_parts(fingerprint: SchemaFingerprint, fields: Vec<SchemaField>) -> Self {
145 Self(Arc::new(Inner {
146 fingerprint,
147 fields,
148 cached_layout: OnceLock::new(),
149 }))
150 }
151
152 pub fn fingerprint(&self) -> SchemaFingerprint {
154 self.fingerprint
155 }
156
157 pub fn fields(&self) -> &[SchemaField] {
159 &self.fields
160 }
161
162 pub fn field_count(&self) -> usize {
164 self.fields.len()
165 }
166
167 pub fn find_field(&self, name: &str) -> Option<&SchemaField> {
169 self.fields.iter().find(|f| f.name == name)
170 }
171
172 pub fn find_field_index(&self, name: &str) -> Option<usize> {
174 self.fields.iter().position(|f| f.name == name)
175 }
176
177 pub fn get_field(&self, index: usize) -> Option<&SchemaField> {
179 self.fields.get(index)
180 }
181
182 pub fn get_field_name(&self, index: usize) -> Option<&str> {
184 self.fields.get(index).map(|f| f.name.as_str())
185 }
186
187 pub fn field_names(&self) -> impl Iterator<Item = &str> {
189 self.fields.iter().map(|f| f.name.as_str())
190 }
191
192 fn compute_layout(mut fields: Vec<SchemaField>) -> Vec<SchemaField> {
195 let bitvec_size = (fields.len() + 7) / 8;
197 let mut offset: u32 = (SCHEMA_HEADER_SIZE + bitvec_size) as u32;
198
199 for field in fields.iter_mut() {
200 let storage_type = field.constraint.storage_type();
201 field.size = storage_type.size() as u32;
202 field.align = storage_type.alignment() as u8;
203
204 let align = field.align as u32;
206 if align > 0 {
207 offset = (offset + align - 1) & !(align - 1);
208 }
209
210 field.offset = offset;
211 offset += field.size;
212 }
213
214 fields
215 }
216
217 pub fn bitvec_size(&self) -> usize {
219 (self.fields.len() + 7) / 8
220 }
221
222 pub fn data_offset(&self) -> usize {
224 SCHEMA_HEADER_SIZE + self.bitvec_size()
225 }
226
227 fn get_cached_layout(&self) -> (usize, usize) {
230 *self.cached_layout.get_or_init(|| {
231 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
233
234 let total_size = if self.fields.is_empty() {
236 SCHEMA_HEADER_SIZE + self.bitvec_size()
237 } else {
238 let last_field = &self.fields[self.fields.len() - 1];
239 let end = last_field.offset as usize + last_field.size as usize;
240 Self::align_up(end, max_align)
242 };
243
244 (total_size, max_align)
245 })
246 }
247
248 pub fn total_static_size(&self) -> usize {
250 self.get_cached_layout().0
251 }
252
253 pub fn dynamic_section_start(&self) -> usize {
255 self.total_static_size()
256 }
257
258 pub fn dynamic_section_size(&self, row: &EncodedValues) -> usize {
260 row.len().saturating_sub(self.total_static_size())
261 }
262
263 pub fn allocate(&self) -> EncodedValues {
265 let (total_size, max_align) = self.get_cached_layout();
266 let layout = Layout::from_size_align(total_size, max_align).unwrap();
267 unsafe {
268 let ptr = alloc_zeroed(layout);
269 if ptr.is_null() {
270 handle_alloc_error(layout);
271 }
272 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
273 let mut row = EncodedValues(CowVec::new(vec));
274 row.set_fingerprint(self.fingerprint);
275 row
276 }
277 }
278
279 fn align_up(offset: usize, align: usize) -> usize {
280 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
281 }
282
283 pub fn set_none(&self, row: &mut EncodedValues, index: usize) {
285 row.set_valid(index, false);
286 }
287
288 pub fn testing(types: &[Type]) -> Self {
292 Schema::new(
293 types.iter()
294 .enumerate()
295 .map(|(i, t)| SchemaField::unconstrained(format!("f{}", i), t.clone()))
296 .collect(),
297 )
298 }
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 #[test]
306 fn test_schema_creation() {
307 let fields = vec![
308 SchemaField::unconstrained("id", Type::Int8),
309 SchemaField::unconstrained("name", Type::Utf8),
310 SchemaField::unconstrained("active", Type::Boolean),
311 ];
312
313 let schema = Schema::new(fields);
314
315 assert_eq!(schema.field_count(), 3);
316 assert_eq!(schema.fields()[0].name, "id");
317 assert_eq!(schema.fields()[1].name, "name");
318 assert_eq!(schema.fields()[2].name, "active");
319 }
320
321 #[test]
322 fn test_schema_fingerprint_deterministic() {
323 let fields1 =
324 vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
325
326 let fields2 =
327 vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
328
329 let schema1 = Schema::new(fields1);
330 let schema2 = Schema::new(fields2);
331
332 assert_eq!(schema1.fingerprint(), schema2.fingerprint());
333 }
334
335 #[test]
336 fn test_schema_fingerprint_different_for_different_schemas() {
337 let fields1 = vec![SchemaField::unconstrained("a", Type::Int4)];
338 let fields2 = vec![SchemaField::unconstrained("a", Type::Int8)];
339
340 let schema1 = Schema::new(fields1);
341 let schema2 = Schema::new(fields2);
342
343 assert_ne!(schema1.fingerprint(), schema2.fingerprint());
344 }
345
346 #[test]
347 fn test_find_field() {
348 let fields = vec![
349 SchemaField::unconstrained("id", Type::Int8),
350 SchemaField::unconstrained("name", Type::Utf8),
351 ];
352
353 let schema = Schema::new(fields);
354
355 assert!(schema.find_field("id").is_some());
356 assert!(schema.find_field("name").is_some());
357 assert!(schema.find_field("missing").is_none());
358 }
359}