Skip to main content

reifydb_core/encoded/shape/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Row-shape descriptor: the schema-of-bytes that explains how to interpret an `EncodedRow`.
5//!
6//! A `RowShape` lists every field (name, type constraint, byte offset, byte size, alignment) so storage backends,
7//! replication, and CDC can address fields without consulting the catalog. Submodules cover shape consolidation across
8//! rows of the same logical schema, schema evolution rules for adding and removing fields, structural fingerprinting
9//! used by plan caches and migration tooling, and conversion routines from typed schemas.
10//!
11//! Invariant: the `SHAPE_HEADER_SIZE` constant and the packed-mode bit layout (mode bit, length mask, offset mask) are
12//! part of the wire format. Reordering or resizing any of these regions silently breaks every row written under the
13//! previous layout.
14
15pub mod consolidate;
16pub mod evolution;
17pub mod fingerprint;
18mod from;
19
20use std::{
21	alloc::{Layout, alloc_zeroed, handle_alloc_error},
22	fmt,
23	fmt::Debug,
24	iter,
25	ops::Deref,
26	ptr,
27	sync::{Arc, LazyLock, OnceLock},
28};
29
30use reifydb_type::{
31	util::cowvec::CowVec,
32	value::{constraint::TypeConstraint, r#type::Type},
33};
34use serde::{Deserialize, Serialize};
35
36use super::row::EncodedRow;
37use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
38
39pub const SHAPE_HEADER_SIZE: usize = 24;
40
41const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
42const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
43const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
44const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct RowShapeField {
48	pub name: String,
49
50	pub constraint: TypeConstraint,
51
52	pub offset: u32,
53
54	pub size: u32,
55
56	pub align: u8,
57}
58
59impl RowShapeField {
60	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
61		let storage_type = constraint.storage_type();
62		Self {
63			name: name.into(),
64			constraint,
65			offset: 0,
66			size: storage_type.size() as u32,
67			align: storage_type.alignment() as u8,
68		}
69	}
70
71	pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
72		Self::new(name, TypeConstraint::unconstrained(field_type))
73	}
74}
75
76pub struct RowShape(Arc<Inner>);
77
78#[derive(Debug, Serialize, Deserialize)]
79pub struct Inner {
80	pub fingerprint: RowShapeFingerprint,
81
82	pub fields: Vec<RowShapeField>,
83
84	#[serde(skip)]
85	cached_layout: OnceLock<(usize, usize)>,
86}
87
88impl PartialEq for Inner {
89	fn eq(&self, other: &Self) -> bool {
90		self.fingerprint == other.fingerprint && self.fields == other.fields
91	}
92}
93
94impl Eq for Inner {}
95
96impl Deref for RowShape {
97	type Target = Inner;
98
99	fn deref(&self) -> &Self::Target {
100		&self.0
101	}
102}
103
104impl Clone for RowShape {
105	fn clone(&self) -> Self {
106		Self(self.0.clone())
107	}
108}
109
110impl Debug for RowShape {
111	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112		self.0.fmt(f)
113	}
114}
115
116impl PartialEq for RowShape {
117	fn eq(&self, other: &Self) -> bool {
118		self.0.as_ref() == other.0.as_ref()
119	}
120}
121
122impl Eq for RowShape {}
123
124impl RowShape {
125	pub fn new(fields: Vec<RowShapeField>) -> Self {
126		let fields = Self::compute_layout(fields);
127		let fingerprint = compute_fingerprint(&fields);
128
129		Self(Arc::new(Inner {
130			fingerprint,
131			fields,
132			cached_layout: OnceLock::new(),
133		}))
134	}
135
136	pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
137		Self(Arc::new(Inner {
138			fingerprint,
139			fields,
140			cached_layout: OnceLock::new(),
141		}))
142	}
143
144	pub fn fingerprint(&self) -> RowShapeFingerprint {
145		self.fingerprint
146	}
147
148	pub fn fields(&self) -> &[RowShapeField] {
149		&self.fields
150	}
151
152	pub fn field_count(&self) -> usize {
153		self.fields.len()
154	}
155
156	pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
157		self.fields.iter().find(|f| f.name == name)
158	}
159
160	pub fn find_field_index(&self, name: &str) -> Option<usize> {
161		self.fields.iter().position(|f| f.name == name)
162	}
163
164	pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
165		self.fields.get(index)
166	}
167
168	pub fn get_field_name(&self, index: usize) -> Option<&str> {
169		self.fields.get(index).map(|f| f.name.as_str())
170	}
171
172	pub fn field_names(&self) -> impl Iterator<Item = &str> {
173		self.fields.iter().map(|f| f.name.as_str())
174	}
175
176	fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
177		let bitvec_size = fields.len().div_ceil(8);
178		let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
179
180		for field in fields.iter_mut() {
181			let storage_type = field.constraint.storage_type();
182			field.size = storage_type.size() as u32;
183			field.align = storage_type.alignment() as u8;
184
185			let align = field.align as u32;
186			if align > 0 {
187				offset = (offset + align - 1) & !(align - 1);
188			}
189
190			field.offset = offset;
191			offset += field.size;
192		}
193
194		fields
195	}
196
197	pub fn bitvec_size(&self) -> usize {
198		self.fields.len().div_ceil(8)
199	}
200
201	pub fn data_offset(&self) -> usize {
202		SHAPE_HEADER_SIZE + self.bitvec_size()
203	}
204
205	fn get_cached_layout(&self) -> (usize, usize) {
206		*self.cached_layout.get_or_init(|| {
207			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
208
209			let total_size = if self.fields.is_empty() {
210				SHAPE_HEADER_SIZE + self.bitvec_size()
211			} else {
212				let last_field = &self.fields[self.fields.len() - 1];
213				let end = last_field.offset as usize + last_field.size as usize;
214
215				Self::align_up(end, max_align)
216			};
217
218			(total_size, max_align)
219		})
220	}
221
222	pub fn total_static_size(&self) -> usize {
223		self.get_cached_layout().0
224	}
225
226	pub fn dynamic_section_start(&self) -> usize {
227		self.total_static_size()
228	}
229
230	pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
231		row.len().saturating_sub(self.total_static_size())
232	}
233
234	pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
235		if !row.is_defined(index) {
236			return None;
237		}
238		let field = &self.fields()[index];
239		match field.constraint.get_type().inner_type() {
240			Type::Utf8 | Type::Blob | Type::Any => {
241				let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
242				let offset =
243					u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
244						as usize;
245				let length =
246					u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
247						as usize;
248				Some((offset, length))
249			}
250			Type::Int | Type::Uint | Type::Decimal => {
251				let packed = unsafe {
252					(row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
253				};
254				let packed = u128::from_le(packed);
255				if packed & PACKED_MODE_MASK != 0 {
256					let offset = (packed & PACKED_OFFSET_MASK) as usize;
257					let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
258					Some((offset, length))
259				} else {
260					None
261				}
262			}
263			_ => None,
264		}
265	}
266
267	pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
268		let field = &self.fields()[index];
269		match field.constraint.get_type().inner_type() {
270			Type::Utf8 | Type::Blob | Type::Any => {
271				let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
272				ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
273				ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
274			}
275			Type::Int | Type::Uint | Type::Decimal => {
276				let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
277				let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
278				let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
279				unsafe {
280					ptr::write_unaligned(
281						row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
282						packed.to_le(),
283					);
284				}
285			}
286			_ => {}
287		}
288	}
289
290	pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
291		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
292			let delta = new_data.len() as isize - old_length as isize;
293
294			let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
295				self.fields()
296					.iter()
297					.enumerate()
298					.filter(|(i, _)| *i != index && row.is_defined(*i))
299					.filter_map(|(i, _)| {
300						self.read_dynamic_ref(row, i)
301							.filter(|(off, _)| *off > old_offset)
302							.map(|(off, len)| (i, off, len))
303					})
304					.collect()
305			} else {
306				vec![]
307			};
308
309			let dynamic_start = self.dynamic_section_start();
310			let abs_start = dynamic_start + old_offset;
311			let abs_end = abs_start + old_length;
312			row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
313
314			self.write_dynamic_ref(row, index, old_offset, new_data.len());
315
316			for (i, off, len) in refs_to_update {
317				let new_off = (off as isize + delta) as usize;
318				self.write_dynamic_ref(row, i, new_off, len);
319			}
320		} else {
321			let dynamic_offset = self.dynamic_section_size(row);
322			row.0.extend_from_slice(new_data);
323			self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
324		}
325		row.set_valid(index, true);
326	}
327
328	pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
329		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
330			let refs_to_update: Vec<(usize, usize, usize)> = self
331				.fields()
332				.iter()
333				.enumerate()
334				.filter(|(i, _)| *i != index && row.is_defined(*i))
335				.filter_map(|(i, _)| {
336					self.read_dynamic_ref(row, i)
337						.filter(|(off, _)| *off > old_offset)
338						.map(|(off, len)| (i, off, len))
339				})
340				.collect();
341
342			let dynamic_start = self.dynamic_section_start();
343			let abs_start = dynamic_start + old_offset;
344			let abs_end = abs_start + old_length;
345			row.0.make_mut().splice(abs_start..abs_end, iter::empty());
346
347			for (i, off, len) in refs_to_update {
348				let new_off = off - old_length;
349				self.write_dynamic_ref(row, i, new_off, len);
350			}
351		}
352	}
353
354	pub fn allocate(&self) -> EncodedRow {
355		let (total_size, max_align) = self.get_cached_layout();
356		let layout = Layout::from_size_align(total_size, max_align).unwrap();
357		unsafe {
358			let ptr = alloc_zeroed(layout);
359			if ptr.is_null() {
360				handle_alloc_error(layout);
361			}
362			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
363			let mut row = EncodedRow(CowVec::new(vec));
364			row.set_fingerprint(self.fingerprint);
365			row
366		}
367	}
368
369	fn align_up(offset: usize, align: usize) -> usize {
370		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
371	}
372
373	pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
374		self.remove_dynamic_data(row, index);
375		row.set_valid(index, false);
376	}
377
378	pub fn testing(types: &[Type]) -> Self {
379		RowShape::new(
380			types.iter()
381				.enumerate()
382				.map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
383				.collect(),
384		)
385	}
386
387	pub fn operator_state() -> Self {
388		OPERATOR_STATE_SHAPE.clone()
389	}
390}
391
392static OPERATOR_STATE_SHAPE: LazyLock<RowShape> =
393	LazyLock::new(|| RowShape::new(vec![RowShapeField::unconstrained("state", Type::Blob)]));
394
395#[cfg(test)]
396mod tests {
397	use super::*;
398
399	#[test]
400	fn test_shape_creation() {
401		let fields = vec![
402			RowShapeField::unconstrained("id", Type::Int8),
403			RowShapeField::unconstrained("name", Type::Utf8),
404			RowShapeField::unconstrained("active", Type::Boolean),
405		];
406
407		let shape = RowShape::new(fields);
408
409		assert_eq!(shape.field_count(), 3);
410		assert_eq!(shape.fields()[0].name, "id");
411		assert_eq!(shape.fields()[1].name, "name");
412		assert_eq!(shape.fields()[2].name, "active");
413	}
414
415	#[test]
416	fn test_shape_fingerprint_deterministic() {
417		let fields1 = vec![
418			RowShapeField::unconstrained("a", Type::Int4),
419			RowShapeField::unconstrained("b", Type::Utf8),
420		];
421
422		let fields2 = vec![
423			RowShapeField::unconstrained("a", Type::Int4),
424			RowShapeField::unconstrained("b", Type::Utf8),
425		];
426
427		let shape1 = RowShape::new(fields1);
428		let shape2 = RowShape::new(fields2);
429
430		assert_eq!(shape1.fingerprint(), shape2.fingerprint());
431	}
432
433	#[test]
434	fn test_shape_fingerprint_different_for_different_shapes() {
435		let fields1 = vec![RowShapeField::unconstrained("a", Type::Int4)];
436		let fields2 = vec![RowShapeField::unconstrained("a", Type::Int8)];
437
438		let shape1 = RowShape::new(fields1);
439		let shape2 = RowShape::new(fields2);
440
441		assert_ne!(shape1.fingerprint(), shape2.fingerprint());
442	}
443
444	#[test]
445	fn test_find_field() {
446		let fields = vec![
447			RowShapeField::unconstrained("id", Type::Int8),
448			RowShapeField::unconstrained("name", Type::Utf8),
449		];
450
451		let shape = RowShape::new(fields);
452
453		assert!(shape.find_field("id").is_some());
454		assert!(shape.find_field("name").is_some());
455		assert!(shape.find_field("missing").is_none());
456	}
457}