Skip to main content

reifydb_core/encoded/shape/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4//! Row-shape descriptor: the schema-of-bytes that explains how to interpret an `EncodedRow`.
5//!
6//! A `RowShape` lists every field (name, type constraint, byte offset, byte size, alignment) so storage backends,
7//! replication, and CDC can address fields without consulting the catalog. Submodules cover shape consolidation across
8//! rows of the same logical schema, schema evolution rules for adding and removing fields, structural fingerprinting
9//! used by plan caches and migration tooling, and conversion routines from typed schemas.
10//!
11//! Invariant: the `SHAPE_HEADER_SIZE` constant and the packed-mode bit layout (mode bit, length mask, offset mask) are
12//! part of the wire format. Reordering or resizing any of these regions silently breaks every row written under the
13//! previous layout.
14
15pub mod cache;
16pub mod consolidate;
17pub mod evolution;
18pub mod fingerprint;
19mod from;
20
21use std::{
22	alloc::{Layout, alloc_zeroed, handle_alloc_error},
23	fmt,
24	fmt::Debug,
25	iter,
26	ops::Deref,
27	ptr,
28	sync::{Arc, LazyLock, OnceLock},
29};
30
31use reifydb_value::{
32	util::cowvec::CowVec,
33	value::{constraint::TypeConstraint, value_type::ValueType},
34};
35use serde::{Deserialize, Serialize};
36
37use super::row::EncodedRow;
38use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
39
40pub const SHAPE_HEADER_SIZE: usize = 24;
41
42const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
43const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
44const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
45const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct RowShapeField {
49	pub name: String,
50
51	pub constraint: TypeConstraint,
52
53	pub offset: u32,
54
55	pub size: u32,
56
57	pub align: u8,
58}
59
60impl RowShapeField {
61	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
62		let storage_type = constraint.storage_type();
63		Self {
64			name: name.into(),
65			constraint,
66			offset: 0,
67			size: storage_type.size() as u32,
68			align: storage_type.alignment() as u8,
69		}
70	}
71
72	pub fn unconstrained(name: impl Into<String>, field_type: ValueType) -> Self {
73		Self::new(name, TypeConstraint::unconstrained(field_type))
74	}
75}
76
77pub struct RowShape(Arc<Inner>);
78
79#[derive(Debug, Serialize, Deserialize)]
80pub struct Inner {
81	pub fingerprint: RowShapeFingerprint,
82
83	pub fields: Vec<RowShapeField>,
84
85	#[serde(skip)]
86	cached_layout: OnceLock<(usize, usize)>,
87}
88
89impl PartialEq for Inner {
90	fn eq(&self, other: &Self) -> bool {
91		self.fingerprint == other.fingerprint && self.fields == other.fields
92	}
93}
94
95impl Eq for Inner {}
96
97impl Deref for RowShape {
98	type Target = Inner;
99
100	fn deref(&self) -> &Self::Target {
101		&self.0
102	}
103}
104
105impl Clone for RowShape {
106	fn clone(&self) -> Self {
107		Self(self.0.clone())
108	}
109}
110
111impl Debug for RowShape {
112	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113		self.0.fmt(f)
114	}
115}
116
117impl PartialEq for RowShape {
118	fn eq(&self, other: &Self) -> bool {
119		self.0.as_ref() == other.0.as_ref()
120	}
121}
122
123impl Eq for RowShape {}
124
125impl RowShape {
126	pub fn new(fields: Vec<RowShapeField>) -> Self {
127		let fields = Self::compute_layout(fields);
128		let fingerprint = compute_fingerprint(&fields);
129
130		Self(Arc::new(Inner {
131			fingerprint,
132			fields,
133			cached_layout: OnceLock::new(),
134		}))
135	}
136
137	pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
138		Self(Arc::new(Inner {
139			fingerprint,
140			fields,
141			cached_layout: OnceLock::new(),
142		}))
143	}
144
145	pub fn fingerprint(&self) -> RowShapeFingerprint {
146		self.fingerprint
147	}
148
149	pub fn fields(&self) -> &[RowShapeField] {
150		&self.fields
151	}
152
153	pub fn field_count(&self) -> usize {
154		self.fields.len()
155	}
156
157	pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
158		self.fields.iter().find(|f| f.name == name)
159	}
160
161	pub fn find_field_index(&self, name: &str) -> Option<usize> {
162		self.fields.iter().position(|f| f.name == name)
163	}
164
165	pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
166		self.fields.get(index)
167	}
168
169	pub fn get_field_name(&self, index: usize) -> Option<&str> {
170		self.fields.get(index).map(|f| f.name.as_str())
171	}
172
173	pub fn field_names(&self) -> impl Iterator<Item = &str> {
174		self.fields.iter().map(|f| f.name.as_str())
175	}
176
177	fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
178		let bitvec_size = fields.len().div_ceil(8);
179		let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
180
181		for field in fields.iter_mut() {
182			let storage_type = field.constraint.storage_type();
183			field.size = storage_type.size() as u32;
184			field.align = storage_type.alignment() as u8;
185
186			let align = field.align as u32;
187			if align > 0 {
188				offset = (offset + align - 1) & !(align - 1);
189			}
190
191			field.offset = offset;
192			offset += field.size;
193		}
194
195		fields
196	}
197
198	pub fn bitvec_size(&self) -> usize {
199		self.fields.len().div_ceil(8)
200	}
201
202	pub fn data_offset(&self) -> usize {
203		SHAPE_HEADER_SIZE + self.bitvec_size()
204	}
205
206	fn get_cached_layout(&self) -> (usize, usize) {
207		*self.cached_layout.get_or_init(|| {
208			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
209
210			let total_size = if self.fields.is_empty() {
211				SHAPE_HEADER_SIZE + self.bitvec_size()
212			} else {
213				let last_field = &self.fields[self.fields.len() - 1];
214				let end = last_field.offset as usize + last_field.size as usize;
215
216				Self::align_up(end, max_align)
217			};
218
219			(total_size, max_align)
220		})
221	}
222
223	pub fn total_static_size(&self) -> usize {
224		self.get_cached_layout().0
225	}
226
227	pub fn dynamic_section_start(&self) -> usize {
228		self.total_static_size()
229	}
230
231	pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
232		row.len().saturating_sub(self.total_static_size())
233	}
234
235	pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
236		if !row.is_defined(index) {
237			return None;
238		}
239		let field = &self.fields()[index];
240		match field.constraint.get_type().inner_type() {
241			ValueType::Utf8 | ValueType::Blob | ValueType::Any => {
242				let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
243				let offset =
244					u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
245						as usize;
246				let length =
247					u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
248						as usize;
249				Some((offset, length))
250			}
251			ValueType::Int | ValueType::Uint | ValueType::Decimal => {
252				let packed = unsafe {
253					(row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
254				};
255				let packed = u128::from_le(packed);
256				if packed & PACKED_MODE_MASK != 0 {
257					let offset = (packed & PACKED_OFFSET_MASK) as usize;
258					let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
259					Some((offset, length))
260				} else {
261					None
262				}
263			}
264			_ => None,
265		}
266	}
267
268	pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
269		let field = &self.fields()[index];
270		match field.constraint.get_type().inner_type() {
271			ValueType::Utf8 | ValueType::Blob | ValueType::Any => {
272				let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
273				ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
274				ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
275			}
276			ValueType::Int | ValueType::Uint | ValueType::Decimal => {
277				let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
278				let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
279				let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
280				unsafe {
281					ptr::write_unaligned(
282						row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
283						packed.to_le(),
284					);
285				}
286			}
287			_ => {}
288		}
289	}
290
291	pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
292		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
293			let delta = new_data.len() as isize - old_length as isize;
294
295			let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
296				self.fields()
297					.iter()
298					.enumerate()
299					.filter(|(i, _)| *i != index && row.is_defined(*i))
300					.filter_map(|(i, _)| {
301						self.read_dynamic_ref(row, i)
302							.filter(|(off, _)| *off > old_offset)
303							.map(|(off, len)| (i, off, len))
304					})
305					.collect()
306			} else {
307				vec![]
308			};
309
310			let dynamic_start = self.dynamic_section_start();
311			let abs_start = dynamic_start + old_offset;
312			let abs_end = abs_start + old_length;
313			row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
314
315			self.write_dynamic_ref(row, index, old_offset, new_data.len());
316
317			for (i, off, len) in refs_to_update {
318				let new_off = (off as isize + delta) as usize;
319				self.write_dynamic_ref(row, i, new_off, len);
320			}
321		} else {
322			let dynamic_offset = self.dynamic_section_size(row);
323			row.0.extend_from_slice(new_data);
324			self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
325		}
326		row.set_valid(index, true);
327	}
328
329	pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
330		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
331			let refs_to_update: Vec<(usize, usize, usize)> = self
332				.fields()
333				.iter()
334				.enumerate()
335				.filter(|(i, _)| *i != index && row.is_defined(*i))
336				.filter_map(|(i, _)| {
337					self.read_dynamic_ref(row, i)
338						.filter(|(off, _)| *off > old_offset)
339						.map(|(off, len)| (i, off, len))
340				})
341				.collect();
342
343			let dynamic_start = self.dynamic_section_start();
344			let abs_start = dynamic_start + old_offset;
345			let abs_end = abs_start + old_length;
346			row.0.make_mut().splice(abs_start..abs_end, iter::empty());
347
348			for (i, off, len) in refs_to_update {
349				let new_off = off - old_length;
350				self.write_dynamic_ref(row, i, new_off, len);
351			}
352		}
353	}
354
355	pub fn allocate(&self) -> EncodedRow {
356		let (total_size, max_align) = self.get_cached_layout();
357		let layout = Layout::from_size_align(total_size, max_align).unwrap();
358		unsafe {
359			let ptr = alloc_zeroed(layout);
360			if ptr.is_null() {
361				handle_alloc_error(layout);
362			}
363			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
364			let mut row = EncodedRow(CowVec::new(vec));
365			row.set_fingerprint(self.fingerprint);
366			#[cfg(reifydb_assertions)]
367			{
368				assert!(
369					row.len() == total_size,
370					"allocated row length does not match the shape total_static_size, so any field accessor using pre-computed offsets will read from garbage memory (row_len={} total_size={})",
371					row.len(),
372					total_size
373				);
374			}
375			row
376		}
377	}
378
379	fn align_up(offset: usize, align: usize) -> usize {
380		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
381	}
382
383	pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
384		self.remove_dynamic_data(row, index);
385		row.set_valid(index, false);
386	}
387
388	pub fn testing(types: &[ValueType]) -> Self {
389		RowShape::new(
390			types.iter()
391				.enumerate()
392				.map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
393				.collect(),
394		)
395	}
396
397	pub fn operator_state() -> Self {
398		OPERATOR_STATE_SHAPE.clone()
399	}
400}
401
402static OPERATOR_STATE_SHAPE: LazyLock<RowShape> =
403	LazyLock::new(|| RowShape::new(vec![RowShapeField::unconstrained("state", ValueType::Blob)]));
404
405#[cfg(test)]
406mod tests {
407	use super::*;
408
409	#[test]
410	fn test_shape_creation() {
411		let fields = vec![
412			RowShapeField::unconstrained("id", ValueType::Int8),
413			RowShapeField::unconstrained("name", ValueType::Utf8),
414			RowShapeField::unconstrained("active", ValueType::Boolean),
415		];
416
417		let shape = RowShape::new(fields);
418
419		assert_eq!(shape.field_count(), 3);
420		assert_eq!(shape.fields()[0].name, "id");
421		assert_eq!(shape.fields()[1].name, "name");
422		assert_eq!(shape.fields()[2].name, "active");
423	}
424
425	#[test]
426	fn test_shape_fingerprint_deterministic() {
427		let fields1 = vec![
428			RowShapeField::unconstrained("a", ValueType::Int4),
429			RowShapeField::unconstrained("b", ValueType::Utf8),
430		];
431
432		let fields2 = vec![
433			RowShapeField::unconstrained("a", ValueType::Int4),
434			RowShapeField::unconstrained("b", ValueType::Utf8),
435		];
436
437		let shape1 = RowShape::new(fields1);
438		let shape2 = RowShape::new(fields2);
439
440		assert_eq!(shape1.fingerprint(), shape2.fingerprint());
441	}
442
443	#[test]
444	fn test_shape_fingerprint_different_for_different_shapes() {
445		let fields1 = vec![RowShapeField::unconstrained("a", ValueType::Int4)];
446		let fields2 = vec![RowShapeField::unconstrained("a", ValueType::Int8)];
447
448		let shape1 = RowShape::new(fields1);
449		let shape2 = RowShape::new(fields2);
450
451		assert_ne!(shape1.fingerprint(), shape2.fingerprint());
452	}
453
454	#[test]
455	fn test_find_field() {
456		let fields = vec![
457			RowShapeField::unconstrained("id", ValueType::Int8),
458			RowShapeField::unconstrained("name", ValueType::Utf8),
459		];
460
461		let shape = RowShape::new(fields);
462
463		assert!(shape.find_field("id").is_some());
464		assert!(shape.find_field("name").is_some());
465		assert!(shape.find_field("missing").is_none());
466	}
467}