reifydb_core/encoded/shape/
mod.rs1pub mod consolidate;
5pub mod evolution;
6pub mod fingerprint;
7mod from;
8
9use std::{
10 alloc::{Layout, alloc_zeroed, handle_alloc_error},
11 fmt,
12 fmt::Debug,
13 iter,
14 ops::Deref,
15 ptr,
16 sync::{Arc, OnceLock},
17};
18
19use reifydb_type::{
20 util::cowvec::CowVec,
21 value::{constraint::TypeConstraint, r#type::Type},
22};
23use serde::{Deserialize, Serialize};
24
25use super::row::EncodedRow;
26use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
27
28pub const SHAPE_HEADER_SIZE: usize = 24;
30
31const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
33const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
34const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
35const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct RowShapeField {
40 pub name: String,
42 pub constraint: TypeConstraint,
44 pub offset: u32,
46 pub size: u32,
48 pub align: u8,
50}
51
52impl RowShapeField {
53 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
56 let storage_type = constraint.storage_type();
57 Self {
58 name: name.into(),
59 constraint,
60 offset: 0,
61 size: storage_type.size() as u32,
62 align: storage_type.alignment() as u8,
63 }
64 }
65
66 pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
69 Self::new(name, TypeConstraint::unconstrained(field_type))
70 }
71}
72
73pub struct RowShape(Arc<Inner>);
75
76#[derive(Debug, Serialize, Deserialize)]
82pub struct Inner {
83 pub fingerprint: RowShapeFingerprint,
85 pub fields: Vec<RowShapeField>,
87 #[serde(skip)]
89 cached_layout: OnceLock<(usize, usize)>,
90}
91
92impl PartialEq for Inner {
93 fn eq(&self, other: &Self) -> bool {
94 self.fingerprint == other.fingerprint && self.fields == other.fields
95 }
96}
97
98impl Eq for Inner {}
99
100impl Deref for RowShape {
101 type Target = Inner;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl Clone for RowShape {
109 fn clone(&self) -> Self {
110 Self(self.0.clone())
111 }
112}
113
114impl Debug for RowShape {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 self.0.fmt(f)
117 }
118}
119
120impl PartialEq for RowShape {
121 fn eq(&self, other: &Self) -> bool {
122 self.0.as_ref() == other.0.as_ref()
123 }
124}
125
126impl Eq for RowShape {}
127
128impl RowShape {
129 pub fn new(fields: Vec<RowShapeField>) -> Self {
133 let fields = Self::compute_layout(fields);
134 let fingerprint = compute_fingerprint(&fields);
135
136 Self(Arc::new(Inner {
137 fingerprint,
138 fields,
139 cached_layout: OnceLock::new(),
140 }))
141 }
142
143 pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
146 Self(Arc::new(Inner {
147 fingerprint,
148 fields,
149 cached_layout: OnceLock::new(),
150 }))
151 }
152
153 pub fn fingerprint(&self) -> RowShapeFingerprint {
155 self.fingerprint
156 }
157
158 pub fn fields(&self) -> &[RowShapeField] {
160 &self.fields
161 }
162
163 pub fn field_count(&self) -> usize {
165 self.fields.len()
166 }
167
168 pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
170 self.fields.iter().find(|f| f.name == name)
171 }
172
173 pub fn find_field_index(&self, name: &str) -> Option<usize> {
175 self.fields.iter().position(|f| f.name == name)
176 }
177
178 pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
180 self.fields.get(index)
181 }
182
183 pub fn get_field_name(&self, index: usize) -> Option<&str> {
185 self.fields.get(index).map(|f| f.name.as_str())
186 }
187
188 pub fn field_names(&self) -> impl Iterator<Item = &str> {
190 self.fields.iter().map(|f| f.name.as_str())
191 }
192
193 fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
196 let bitvec_size = fields.len().div_ceil(8);
198 let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
199
200 for field in fields.iter_mut() {
201 let storage_type = field.constraint.storage_type();
202 field.size = storage_type.size() as u32;
203 field.align = storage_type.alignment() as u8;
204
205 let align = field.align as u32;
207 if align > 0 {
208 offset = (offset + align - 1) & !(align - 1);
209 }
210
211 field.offset = offset;
212 offset += field.size;
213 }
214
215 fields
216 }
217
218 pub fn bitvec_size(&self) -> usize {
220 self.fields.len().div_ceil(8)
221 }
222
223 pub fn data_offset(&self) -> usize {
225 SHAPE_HEADER_SIZE + self.bitvec_size()
226 }
227
228 fn get_cached_layout(&self) -> (usize, usize) {
231 *self.cached_layout.get_or_init(|| {
232 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
234
235 let total_size = if self.fields.is_empty() {
237 SHAPE_HEADER_SIZE + self.bitvec_size()
238 } else {
239 let last_field = &self.fields[self.fields.len() - 1];
240 let end = last_field.offset as usize + last_field.size as usize;
241 Self::align_up(end, max_align)
243 };
244
245 (total_size, max_align)
246 })
247 }
248
249 pub fn total_static_size(&self) -> usize {
251 self.get_cached_layout().0
252 }
253
254 pub fn dynamic_section_start(&self) -> usize {
256 self.total_static_size()
257 }
258
259 pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
261 row.len().saturating_sub(self.total_static_size())
262 }
263
264 pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
267 if !row.is_defined(index) {
268 return None;
269 }
270 let field = &self.fields()[index];
271 match field.constraint.get_type().inner_type() {
272 Type::Utf8 | Type::Blob | Type::Any => {
273 let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
274 let offset =
275 u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
276 as usize;
277 let length =
278 u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
279 as usize;
280 Some((offset, length))
281 }
282 Type::Int | Type::Uint | Type::Decimal => {
283 let packed = unsafe {
284 (row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
285 };
286 let packed = u128::from_le(packed);
287 if packed & PACKED_MODE_MASK != 0 {
288 let offset = (packed & PACKED_OFFSET_MASK) as usize;
289 let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
290 Some((offset, length))
291 } else {
292 None }
294 }
295 _ => None,
296 }
297 }
298
299 pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
301 let field = &self.fields()[index];
302 match field.constraint.get_type().inner_type() {
303 Type::Utf8 | Type::Blob | Type::Any => {
304 let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
305 ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
306 ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
307 }
308 Type::Int | Type::Uint | Type::Decimal => {
309 let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
310 let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
311 let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
312 unsafe {
313 ptr::write_unaligned(
314 row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
315 packed.to_le(),
316 );
317 }
318 }
319 _ => {}
320 }
321 }
322
323 pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
326 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
327 let delta = new_data.len() as isize - old_length as isize;
328
329 let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
331 self.fields()
332 .iter()
333 .enumerate()
334 .filter(|(i, _)| *i != index && row.is_defined(*i))
335 .filter_map(|(i, _)| {
336 self.read_dynamic_ref(row, i)
337 .filter(|(off, _)| *off > old_offset)
338 .map(|(off, len)| (i, off, len))
339 })
340 .collect()
341 } else {
342 vec![]
343 };
344
345 let dynamic_start = self.dynamic_section_start();
347 let abs_start = dynamic_start + old_offset;
348 let abs_end = abs_start + old_length;
349 row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
350
351 self.write_dynamic_ref(row, index, old_offset, new_data.len());
353
354 for (i, off, len) in refs_to_update {
356 let new_off = (off as isize + delta) as usize;
357 self.write_dynamic_ref(row, i, new_off, len);
358 }
359 } else {
360 let dynamic_offset = self.dynamic_section_size(row);
362 row.0.extend_from_slice(new_data);
363 self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
364 }
365 row.set_valid(index, true);
366 }
367
368 pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
371 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
372 let refs_to_update: Vec<(usize, usize, usize)> = self
374 .fields()
375 .iter()
376 .enumerate()
377 .filter(|(i, _)| *i != index && row.is_defined(*i))
378 .filter_map(|(i, _)| {
379 self.read_dynamic_ref(row, i)
380 .filter(|(off, _)| *off > old_offset)
381 .map(|(off, len)| (i, off, len))
382 })
383 .collect();
384
385 let dynamic_start = self.dynamic_section_start();
387 let abs_start = dynamic_start + old_offset;
388 let abs_end = abs_start + old_length;
389 row.0.make_mut().splice(abs_start..abs_end, iter::empty());
390
391 for (i, off, len) in refs_to_update {
393 let new_off = off - old_length;
394 self.write_dynamic_ref(row, i, new_off, len);
395 }
396 }
397 }
398
399 pub fn allocate(&self) -> EncodedRow {
401 let (total_size, max_align) = self.get_cached_layout();
402 let layout = Layout::from_size_align(total_size, max_align).unwrap();
403 unsafe {
404 let ptr = alloc_zeroed(layout);
405 if ptr.is_null() {
406 handle_alloc_error(layout);
407 }
408 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
409 let mut row = EncodedRow(CowVec::new(vec));
410 row.set_fingerprint(self.fingerprint);
411 row
412 }
413 }
414
415 fn align_up(offset: usize, align: usize) -> usize {
416 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
417 }
418
419 pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
421 self.remove_dynamic_data(row, index);
422 row.set_valid(index, false);
423 }
424
425 pub fn testing(types: &[Type]) -> Self {
429 RowShape::new(
430 types.iter()
431 .enumerate()
432 .map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
433 .collect(),
434 )
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441
442 #[test]
443 fn test_shape_creation() {
444 let fields = vec![
445 RowShapeField::unconstrained("id", Type::Int8),
446 RowShapeField::unconstrained("name", Type::Utf8),
447 RowShapeField::unconstrained("active", Type::Boolean),
448 ];
449
450 let shape = RowShape::new(fields);
451
452 assert_eq!(shape.field_count(), 3);
453 assert_eq!(shape.fields()[0].name, "id");
454 assert_eq!(shape.fields()[1].name, "name");
455 assert_eq!(shape.fields()[2].name, "active");
456 }
457
458 #[test]
459 fn test_shape_fingerprint_deterministic() {
460 let fields1 = vec![
461 RowShapeField::unconstrained("a", Type::Int4),
462 RowShapeField::unconstrained("b", Type::Utf8),
463 ];
464
465 let fields2 = vec![
466 RowShapeField::unconstrained("a", Type::Int4),
467 RowShapeField::unconstrained("b", Type::Utf8),
468 ];
469
470 let shape1 = RowShape::new(fields1);
471 let shape2 = RowShape::new(fields2);
472
473 assert_eq!(shape1.fingerprint(), shape2.fingerprint());
474 }
475
476 #[test]
477 fn test_shape_fingerprint_different_for_different_shapes() {
478 let fields1 = vec![RowShapeField::unconstrained("a", Type::Int4)];
479 let fields2 = vec![RowShapeField::unconstrained("a", Type::Int8)];
480
481 let shape1 = RowShape::new(fields1);
482 let shape2 = RowShape::new(fields2);
483
484 assert_ne!(shape1.fingerprint(), shape2.fingerprint());
485 }
486
487 #[test]
488 fn test_find_field() {
489 let fields = vec![
490 RowShapeField::unconstrained("id", Type::Int8),
491 RowShapeField::unconstrained("name", Type::Utf8),
492 ];
493
494 let shape = RowShape::new(fields);
495
496 assert!(shape.find_field("id").is_some());
497 assert!(shape.find_field("name").is_some());
498 assert!(shape.find_field("missing").is_none());
499 }
500}