reifydb_core/encoded/shape/
mod.rs1pub mod consolidate;
16pub mod evolution;
17pub mod fingerprint;
18mod from;
19
20use std::{
21 alloc::{Layout, alloc_zeroed, handle_alloc_error},
22 fmt,
23 fmt::Debug,
24 iter,
25 ops::Deref,
26 ptr,
27 sync::{Arc, LazyLock, OnceLock},
28};
29
30use reifydb_type::{
31 util::cowvec::CowVec,
32 value::{constraint::TypeConstraint, r#type::Type},
33};
34use serde::{Deserialize, Serialize};
35
36use super::row::EncodedRow;
37use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
38
39pub const SHAPE_HEADER_SIZE: usize = 24;
40
41const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
42const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
43const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
44const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct RowShapeField {
48 pub name: String,
49
50 pub constraint: TypeConstraint,
51
52 pub offset: u32,
53
54 pub size: u32,
55
56 pub align: u8,
57}
58
59impl RowShapeField {
60 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
61 let storage_type = constraint.storage_type();
62 Self {
63 name: name.into(),
64 constraint,
65 offset: 0,
66 size: storage_type.size() as u32,
67 align: storage_type.alignment() as u8,
68 }
69 }
70
71 pub fn unconstrained(name: impl Into<String>, field_type: Type) -> Self {
72 Self::new(name, TypeConstraint::unconstrained(field_type))
73 }
74}
75
76pub struct RowShape(Arc<Inner>);
77
78#[derive(Debug, Serialize, Deserialize)]
79pub struct Inner {
80 pub fingerprint: RowShapeFingerprint,
81
82 pub fields: Vec<RowShapeField>,
83
84 #[serde(skip)]
85 cached_layout: OnceLock<(usize, usize)>,
86}
87
88impl PartialEq for Inner {
89 fn eq(&self, other: &Self) -> bool {
90 self.fingerprint == other.fingerprint && self.fields == other.fields
91 }
92}
93
94impl Eq for Inner {}
95
96impl Deref for RowShape {
97 type Target = Inner;
98
99 fn deref(&self) -> &Self::Target {
100 &self.0
101 }
102}
103
104impl Clone for RowShape {
105 fn clone(&self) -> Self {
106 Self(self.0.clone())
107 }
108}
109
110impl Debug for RowShape {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 self.0.fmt(f)
113 }
114}
115
116impl PartialEq for RowShape {
117 fn eq(&self, other: &Self) -> bool {
118 self.0.as_ref() == other.0.as_ref()
119 }
120}
121
122impl Eq for RowShape {}
123
124impl RowShape {
125 pub fn new(fields: Vec<RowShapeField>) -> Self {
126 let fields = Self::compute_layout(fields);
127 let fingerprint = compute_fingerprint(&fields);
128
129 Self(Arc::new(Inner {
130 fingerprint,
131 fields,
132 cached_layout: OnceLock::new(),
133 }))
134 }
135
136 pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
137 Self(Arc::new(Inner {
138 fingerprint,
139 fields,
140 cached_layout: OnceLock::new(),
141 }))
142 }
143
144 pub fn fingerprint(&self) -> RowShapeFingerprint {
145 self.fingerprint
146 }
147
148 pub fn fields(&self) -> &[RowShapeField] {
149 &self.fields
150 }
151
152 pub fn field_count(&self) -> usize {
153 self.fields.len()
154 }
155
156 pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
157 self.fields.iter().find(|f| f.name == name)
158 }
159
160 pub fn find_field_index(&self, name: &str) -> Option<usize> {
161 self.fields.iter().position(|f| f.name == name)
162 }
163
164 pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
165 self.fields.get(index)
166 }
167
168 pub fn get_field_name(&self, index: usize) -> Option<&str> {
169 self.fields.get(index).map(|f| f.name.as_str())
170 }
171
172 pub fn field_names(&self) -> impl Iterator<Item = &str> {
173 self.fields.iter().map(|f| f.name.as_str())
174 }
175
176 fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
177 let bitvec_size = fields.len().div_ceil(8);
178 let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
179
180 for field in fields.iter_mut() {
181 let storage_type = field.constraint.storage_type();
182 field.size = storage_type.size() as u32;
183 field.align = storage_type.alignment() as u8;
184
185 let align = field.align as u32;
186 if align > 0 {
187 offset = (offset + align - 1) & !(align - 1);
188 }
189
190 field.offset = offset;
191 offset += field.size;
192 }
193
194 fields
195 }
196
197 pub fn bitvec_size(&self) -> usize {
198 self.fields.len().div_ceil(8)
199 }
200
201 pub fn data_offset(&self) -> usize {
202 SHAPE_HEADER_SIZE + self.bitvec_size()
203 }
204
205 fn get_cached_layout(&self) -> (usize, usize) {
206 *self.cached_layout.get_or_init(|| {
207 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
208
209 let total_size = if self.fields.is_empty() {
210 SHAPE_HEADER_SIZE + self.bitvec_size()
211 } else {
212 let last_field = &self.fields[self.fields.len() - 1];
213 let end = last_field.offset as usize + last_field.size as usize;
214
215 Self::align_up(end, max_align)
216 };
217
218 (total_size, max_align)
219 })
220 }
221
222 pub fn total_static_size(&self) -> usize {
223 self.get_cached_layout().0
224 }
225
226 pub fn dynamic_section_start(&self) -> usize {
227 self.total_static_size()
228 }
229
230 pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
231 row.len().saturating_sub(self.total_static_size())
232 }
233
234 pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
235 if !row.is_defined(index) {
236 return None;
237 }
238 let field = &self.fields()[index];
239 match field.constraint.get_type().inner_type() {
240 Type::Utf8 | Type::Blob | Type::Any => {
241 let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
242 let offset =
243 u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
244 as usize;
245 let length =
246 u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
247 as usize;
248 Some((offset, length))
249 }
250 Type::Int | Type::Uint | Type::Decimal => {
251 let packed = unsafe {
252 (row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
253 };
254 let packed = u128::from_le(packed);
255 if packed & PACKED_MODE_MASK != 0 {
256 let offset = (packed & PACKED_OFFSET_MASK) as usize;
257 let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
258 Some((offset, length))
259 } else {
260 None
261 }
262 }
263 _ => None,
264 }
265 }
266
267 pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
268 let field = &self.fields()[index];
269 match field.constraint.get_type().inner_type() {
270 Type::Utf8 | Type::Blob | Type::Any => {
271 let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
272 ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
273 ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
274 }
275 Type::Int | Type::Uint | Type::Decimal => {
276 let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
277 let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
278 let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
279 unsafe {
280 ptr::write_unaligned(
281 row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
282 packed.to_le(),
283 );
284 }
285 }
286 _ => {}
287 }
288 }
289
290 pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
291 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
292 let delta = new_data.len() as isize - old_length as isize;
293
294 let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
295 self.fields()
296 .iter()
297 .enumerate()
298 .filter(|(i, _)| *i != index && row.is_defined(*i))
299 .filter_map(|(i, _)| {
300 self.read_dynamic_ref(row, i)
301 .filter(|(off, _)| *off > old_offset)
302 .map(|(off, len)| (i, off, len))
303 })
304 .collect()
305 } else {
306 vec![]
307 };
308
309 let dynamic_start = self.dynamic_section_start();
310 let abs_start = dynamic_start + old_offset;
311 let abs_end = abs_start + old_length;
312 row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
313
314 self.write_dynamic_ref(row, index, old_offset, new_data.len());
315
316 for (i, off, len) in refs_to_update {
317 let new_off = (off as isize + delta) as usize;
318 self.write_dynamic_ref(row, i, new_off, len);
319 }
320 } else {
321 let dynamic_offset = self.dynamic_section_size(row);
322 row.0.extend_from_slice(new_data);
323 self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
324 }
325 row.set_valid(index, true);
326 }
327
328 pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
329 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
330 let refs_to_update: Vec<(usize, usize, usize)> = self
331 .fields()
332 .iter()
333 .enumerate()
334 .filter(|(i, _)| *i != index && row.is_defined(*i))
335 .filter_map(|(i, _)| {
336 self.read_dynamic_ref(row, i)
337 .filter(|(off, _)| *off > old_offset)
338 .map(|(off, len)| (i, off, len))
339 })
340 .collect();
341
342 let dynamic_start = self.dynamic_section_start();
343 let abs_start = dynamic_start + old_offset;
344 let abs_end = abs_start + old_length;
345 row.0.make_mut().splice(abs_start..abs_end, iter::empty());
346
347 for (i, off, len) in refs_to_update {
348 let new_off = off - old_length;
349 self.write_dynamic_ref(row, i, new_off, len);
350 }
351 }
352 }
353
354 pub fn allocate(&self) -> EncodedRow {
355 let (total_size, max_align) = self.get_cached_layout();
356 let layout = Layout::from_size_align(total_size, max_align).unwrap();
357 unsafe {
358 let ptr = alloc_zeroed(layout);
359 if ptr.is_null() {
360 handle_alloc_error(layout);
361 }
362 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
363 let mut row = EncodedRow(CowVec::new(vec));
364 row.set_fingerprint(self.fingerprint);
365 row
366 }
367 }
368
369 fn align_up(offset: usize, align: usize) -> usize {
370 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
371 }
372
373 pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
374 self.remove_dynamic_data(row, index);
375 row.set_valid(index, false);
376 }
377
378 pub fn testing(types: &[Type]) -> Self {
379 RowShape::new(
380 types.iter()
381 .enumerate()
382 .map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
383 .collect(),
384 )
385 }
386
387 pub fn operator_state() -> Self {
388 OPERATOR_STATE_SHAPE.clone()
389 }
390}
391
392static OPERATOR_STATE_SHAPE: LazyLock<RowShape> =
393 LazyLock::new(|| RowShape::new(vec![RowShapeField::unconstrained("state", Type::Blob)]));
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn test_shape_creation() {
401 let fields = vec![
402 RowShapeField::unconstrained("id", Type::Int8),
403 RowShapeField::unconstrained("name", Type::Utf8),
404 RowShapeField::unconstrained("active", Type::Boolean),
405 ];
406
407 let shape = RowShape::new(fields);
408
409 assert_eq!(shape.field_count(), 3);
410 assert_eq!(shape.fields()[0].name, "id");
411 assert_eq!(shape.fields()[1].name, "name");
412 assert_eq!(shape.fields()[2].name, "active");
413 }
414
415 #[test]
416 fn test_shape_fingerprint_deterministic() {
417 let fields1 = vec![
418 RowShapeField::unconstrained("a", Type::Int4),
419 RowShapeField::unconstrained("b", Type::Utf8),
420 ];
421
422 let fields2 = vec![
423 RowShapeField::unconstrained("a", Type::Int4),
424 RowShapeField::unconstrained("b", Type::Utf8),
425 ];
426
427 let shape1 = RowShape::new(fields1);
428 let shape2 = RowShape::new(fields2);
429
430 assert_eq!(shape1.fingerprint(), shape2.fingerprint());
431 }
432
433 #[test]
434 fn test_shape_fingerprint_different_for_different_shapes() {
435 let fields1 = vec![RowShapeField::unconstrained("a", Type::Int4)];
436 let fields2 = vec![RowShapeField::unconstrained("a", Type::Int8)];
437
438 let shape1 = RowShape::new(fields1);
439 let shape2 = RowShape::new(fields2);
440
441 assert_ne!(shape1.fingerprint(), shape2.fingerprint());
442 }
443
444 #[test]
445 fn test_find_field() {
446 let fields = vec![
447 RowShapeField::unconstrained("id", Type::Int8),
448 RowShapeField::unconstrained("name", Type::Utf8),
449 ];
450
451 let shape = RowShape::new(fields);
452
453 assert!(shape.find_field("id").is_some());
454 assert!(shape.find_field("name").is_some());
455 assert!(shape.find_field("missing").is_none());
456 }
457}