reifydb_core/encoded/shape/
mod.rs1pub mod consolidate;
12pub mod evolution;
13pub mod fingerprint;
14mod from;
15
16use std::{
17 alloc::{Layout, alloc_zeroed, handle_alloc_error},
18 fmt,
19 fmt::Debug,
20 iter,
21 ops::Deref,
22 ptr,
23 sync::{Arc, OnceLock},
24};
25
26use reifydb_type::{
27 util::cowvec::CowVec,
28 value::{constraint::TypeConstraint, r#type::Type},
29};
30use serde::{Deserialize, Serialize};
31
32use super::row::EncodedRow;
33use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
34
35pub const SHAPE_HEADER_SIZE: usize = 24;
37
38const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
40const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
41const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
42const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
46pub struct RowShapeField {
47 pub name: String,
49 pub constraint: TypeConstraint,
51 pub offset: u32,
53 pub size: u32,
55 pub align: u8,
57}
58
59impl RowShapeField {
60 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
63 let storage_type = constraint.storage_type();
64 Self {
65 name: name.into(),
66 constraint,
67 offset: 0,
68 size: storage_type.size() as u32,
69 align: storage_type.alignment() as u8,
70 }
71 }
72
73 pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
76 Self::new(name, TypeConstraint::unconstrained(field_type))
77 }
78}
79
80pub struct RowShape(Arc<Inner>);
82
83#[derive(Debug, Serialize, Deserialize)]
89pub struct Inner {
90 pub fingerprint: RowShapeFingerprint,
92 pub fields: Vec<RowShapeField>,
94 #[serde(skip)]
96 cached_layout: OnceLock<(usize, usize)>,
97}
98
99impl PartialEq for Inner {
100 fn eq(&self, other: &Self) -> bool {
101 self.fingerprint == other.fingerprint && self.fields == other.fields
102 }
103}
104
105impl Eq for Inner {}
106
107impl Deref for RowShape {
108 type Target = Inner;
109
110 fn deref(&self) -> &Self::Target {
111 &self.0
112 }
113}
114
115impl Clone for RowShape {
116 fn clone(&self) -> Self {
117 Self(self.0.clone())
118 }
119}
120
121impl Debug for RowShape {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 self.0.fmt(f)
124 }
125}
126
127impl PartialEq for RowShape {
128 fn eq(&self, other: &Self) -> bool {
129 self.0.as_ref() == other.0.as_ref()
130 }
131}
132
133impl Eq for RowShape {}
134
135impl RowShape {
136 pub fn new(fields: Vec<RowShapeField>) -> Self {
140 let fields = Self::compute_layout(fields);
141 let fingerprint = compute_fingerprint(&fields);
142
143 Self(Arc::new(Inner {
144 fingerprint,
145 fields,
146 cached_layout: OnceLock::new(),
147 }))
148 }
149
150 pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
153 Self(Arc::new(Inner {
154 fingerprint,
155 fields,
156 cached_layout: OnceLock::new(),
157 }))
158 }
159
160 pub fn fingerprint(&self) -> RowShapeFingerprint {
162 self.fingerprint
163 }
164
165 pub fn fields(&self) -> &[RowShapeField] {
167 &self.fields
168 }
169
170 pub fn field_count(&self) -> usize {
172 self.fields.len()
173 }
174
175 pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
177 self.fields.iter().find(|f| f.name == name)
178 }
179
180 pub fn find_field_index(&self, name: &str) -> Option<usize> {
182 self.fields.iter().position(|f| f.name == name)
183 }
184
185 pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
187 self.fields.get(index)
188 }
189
190 pub fn get_field_name(&self, index: usize) -> Option<&str> {
192 self.fields.get(index).map(|f| f.name.as_str())
193 }
194
195 pub fn field_names(&self) -> impl Iterator<Item = &str> {
197 self.fields.iter().map(|f| f.name.as_str())
198 }
199
200 fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
203 let bitvec_size = fields.len().div_ceil(8);
205 let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
206
207 for field in fields.iter_mut() {
208 let storage_type = field.constraint.storage_type();
209 field.size = storage_type.size() as u32;
210 field.align = storage_type.alignment() as u8;
211
212 let align = field.align as u32;
214 if align > 0 {
215 offset = (offset + align - 1) & !(align - 1);
216 }
217
218 field.offset = offset;
219 offset += field.size;
220 }
221
222 fields
223 }
224
225 pub fn bitvec_size(&self) -> usize {
227 self.fields.len().div_ceil(8)
228 }
229
230 pub fn data_offset(&self) -> usize {
232 SHAPE_HEADER_SIZE + self.bitvec_size()
233 }
234
235 fn get_cached_layout(&self) -> (usize, usize) {
238 *self.cached_layout.get_or_init(|| {
239 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
241
242 let total_size = if self.fields.is_empty() {
244 SHAPE_HEADER_SIZE + self.bitvec_size()
245 } else {
246 let last_field = &self.fields[self.fields.len() - 1];
247 let end = last_field.offset as usize + last_field.size as usize;
248 Self::align_up(end, max_align)
250 };
251
252 (total_size, max_align)
253 })
254 }
255
256 pub fn total_static_size(&self) -> usize {
258 self.get_cached_layout().0
259 }
260
261 pub fn dynamic_section_start(&self) -> usize {
263 self.total_static_size()
264 }
265
266 pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
268 row.len().saturating_sub(self.total_static_size())
269 }
270
271 pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
274 if !row.is_defined(index) {
275 return None;
276 }
277 let field = &self.fields()[index];
278 match field.constraint.get_type().inner_type() {
279 Type::Utf8 | Type::Blob | Type::Any => {
280 let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
281 let offset =
282 u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
283 as usize;
284 let length =
285 u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
286 as usize;
287 Some((offset, length))
288 }
289 Type::Int | Type::Uint | Type::Decimal => {
290 let packed = unsafe {
291 (row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
292 };
293 let packed = u128::from_le(packed);
294 if packed & PACKED_MODE_MASK != 0 {
295 let offset = (packed & PACKED_OFFSET_MASK) as usize;
296 let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
297 Some((offset, length))
298 } else {
299 None }
301 }
302 _ => None,
303 }
304 }
305
306 pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
308 let field = &self.fields()[index];
309 match field.constraint.get_type().inner_type() {
310 Type::Utf8 | Type::Blob | Type::Any => {
311 let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
312 ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
313 ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
314 }
315 Type::Int | Type::Uint | Type::Decimal => {
316 let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
317 let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
318 let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
319 unsafe {
320 ptr::write_unaligned(
321 row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
322 packed.to_le(),
323 );
324 }
325 }
326 _ => {}
327 }
328 }
329
330 pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
333 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
334 let delta = new_data.len() as isize - old_length as isize;
335
336 let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
338 self.fields()
339 .iter()
340 .enumerate()
341 .filter(|(i, _)| *i != index && row.is_defined(*i))
342 .filter_map(|(i, _)| {
343 self.read_dynamic_ref(row, i)
344 .filter(|(off, _)| *off > old_offset)
345 .map(|(off, len)| (i, off, len))
346 })
347 .collect()
348 } else {
349 vec![]
350 };
351
352 let dynamic_start = self.dynamic_section_start();
354 let abs_start = dynamic_start + old_offset;
355 let abs_end = abs_start + old_length;
356 row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
357
358 self.write_dynamic_ref(row, index, old_offset, new_data.len());
360
361 for (i, off, len) in refs_to_update {
363 let new_off = (off as isize + delta) as usize;
364 self.write_dynamic_ref(row, i, new_off, len);
365 }
366 } else {
367 let dynamic_offset = self.dynamic_section_size(row);
369 row.0.extend_from_slice(new_data);
370 self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
371 }
372 row.set_valid(index, true);
373 }
374
375 pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
378 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
379 let refs_to_update: Vec<(usize, usize, usize)> = self
381 .fields()
382 .iter()
383 .enumerate()
384 .filter(|(i, _)| *i != index && row.is_defined(*i))
385 .filter_map(|(i, _)| {
386 self.read_dynamic_ref(row, i)
387 .filter(|(off, _)| *off > old_offset)
388 .map(|(off, len)| (i, off, len))
389 })
390 .collect();
391
392 let dynamic_start = self.dynamic_section_start();
394 let abs_start = dynamic_start + old_offset;
395 let abs_end = abs_start + old_length;
396 row.0.make_mut().splice(abs_start..abs_end, iter::empty());
397
398 for (i, off, len) in refs_to_update {
400 let new_off = off - old_length;
401 self.write_dynamic_ref(row, i, new_off, len);
402 }
403 }
404 }
405
406 pub fn allocate(&self) -> EncodedRow {
408 let (total_size, max_align) = self.get_cached_layout();
409 let layout = Layout::from_size_align(total_size, max_align).unwrap();
410 unsafe {
411 let ptr = alloc_zeroed(layout);
412 if ptr.is_null() {
413 handle_alloc_error(layout);
414 }
415 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
416 let mut row = EncodedRow(CowVec::new(vec));
417 row.set_fingerprint(self.fingerprint);
418 row
419 }
420 }
421
422 fn align_up(offset: usize, align: usize) -> usize {
423 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
424 }
425
426 pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
428 self.remove_dynamic_data(row, index);
429 row.set_valid(index, false);
430 }
431
432 pub fn testing(types: &[Type]) -> Self {
436 RowShape::new(
437 types.iter()
438 .enumerate()
439 .map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
440 .collect(),
441 )
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448
449 #[test]
450 fn test_shape_creation() {
451 let fields = vec![
452 RowShapeField::unconstrained("id", Type::Int8),
453 RowShapeField::unconstrained("name", Type::Utf8),
454 RowShapeField::unconstrained("active", Type::Boolean),
455 ];
456
457 let shape = RowShape::new(fields);
458
459 assert_eq!(shape.field_count(), 3);
460 assert_eq!(shape.fields()[0].name, "id");
461 assert_eq!(shape.fields()[1].name, "name");
462 assert_eq!(shape.fields()[2].name, "active");
463 }
464
465 #[test]
466 fn test_shape_fingerprint_deterministic() {
467 let fields1 = vec![
468 RowShapeField::unconstrained("a", Type::Int4),
469 RowShapeField::unconstrained("b", Type::Utf8),
470 ];
471
472 let fields2 = vec![
473 RowShapeField::unconstrained("a", Type::Int4),
474 RowShapeField::unconstrained("b", Type::Utf8),
475 ];
476
477 let shape1 = RowShape::new(fields1);
478 let shape2 = RowShape::new(fields2);
479
480 assert_eq!(shape1.fingerprint(), shape2.fingerprint());
481 }
482
483 #[test]
484 fn test_shape_fingerprint_different_for_different_shapes() {
485 let fields1 = vec![RowShapeField::unconstrained("a", Type::Int4)];
486 let fields2 = vec![RowShapeField::unconstrained("a", Type::Int8)];
487
488 let shape1 = RowShape::new(fields1);
489 let shape2 = RowShape::new(fields2);
490
491 assert_ne!(shape1.fingerprint(), shape2.fingerprint());
492 }
493
494 #[test]
495 fn test_find_field() {
496 let fields = vec![
497 RowShapeField::unconstrained("id", Type::Int8),
498 RowShapeField::unconstrained("name", Type::Utf8),
499 ];
500
501 let shape = RowShape::new(fields);
502
503 assert!(shape.find_field("id").is_some());
504 assert!(shape.find_field("name").is_some());
505 assert!(shape.find_field("missing").is_none());
506 }
507}