reifydb_core/encoded/schema/
mod.rs1pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17 alloc::{Layout, alloc_zeroed, handle_alloc_error},
18 fmt,
19 fmt::Debug,
20 iter,
21 ops::Deref,
22 ptr,
23 sync::{Arc, OnceLock},
24};
25
26use reifydb_type::{
27 util::cowvec::CowVec,
28 value::{constraint::TypeConstraint, r#type::Type},
29};
30use serde::{Deserialize, Serialize};
31
32use super::row::EncodedRow;
33use crate::encoded::schema::fingerprint::{RowSchemaFingerprint, compute_fingerprint};
34
35pub const SCHEMA_HEADER_SIZE: usize = 8;
37
38const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
40const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
41const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
42const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct RowSchemaField {
47 pub name: String,
49 pub constraint: TypeConstraint,
51 pub offset: u32,
53 pub size: u32,
55 pub align: u8,
57}
58
59impl RowSchemaField {
60 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
63 let storage_type = constraint.storage_type();
64 Self {
65 name: name.into(),
66 constraint,
67 offset: 0,
68 size: storage_type.size() as u32,
69 align: storage_type.alignment() as u8,
70 }
71 }
72
73 pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
76 Self::new(name, TypeConstraint::unconstrained(field_type))
77 }
78}
79
80pub struct RowSchema(Arc<Inner>);
82
83#[derive(Debug, Serialize, Deserialize)]
89pub struct Inner {
90 pub fingerprint: RowSchemaFingerprint,
92 pub fields: Vec<RowSchemaField>,
94 #[serde(skip)]
96 cached_layout: OnceLock<(usize, usize)>,
97}
98
99impl PartialEq for Inner {
100 fn eq(&self, other: &Self) -> bool {
101 self.fingerprint == other.fingerprint && self.fields == other.fields
102 }
103}
104
105impl Eq for Inner {}
106
107impl Deref for RowSchema {
108 type Target = Inner;
109
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl Clone for RowSchema {
116 fn clone(&self) -> Self {
117 Self(self.0.clone())
118 }
119}
120
121impl Debug for RowSchema {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 self.0.fmt(f)
124 }
125}
126
127impl PartialEq for RowSchema {
128 fn eq(&self, other: &Self) -> bool {
129 self.0.as_ref() == other.0.as_ref()
130 }
131}
132
133impl Eq for RowSchema {}
134
135impl RowSchema {
136 pub fn new(fields: Vec<RowSchemaField>) -> Self {
140 let fields = Self::compute_layout(fields);
141 let fingerprint = compute_fingerprint(&fields);
142
143 Self(Arc::new(Inner {
144 fingerprint,
145 fields,
146 cached_layout: OnceLock::new(),
147 }))
148 }
149
150 pub fn from_parts(fingerprint: RowSchemaFingerprint, fields: Vec<RowSchemaField>) -> Self {
153 Self(Arc::new(Inner {
154 fingerprint,
155 fields,
156 cached_layout: OnceLock::new(),
157 }))
158 }
159
160 pub fn fingerprint(&self) -> RowSchemaFingerprint {
162 self.fingerprint
163 }
164
165 pub fn fields(&self) -> &[RowSchemaField] {
167 &self.fields
168 }
169
170 pub fn field_count(&self) -> usize {
172 self.fields.len()
173 }
174
175 pub fn find_field(&self, name: &str) -> Option<&RowSchemaField> {
177 self.fields.iter().find(|f| f.name == name)
178 }
179
180 pub fn find_field_index(&self, name: &str) -> Option<usize> {
182 self.fields.iter().position(|f| f.name == name)
183 }
184
185 pub fn get_field(&self, index: usize) -> Option<&RowSchemaField> {
187 self.fields.get(index)
188 }
189
190 pub fn get_field_name(&self, index: usize) -> Option<&str> {
192 self.fields.get(index).map(|f| f.name.as_str())
193 }
194
195 pub fn field_names(&self) -> impl Iterator<Item = &str> {
197 self.fields.iter().map(|f| f.name.as_str())
198 }
199
200 fn compute_layout(mut fields: Vec<RowSchemaField>) -> Vec<RowSchemaField> {
203 let bitvec_size = (fields.len() + 7) / 8;
205 let mut offset: u32 = (SCHEMA_HEADER_SIZE + bitvec_size) as u32;
206
207 for field in fields.iter_mut() {
208 let storage_type = field.constraint.storage_type();
209 field.size = storage_type.size() as u32;
210 field.align = storage_type.alignment() as u8;
211
212 let align = field.align as u32;
214 if align > 0 {
215 offset = (offset + align - 1) & !(align - 1);
216 }
217
218 field.offset = offset;
219 offset += field.size;
220 }
221
222 fields
223 }
224
225 pub fn bitvec_size(&self) -> usize {
227 (self.fields.len() + 7) / 8
228 }
229
230 pub fn data_offset(&self) -> usize {
232 SCHEMA_HEADER_SIZE + self.bitvec_size()
233 }
234
235 fn get_cached_layout(&self) -> (usize, usize) {
238 *self.cached_layout.get_or_init(|| {
239 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
241
242 let total_size = if self.fields.is_empty() {
244 SCHEMA_HEADER_SIZE + self.bitvec_size()
245 } else {
246 let last_field = &self.fields[self.fields.len() - 1];
247 let end = last_field.offset as usize + last_field.size as usize;
248 Self::align_up(end, max_align)
250 };
251
252 (total_size, max_align)
253 })
254 }
255
256 pub fn total_static_size(&self) -> usize {
258 self.get_cached_layout().0
259 }
260
261 pub fn dynamic_section_start(&self) -> usize {
263 self.total_static_size()
264 }
265
266 pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
268 row.len().saturating_sub(self.total_static_size())
269 }
270
271 pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
274 if !row.is_defined(index) {
275 return None;
276 }
277 let field = &self.fields()[index];
278 match field.constraint.get_type().inner_type() {
279 Type::Utf8 | Type::Blob | Type::Any => {
280 let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
281 let offset =
282 u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
283 as usize;
284 let length =
285 u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
286 as usize;
287 Some((offset, length))
288 }
289 Type::Int
290 | Type::Uint
291 | Type::Decimal {
292 ..
293 } => {
294 let packed = unsafe {
295 (row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
296 };
297 let packed = u128::from_le(packed);
298 if packed & PACKED_MODE_MASK != 0 {
299 let offset = (packed & PACKED_OFFSET_MASK) as usize;
300 let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
301 Some((offset, length))
302 } else {
303 None }
305 }
306 _ => None,
307 }
308 }
309
310 pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
312 let field = &self.fields()[index];
313 match field.constraint.get_type().inner_type() {
314 Type::Utf8 | Type::Blob | Type::Any => {
315 let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
316 ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
317 ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
318 }
319 Type::Int
320 | Type::Uint
321 | Type::Decimal {
322 ..
323 } => {
324 let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
325 let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
326 let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
327 unsafe {
328 ptr::write_unaligned(
329 row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
330 packed.to_le(),
331 );
332 }
333 }
334 _ => {}
335 }
336 }
337
338 pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
341 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
342 let delta = new_data.len() as isize - old_length as isize;
343
344 let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
346 self.fields()
347 .iter()
348 .enumerate()
349 .filter(|(i, _)| *i != index && row.is_defined(*i))
350 .filter_map(|(i, _)| {
351 self.read_dynamic_ref(row, i)
352 .filter(|(off, _)| *off > old_offset)
353 .map(|(off, len)| (i, off, len))
354 })
355 .collect()
356 } else {
357 vec![]
358 };
359
360 let dynamic_start = self.dynamic_section_start();
362 let abs_start = dynamic_start + old_offset;
363 let abs_end = abs_start + old_length;
364 row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
365
366 self.write_dynamic_ref(row, index, old_offset, new_data.len());
368
369 for (i, off, len) in refs_to_update {
371 let new_off = (off as isize + delta) as usize;
372 self.write_dynamic_ref(row, i, new_off, len);
373 }
374 } else {
375 let dynamic_offset = self.dynamic_section_size(row);
377 row.0.extend_from_slice(new_data);
378 self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
379 }
380 row.set_valid(index, true);
381 }
382
383 pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
386 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
387 let refs_to_update: Vec<(usize, usize, usize)> = self
389 .fields()
390 .iter()
391 .enumerate()
392 .filter(|(i, _)| *i != index && row.is_defined(*i))
393 .filter_map(|(i, _)| {
394 self.read_dynamic_ref(row, i)
395 .filter(|(off, _)| *off > old_offset)
396 .map(|(off, len)| (i, off, len))
397 })
398 .collect();
399
400 let dynamic_start = self.dynamic_section_start();
402 let abs_start = dynamic_start + old_offset;
403 let abs_end = abs_start + old_length;
404 row.0.make_mut().splice(abs_start..abs_end, iter::empty());
405
406 for (i, off, len) in refs_to_update {
408 let new_off = off - old_length;
409 self.write_dynamic_ref(row, i, new_off, len);
410 }
411 }
412 }
413
414 pub fn allocate(&self) -> EncodedRow {
416 let (total_size, max_align) = self.get_cached_layout();
417 let layout = Layout::from_size_align(total_size, max_align).unwrap();
418 unsafe {
419 let ptr = alloc_zeroed(layout);
420 if ptr.is_null() {
421 handle_alloc_error(layout);
422 }
423 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
424 let mut row = EncodedRow(CowVec::new(vec));
425 row.set_fingerprint(self.fingerprint);
426 row
427 }
428 }
429
430 fn align_up(offset: usize, align: usize) -> usize {
431 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
432 }
433
434 pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
436 self.remove_dynamic_data(row, index);
437 row.set_valid(index, false);
438 }
439
440 pub fn testing(types: &[Type]) -> Self {
444 RowSchema::new(
445 types.iter()
446 .enumerate()
447 .map(|(i, t)| RowSchemaField::unconstrained(format!("f{}", i), t.clone()))
448 .collect(),
449 )
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456
457 #[test]
458 fn test_schema_creation() {
459 let fields = vec![
460 RowSchemaField::unconstrained("id", Type::Int8),
461 RowSchemaField::unconstrained("name", Type::Utf8),
462 RowSchemaField::unconstrained("active", Type::Boolean),
463 ];
464
465 let schema = RowSchema::new(fields);
466
467 assert_eq!(schema.field_count(), 3);
468 assert_eq!(schema.fields()[0].name, "id");
469 assert_eq!(schema.fields()[1].name, "name");
470 assert_eq!(schema.fields()[2].name, "active");
471 }
472
473 #[test]
474 fn test_schema_fingerprint_deterministic() {
475 let fields1 = vec![
476 RowSchemaField::unconstrained("a", Type::Int4),
477 RowSchemaField::unconstrained("b", Type::Utf8),
478 ];
479
480 let fields2 = vec![
481 RowSchemaField::unconstrained("a", Type::Int4),
482 RowSchemaField::unconstrained("b", Type::Utf8),
483 ];
484
485 let schema1 = RowSchema::new(fields1);
486 let schema2 = RowSchema::new(fields2);
487
488 assert_eq!(schema1.fingerprint(), schema2.fingerprint());
489 }
490
491 #[test]
492 fn test_schema_fingerprint_different_for_different_schemas() {
493 let fields1 = vec![RowSchemaField::unconstrained("a", Type::Int4)];
494 let fields2 = vec![RowSchemaField::unconstrained("a", Type::Int8)];
495
496 let schema1 = RowSchema::new(fields1);
497 let schema2 = RowSchema::new(fields2);
498
499 assert_ne!(schema1.fingerprint(), schema2.fingerprint());
500 }
501
502 #[test]
503 fn test_find_field() {
504 let fields = vec![
505 RowSchemaField::unconstrained("id", Type::Int8),
506 RowSchemaField::unconstrained("name", Type::Utf8),
507 ];
508
509 let schema = RowSchema::new(fields);
510
511 assert!(schema.find_field("id").is_some());
512 assert!(schema.find_field("name").is_some());
513 assert!(schema.find_field("missing").is_none());
514 }
515}