Skip to main content

reifydb_core/encoded/schema/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Schema definitions for encoding row data with consistent field layouts.
5//!
6//! A `Schema` describes the structure of encoded row data, including:
7//! - Field names, types, and order
8//! - Memory layout (offsets, sizes, alignment)
9//! - A content-addressable fingerprint for deduplication
10
11pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17	alloc::{Layout, alloc_zeroed, handle_alloc_error},
18	fmt,
19	fmt::Debug,
20	ops::Deref,
21	sync::{Arc, OnceLock},
22};
23
24use reifydb_type::{
25	util::cowvec::CowVec,
26	value::{constraint::TypeConstraint, r#type::Type},
27};
28use serde::{Deserialize, Serialize};
29
30use super::encoded::EncodedValues;
31use crate::encoded::schema::fingerprint::{SchemaFingerprint, compute_fingerprint};
32
33/// Size of schema header (fingerprint) in bytes
34pub const SCHEMA_HEADER_SIZE: usize = 8;
35
36/// A field within a schema
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub struct SchemaField {
39	/// Field name
40	pub name: String,
41	/// Field type constraint (includes base type and optional constraints like MaxBytes)
42	pub constraint: TypeConstraint,
43	/// Byte offset within the encoded row
44	pub offset: u32,
45	/// Size in bytes
46	pub size: u32,
47	/// Alignment requirement
48	pub align: u8,
49}
50
51impl SchemaField {
52	/// Create a new schema field with a type constraint.
53	/// Offset, size, and alignment are computed when added to a Schema.
54	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
55		let storage_type = constraint.storage_type();
56		Self {
57			name: name.into(),
58			constraint,
59			offset: 0,
60			size: storage_type.size() as u32,
61			align: storage_type.alignment() as u8,
62		}
63	}
64
65	/// Create a new schema field with an unconstrained type.
66	/// Convenience method for the common case of no constraints.
67	pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
68		Self::new(name, TypeConstraint::unconstrained(field_type))
69	}
70}
71
72/// A schema describing the structure of encoded row data.
73pub struct Schema(Arc<Inner>);
74
75/// Inner data for a schema describing the structure of encoded row data.
76///
77/// Schemas are immutable and content-addressable via their fingerprint.
78/// The same field configuration always produces the same fingerprint,
79/// enabling schema deduplication in the registry.
80#[derive(Debug, Serialize, Deserialize)]
81pub struct Inner {
82	/// Content-addressable fingerprint (hash of canonical field representation)
83	pub fingerprint: SchemaFingerprint,
84	/// Fields in definition order
85	pub fields: Vec<SchemaField>,
86	/// Cached layout computation (total_size, max_align) - computed once on first use
87	#[serde(skip)]
88	cached_layout: OnceLock<(usize, usize)>,
89}
90
91impl PartialEq for Inner {
92	fn eq(&self, other: &Self) -> bool {
93		self.fingerprint == other.fingerprint && self.fields == other.fields
94	}
95}
96
97impl Eq for Inner {}
98
99impl Deref for Schema {
100	type Target = Inner;
101
102	fn deref(&self) -> &Self::Target {
103		&self.0
104	}
105}
106
107impl Clone for Schema {
108	fn clone(&self) -> Self {
109		Self(self.0.clone())
110	}
111}
112
113impl Debug for Schema {
114	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115		self.0.fmt(f)
116	}
117}
118
119impl PartialEq for Schema {
120	fn eq(&self, other: &Self) -> bool {
121		self.0.as_ref() == other.0.as_ref()
122	}
123}
124
125impl Eq for Schema {}
126
127impl Schema {
128	/// Create a new schema from a list of fields.
129	///
130	/// This computes the memory layout (offsets, alignment) and fingerprint.
131	pub fn new(fields: Vec<SchemaField>) -> Self {
132		let fields = Self::compute_layout(fields);
133		let fingerprint = compute_fingerprint(&fields);
134
135		Self(Arc::new(Inner {
136			fingerprint,
137			fields,
138			cached_layout: OnceLock::new(),
139		}))
140	}
141
142	/// Create a schema from pre-computed fields and fingerprint.
143	/// Used when loading from storage.
144	pub fn from_parts(fingerprint: SchemaFingerprint, fields: Vec<SchemaField>) -> Self {
145		Self(Arc::new(Inner {
146			fingerprint,
147			fields,
148			cached_layout: OnceLock::new(),
149		}))
150	}
151
152	/// Get the schema's fingerprint
153	pub fn fingerprint(&self) -> SchemaFingerprint {
154		self.fingerprint
155	}
156
157	/// Get the fields in this schema
158	pub fn fields(&self) -> &[SchemaField] {
159		&self.fields
160	}
161
162	/// Get the number of fields
163	pub fn field_count(&self) -> usize {
164		self.fields.len()
165	}
166
167	/// Find a field by name
168	pub fn find_field(&self, name: &str) -> Option<&SchemaField> {
169		self.fields.iter().find(|f| f.name == name)
170	}
171
172	/// Find field index by name
173	pub fn find_field_index(&self, name: &str) -> Option<usize> {
174		self.fields.iter().position(|f| f.name == name)
175	}
176
177	/// Find a field by index
178	pub fn get_field(&self, index: usize) -> Option<&SchemaField> {
179		self.fields.get(index)
180	}
181
182	/// Get field name by index
183	pub fn get_field_name(&self, index: usize) -> Option<&str> {
184		self.fields.get(index).map(|f| f.name.as_str())
185	}
186
187	/// Get all field names as an iterator
188	pub fn field_names(&self) -> impl Iterator<Item = &str> {
189		self.fields.iter().map(|f| f.name.as_str())
190	}
191
192	/// Compute memory layout for fields.
193	/// Returns the fields with computed offsets and the total row size.
194	fn compute_layout(mut fields: Vec<SchemaField>) -> Vec<SchemaField> {
195		// Start offset calculation from where data section begins (after header + bitvec)
196		let bitvec_size = (fields.len() + 7) / 8;
197		let mut offset: u32 = (SCHEMA_HEADER_SIZE + bitvec_size) as u32;
198
199		for field in fields.iter_mut() {
200			let storage_type = field.constraint.storage_type();
201			field.size = storage_type.size() as u32;
202			field.align = storage_type.alignment() as u8;
203
204			// Align offset
205			let align = field.align as u32;
206			if align > 0 {
207				offset = (offset + align - 1) & !(align - 1);
208			}
209
210			field.offset = offset;
211			offset += field.size;
212		}
213
214		fields
215	}
216
217	/// Size of the bitvec section in bytes
218	pub fn bitvec_size(&self) -> usize {
219		(self.fields.len() + 7) / 8
220	}
221
222	/// Offset where field data starts (after header and bitvec)
223	pub fn data_offset(&self) -> usize {
224		SCHEMA_HEADER_SIZE + self.bitvec_size()
225	}
226
227	/// Compute and cache the layout (total_size, max_align).
228	/// This is called once and the result is cached for subsequent calls.
229	fn get_cached_layout(&self) -> (usize, usize) {
230		*self.cached_layout.get_or_init(|| {
231			// Compute max_align
232			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
233
234			// Compute total_size
235			let total_size = if self.fields.is_empty() {
236				SCHEMA_HEADER_SIZE + self.bitvec_size()
237			} else {
238				let last_field = &self.fields[self.fields.len() - 1];
239				let end = last_field.offset as usize + last_field.size as usize;
240				// Align to maximum field alignment
241				Self::align_up(end, max_align)
242			};
243
244			(total_size, max_align)
245		})
246	}
247
248	/// Total size of the static section
249	pub fn total_static_size(&self) -> usize {
250		self.get_cached_layout().0
251	}
252
253	/// Start of the dynamic section
254	pub fn dynamic_section_start(&self) -> usize {
255		self.total_static_size()
256	}
257
258	/// Size of the dynamic section
259	pub fn dynamic_section_size(&self, row: &EncodedValues) -> usize {
260		row.len().saturating_sub(self.total_static_size())
261	}
262
263	/// Allocate a new encoded row
264	pub fn allocate(&self) -> EncodedValues {
265		let (total_size, max_align) = self.get_cached_layout();
266		let layout = Layout::from_size_align(total_size, max_align).unwrap();
267		unsafe {
268			let ptr = alloc_zeroed(layout);
269			if ptr.is_null() {
270				handle_alloc_error(layout);
271			}
272			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
273			let mut row = EncodedValues(CowVec::new(vec));
274			row.set_fingerprint(self.fingerprint);
275			row
276		}
277	}
278
279	fn align_up(offset: usize, align: usize) -> usize {
280		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
281	}
282
283	/// Set a field as undefined (not set)
284	pub fn set_none(&self, row: &mut EncodedValues, index: usize) {
285		row.set_valid(index, false);
286	}
287
288	/// Create a schema from a list of types.
289	/// Fields are named f0, f1, f2, etc. and have unconstrained types.
290	/// Useful for tests and simple state schemas.
291	pub fn testing(types: &[Type]) -> Self {
292		Schema::new(
293			types.iter()
294				.enumerate()
295				.map(|(i, t)| SchemaField::unconstrained(format!("f{}", i), t.clone()))
296				.collect(),
297		)
298	}
299}
300
301#[cfg(test)]
302mod tests {
303	use super::*;
304
305	#[test]
306	fn test_schema_creation() {
307		let fields = vec![
308			SchemaField::unconstrained("id", Type::Int8),
309			SchemaField::unconstrained("name", Type::Utf8),
310			SchemaField::unconstrained("active", Type::Boolean),
311		];
312
313		let schema = Schema::new(fields);
314
315		assert_eq!(schema.field_count(), 3);
316		assert_eq!(schema.fields()[0].name, "id");
317		assert_eq!(schema.fields()[1].name, "name");
318		assert_eq!(schema.fields()[2].name, "active");
319	}
320
321	#[test]
322	fn test_schema_fingerprint_deterministic() {
323		let fields1 =
324			vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
325
326		let fields2 =
327			vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
328
329		let schema1 = Schema::new(fields1);
330		let schema2 = Schema::new(fields2);
331
332		assert_eq!(schema1.fingerprint(), schema2.fingerprint());
333	}
334
335	#[test]
336	fn test_schema_fingerprint_different_for_different_schemas() {
337		let fields1 = vec![SchemaField::unconstrained("a", Type::Int4)];
338		let fields2 = vec![SchemaField::unconstrained("a", Type::Int8)];
339
340		let schema1 = Schema::new(fields1);
341		let schema2 = Schema::new(fields2);
342
343		assert_ne!(schema1.fingerprint(), schema2.fingerprint());
344	}
345
346	#[test]
347	fn test_find_field() {
348		let fields = vec![
349			SchemaField::unconstrained("id", Type::Int8),
350			SchemaField::unconstrained("name", Type::Utf8),
351		];
352
353		let schema = Schema::new(fields);
354
355		assert!(schema.find_field("id").is_some());
356		assert!(schema.find_field("name").is_some());
357		assert!(schema.find_field("missing").is_none());
358	}
359}