Skip to main content

reifydb_core/encoded/shape/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! RowShape definitions for encoding row data with consistent field layouts.
5//!
6//! A `RowShape` describes the structure of encoded row data, including:
7//! - Field names, types, and order
8//! - Memory layout (offsets, sizes, alignment)
9//! - A content-addressable fingerprint for deduplication
10
11pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17	alloc::{Layout, alloc_zeroed, handle_alloc_error},
18	fmt,
19	fmt::Debug,
20	iter,
21	ops::Deref,
22	ptr,
23	sync::{Arc, OnceLock},
24};
25
26use reifydb_type::{
27	util::cowvec::CowVec,
28	value::{constraint::TypeConstraint, r#type::Type},
29};
30use serde::{Deserialize, Serialize};
31
32use super::row::EncodedRow;
33use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
34
35/// Size of shape header (fingerprint) in bytes
36pub const SHAPE_HEADER_SIZE: usize = 24;
37
38/// Constants for packed u128 dynamic references (used by Int, Uint, Decimal)
39const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
40const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
41const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
42const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
43
44/// A field within a shape
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct RowShapeField {
47	/// Field name
48	pub name: String,
49	/// Field type constraint (includes base type and optional constraints like MaxBytes)
50	pub constraint: TypeConstraint,
51	/// Byte offset within the encoded row
52	pub offset: u32,
53	/// Size in bytes
54	pub size: u32,
55	/// Alignment requirement
56	pub align: u8,
57}
58
59impl RowShapeField {
60	/// Create a new shape field with a type constraint.
61	/// Offset, size, and alignment are computed when added to a RowShape.
62	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
63		let storage_type = constraint.storage_type();
64		Self {
65			name: name.into(),
66			constraint,
67			offset: 0,
68			size: storage_type.size() as u32,
69			align: storage_type.alignment() as u8,
70		}
71	}
72
73	/// Create a new shape field with an unconstrained type.
74	/// Convenience method for the common case of no constraints.
75	pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
76		Self::new(name, TypeConstraint::unconstrained(field_type))
77	}
78}
79
80/// A shape describing the structure of encoded row data.
81pub struct RowShape(Arc<Inner>);
82
83/// Inner data for a shape describing the structure of encoded row data.
84///
85/// Shapes are immutable and content-addressable via their fingerprint.
86/// The same field configuration always produces the same fingerprint,
87/// enabling shape deduplication in the registry.
88#[derive(Debug, Serialize, Deserialize)]
89pub struct Inner {
90	/// Content-addressable fingerprint (hash of canonical field representation)
91	pub fingerprint: RowShapeFingerprint,
92	/// Fields in definition order
93	pub fields: Vec<RowShapeField>,
94	/// Cached layout computation (total_size, max_align) - computed once on first use
95	#[serde(skip)]
96	cached_layout: OnceLock<(usize, usize)>,
97}
98
99impl PartialEq for Inner {
100	fn eq(&self, other: &Self) -> bool {
101		self.fingerprint == other.fingerprint && self.fields == other.fields
102	}
103}
104
105impl Eq for Inner {}
106
107impl Deref for RowShape {
108	type Target = Inner;
109
110	fn deref(&self) -> &Self::Target {
111		&self.0
112	}
113}
114
115impl Clone for RowShape {
116	fn clone(&self) -> Self {
117		Self(self.0.clone())
118	}
119}
120
121impl Debug for RowShape {
122	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123		self.0.fmt(f)
124	}
125}
126
127impl PartialEq for RowShape {
128	fn eq(&self, other: &Self) -> bool {
129		self.0.as_ref() == other.0.as_ref()
130	}
131}
132
133impl Eq for RowShape {}
134
135impl RowShape {
136	/// Create a new shape from a list of fields.
137	///
138	/// This computes the memory layout (offsets, alignment) and fingerprint.
139	pub fn new(fields: Vec<RowShapeField>) -> Self {
140		let fields = Self::compute_layout(fields);
141		let fingerprint = compute_fingerprint(&fields);
142
143		Self(Arc::new(Inner {
144			fingerprint,
145			fields,
146			cached_layout: OnceLock::new(),
147		}))
148	}
149
150	/// Create a shape from pre-computed fields and fingerprint.
151	/// Used when loading from storage.
152	pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
153		Self(Arc::new(Inner {
154			fingerprint,
155			fields,
156			cached_layout: OnceLock::new(),
157		}))
158	}
159
160	/// Get the shape's fingerprint
161	pub fn fingerprint(&self) -> RowShapeFingerprint {
162		self.fingerprint
163	}
164
165	/// Get the fields in this shape
166	pub fn fields(&self) -> &[RowShapeField] {
167		&self.fields
168	}
169
170	/// Get the number of fields
171	pub fn field_count(&self) -> usize {
172		self.fields.len()
173	}
174
175	/// Find a field by name
176	pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
177		self.fields.iter().find(|f| f.name == name)
178	}
179
180	/// Find field index by name
181	pub fn find_field_index(&self, name: &str) -> Option<usize> {
182		self.fields.iter().position(|f| f.name == name)
183	}
184
185	/// Find a field by index
186	pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
187		self.fields.get(index)
188	}
189
190	/// Get field name by index
191	pub fn get_field_name(&self, index: usize) -> Option<&str> {
192		self.fields.get(index).map(|f| f.name.as_str())
193	}
194
195	/// Get all field names as an iterator
196	pub fn field_names(&self) -> impl Iterator<Item = &str> {
197		self.fields.iter().map(|f| f.name.as_str())
198	}
199
200	/// Compute memory layout for fields.
201	/// Returns the fields with computed offsets and the total row size.
202	fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
203		// Start offset calculation from where data section begins (after header + bitvec)
204		let bitvec_size = fields.len().div_ceil(8);
205		let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
206
207		for field in fields.iter_mut() {
208			let storage_type = field.constraint.storage_type();
209			field.size = storage_type.size() as u32;
210			field.align = storage_type.alignment() as u8;
211
212			// Align offset
213			let align = field.align as u32;
214			if align > 0 {
215				offset = (offset + align - 1) & !(align - 1);
216			}
217
218			field.offset = offset;
219			offset += field.size;
220		}
221
222		fields
223	}
224
225	/// Size of the bitvec section in bytes
226	pub fn bitvec_size(&self) -> usize {
227		self.fields.len().div_ceil(8)
228	}
229
230	/// Offset where field data starts (after header and bitvec)
231	pub fn data_offset(&self) -> usize {
232		SHAPE_HEADER_SIZE + self.bitvec_size()
233	}
234
235	/// Compute and cache the layout (total_size, max_align).
236	/// This is called once and the result is cached for subsequent calls.
237	fn get_cached_layout(&self) -> (usize, usize) {
238		*self.cached_layout.get_or_init(|| {
239			// Compute max_align
240			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
241
242			// Compute total_size
243			let total_size = if self.fields.is_empty() {
244				SHAPE_HEADER_SIZE + self.bitvec_size()
245			} else {
246				let last_field = &self.fields[self.fields.len() - 1];
247				let end = last_field.offset as usize + last_field.size as usize;
248				// Align to maximum field alignment
249				Self::align_up(end, max_align)
250			};
251
252			(total_size, max_align)
253		})
254	}
255
256	/// Total size of the static section
257	pub fn total_static_size(&self) -> usize {
258		self.get_cached_layout().0
259	}
260
261	/// Start of the dynamic section
262	pub fn dynamic_section_start(&self) -> usize {
263		self.total_static_size()
264	}
265
266	/// Size of the dynamic section
267	pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
268		row.len().saturating_sub(self.total_static_size())
269	}
270
271	/// Returns (offset, length) in the dynamic section for a defined dynamic field.
272	/// Returns None if field is undefined, static-only, or uses inline storage.
273	pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
274		if !row.is_defined(index) {
275			return None;
276		}
277		let field = &self.fields()[index];
278		match field.constraint.get_type().inner_type() {
279			Type::Utf8 | Type::Blob | Type::Any => {
280				let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
281				let offset =
282					u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
283						as usize;
284				let length =
285					u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
286						as usize;
287				Some((offset, length))
288			}
289			Type::Int | Type::Uint | Type::Decimal => {
290				let packed = unsafe {
291					(row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
292				};
293				let packed = u128::from_le(packed);
294				if packed & PACKED_MODE_MASK != 0 {
295					let offset = (packed & PACKED_OFFSET_MASK) as usize;
296					let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
297					Some((offset, length))
298				} else {
299					None // inline storage
300				}
301			}
302			_ => None,
303		}
304	}
305
306	/// Writes a dynamic section reference for the given field in its type-appropriate format.
307	pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
308		let field = &self.fields()[index];
309		match field.constraint.get_type().inner_type() {
310			Type::Utf8 | Type::Blob | Type::Any => {
311				let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
312				ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
313				ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
314			}
315			Type::Int | Type::Uint | Type::Decimal => {
316				let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
317				let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
318				let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
319				unsafe {
320					ptr::write_unaligned(
321						row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
322						packed.to_le(),
323					);
324				}
325			}
326			_ => {}
327		}
328	}
329
330	/// Replace dynamic data for a field. Handles both first-set (append) and update (splice).
331	/// On update: splices old bytes out, inserts new bytes, adjusts all other dynamic refs.
332	pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
333		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
334			let delta = new_data.len() as isize - old_length as isize;
335
336			// Collect refs that need adjusting BEFORE splice
337			let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
338				self.fields()
339					.iter()
340					.enumerate()
341					.filter(|(i, _)| *i != index && row.is_defined(*i))
342					.filter_map(|(i, _)| {
343						self.read_dynamic_ref(row, i)
344							.filter(|(off, _)| *off > old_offset)
345							.map(|(off, len)| (i, off, len))
346					})
347					.collect()
348			} else {
349				vec![]
350			};
351
352			// Splice bytes in the dynamic section
353			let dynamic_start = self.dynamic_section_start();
354			let abs_start = dynamic_start + old_offset;
355			let abs_end = abs_start + old_length;
356			row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
357
358			// Update this field's reference (same offset, new length)
359			self.write_dynamic_ref(row, index, old_offset, new_data.len());
360
361			// Adjust other dynamic references by the size delta
362			for (i, off, len) in refs_to_update {
363				let new_off = (off as isize + delta) as usize;
364				self.write_dynamic_ref(row, i, new_off, len);
365			}
366		} else {
367			// First set or transitioning from inline — append to dynamic section
368			let dynamic_offset = self.dynamic_section_size(row);
369			row.0.extend_from_slice(new_data);
370			self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
371		}
372		row.set_valid(index, true);
373	}
374
375	/// Remove dynamic data for a field without setting new data.
376	/// Used for dynamic→inline transitions in Int/Uint.
377	pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
378		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
379			// Collect refs that need adjusting
380			let refs_to_update: Vec<(usize, usize, usize)> = self
381				.fields()
382				.iter()
383				.enumerate()
384				.filter(|(i, _)| *i != index && row.is_defined(*i))
385				.filter_map(|(i, _)| {
386					self.read_dynamic_ref(row, i)
387						.filter(|(off, _)| *off > old_offset)
388						.map(|(off, len)| (i, off, len))
389				})
390				.collect();
391
392			// Remove bytes
393			let dynamic_start = self.dynamic_section_start();
394			let abs_start = dynamic_start + old_offset;
395			let abs_end = abs_start + old_length;
396			row.0.make_mut().splice(abs_start..abs_end, iter::empty());
397
398			// Adjust other references
399			for (i, off, len) in refs_to_update {
400				let new_off = off - old_length;
401				self.write_dynamic_ref(row, i, new_off, len);
402			}
403		}
404	}
405
406	/// Allocate a new encoded row
407	pub fn allocate(&self) -> EncodedRow {
408		let (total_size, max_align) = self.get_cached_layout();
409		let layout = Layout::from_size_align(total_size, max_align).unwrap();
410		unsafe {
411			let ptr = alloc_zeroed(layout);
412			if ptr.is_null() {
413				handle_alloc_error(layout);
414			}
415			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
416			let mut row = EncodedRow(CowVec::new(vec));
417			row.set_fingerprint(self.fingerprint);
418			row
419		}
420	}
421
422	fn align_up(offset: usize, align: usize) -> usize {
423		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
424	}
425
426	/// Set a field as undefined (not set)
427	pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
428		self.remove_dynamic_data(row, index);
429		row.set_valid(index, false);
430	}
431
432	/// Create a shape from a list of types.
433	/// Fields are named f0, f1, f2, etc. and have unconstrained types.
434	/// Useful for tests and simple state shapes.
435	pub fn testing(types: &[Type]) -> Self {
436		RowShape::new(
437			types.iter()
438				.enumerate()
439				.map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
440				.collect(),
441		)
442	}
443}
444
445#[cfg(test)]
446mod tests {
447	use super::*;
448
449	#[test]
450	fn test_shape_creation() {
451		let fields = vec![
452			RowShapeField::unconstrained("id", Type::Int8),
453			RowShapeField::unconstrained("name", Type::Utf8),
454			RowShapeField::unconstrained("active", Type::Boolean),
455		];
456
457		let shape = RowShape::new(fields);
458
459		assert_eq!(shape.field_count(), 3);
460		assert_eq!(shape.fields()[0].name, "id");
461		assert_eq!(shape.fields()[1].name, "name");
462		assert_eq!(shape.fields()[2].name, "active");
463	}
464
465	#[test]
466	fn test_shape_fingerprint_deterministic() {
467		let fields1 = vec![
468			RowShapeField::unconstrained("a", Type::Int4),
469			RowShapeField::unconstrained("b", Type::Utf8),
470		];
471
472		let fields2 = vec![
473			RowShapeField::unconstrained("a", Type::Int4),
474			RowShapeField::unconstrained("b", Type::Utf8),
475		];
476
477		let shape1 = RowShape::new(fields1);
478		let shape2 = RowShape::new(fields2);
479
480		assert_eq!(shape1.fingerprint(), shape2.fingerprint());
481	}
482
483	#[test]
484	fn test_shape_fingerprint_different_for_different_shapes() {
485		let fields1 = vec![RowShapeField::unconstrained("a", Type::Int4)];
486		let fields2 = vec![RowShapeField::unconstrained("a", Type::Int8)];
487
488		let shape1 = RowShape::new(fields1);
489		let shape2 = RowShape::new(fields2);
490
491		assert_ne!(shape1.fingerprint(), shape2.fingerprint());
492	}
493
494	#[test]
495	fn test_find_field() {
496		let fields = vec![
497			RowShapeField::unconstrained("id", Type::Int8),
498			RowShapeField::unconstrained("name", Type::Utf8),
499		];
500
501		let shape = RowShape::new(fields);
502
503		assert!(shape.find_field("id").is_some());
504		assert!(shape.find_field("name").is_some());
505		assert!(shape.find_field("missing").is_none());
506	}
507}