Skip to main content

reifydb_core/encoded/schema/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! RowSchema definitions for encoding row data with consistent field layouts.
5//!
6//! A `RowSchema` describes the structure of encoded row data, including:
7//! - Field names, types, and order
8//! - Memory layout (offsets, sizes, alignment)
9//! - A content-addressable fingerprint for deduplication
10
11pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17	alloc::{Layout, alloc_zeroed, handle_alloc_error},
18	fmt,
19	fmt::Debug,
20	iter,
21	ops::Deref,
22	ptr,
23	sync::{Arc, OnceLock},
24};
25
26use reifydb_type::{
27	util::cowvec::CowVec,
28	value::{constraint::TypeConstraint, r#type::Type},
29};
30use serde::{Deserialize, Serialize};
31
32use super::row::EncodedRow;
33use crate::encoded::schema::fingerprint::{RowSchemaFingerprint, compute_fingerprint};
34
35/// Size of schema header (fingerprint) in bytes
36pub const SCHEMA_HEADER_SIZE: usize = 8;
37
38/// Constants for packed u128 dynamic references (used by Int, Uint, Decimal)
39const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
40const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
41const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
42const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
43
44/// A field within a schema
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct RowSchemaField {
47	/// Field name
48	pub name: String,
49	/// Field type constraint (includes base type and optional constraints like MaxBytes)
50	pub constraint: TypeConstraint,
51	/// Byte offset within the encoded row
52	pub offset: u32,
53	/// Size in bytes
54	pub size: u32,
55	/// Alignment requirement
56	pub align: u8,
57}
58
59impl RowSchemaField {
60	/// Create a new schema field with a type constraint.
61	/// Offset, size, and alignment are computed when added to a RowSchema.
62	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
63		let storage_type = constraint.storage_type();
64		Self {
65			name: name.into(),
66			constraint,
67			offset: 0,
68			size: storage_type.size() as u32,
69			align: storage_type.alignment() as u8,
70		}
71	}
72
73	/// Create a new schema field with an unconstrained type.
74	/// Convenience method for the common case of no constraints.
75	pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
76		Self::new(name, TypeConstraint::unconstrained(field_type))
77	}
78}
79
80/// A schema describing the structure of encoded row data.
81pub struct RowSchema(Arc<Inner>);
82
83/// Inner data for a schema describing the structure of encoded row data.
84///
85/// Schemas are immutable and content-addressable via their fingerprint.
86/// The same field configuration always produces the same fingerprint,
87/// enabling schema deduplication in the registry.
88#[derive(Debug, Serialize, Deserialize)]
89pub struct Inner {
90	/// Content-addressable fingerprint (hash of canonical field representation)
91	pub fingerprint: RowSchemaFingerprint,
92	/// Fields in definition order
93	pub fields: Vec<RowSchemaField>,
94	/// Cached layout computation (total_size, max_align) - computed once on first use
95	#[serde(skip)]
96	cached_layout: OnceLock<(usize, usize)>,
97}
98
99impl PartialEq for Inner {
100	fn eq(&self, other: &Self) -> bool {
101		self.fingerprint == other.fingerprint && self.fields == other.fields
102	}
103}
104
105impl Eq for Inner {}
106
107impl Deref for RowSchema {
108	type Target = Inner;
109
110	fn deref(&self) -> &Self::Target {
111		&self.0
112	}
113}
114
115impl Clone for RowSchema {
116	fn clone(&self) -> Self {
117		Self(self.0.clone())
118	}
119}
120
121impl Debug for RowSchema {
122	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123		self.0.fmt(f)
124	}
125}
126
127impl PartialEq for RowSchema {
128	fn eq(&self, other: &Self) -> bool {
129		self.0.as_ref() == other.0.as_ref()
130	}
131}
132
133impl Eq for RowSchema {}
134
135impl RowSchema {
136	/// Create a new schema from a list of fields.
137	///
138	/// This computes the memory layout (offsets, alignment) and fingerprint.
139	pub fn new(fields: Vec<RowSchemaField>) -> Self {
140		let fields = Self::compute_layout(fields);
141		let fingerprint = compute_fingerprint(&fields);
142
143		Self(Arc::new(Inner {
144			fingerprint,
145			fields,
146			cached_layout: OnceLock::new(),
147		}))
148	}
149
150	/// Create a schema from pre-computed fields and fingerprint.
151	/// Used when loading from storage.
152	pub fn from_parts(fingerprint: RowSchemaFingerprint, fields: Vec<RowSchemaField>) -> Self {
153		Self(Arc::new(Inner {
154			fingerprint,
155			fields,
156			cached_layout: OnceLock::new(),
157		}))
158	}
159
160	/// Get the schema's fingerprint
161	pub fn fingerprint(&self) -> RowSchemaFingerprint {
162		self.fingerprint
163	}
164
165	/// Get the fields in this schema
166	pub fn fields(&self) -> &[RowSchemaField] {
167		&self.fields
168	}
169
170	/// Get the number of fields
171	pub fn field_count(&self) -> usize {
172		self.fields.len()
173	}
174
175	/// Find a field by name
176	pub fn find_field(&self, name: &str) -> Option<&RowSchemaField> {
177		self.fields.iter().find(|f| f.name == name)
178	}
179
180	/// Find field index by name
181	pub fn find_field_index(&self, name: &str) -> Option<usize> {
182		self.fields.iter().position(|f| f.name == name)
183	}
184
185	/// Find a field by index
186	pub fn get_field(&self, index: usize) -> Option<&RowSchemaField> {
187		self.fields.get(index)
188	}
189
190	/// Get field name by index
191	pub fn get_field_name(&self, index: usize) -> Option<&str> {
192		self.fields.get(index).map(|f| f.name.as_str())
193	}
194
195	/// Get all field names as an iterator
196	pub fn field_names(&self) -> impl Iterator<Item = &str> {
197		self.fields.iter().map(|f| f.name.as_str())
198	}
199
200	/// Compute memory layout for fields.
201	/// Returns the fields with computed offsets and the total row size.
202	fn compute_layout(mut fields: Vec<RowSchemaField>) -> Vec<RowSchemaField> {
203		// Start offset calculation from where data section begins (after header + bitvec)
204		let bitvec_size = (fields.len() + 7) / 8;
205		let mut offset: u32 = (SCHEMA_HEADER_SIZE + bitvec_size) as u32;
206
207		for field in fields.iter_mut() {
208			let storage_type = field.constraint.storage_type();
209			field.size = storage_type.size() as u32;
210			field.align = storage_type.alignment() as u8;
211
212			// Align offset
213			let align = field.align as u32;
214			if align > 0 {
215				offset = (offset + align - 1) & !(align - 1);
216			}
217
218			field.offset = offset;
219			offset += field.size;
220		}
221
222		fields
223	}
224
225	/// Size of the bitvec section in bytes
226	pub fn bitvec_size(&self) -> usize {
227		(self.fields.len() + 7) / 8
228	}
229
230	/// Offset where field data starts (after header and bitvec)
231	pub fn data_offset(&self) -> usize {
232		SCHEMA_HEADER_SIZE + self.bitvec_size()
233	}
234
235	/// Compute and cache the layout (total_size, max_align).
236	/// This is called once and the result is cached for subsequent calls.
237	fn get_cached_layout(&self) -> (usize, usize) {
238		*self.cached_layout.get_or_init(|| {
239			// Compute max_align
240			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
241
242			// Compute total_size
243			let total_size = if self.fields.is_empty() {
244				SCHEMA_HEADER_SIZE + self.bitvec_size()
245			} else {
246				let last_field = &self.fields[self.fields.len() - 1];
247				let end = last_field.offset as usize + last_field.size as usize;
248				// Align to maximum field alignment
249				Self::align_up(end, max_align)
250			};
251
252			(total_size, max_align)
253		})
254	}
255
256	/// Total size of the static section
257	pub fn total_static_size(&self) -> usize {
258		self.get_cached_layout().0
259	}
260
261	/// Start of the dynamic section
262	pub fn dynamic_section_start(&self) -> usize {
263		self.total_static_size()
264	}
265
266	/// Size of the dynamic section
267	pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
268		row.len().saturating_sub(self.total_static_size())
269	}
270
271	/// Returns (offset, length) in the dynamic section for a defined dynamic field.
272	/// Returns None if field is undefined, static-only, or uses inline storage.
273	pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
274		if !row.is_defined(index) {
275			return None;
276		}
277		let field = &self.fields()[index];
278		match field.constraint.get_type().inner_type() {
279			Type::Utf8 | Type::Blob | Type::Any => {
280				let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
281				let offset =
282					u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
283						as usize;
284				let length =
285					u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
286						as usize;
287				Some((offset, length))
288			}
289			Type::Int
290			| Type::Uint
291			| Type::Decimal {
292				..
293			} => {
294				let packed = unsafe {
295					(row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
296				};
297				let packed = u128::from_le(packed);
298				if packed & PACKED_MODE_MASK != 0 {
299					let offset = (packed & PACKED_OFFSET_MASK) as usize;
300					let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
301					Some((offset, length))
302				} else {
303					None // inline storage
304				}
305			}
306			_ => None,
307		}
308	}
309
310	/// Writes a dynamic section reference for the given field in its type-appropriate format.
311	pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
312		let field = &self.fields()[index];
313		match field.constraint.get_type().inner_type() {
314			Type::Utf8 | Type::Blob | Type::Any => {
315				let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
316				ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
317				ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
318			}
319			Type::Int
320			| Type::Uint
321			| Type::Decimal {
322				..
323			} => {
324				let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
325				let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
326				let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
327				unsafe {
328					ptr::write_unaligned(
329						row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
330						packed.to_le(),
331					);
332				}
333			}
334			_ => {}
335		}
336	}
337
338	/// Replace dynamic data for a field. Handles both first-set (append) and update (splice).
339	/// On update: splices old bytes out, inserts new bytes, adjusts all other dynamic refs.
340	pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
341		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
342			let delta = new_data.len() as isize - old_length as isize;
343
344			// Collect refs that need adjusting BEFORE splice
345			let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
346				self.fields()
347					.iter()
348					.enumerate()
349					.filter(|(i, _)| *i != index && row.is_defined(*i))
350					.filter_map(|(i, _)| {
351						self.read_dynamic_ref(row, i)
352							.filter(|(off, _)| *off > old_offset)
353							.map(|(off, len)| (i, off, len))
354					})
355					.collect()
356			} else {
357				vec![]
358			};
359
360			// Splice bytes in the dynamic section
361			let dynamic_start = self.dynamic_section_start();
362			let abs_start = dynamic_start + old_offset;
363			let abs_end = abs_start + old_length;
364			row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
365
366			// Update this field's reference (same offset, new length)
367			self.write_dynamic_ref(row, index, old_offset, new_data.len());
368
369			// Adjust other dynamic references by the size delta
370			for (i, off, len) in refs_to_update {
371				let new_off = (off as isize + delta) as usize;
372				self.write_dynamic_ref(row, i, new_off, len);
373			}
374		} else {
375			// First set or transitioning from inline — append to dynamic section
376			let dynamic_offset = self.dynamic_section_size(row);
377			row.0.extend_from_slice(new_data);
378			self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
379		}
380		row.set_valid(index, true);
381	}
382
383	/// Remove dynamic data for a field without setting new data.
384	/// Used for dynamic→inline transitions in Int/Uint.
385	pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
386		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
387			// Collect refs that need adjusting
388			let refs_to_update: Vec<(usize, usize, usize)> = self
389				.fields()
390				.iter()
391				.enumerate()
392				.filter(|(i, _)| *i != index && row.is_defined(*i))
393				.filter_map(|(i, _)| {
394					self.read_dynamic_ref(row, i)
395						.filter(|(off, _)| *off > old_offset)
396						.map(|(off, len)| (i, off, len))
397				})
398				.collect();
399
400			// Remove bytes
401			let dynamic_start = self.dynamic_section_start();
402			let abs_start = dynamic_start + old_offset;
403			let abs_end = abs_start + old_length;
404			row.0.make_mut().splice(abs_start..abs_end, iter::empty());
405
406			// Adjust other references
407			for (i, off, len) in refs_to_update {
408				let new_off = off - old_length;
409				self.write_dynamic_ref(row, i, new_off, len);
410			}
411		}
412	}
413
414	/// Allocate a new encoded row
415	pub fn allocate(&self) -> EncodedRow {
416		let (total_size, max_align) = self.get_cached_layout();
417		let layout = Layout::from_size_align(total_size, max_align).unwrap();
418		unsafe {
419			let ptr = alloc_zeroed(layout);
420			if ptr.is_null() {
421				handle_alloc_error(layout);
422			}
423			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
424			let mut row = EncodedRow(CowVec::new(vec));
425			row.set_fingerprint(self.fingerprint);
426			row
427		}
428	}
429
430	fn align_up(offset: usize, align: usize) -> usize {
431		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
432	}
433
434	/// Set a field as undefined (not set)
435	pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
436		self.remove_dynamic_data(row, index);
437		row.set_valid(index, false);
438	}
439
440	/// Create a schema from a list of types.
441	/// Fields are named f0, f1, f2, etc. and have unconstrained types.
442	/// Useful for tests and simple state schemas.
443	pub fn testing(types: &[Type]) -> Self {
444		RowSchema::new(
445			types.iter()
446				.enumerate()
447				.map(|(i, t)| RowSchemaField::unconstrained(format!("f{}", i), t.clone()))
448				.collect(),
449		)
450	}
451}
452
453#[cfg(test)]
454mod tests {
455	use super::*;
456
457	#[test]
458	fn test_schema_creation() {
459		let fields = vec![
460			RowSchemaField::unconstrained("id", Type::Int8),
461			RowSchemaField::unconstrained("name", Type::Utf8),
462			RowSchemaField::unconstrained("active", Type::Boolean),
463		];
464
465		let schema = RowSchema::new(fields);
466
467		assert_eq!(schema.field_count(), 3);
468		assert_eq!(schema.fields()[0].name, "id");
469		assert_eq!(schema.fields()[1].name, "name");
470		assert_eq!(schema.fields()[2].name, "active");
471	}
472
473	#[test]
474	fn test_schema_fingerprint_deterministic() {
475		let fields1 = vec![
476			RowSchemaField::unconstrained("a", Type::Int4),
477			RowSchemaField::unconstrained("b", Type::Utf8),
478		];
479
480		let fields2 = vec![
481			RowSchemaField::unconstrained("a", Type::Int4),
482			RowSchemaField::unconstrained("b", Type::Utf8),
483		];
484
485		let schema1 = RowSchema::new(fields1);
486		let schema2 = RowSchema::new(fields2);
487
488		assert_eq!(schema1.fingerprint(), schema2.fingerprint());
489	}
490
491	#[test]
492	fn test_schema_fingerprint_different_for_different_schemas() {
493		let fields1 = vec![RowSchemaField::unconstrained("a", Type::Int4)];
494		let fields2 = vec![RowSchemaField::unconstrained("a", Type::Int8)];
495
496		let schema1 = RowSchema::new(fields1);
497		let schema2 = RowSchema::new(fields2);
498
499		assert_ne!(schema1.fingerprint(), schema2.fingerprint());
500	}
501
502	#[test]
503	fn test_find_field() {
504		let fields = vec![
505			RowSchemaField::unconstrained("id", Type::Int8),
506			RowSchemaField::unconstrained("name", Type::Utf8),
507		];
508
509		let schema = RowSchema::new(fields);
510
511		assert!(schema.find_field("id").is_some());
512		assert!(schema.find_field("name").is_some());
513		assert!(schema.find_field("missing").is_none());
514	}
515}