1pub mod cache;
16pub mod consolidate;
17pub mod evolution;
18pub mod fingerprint;
19mod from;
20
21use std::{
22 alloc::{Layout, alloc_zeroed, handle_alloc_error},
23 fmt,
24 fmt::Debug,
25 iter,
26 ops::Deref,
27 ptr,
28 sync::{Arc, LazyLock, OnceLock},
29};
30
31use reifydb_value::{
32 util::cowvec::CowVec,
33 value::{constraint::TypeConstraint, value_type::ValueType},
34};
35use serde::{Deserialize, Serialize};
36
37use super::row::EncodedRow;
38use crate::encoded::shape::fingerprint::{RowShapeFingerprint, compute_fingerprint};
39
40pub const SHAPE_HEADER_SIZE: usize = 24;
41
42const PACKED_MODE_DYNAMIC: u128 = 0x80000000000000000000000000000000;
43const PACKED_MODE_MASK: u128 = 0x80000000000000000000000000000000;
44const PACKED_OFFSET_MASK: u128 = 0x0000000000000000FFFFFFFFFFFFFFFF;
45const PACKED_LENGTH_MASK: u128 = 0x7FFFFFFFFFFFFFFF0000000000000000;
46
47#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48pub struct RowShapeField {
49 pub name: String,
50
51 pub constraint: TypeConstraint,
52
53 pub offset: u32,
54
55 pub size: u32,
56
57 pub align: u8,
58}
59
60impl RowShapeField {
61 pub fn new(name: impl Into<String>, constraint: TypeConstraint) -> Self {
62 let storage_type = constraint.storage_type();
63 Self {
64 name: name.into(),
65 constraint,
66 offset: 0,
67 size: storage_type.size() as u32,
68 align: storage_type.alignment() as u8,
69 }
70 }
71
72 pub fn unconstrained(name: impl Into<String>, field_type: ValueType) -> Self {
73 Self::new(name, TypeConstraint::unconstrained(field_type))
74 }
75}
76
77pub struct RowShape(Arc<Inner>);
78
79#[derive(Debug, Serialize, Deserialize)]
80pub struct Inner {
81 pub fingerprint: RowShapeFingerprint,
82
83 pub fields: Vec<RowShapeField>,
84
85 #[serde(skip)]
86 cached_layout: OnceLock<(usize, usize)>,
87}
88
89impl PartialEq for Inner {
90 fn eq(&self, other: &Self) -> bool {
91 self.fingerprint == other.fingerprint && self.fields == other.fields
92 }
93}
94
95impl Eq for Inner {}
96
97impl Deref for RowShape {
98 type Target = Inner;
99
100 fn deref(&self) -> &Self::Target {
101 &self.0
102 }
103}
104
105impl Clone for RowShape {
106 fn clone(&self) -> Self {
107 Self(self.0.clone())
108 }
109}
110
111impl Debug for RowShape {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 self.0.fmt(f)
114 }
115}
116
117impl PartialEq for RowShape {
118 fn eq(&self, other: &Self) -> bool {
119 self.0.as_ref() == other.0.as_ref()
120 }
121}
122
123impl Eq for RowShape {}
124
125impl RowShape {
126 pub fn new(fields: Vec<RowShapeField>) -> Self {
127 let fields = Self::compute_layout(fields);
128 let fingerprint = compute_fingerprint(&fields);
129
130 Self(Arc::new(Inner {
131 fingerprint,
132 fields,
133 cached_layout: OnceLock::new(),
134 }))
135 }
136
137 pub fn from_parts(fingerprint: RowShapeFingerprint, fields: Vec<RowShapeField>) -> Self {
138 Self(Arc::new(Inner {
139 fingerprint,
140 fields,
141 cached_layout: OnceLock::new(),
142 }))
143 }
144
145 pub fn fingerprint(&self) -> RowShapeFingerprint {
146 self.fingerprint
147 }
148
149 pub fn fields(&self) -> &[RowShapeField] {
150 &self.fields
151 }
152
153 pub fn field_count(&self) -> usize {
154 self.fields.len()
155 }
156
157 pub fn find_field(&self, name: &str) -> Option<&RowShapeField> {
158 self.fields.iter().find(|f| f.name == name)
159 }
160
161 pub fn find_field_index(&self, name: &str) -> Option<usize> {
162 self.fields.iter().position(|f| f.name == name)
163 }
164
165 pub fn get_field(&self, index: usize) -> Option<&RowShapeField> {
166 self.fields.get(index)
167 }
168
169 pub fn get_field_name(&self, index: usize) -> Option<&str> {
170 self.fields.get(index).map(|f| f.name.as_str())
171 }
172
173 pub fn field_names(&self) -> impl Iterator<Item = &str> {
174 self.fields.iter().map(|f| f.name.as_str())
175 }
176
177 fn compute_layout(mut fields: Vec<RowShapeField>) -> Vec<RowShapeField> {
178 let bitvec_size = fields.len().div_ceil(8);
179 let mut offset: u32 = (SHAPE_HEADER_SIZE + bitvec_size) as u32;
180
181 for field in fields.iter_mut() {
182 let storage_type = field.constraint.storage_type();
183 field.size = storage_type.size() as u32;
184 field.align = storage_type.alignment() as u8;
185
186 let align = field.align as u32;
187 if align > 0 {
188 offset = (offset + align - 1) & !(align - 1);
189 }
190
191 field.offset = offset;
192 offset += field.size;
193 }
194
195 fields
196 }
197
198 pub fn bitvec_size(&self) -> usize {
199 self.fields.len().div_ceil(8)
200 }
201
202 pub fn data_offset(&self) -> usize {
203 SHAPE_HEADER_SIZE + self.bitvec_size()
204 }
205
206 fn get_cached_layout(&self) -> (usize, usize) {
207 *self.cached_layout.get_or_init(|| {
208 let max_align = self.fields.iter().map(|f| f.align as usize).max().unwrap_or(1);
209
210 let total_size = if self.fields.is_empty() {
211 SHAPE_HEADER_SIZE + self.bitvec_size()
212 } else {
213 let last_field = &self.fields[self.fields.len() - 1];
214 let end = last_field.offset as usize + last_field.size as usize;
215
216 Self::align_up(end, max_align)
217 };
218
219 (total_size, max_align)
220 })
221 }
222
223 pub fn total_static_size(&self) -> usize {
224 self.get_cached_layout().0
225 }
226
227 pub fn dynamic_section_start(&self) -> usize {
228 self.total_static_size()
229 }
230
231 pub fn dynamic_section_size(&self, row: &EncodedRow) -> usize {
232 row.len().saturating_sub(self.total_static_size())
233 }
234
235 pub(crate) fn read_dynamic_ref(&self, row: &EncodedRow, index: usize) -> Option<(usize, usize)> {
236 if !row.is_defined(index) {
237 return None;
238 }
239 let field = &self.fields()[index];
240 match field.constraint.get_type().inner_type() {
241 ValueType::Utf8 | ValueType::Blob | ValueType::Any => {
242 let ref_slice = &row.as_slice()[field.offset as usize..field.offset as usize + 8];
243 let offset =
244 u32::from_le_bytes([ref_slice[0], ref_slice[1], ref_slice[2], ref_slice[3]])
245 as usize;
246 let length =
247 u32::from_le_bytes([ref_slice[4], ref_slice[5], ref_slice[6], ref_slice[7]])
248 as usize;
249 Some((offset, length))
250 }
251 ValueType::Int | ValueType::Uint | ValueType::Decimal => {
252 let packed = unsafe {
253 (row.as_ptr().add(field.offset as usize) as *const u128).read_unaligned()
254 };
255 let packed = u128::from_le(packed);
256 if packed & PACKED_MODE_MASK != 0 {
257 let offset = (packed & PACKED_OFFSET_MASK) as usize;
258 let length = ((packed & PACKED_LENGTH_MASK) >> 64) as usize;
259 Some((offset, length))
260 } else {
261 None
262 }
263 }
264 _ => None,
265 }
266 }
267
268 pub(crate) fn write_dynamic_ref(&self, row: &mut EncodedRow, index: usize, offset: usize, length: usize) {
269 let field = &self.fields()[index];
270 match field.constraint.get_type().inner_type() {
271 ValueType::Utf8 | ValueType::Blob | ValueType::Any => {
272 let ref_slice = &mut row.0.make_mut()[field.offset as usize..field.offset as usize + 8];
273 ref_slice[0..4].copy_from_slice(&(offset as u32).to_le_bytes());
274 ref_slice[4..8].copy_from_slice(&(length as u32).to_le_bytes());
275 }
276 ValueType::Int | ValueType::Uint | ValueType::Decimal => {
277 let offset_part = (offset as u128) & PACKED_OFFSET_MASK;
278 let length_part = ((length as u128) << 64) & PACKED_LENGTH_MASK;
279 let packed = PACKED_MODE_DYNAMIC | offset_part | length_part;
280 unsafe {
281 ptr::write_unaligned(
282 row.0.make_mut().as_mut_ptr().add(field.offset as usize) as *mut u128,
283 packed.to_le(),
284 );
285 }
286 }
287 _ => {}
288 }
289 }
290
291 pub(crate) fn replace_dynamic_data(&self, row: &mut EncodedRow, index: usize, new_data: &[u8]) {
292 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
293 let delta = new_data.len() as isize - old_length as isize;
294
295 let refs_to_update: Vec<(usize, usize, usize)> = if delta != 0 {
296 self.fields()
297 .iter()
298 .enumerate()
299 .filter(|(i, _)| *i != index && row.is_defined(*i))
300 .filter_map(|(i, _)| {
301 self.read_dynamic_ref(row, i)
302 .filter(|(off, _)| *off > old_offset)
303 .map(|(off, len)| (i, off, len))
304 })
305 .collect()
306 } else {
307 vec![]
308 };
309
310 let dynamic_start = self.dynamic_section_start();
311 let abs_start = dynamic_start + old_offset;
312 let abs_end = abs_start + old_length;
313 row.0.make_mut().splice(abs_start..abs_end, new_data.iter().copied());
314
315 self.write_dynamic_ref(row, index, old_offset, new_data.len());
316
317 for (i, off, len) in refs_to_update {
318 let new_off = (off as isize + delta) as usize;
319 self.write_dynamic_ref(row, i, new_off, len);
320 }
321 } else {
322 let dynamic_offset = self.dynamic_section_size(row);
323 row.0.extend_from_slice(new_data);
324 self.write_dynamic_ref(row, index, dynamic_offset, new_data.len());
325 }
326 row.set_valid(index, true);
327 }
328
329 pub(crate) fn remove_dynamic_data(&self, row: &mut EncodedRow, index: usize) {
330 if let Some((old_offset, old_length)) = self.read_dynamic_ref(row, index) {
331 let refs_to_update: Vec<(usize, usize, usize)> = self
332 .fields()
333 .iter()
334 .enumerate()
335 .filter(|(i, _)| *i != index && row.is_defined(*i))
336 .filter_map(|(i, _)| {
337 self.read_dynamic_ref(row, i)
338 .filter(|(off, _)| *off > old_offset)
339 .map(|(off, len)| (i, off, len))
340 })
341 .collect();
342
343 let dynamic_start = self.dynamic_section_start();
344 let abs_start = dynamic_start + old_offset;
345 let abs_end = abs_start + old_length;
346 row.0.make_mut().splice(abs_start..abs_end, iter::empty());
347
348 for (i, off, len) in refs_to_update {
349 let new_off = off - old_length;
350 self.write_dynamic_ref(row, i, new_off, len);
351 }
352 }
353 }
354
355 pub fn allocate(&self) -> EncodedRow {
356 let (total_size, max_align) = self.get_cached_layout();
357 let layout = Layout::from_size_align(total_size, max_align).unwrap();
358 unsafe {
359 let ptr = alloc_zeroed(layout);
360 if ptr.is_null() {
361 handle_alloc_error(layout);
362 }
363 let vec = Vec::from_raw_parts(ptr, total_size, total_size);
364 let mut row = EncodedRow(CowVec::new(vec));
365 row.set_fingerprint(self.fingerprint);
366 #[cfg(reifydb_assertions)]
367 {
368 assert!(
369 row.len() == total_size,
370 "allocated row length does not match the shape total_static_size, so any field accessor using pre-computed offsets will read from garbage memory (row_len={} total_size={})",
371 row.len(),
372 total_size
373 );
374 }
375 row
376 }
377 }
378
379 fn align_up(offset: usize, align: usize) -> usize {
380 (offset + align).saturating_sub(1) & !(align.saturating_sub(1))
381 }
382
383 pub fn set_none(&self, row: &mut EncodedRow, index: usize) {
384 self.remove_dynamic_data(row, index);
385 row.set_valid(index, false);
386 }
387
388 pub fn testing(types: &[ValueType]) -> Self {
389 RowShape::new(
390 types.iter()
391 .enumerate()
392 .map(|(i, t)| RowShapeField::unconstrained(format!("f{}", i), t.clone()))
393 .collect(),
394 )
395 }
396
397 pub fn operator_state() -> Self {
398 OPERATOR_STATE_SHAPE.clone()
399 }
400}
401
402static OPERATOR_STATE_SHAPE: LazyLock<RowShape> =
403 LazyLock::new(|| RowShape::new(vec![RowShapeField::unconstrained("state", ValueType::Blob)]));
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
410 fn test_shape_creation() {
411 let fields = vec![
412 RowShapeField::unconstrained("id", ValueType::Int8),
413 RowShapeField::unconstrained("name", ValueType::Utf8),
414 RowShapeField::unconstrained("active", ValueType::Boolean),
415 ];
416
417 let shape = RowShape::new(fields);
418
419 assert_eq!(shape.field_count(), 3);
420 assert_eq!(shape.fields()[0].name, "id");
421 assert_eq!(shape.fields()[1].name, "name");
422 assert_eq!(shape.fields()[2].name, "active");
423 }
424
425 #[test]
426 fn test_shape_fingerprint_deterministic() {
427 let fields1 = vec![
428 RowShapeField::unconstrained("a", ValueType::Int4),
429 RowShapeField::unconstrained("b", ValueType::Utf8),
430 ];
431
432 let fields2 = vec![
433 RowShapeField::unconstrained("a", ValueType::Int4),
434 RowShapeField::unconstrained("b", ValueType::Utf8),
435 ];
436
437 let shape1 = RowShape::new(fields1);
438 let shape2 = RowShape::new(fields2);
439
440 assert_eq!(shape1.fingerprint(), shape2.fingerprint());
441 }
442
443 #[test]
444 fn test_shape_fingerprint_different_for_different_shapes() {
445 let fields1 = vec![RowShapeField::unconstrained("a", ValueType::Int4)];
446 let fields2 = vec![RowShapeField::unconstrained("a", ValueType::Int8)];
447
448 let shape1 = RowShape::new(fields1);
449 let shape2 = RowShape::new(fields2);
450
451 assert_ne!(shape1.fingerprint(), shape2.fingerprint());
452 }
453
454 #[test]
455 fn test_find_field() {
456 let fields = vec![
457 RowShapeField::unconstrained("id", ValueType::Int8),
458 RowShapeField::unconstrained("name", ValueType::Utf8),
459 ];
460
461 let shape = RowShape::new(fields);
462
463 assert!(shape.find_field("id").is_some());
464 assert!(shape.find_field("name").is_some());
465 assert!(shape.find_field("missing").is_none());
466 }
467}