Skip to main content

reifydb_core/encoded/shape/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4pub mod consolidate;
5pub mod evolution;
6pub mod fingerprint;
7mod from;
8
9use std::{
10	alloc::{Layout, alloc_zeroed, handle_alloc_error},
11	fmt,
12	fmt::Debug,
13	iter,
14	ops::Deref,
15	ptr,
16	sync::{Arc, OnceLock},
17};
18
19use reifydb_type::{
20	util::cowvec::CowVec,
21	value::{constraint::TypeConstraint, r#type::Type},
22};
23use serde::{Deserialize, Serialize};
24
25use super::row::EncodedRow;
26use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
27
28/// Size of shape header (fingerprint) in bytes
29pub const SHAPE_HEADER_SIZE: usize = 24;
30
31/// Constants for packed u128 dynamic references (used by Int, Uint, Decimal)
32const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
33const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
34const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
35const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
36
37/// A field within a shape
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct RowShapeField {
40	/// Field name
41	pub name: String,
42	/// Field type constraint (includes base type and optional constraints like MaxBytes)
43	pub constraint: TypeConstraint,
44	/// Byte offset within the encoded row
45	pub offset: u32,
46	/// Size in bytes
47	pub size: u32,
48	/// Alignment requirement
49	pub align: u8,
50}
51
52impl RowShapeField {
53	/// Create a new shape field with a type constraint.
54	/// Offset, size, and alignment are computed when added to a RowShape.
55	pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
56		let storage_type = constraint.storage_type();
57		Self {
58			name: name.into(),
59			constraint,
60			offset: 0,
61			size: storage_type.size() as u32,
62			align: storage_type.alignment() as u8,
63		}
64	}
65
66	/// Create a new shape field with an unconstrained type.
67	/// Convenience method for the common case of no constraints.
68	pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
69		Self::new(name, TypeConstraint::unconstrained(field_type))
70	}
71}
72
73/// A shape describing the structure of encoded row data.
74pub struct RowShape(Arc<Inner>);
75
76/// Inner data for a shape describing the structure of encoded row data.
77///
78/// Shapes are immutable and content-addressable via their fingerprint.
79/// The same field configuration always produces the same fingerprint,
80/// enabling shape deduplication in the registry.
81#[derive(Debug, Serialize, Deserialize)]
82pub struct Inner {
83	/// Content-addressable fingerprint (hash of canonical field representation)
84	pub fingerprint: RowShapeFingerprint,
85	/// Fields in definition order
86	pub fields: Vec<RowShapeField>,
87	/// Cached layout computation (total_size, max_align) - computed once on first use
88	#[serde(skip)]
89	cached_layout: OnceLock<(usize, usize)>,
90}
91
92impl PartialEq for Inner {
93	fn eq(&self, other: &Self) -> bool {
94		self.fingerprint == other.fingerprint && self.fields == other.fields
95	}
96}
97
98impl Eq for Inner {}
99
100impl Deref for RowShape {
101	type Target = Inner;
102
103	fn deref(&self) -> &Self::Target {
104		&self.0
105	}
106}
107
108impl Clone for RowShape {
109	fn clone(&self) -> Self {
110		Self(self.0.clone())
111	}
112}
113
114impl Debug for RowShape {
115	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116		self.0.fmt(f)
117	}
118}
119
120impl PartialEq for RowShape {
121	fn eq(&self, other: &Self) -> bool {
122		self.0.as_ref() == other.0.as_ref()
123	}
124}
125
126impl Eq for RowShape {}
127
128impl RowShape {
129	/// Create a new shape from a list of fields.
130	///
131	/// This computes the memory layout (offsets, alignment) and fingerprint.
132	pub fn new(fields: Vec<RowShapeField>) -> Self {
133		let fields = Self::compute_layout(fields);
134		let fingerprint = compute_fingerprint(&fields);
135
136		Self(Arc::new(Inner {
137			fingerprint,
138			fields,
139			cached_layout: OnceLock::new(),
140		}))
141	}
142
143	/// Create a shape from pre-computed fields and fingerprint.
144	/// Used when loading from storage.
145	pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
146		Self(Arc::new(Inner {
147			fingerprint,
148			fields,
149			cached_layout: OnceLock::new(),
150		}))
151	}
152
153	/// Get the shape's fingerprint
154	pub fn fingerprint(&self) -> RowShapeFingerprint {
155		self.fingerprint
156	}
157
158	/// Get the fields in this shape
159	pub fn fields(&self) -> &[RowShapeField] {
160		&self.fields
161	}
162
163	/// Get the number of fields
164	pub fn field_count(&self) -> usize {
165		self.fields.len()
166	}
167
168	/// Find a field by name
169	pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
170		self.fields.iter().find(|f| f.name == name)
171	}
172
173	/// Find field index by name
174	pub fn find_field_index(&self, name: &str) -> Option<usize> {
175		self.fields.iter().position(|f| f.name == name)
176	}
177
178	/// Find a field by index
179	pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
180		self.fields.get(index)
181	}
182
183	/// Get field name by index
184	pub fn get_field_name(&self, index: usize) -> Option<&str> {
185		self.fields.get(index).map(|f| f.name.as_str())
186	}
187
188	/// Get all field names as an iterator
189	pub fn field_names(&self) -> impl Iterator<Item = &str> {
190		self.fields.iter().map(|f| f.name.as_str())
191	}
192
193	/// Compute memory layout for fields.
194	/// Returns the fields with computed offsets and the total row size.
195	fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
196		// Start offset calculation from where data section begins (after header + bitvec)
197		let bitvec_size = fields.len().div_ceil(8);
198		let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
199
200		for field in fields.iter_mut() {
201			let storage_type = field.constraint.storage_type();
202			field.size = storage_type.size() as u32;
203			field.align = storage_type.alignment() as u8;
204
205			// Align offset
206			let align = field.align as u32;
207			if align > 0 {
208				offset = (offset + align - 1) & !(align - 1);
209			}
210
211			field.offset = offset;
212			offset += field.size;
213		}
214
215		fields
216	}
217
218	/// Size of the bitvec section in bytes
219	pub fn bitvec_size(&self) -> usize {
220		self.fields.len().div_ceil(8)
221	}
222
223	/// Offset where field data starts (after header and bitvec)
224	pub fn data_offset(&self) -> usize {
225		SHAPE_HEADER_SIZE + self.bitvec_size()
226	}
227
228	/// Compute and cache the layout (total_size, max_align).
229	/// This is called once and the result is cached for subsequent calls.
230	fn get_cached_layout(&self) -> (usize, usize) {
231		*self.cached_layout.get_or_init(|| {
232			// Compute max_align
233			let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
234
235			// Compute total_size
236			let total_size = if self.fields.is_empty() {
237				SHAPE_HEADER_SIZE + self.bitvec_size()
238			} else {
239				let last_field = &self.fields[self.fields.len() - 1];
240				let end = last_field.offset as usize + last_field.size as usize;
241				// Align to maximum field alignment
242				Self::align_up(end, max_align)
243			};
244
245			(total_size, max_align)
246		})
247	}
248
249	/// Total size of the static section
250	pub fn total_static_size(&self) -> usize {
251		self.get_cached_layout().0
252	}
253
254	/// Start of the dynamic section
255	pub fn dynamic_section_start(&self) -> usize {
256		self.total_static_size()
257	}
258
259	/// Size of the dynamic section
260	pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
261		row.len().saturating_sub(self.total_static_size())
262	}
263
264	/// Returns (offset, length) in the dynamic section for a defined dynamic field.
265	/// Returns None if field is undefined, static-only, or uses inline storage.
266	pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
267		if !row.is_defined(index) {
268			return None;
269		}
270		let field = &self.fields()[index];
271		match field.constraint.get_type().inner_type() {
272			Type::Utf8 | Type::Blob | Type::Any => {
273				let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
274				let offset =
275					u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
276						as usize;
277				let length =
278					u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
279						as usize;
280				Some((offset, length))
281			}
282			Type::Int | Type::Uint | Type::Decimal => {
283				let packed = unsafe {
284					(row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
285				};
286				let packed = u128::from_le(packed);
287				if packed & PACKED_MODE_MASK != 0 {
288					let offset = (packed & PACKED_OFFSET_MASK) as usize;
289					let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
290					Some((offset, length))
291				} else {
292					None // inline storage
293				}
294			}
295			_ => None,
296		}
297	}
298
299	/// Writes a dynamic section reference for the given field in its type-appropriate format.
300	pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
301		let field = &self.fields()[index];
302		match field.constraint.get_type().inner_type() {
303			Type::Utf8 | Type::Blob | Type::Any => {
304				let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
305				ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
306				ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
307			}
308			Type::Int | Type::Uint | Type::Decimal => {
309				let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
310				let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
311				let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
312				unsafe {
313					ptr::write_unaligned(
314						row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
315						packed.to_le(),
316					);
317				}
318			}
319			_ => {}
320		}
321	}
322
323	/// Replace dynamic data for a field. Handles both first-set (append) and update (splice).
324	/// On update: splices old bytes out, inserts new bytes, adjusts all other dynamic refs.
325	pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
326		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
327			let delta = new_data.len() as isize - old_length as isize;
328
329			// Collect refs that need adjusting BEFORE splice
330			let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
331				self.fields()
332					.iter()
333					.enumerate()
334					.filter(|(i, _)| *i != index && row.is_defined(*i))
335					.filter_map(|(i, _)| {
336						self.read_dynamic_ref(row, i)
337							.filter(|(off, _)| *off > old_offset)
338							.map(|(off, len)| (i, off, len))
339					})
340					.collect()
341			} else {
342				vec![]
343			};
344
345			// Splice bytes in the dynamic section
346			let dynamic_start = self.dynamic_section_start();
347			let abs_start = dynamic_start + old_offset;
348			let abs_end = abs_start + old_length;
349			row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
350
351			// Update this field's reference (same offset, new length)
352			self.write_dynamic_ref(row, index, old_offset, new_data.len());
353
354			// Adjust other dynamic references by the size delta
355			for (i, off, len) in refs_to_update {
356				let new_off = (off as isize + delta) as usize;
357				self.write_dynamic_ref(row, i, new_off, len);
358			}
359		} else {
360			// First set or transitioning from inline - append to dynamic section
361			let dynamic_offset = self.dynamic_section_size(row);
362			row.0.extend_from_slice(new_data);
363			self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
364		}
365		row.set_valid(index, true);
366	}
367
368	/// Remove dynamic data for a field without setting new data.
369	/// Used for dynamic→inline transitions in Int/Uint.
370	pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
371		if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
372			// Collect refs that need adjusting
373			let refs_to_update: Vec<(usize, usize, usize)> = self
374				.fields()
375				.iter()
376				.enumerate()
377				.filter(|(i, _)| *i != index && row.is_defined(*i))
378				.filter_map(|(i, _)| {
379					self.read_dynamic_ref(row, i)
380						.filter(|(off, _)| *off > old_offset)
381						.map(|(off, len)| (i, off, len))
382				})
383				.collect();
384
385			// Remove bytes
386			let dynamic_start = self.dynamic_section_start();
387			let abs_start = dynamic_start + old_offset;
388			let abs_end = abs_start + old_length;
389			row.0.make_mut().splice(abs_start..abs_end, iter::empty());
390
391			// Adjust other references
392			for (i, off, len) in refs_to_update {
393				let new_off = off - old_length;
394				self.write_dynamic_ref(row, i, new_off, len);
395			}
396		}
397	}
398
399	/// Allocate a new encoded row
400	pub fn allocate(&self) -> EncodedRow {
401		let (total_size, max_align) = self.get_cached_layout();
402		let layout = Layout::from_size_align(total_size, max_align).unwrap();
403		unsafe {
404			let ptr = alloc_zeroed(layout);
405			if ptr.is_null() {
406				handle_alloc_error(layout);
407			}
408			let vec = Vec::from_raw_parts(ptr, total_size, total_size);
409			let mut row = EncodedRow(CowVec::new(vec));
410			row.set_fingerprint(self.fingerprint);
411			row
412		}
413	}
414
415	fn align_up(offset: usize, align: usize) -> usize {
416		(offset + align).saturating_sub(1) & !(align.saturating_sub(1))
417	}
418
419	/// Set a field as undefined (not set)
420	pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
421		self.remove_dynamic_data(row, index);
422		row.set_valid(index, false);
423	}
424
425	/// Create a shape from a list of types.
426	/// Fields are named f0, f1, f2, etc. and have unconstrained types.
427	/// Useful for tests and simple state shapes.
428	pub fn testing(types: &[Type]) -> Self {
429		RowShape::new(
430			types.iter()
431				.enumerate()
432				.map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
433				.collect(),
434		)
435	}
436}
437
438#[cfg(test)]
439mod tests {
440	use super::*;
441
442	#[test]
443	fn test_shape_creation() {
444		let fields = vec![
445			RowShapeField::unconstrained("id", Type::Int8),
446			RowShapeField::unconstrained("name", Type::Utf8),
447			RowShapeField::unconstrained("active", Type::Boolean),
448		];
449
450		let shape = RowShape::new(fields);
451
452		assert_eq!(shape.field_count(), 3);
453		assert_eq!(shape.fields()[0].name, "id");
454		assert_eq!(shape.fields()[1].name, "name");
455		assert_eq!(shape.fields()[2].name, "active");
456	}
457
458	#[test]
459	fn test_shape_fingerprint_deterministic() {
460		let fields1 = vec![
461			RowShapeField::unconstrained("a", Type::Int4),
462			RowShapeField::unconstrained("b", Type::Utf8),
463		];
464
465		let fields2 = vec![
466			RowShapeField::unconstrained("a", Type::Int4),
467			RowShapeField::unconstrained("b", Type::Utf8),
468		];
469
470		let shape1 = RowShape::new(fields1);
471		let shape2 = RowShape::new(fields2);
472
473		assert_eq!(shape1.fingerprint(), shape2.fingerprint());
474	}
475
476	#[test]
477	fn test_shape_fingerprint_different_for_different_shapes() {
478		let fields1 = vec![RowShapeField::unconstrained("a", Type::Int4)];
479		let fields2 = vec![RowShapeField::unconstrained("a", Type::Int8)];
480
481		let shape1 = RowShape::new(fields1);
482		let shape2 = RowShape::new(fields2);
483
484		assert_ne!(shape1.fingerprint(), shape2.fingerprint());
485	}
486
487	#[test]
488	fn test_find_field() {
489		let fields = vec![
490			RowShapeField::unconstrained("id", Type::Int8),
491			RowShapeField::unconstrained("name", Type::Utf8),
492		];
493
494		let shape = RowShape::new(fields);
495
496		assert!(shape.find_field("id").is_some());
497		assert!(shape.find_field("name").is_some());
498		assert!(shape.find_field("missing").is_none());
499	}
500}