Skip to main content

reifydb_core/encoded/schema/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Schema definitions for encoding row data with consistent field layouts.
5//!
6//! A `Schema` describes the structure of encoded row data, including:
7//! - Field names, types, and order
8//! - Memory layout (offsets, sizes, alignment)
9//! - A content-addressable fingerprint for deduplication
10
11pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17	alloc::{Layout, alloc_zeroed, handle_alloc_error},
18	fmt::Debug,
19	ops::Deref,
20	sync::{Arc, OnceLock},
21};
22
23use reifydb_type::{
24	util::cowvec::CowVec,
25	value::{constraint::TypeConstraint, r#type::Type},
26};
27use serde::{Deserialize, Serialize};
28
29use super::encoded::EncodedValues;
30use crate::encoded::schema::fingerprint::{SchemaFingerprint, compute_fingerprint};
31
32/// Size of schema header (fingerprint) in bytes
33pub const SCHEMA_HEADER_SIZE: usize = 8;
34
35/// A field within a schema
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37pub struct SchemaField {
38	/// Field name
39	pub name: String,
40	/// Field type constraint (includes base type and optional constraints like MaxBytes)
41	pub constraint: TypeConstraint,
42	/// Byte offset within the encoded row
43	pub offset: u32,
44	/// Size in bytes
45	pub size: u32,
46	/// Alignment requirement
47	pub align: u8,
48}
49
50impl SchemaField {
51	/// Create a new schema field with a type constraint.
52	/// Offset, size, and alignment are computed when added to a Schema.
53	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
54		let storage_type = constraint.storage_type();
55		Self {
56			name: name.into(),
57			constraint,
58			offset: 0,
59			size: storage_type.size() as u32,
60			align: storage_type.alignment() as u8,
61		}
62	}
63
64	/// Create a new schema field with an unconstrained type.
65	/// Convenience method for the common case of no constraints.
66	pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
67		Self::new(name, TypeConstraint::unconstrained(field_type))
68	}
69}
70
71/// A schema describing the structure of encoded row data.
72pub struct Schema(Arc<Inner>);
73
74/// Inner data for a schema describing the structure of encoded row data.
75///
76/// Schemas are immutable and content-addressable via their fingerprint.
77/// The same field configuration always produces the same fingerprint,
78/// enabling schema deduplication in the registry.
79#[derive(Debug, Serialize, Deserialize)]
80pub struct Inner {
81	/// Content-addressable fingerprint (hash of canonical field representation)
82	pub fingerprint: SchemaFingerprint,
83	/// Fields in definition order
84	pub fields: Vec<SchemaField>,
85	/// Cached layout computation (total_size, max_align) - computed once on first use
86	#[serde(skip)]
87	cached_layout: OnceLock<(usize, usize)>,
88}
89
90impl PartialEq for Inner {
91	fn eq(&self, other: &Self) -> bool {
92		self.fingerprint == other.fingerprint && self.fields == other.fields
93	}
94}
95
96impl Eq for Inner {}
97
98impl Deref for Schema {
99	type Target = Inner;
100
101	fn deref(&self) -> &Self::Target {
102		&self.0
103	}
104}
105
106impl Clone for Schema {
107	fn clone(&self) -> Self {
108		Self(self.0.clone())
109	}
110}
111
112impl Debug for Schema {
113	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114		self.0.fmt(f)
115	}
116}
117
118impl PartialEq for Schema {
119	fn eq(&self, other: &Self) -> bool {
120		self.0.as_ref() == other.0.as_ref()
121	}
122}
123
124impl Eq for Schema {}
125
126impl Schema {
127	/// Create a new schema from a list of fields.
128	///
129	/// This computes the memory layout (offsets, alignment) and fingerprint.
130	pub fn new(fields: Vec<SchemaField>) -> Self {
131		let fields = Self::compute_layout(fields);
132		let fingerprint = compute_fingerprint(&fields);
133
134		Self(Arc::new(Inner {
135			fingerprint,
136			fields,
137			cached_layout: OnceLock::new(),
138		}))
139	}
140
141	/// Create a schema from pre-computed fields and fingerprint.
142	/// Used when loading from storage.
143	pub fn from_parts(fingerprint: SchemaFingerprint, fields: Vec<SchemaField>) -> Self {
144		Self(Arc::new(Inner {
145			fingerprint,
146			fields,
147			cached_layout: OnceLock::new(),
148		}))
149	}
150
151	/// Get the schema's fingerprint
152	pub fn fingerprint(&self) -> SchemaFingerprint {
153		self.fingerprint
154	}
155
156	/// Get the fields in this schema
157	pub fn fields(&self) -> &[SchemaField] {
158		&self.fields
159	}
160
161	/// Get the number of fields
162	pub fn field_count(&self) -> usize {
163		self.fields.len()
164	}
165
166	/// Find a field by name
167	pub fn find_field(&self, name: &str) -> Option<&SchemaField> {
168		self.fields.iter().find(|f| f.name == name)
169	}
170
171	/// Find field index by name
172	pub fn find_field_index(&self, name: &str) -> Option<usize> {
173		self.fields.iter().position(|f| f.name == name)
174	}
175
176	/// Find a field by index
177	pub fn get_field(&self, index: usize) -> Option<&SchemaField> {
178		self.fields.get(index)
179	}
180
181	/// Get field name by index
182	pub fn get_field_name(&self, index: usize) -> Option<&str> {
183		self.fields.get(index).map(|f| f.name.as_str())
184	}
185
186	/// Get all field names as an iterator
187	pub fn field_names(&self) -> impl Iterator<Item = &str> {
188		self.fields.iter().map(|f| f.name.as_str())
189	}
190
191	/// Compute memory layout for fields.
192	/// Returns the fields with computed offsets and the total row size.
193	fn compute_layout(mut fields: Vec<SchemaField>) -> Vec<SchemaField> {
194		// Start offset calculation from where data section begins (after header + bitvec)
195		let bitvec_size = (fields.len() + 7) / 8;
196		let mut offset: u32 = (SCHEMA_HEADER_SIZE + bitvec_size) as u32;
197
198		for field in fields.iter_mut() {
199			let storage_type = field.constraint.storage_type();
200			field.size = storage_type.size() as u32;
201			field.align = storage_type.alignment() as u8;
202
203			// Align offset
204			let align = field.align as u32;
205			if align > 0 {
206				offset = (offset + align - 1) & !(align - 1);
207			}
208
209			field.offset = offset;
210			offset += field.size;
211		}
212
213		fields
214	}
215
216	/// Size of the bitvec section in bytes
217	pub fn bitvec_size(&self) -> usize {
218		(self.fields.len() + 7) / 8
219	}
220
221	/// Offset where field data starts (after header and bitvec)
222	pub fn data_offset(&self) -> usize {
223		SCHEMA_HEADER_SIZE + self.bitvec_size()
224	}
225
226	/// Compute and cache the layout (total_size, max_align).
227	/// This is called once and the result is cached for subsequent calls.
228	fn get_cached_layout(&self) -> (usize, usize) {
229		*self.cached_layout.get_or_init(|| {
230			// Compute max_align
231			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
232
233			// Compute total_size
234			let total_size = if self.fields.is_empty() {
235				SCHEMA_HEADER_SIZE + self.bitvec_size()
236			} else {
237				let last_field = &self.fields[self.fields.len() - 1];
238				let end = last_field.offset as usize + last_field.size as usize;
239				// Align to maximum field alignment
240				Self::align_up(end, max_align)
241			};
242
243			(total_size, max_align)
244		})
245	}
246
247	/// Total size of the static section
248	pub fn total_static_size(&self) -> usize {
249		self.get_cached_layout().0
250	}
251
252	/// Start of the dynamic section
253	pub fn dynamic_section_start(&self) -> usize {
254		self.total_static_size()
255	}
256
257	/// Size of the dynamic section
258	pub fn dynamic_section_size(&self, row: &EncodedValues) -> usize {
259		row.len().saturating_sub(self.total_static_size())
260	}
261
262	/// Allocate a new encoded row
263	pub fn allocate(&self) -> EncodedValues {
264		let (total_size, max_align) = self.get_cached_layout();
265		let layout = Layout::from_size_align(total_size, max_align).unwrap();
266		unsafe {
267			let ptr = alloc_zeroed(layout);
268			if ptr.is_null() {
269				handle_alloc_error(layout);
270			}
271			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
272			let mut row = EncodedValues(CowVec::new(vec));
273			row.set_fingerprint(self.fingerprint);
274			row
275		}
276	}
277
278	fn align_up(offset: usize, align: usize) -> usize {
279		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
280	}
281
282	/// Set a field as undefined (not set)
283	pub fn set_undefined(&self, row: &mut EncodedValues, index: usize) {
284		row.set_valid(index, false);
285	}
286
287	/// Create a schema from a list of types.
288	/// Fields are named f0, f1, f2, etc. and have unconstrained types.
289	/// Useful for tests and simple state schemas.
290	pub fn testing(types: &[Type]) -> Self {
291		Schema::new(
292			types.iter()
293				.enumerate()
294				.map(|(i, t)| SchemaField::unconstrained(format!("f{}", i), t.clone()))
295				.collect(),
296		)
297	}
298}
299
300#[cfg(test)]
301mod tests {
302	use super::*;
303
304	#[test]
305	fn test_schema_creation() {
306		let fields = vec![
307			SchemaField::unconstrained("id", Type::Int8),
308			SchemaField::unconstrained("name", Type::Utf8),
309			SchemaField::unconstrained("active", Type::Boolean),
310		];
311
312		let schema = Schema::new(fields);
313
314		assert_eq!(schema.field_count(), 3);
315		assert_eq!(schema.fields()[0].name, "id");
316		assert_eq!(schema.fields()[1].name, "name");
317		assert_eq!(schema.fields()[2].name, "active");
318	}
319
320	#[test]
321	fn test_schema_fingerprint_deterministic() {
322		let fields1 =
323			vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
324
325		let fields2 =
326			vec![SchemaField::unconstrained("a", Type::Int4), SchemaField::unconstrained("b", Type::Utf8)];
327
328		let schema1 = Schema::new(fields1);
329		let schema2 = Schema::new(fields2);
330
331		assert_eq!(schema1.fingerprint(), schema2.fingerprint());
332	}
333
334	#[test]
335	fn test_schema_fingerprint_different_for_different_schemas() {
336		let fields1 = vec![SchemaField::unconstrained("a", Type::Int4)];
337		let fields2 = vec![SchemaField::unconstrained("a", Type::Int8)];
338
339		let schema1 = Schema::new(fields1);
340		let schema2 = Schema::new(fields2);
341
342		assert_ne!(schema1.fingerprint(), schema2.fingerprint());
343	}
344
345	#[test]
346	fn test_find_field() {
347		let fields = vec![
348			SchemaField::unconstrained("id", Type::Int8),
349			SchemaField::unconstrained("name", Type::Utf8),
350		];
351
352		let schema = Schema::new(fields);
353
354		assert!(schema.find_field("id").is_some());
355		assert!(schema.find_field("name").is_some());
356		assert!(schema.find_field("missing").is_none());
357	}
358}