1use crate::aligned_vec::AlignedVec;
14use crate::value_word::ValueWord;
15use chrono::{DateTime, FixedOffset};
16use shape_ast::data::Timeframe;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20#[derive(Debug, Clone)]
24pub struct MatrixData {
25 pub data: AlignedVec<f64>,
26 pub rows: u32,
27 pub cols: u32,
28}
29
30impl MatrixData {
31 pub fn new(rows: u32, cols: u32) -> Self {
33 let len = (rows as usize) * (cols as usize);
34 let mut data = AlignedVec::with_capacity(len);
35 for _ in 0..len {
36 data.push(0.0);
37 }
38 Self { data, rows, cols }
39 }
40
41 pub fn from_flat(data: AlignedVec<f64>, rows: u32, cols: u32) -> Self {
43 debug_assert_eq!(data.len(), (rows as usize) * (cols as usize));
44 Self { data, rows, cols }
45 }
46
47 #[inline]
49 pub fn get(&self, row: u32, col: u32) -> f64 {
50 self.data[(row as usize) * (self.cols as usize) + (col as usize)]
51 }
52
53 #[inline]
55 pub fn set(&mut self, row: u32, col: u32, val: f64) {
56 self.data[(row as usize) * (self.cols as usize) + (col as usize)] = val;
57 }
58
59 #[inline]
61 pub fn row_slice(&self, row: u32) -> &[f64] {
62 let start = (row as usize) * (self.cols as usize);
63 &self.data[start..start + self.cols as usize]
64 }
65
66 #[inline]
68 pub fn shape(&self) -> (u32, u32) {
69 (self.rows, self.cols)
70 }
71
72 #[inline]
74 pub fn row_data(&self, row: u32) -> &[f64] {
75 self.row_slice(row)
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct IteratorState {
82 pub source: ValueWord,
83 pub position: usize,
84 pub transforms: Vec<IteratorTransform>,
85 pub done: bool,
86}
87
88#[derive(Debug, Clone)]
90pub enum IteratorTransform {
91 Map(ValueWord),
92 Filter(ValueWord),
93 Take(usize),
94 Skip(usize),
95 FlatMap(ValueWord),
96}
97
98#[derive(Debug, Clone)]
100pub struct GeneratorState {
101 pub function_id: u16,
102 pub state: u16,
103 pub locals: Box<[ValueWord]>,
104 pub result: Option<Box<ValueWord>>,
105}
106
107#[derive(Debug, Clone)]
109pub struct SimulationCallData {
110 pub name: String,
111 pub params: HashMap<String, ValueWord>,
112}
113
114#[derive(Debug, Clone)]
116pub struct DataReferenceData {
117 pub datetime: DateTime<FixedOffset>,
118 pub id: String,
119 pub timeframe: Timeframe,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
124pub enum RefProjection {
125 TypedField {
126 type_id: u16,
127 field_idx: u16,
128 field_type_tag: u16,
129 },
130 Index {
133 index: ValueWord,
134 },
135 MatrixRow {
141 row_index: u32,
142 },
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct ProjectedRefData {
148 pub base: ValueWord,
149 pub projection: RefProjection,
150}
151
152#[derive(Debug, Clone)]
158pub struct HashMapData {
159 pub keys: Vec<ValueWord>,
160 pub values: Vec<ValueWord>,
161 pub index: HashMap<u64, Vec<usize>>,
162 pub shape_id: Option<crate::shape_graph::ShapeId>,
165}
166
167impl HashMapData {
168 #[inline]
170 pub fn find_key(&self, key: &ValueWord) -> Option<usize> {
171 let hash = key.vw_hash();
172 let bucket = self.index.get(&hash)?;
173 bucket
174 .iter()
175 .copied()
176 .find(|&idx| self.keys[idx].vw_equals(key))
177 }
178
179 pub fn rebuild_index(keys: &[ValueWord]) -> HashMap<u64, Vec<usize>> {
181 let mut index: HashMap<u64, Vec<usize>> = HashMap::new();
182 for (i, k) in keys.iter().enumerate() {
183 index.entry(k.vw_hash()).or_default().push(i);
184 }
185 index
186 }
187
188 pub fn compute_shape(keys: &[ValueWord]) -> Option<crate::shape_graph::ShapeId> {
193 if keys.is_empty() || keys.len() > 64 {
194 return None;
195 }
196 let mut key_hashes = Vec::with_capacity(keys.len());
197 for k in keys {
198 if let Some(s) = k.as_str() {
199 key_hashes.push(crate::shape_graph::hash_property_name(s));
200 } else {
201 return None; }
203 }
204 crate::shape_graph::shape_for_hashmap_keys(&key_hashes)
205 }
206
207 #[inline]
212 pub fn shape_get(&self, property: &str) -> Option<&ValueWord> {
213 let shape_id = self.shape_id?;
214 let prop_hash = crate::shape_graph::hash_property_name(property);
215 let idx = crate::shape_graph::shape_property_index(shape_id, prop_hash)?;
216 self.values.get(idx)
217 }
218}
219
220#[derive(Debug, Clone)]
224pub struct SetData {
225 pub items: Vec<ValueWord>,
226 pub index: HashMap<u64, Vec<usize>>,
227}
228
229impl SetData {
230 #[inline]
232 pub fn contains(&self, item: &ValueWord) -> bool {
233 let hash = item.vw_hash();
234 if let Some(bucket) = self.index.get(&hash) {
235 bucket.iter().any(|&idx| self.items[idx].vw_equals(item))
236 } else {
237 false
238 }
239 }
240
241 pub fn insert(&mut self, item: ValueWord) -> bool {
243 if self.contains(&item) {
244 return false;
245 }
246 let hash = item.vw_hash();
247 let idx = self.items.len();
248 self.items.push(item);
249 self.index.entry(hash).or_default().push(idx);
250 true
251 }
252
253 pub fn remove(&mut self, item: &ValueWord) -> bool {
255 let hash = item.vw_hash();
256 if let Some(bucket) = self.index.get(&hash) {
257 if let Some(&idx) = bucket.iter().find(|&&idx| self.items[idx].vw_equals(item)) {
258 self.items.swap_remove(idx);
259 self.rebuild_index_from_items();
260 return true;
261 }
262 }
263 false
264 }
265
266 pub fn rebuild_index(items: &[ValueWord]) -> HashMap<u64, Vec<usize>> {
268 let mut index: HashMap<u64, Vec<usize>> = HashMap::new();
269 for (i, k) in items.iter().enumerate() {
270 index.entry(k.vw_hash()).or_default().push(i);
271 }
272 index
273 }
274
275 fn rebuild_index_from_items(&mut self) {
276 self.index = Self::rebuild_index(&self.items);
277 }
278
279 pub fn from_items(items: Vec<ValueWord>) -> Self {
281 let mut set = SetData {
282 items: Vec::with_capacity(items.len()),
283 index: HashMap::new(),
284 };
285 for item in items {
286 set.insert(item);
287 }
288 set
289 }
290}
291
292#[derive(Debug, Clone)]
297pub struct PriorityQueueData {
298 pub items: Vec<ValueWord>,
299}
300
301impl PriorityQueueData {
302 pub fn new() -> Self {
303 PriorityQueueData { items: Vec::new() }
304 }
305
306 pub fn from_items(items: Vec<ValueWord>) -> Self {
307 let mut pq = PriorityQueueData { items };
308 pq.heapify();
309 pq
310 }
311
312 #[inline]
315 fn cmp_items(a: &ValueWord, b: &ValueWord) -> std::cmp::Ordering {
316 match (a.as_number_coerce(), b.as_number_coerce()) {
317 (Some(fa), Some(fb)) => fa.partial_cmp(&fb).unwrap_or(std::cmp::Ordering::Equal),
318 (Some(_), None) => std::cmp::Ordering::Less,
319 (None, Some(_)) => std::cmp::Ordering::Greater,
320 (None, None) => {
321 let sa = format!("{}", a);
323 let sb = format!("{}", b);
324 sa.cmp(&sb)
325 }
326 }
327 }
328
329 pub fn push(&mut self, item: ValueWord) {
331 self.items.push(item);
332 self.sift_up(self.items.len() - 1);
333 }
334
335 pub fn pop(&mut self) -> Option<ValueWord> {
337 if self.items.is_empty() {
338 return None;
339 }
340 let last = self.items.len() - 1;
341 self.items.swap(0, last);
342 let result = self.items.pop();
343 if !self.items.is_empty() {
344 self.sift_down(0);
345 }
346 result
347 }
348
349 pub fn peek(&self) -> Option<&ValueWord> {
351 self.items.first()
352 }
353
354 fn sift_up(&mut self, mut idx: usize) {
355 while idx > 0 {
356 let parent = (idx - 1) / 2;
357 if Self::cmp_items(&self.items[idx], &self.items[parent]) == std::cmp::Ordering::Less {
358 self.items.swap(idx, parent);
359 idx = parent;
360 } else {
361 break;
362 }
363 }
364 }
365
366 fn sift_down(&mut self, mut idx: usize) {
367 let len = self.items.len();
368 loop {
369 let left = 2 * idx + 1;
370 let right = 2 * idx + 2;
371 let mut smallest = idx;
372
373 if left < len
374 && Self::cmp_items(&self.items[left], &self.items[smallest])
375 == std::cmp::Ordering::Less
376 {
377 smallest = left;
378 }
379 if right < len
380 && Self::cmp_items(&self.items[right], &self.items[smallest])
381 == std::cmp::Ordering::Less
382 {
383 smallest = right;
384 }
385
386 if smallest != idx {
387 self.items.swap(idx, smallest);
388 idx = smallest;
389 } else {
390 break;
391 }
392 }
393 }
394
395 fn heapify(&mut self) {
396 if self.items.len() <= 1 {
397 return;
398 }
399 for i in (0..self.items.len() / 2).rev() {
400 self.sift_down(i);
401 }
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct DequeData {
408 pub items: std::collections::VecDeque<ValueWord>,
409}
410
411impl DequeData {
412 pub fn new() -> Self {
413 DequeData {
414 items: std::collections::VecDeque::new(),
415 }
416 }
417
418 pub fn from_items(items: Vec<ValueWord>) -> Self {
419 DequeData {
420 items: items.into(),
421 }
422 }
423}
424
425#[derive(Debug, Clone, Copy, PartialEq)]
430pub enum NativeScalar {
431 I8(i8),
432 U8(u8),
433 I16(i16),
434 U16(u16),
435 I32(i32),
436 I64(i64),
437 U32(u32),
438 U64(u64),
439 Isize(isize),
440 Usize(usize),
441 Ptr(usize),
442 F32(f32),
443}
444
445impl NativeScalar {
446 #[inline]
447 pub fn type_name(&self) -> &'static str {
448 match self {
449 NativeScalar::I8(_) => "i8",
450 NativeScalar::U8(_) => "u8",
451 NativeScalar::I16(_) => "i16",
452 NativeScalar::U16(_) => "u16",
453 NativeScalar::I32(_) => "i32",
454 NativeScalar::I64(_) => "i64",
455 NativeScalar::U32(_) => "u32",
456 NativeScalar::U64(_) => "u64",
457 NativeScalar::Isize(_) => "isize",
458 NativeScalar::Usize(_) => "usize",
459 NativeScalar::Ptr(_) => "ptr",
460 NativeScalar::F32(_) => "f32",
461 }
462 }
463
464 #[inline]
465 pub fn is_truthy(&self) -> bool {
466 match self {
467 NativeScalar::I8(v) => *v != 0,
468 NativeScalar::U8(v) => *v != 0,
469 NativeScalar::I16(v) => *v != 0,
470 NativeScalar::U16(v) => *v != 0,
471 NativeScalar::I32(v) => *v != 0,
472 NativeScalar::I64(v) => *v != 0,
473 NativeScalar::U32(v) => *v != 0,
474 NativeScalar::U64(v) => *v != 0,
475 NativeScalar::Isize(v) => *v != 0,
476 NativeScalar::Usize(v) => *v != 0,
477 NativeScalar::Ptr(v) => *v != 0,
478 NativeScalar::F32(v) => *v != 0.0 && !v.is_nan(),
479 }
480 }
481
482 #[inline]
483 pub fn as_i64(&self) -> Option<i64> {
484 match self {
485 NativeScalar::I8(v) => Some(*v as i64),
486 NativeScalar::U8(v) => Some(*v as i64),
487 NativeScalar::I16(v) => Some(*v as i64),
488 NativeScalar::U16(v) => Some(*v as i64),
489 NativeScalar::I32(v) => Some(*v as i64),
490 NativeScalar::I64(v) => Some(*v),
491 NativeScalar::U32(v) => Some(*v as i64),
492 NativeScalar::U64(v) => i64::try_from(*v).ok(),
493 NativeScalar::Isize(v) => i64::try_from(*v).ok(),
494 NativeScalar::Usize(v) => i64::try_from(*v).ok(),
495 NativeScalar::Ptr(v) => i64::try_from(*v).ok(),
496 NativeScalar::F32(_) => None,
497 }
498 }
499
500 #[inline]
501 pub fn as_u64(&self) -> Option<u64> {
502 match self {
503 NativeScalar::U8(v) => Some(*v as u64),
504 NativeScalar::U16(v) => Some(*v as u64),
505 NativeScalar::U32(v) => Some(*v as u64),
506 NativeScalar::U64(v) => Some(*v),
507 NativeScalar::Usize(v) => Some(*v as u64),
508 NativeScalar::Ptr(v) => Some(*v as u64),
509 NativeScalar::I8(v) if *v >= 0 => Some(*v as u64),
510 NativeScalar::I16(v) if *v >= 0 => Some(*v as u64),
511 NativeScalar::I32(v) if *v >= 0 => Some(*v as u64),
512 NativeScalar::I64(v) if *v >= 0 => Some(*v as u64),
513 NativeScalar::Isize(v) if *v >= 0 => Some(*v as u64),
514 _ => None,
515 }
516 }
517
518 #[inline]
519 pub fn as_i128(&self) -> Option<i128> {
520 match self {
521 NativeScalar::I8(v) => Some(*v as i128),
522 NativeScalar::U8(v) => Some(*v as i128),
523 NativeScalar::I16(v) => Some(*v as i128),
524 NativeScalar::U16(v) => Some(*v as i128),
525 NativeScalar::I32(v) => Some(*v as i128),
526 NativeScalar::U32(v) => Some(*v as i128),
527 NativeScalar::I64(v) => Some(*v as i128),
528 NativeScalar::U64(v) => Some(*v as i128),
529 NativeScalar::Isize(v) => Some(*v as i128),
530 NativeScalar::Usize(v) => Some(*v as i128),
531 NativeScalar::Ptr(v) => Some(*v as i128),
532 NativeScalar::F32(_) => None,
533 }
534 }
535
536 #[inline]
537 pub fn as_f64(&self) -> f64 {
538 match self {
539 NativeScalar::I8(v) => *v as f64,
540 NativeScalar::U8(v) => *v as f64,
541 NativeScalar::I16(v) => *v as f64,
542 NativeScalar::U16(v) => *v as f64,
543 NativeScalar::I32(v) => *v as f64,
544 NativeScalar::I64(v) => *v as f64,
545 NativeScalar::U32(v) => *v as f64,
546 NativeScalar::U64(v) => *v as f64,
547 NativeScalar::Isize(v) => *v as f64,
548 NativeScalar::Usize(v) => *v as f64,
549 NativeScalar::Ptr(v) => *v as f64,
550 NativeScalar::F32(v) => *v as f64,
551 }
552 }
553}
554
555impl std::fmt::Display for NativeScalar {
556 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
557 match self {
558 NativeScalar::I8(v) => write!(f, "{v}"),
559 NativeScalar::U8(v) => write!(f, "{v}"),
560 NativeScalar::I16(v) => write!(f, "{v}"),
561 NativeScalar::U16(v) => write!(f, "{v}"),
562 NativeScalar::I32(v) => write!(f, "{v}"),
563 NativeScalar::I64(v) => write!(f, "{v}"),
564 NativeScalar::U32(v) => write!(f, "{v}"),
565 NativeScalar::U64(v) => write!(f, "{v}"),
566 NativeScalar::Isize(v) => write!(f, "{v}"),
567 NativeScalar::Usize(v) => write!(f, "{v}"),
568 NativeScalar::Ptr(v) => write!(f, "0x{v:x}"),
569 NativeScalar::F32(v) => write!(f, "{v}"),
570 }
571 }
572}
573
574#[derive(Debug, Clone)]
576pub struct NativeLayoutField {
577 pub name: String,
578 pub c_type: String,
579 pub offset: u32,
580 pub size: u32,
581 pub align: u32,
582}
583
584#[derive(Debug, Clone)]
586pub struct NativeTypeLayout {
587 pub name: String,
588 pub abi: String,
589 pub size: u32,
590 pub align: u32,
591 pub fields: Vec<NativeLayoutField>,
592}
593
594impl NativeTypeLayout {
595 #[inline]
596 pub fn field(&self, name: &str) -> Option<&NativeLayoutField> {
597 self.fields.iter().find(|field| field.name == name)
598 }
599}
600
601#[derive(Debug, Clone)]
603pub struct NativeViewData {
604 pub ptr: usize,
605 pub layout: Arc<NativeTypeLayout>,
606 pub mutable: bool,
607}
608
609#[derive(Debug, Clone, Copy, PartialEq, Eq)]
611#[repr(u8)]
612pub enum IoHandleKind {
613 File = 0,
614 TcpStream = 1,
615 TcpListener = 2,
616 UdpSocket = 3,
617 ChildProcess = 4,
618 PipeReader = 5,
619 PipeWriter = 6,
620 Custom = 7,
621}
622
623pub enum IoResource {
625 File(std::fs::File),
626 TcpStream(std::net::TcpStream),
627 TcpListener(std::net::TcpListener),
628 UdpSocket(std::net::UdpSocket),
629 ChildProcess(std::process::Child),
630 PipeReader(std::process::ChildStdout),
631 PipeWriter(std::process::ChildStdin),
632 PipeReaderErr(std::process::ChildStderr),
633 Custom(Box<dyn std::any::Any + Send>),
635}
636
637impl std::fmt::Debug for IoResource {
638 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
639 match self {
640 IoResource::File(_) => write!(f, "File(...)"),
641 IoResource::TcpStream(_) => write!(f, "TcpStream(...)"),
642 IoResource::TcpListener(_) => write!(f, "TcpListener(...)"),
643 IoResource::UdpSocket(_) => write!(f, "UdpSocket(...)"),
644 IoResource::ChildProcess(_) => write!(f, "ChildProcess(...)"),
645 IoResource::PipeReader(_) => write!(f, "PipeReader(...)"),
646 IoResource::PipeWriter(_) => write!(f, "PipeWriter(...)"),
647 IoResource::PipeReaderErr(_) => write!(f, "PipeReaderErr(...)"),
648 IoResource::Custom(_) => write!(f, "Custom(...)"),
649 }
650 }
651}
652
653#[derive(Clone)]
659pub struct IoHandleData {
660 pub kind: IoHandleKind,
661 pub resource: Arc<std::sync::Mutex<Option<IoResource>>>,
662 pub path: String,
663 pub mode: String,
664}
665
666impl std::fmt::Debug for IoHandleData {
667 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668 f.debug_struct("IoHandleData")
669 .field("kind", &self.kind)
670 .field("path", &self.path)
671 .field("mode", &self.mode)
672 .field(
673 "open",
674 &self.resource.lock().map(|g| g.is_some()).unwrap_or(false),
675 )
676 .finish()
677 }
678}
679
680impl IoHandleData {
681 pub fn new_file(file: std::fs::File, path: String, mode: String) -> Self {
683 Self {
684 kind: IoHandleKind::File,
685 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::File(file)))),
686 path,
687 mode,
688 }
689 }
690
691 pub fn new_tcp_stream(stream: std::net::TcpStream, addr: String) -> Self {
693 Self {
694 kind: IoHandleKind::TcpStream,
695 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::TcpStream(stream)))),
696 path: addr,
697 mode: "rw".to_string(),
698 }
699 }
700
701 pub fn new_tcp_listener(listener: std::net::TcpListener, addr: String) -> Self {
703 Self {
704 kind: IoHandleKind::TcpListener,
705 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::TcpListener(
706 listener,
707 )))),
708 path: addr,
709 mode: "listen".to_string(),
710 }
711 }
712
713 pub fn new_udp_socket(socket: std::net::UdpSocket, addr: String) -> Self {
715 Self {
716 kind: IoHandleKind::UdpSocket,
717 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::UdpSocket(socket)))),
718 path: addr,
719 mode: "rw".to_string(),
720 }
721 }
722
723 pub fn new_child_process(child: std::process::Child, cmd: String) -> Self {
725 Self {
726 kind: IoHandleKind::ChildProcess,
727 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::ChildProcess(child)))),
728 path: cmd,
729 mode: "process".to_string(),
730 }
731 }
732
733 pub fn new_pipe_reader(stdout: std::process::ChildStdout, label: String) -> Self {
735 Self {
736 kind: IoHandleKind::PipeReader,
737 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::PipeReader(stdout)))),
738 path: label,
739 mode: "r".to_string(),
740 }
741 }
742
743 pub fn new_pipe_writer(stdin: std::process::ChildStdin, label: String) -> Self {
745 Self {
746 kind: IoHandleKind::PipeWriter,
747 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::PipeWriter(stdin)))),
748 path: label,
749 mode: "w".to_string(),
750 }
751 }
752
753 pub fn new_pipe_reader_err(stderr: std::process::ChildStderr, label: String) -> Self {
755 Self {
756 kind: IoHandleKind::PipeReader,
757 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::PipeReaderErr(
758 stderr,
759 )))),
760 path: label,
761 mode: "r".to_string(),
762 }
763 }
764
765 pub fn new_custom(resource: Box<dyn std::any::Any + Send>, label: String) -> Self {
767 Self {
768 kind: IoHandleKind::Custom,
769 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::Custom(resource)))),
770 path: label,
771 mode: "custom".to_string(),
772 }
773 }
774
775 pub fn is_open(&self) -> bool {
777 self.resource.lock().map(|g| g.is_some()).unwrap_or(false)
778 }
779
780 pub fn close(&self) -> bool {
782 if let Ok(mut guard) = self.resource.lock() {
783 guard.take().is_some()
784 } else {
785 false
786 }
787 }
788}
789
790#[derive(Debug, Clone)]
795pub struct MutexData {
796 pub inner: Arc<std::sync::Mutex<ValueWord>>,
797}
798
799impl MutexData {
800 pub fn new(value: ValueWord) -> Self {
801 Self {
802 inner: Arc::new(std::sync::Mutex::new(value)),
803 }
804 }
805}
806
807#[derive(Debug, Clone)]
810pub struct AtomicData {
811 pub inner: Arc<std::sync::atomic::AtomicI64>,
812}
813
814impl AtomicData {
815 pub fn new(value: i64) -> Self {
816 Self {
817 inner: Arc::new(std::sync::atomic::AtomicI64::new(value)),
818 }
819 }
820}
821
822#[derive(Debug, Clone)]
825pub struct LazyData {
826 pub initializer: Arc<std::sync::Mutex<Option<ValueWord>>>,
828 pub value: Arc<std::sync::Mutex<Option<ValueWord>>>,
830}
831
832impl LazyData {
833 pub fn new(initializer: ValueWord) -> Self {
834 Self {
835 initializer: Arc::new(std::sync::Mutex::new(Some(initializer))),
836 value: Arc::new(std::sync::Mutex::new(None)),
837 }
838 }
839
840 pub fn is_initialized(&self) -> bool {
842 self.value.lock().map(|g| g.is_some()).unwrap_or(false)
843 }
844}
845
846#[derive(Debug, Clone)]
852pub enum ChannelData {
853 Sender {
854 tx: Arc<std::sync::mpsc::Sender<ValueWord>>,
855 closed: Arc<std::sync::atomic::AtomicBool>,
856 },
857 Receiver {
858 rx: Arc<std::sync::Mutex<std::sync::mpsc::Receiver<ValueWord>>>,
859 closed: Arc<std::sync::atomic::AtomicBool>,
860 },
861}
862
863impl ChannelData {
864 pub fn new_pair() -> (Self, Self) {
866 let (tx, rx) = std::sync::mpsc::channel();
867 let closed = Arc::new(std::sync::atomic::AtomicBool::new(false));
868 (
869 ChannelData::Sender {
870 tx: Arc::new(tx),
871 closed: closed.clone(),
872 },
873 ChannelData::Receiver {
874 rx: Arc::new(std::sync::Mutex::new(rx)),
875 closed,
876 },
877 )
878 }
879
880 pub fn is_closed(&self) -> bool {
882 match self {
883 ChannelData::Sender { closed, .. } | ChannelData::Receiver { closed, .. } => {
884 closed.load(std::sync::atomic::Ordering::Relaxed)
885 }
886 }
887 }
888
889 pub fn close(&self) {
891 match self {
892 ChannelData::Sender { closed, .. } | ChannelData::Receiver { closed, .. } => {
893 closed.store(true, std::sync::atomic::Ordering::Relaxed);
894 }
895 }
896 }
897
898 pub fn is_sender(&self) -> bool {
900 matches!(self, ChannelData::Sender { .. })
901 }
902}
903
904crate::define_heap_types!();
908
909#[inline]
913fn bigint_decimal_eq(a: &i64, b: &rust_decimal::Decimal) -> bool {
914 rust_decimal::Decimal::from(*a) == *b
915}
916
917#[inline]
919fn native_scalar_bigint_eq(a: &NativeScalar, b: &i64) -> bool {
920 a.as_i64().is_some_and(|v| v == *b)
921}
922
923#[inline]
925fn native_scalar_decimal_eq(a: &NativeScalar, b: &rust_decimal::Decimal) -> bool {
926 match a {
927 NativeScalar::F32(v) => {
928 rust_decimal::Decimal::from_f64_retain(*v as f64).is_some_and(|v| v == *b)
929 }
930 _ => a
931 .as_i128()
932 .map(|n| rust_decimal::Decimal::from_i128_with_scale(n, 0))
933 .is_some_and(|to_dec| to_dec == *b),
934 }
935}
936
937#[inline]
939fn int_float_array_eq(
940 ints: &crate::typed_buffer::TypedBuffer<i64>,
941 floats: &crate::typed_buffer::AlignedTypedBuffer,
942) -> bool {
943 ints.len() == floats.len()
944 && ints
945 .iter()
946 .zip(floats.iter())
947 .all(|(x, y)| (*x as f64) == *y)
948}
949
950#[inline]
952fn matrix_eq(a: &MatrixData, b: &MatrixData) -> bool {
953 a.rows == b.rows
954 && a.cols == b.cols
955 && a.data.len() == b.data.len()
956 && a.data.iter().zip(b.data.iter()).all(|(x, y)| x == y)
957}
958
959#[inline]
961fn native_view_eq(a: &NativeViewData, b: &NativeViewData) -> bool {
962 a.ptr == b.ptr && a.mutable == b.mutable && a.layout.name == b.layout.name
963}
964
965impl HeapValue {
968 pub fn structural_eq(&self, other: &HeapValue) -> bool {
973 match (self, other) {
974 (HeapValue::Char(a), HeapValue::Char(b)) => a == b,
975 (HeapValue::String(a), HeapValue::String(b)) => a == b,
976 (HeapValue::Char(c), HeapValue::String(s))
978 | (HeapValue::String(s), HeapValue::Char(c)) => {
979 let mut buf = [0u8; 4];
980 let cs = c.encode_utf8(&mut buf);
981 cs == s.as_str()
982 }
983 (HeapValue::Array(a), HeapValue::Array(b)) => {
984 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| x == y)
985 }
986 (HeapValue::Decimal(a), HeapValue::Decimal(b)) => a == b,
987 (HeapValue::BigInt(a), HeapValue::BigInt(b)) => a == b,
988 (HeapValue::Some(a), HeapValue::Some(b)) => a == b,
989 (HeapValue::Ok(a), HeapValue::Ok(b)) => a == b,
990 (HeapValue::Err(a), HeapValue::Err(b)) => a == b,
991 (HeapValue::NativeScalar(a), HeapValue::NativeScalar(b)) => a == b,
992 (HeapValue::NativeView(a), HeapValue::NativeView(b)) => native_view_eq(a, b),
993 (HeapValue::Mutex(a), HeapValue::Mutex(b)) => Arc::ptr_eq(&a.inner, &b.inner),
994 (HeapValue::Atomic(a), HeapValue::Atomic(b)) => Arc::ptr_eq(&a.inner, &b.inner),
995 (HeapValue::Lazy(a), HeapValue::Lazy(b)) => Arc::ptr_eq(&a.value, &b.value),
996 (HeapValue::Future(a), HeapValue::Future(b)) => a == b,
997 (HeapValue::ExprProxy(a), HeapValue::ExprProxy(b)) => a == b,
998 (HeapValue::Time(a), HeapValue::Time(b)) => a == b,
999 (HeapValue::HashMap(d1), HeapValue::HashMap(d2)) => {
1000 d1.keys.len() == d2.keys.len()
1001 && d1.keys.iter().zip(d2.keys.iter()).all(|(a, b)| a == b)
1002 && d1.values.iter().zip(d2.values.iter()).all(|(a, b)| a == b)
1003 }
1004 (HeapValue::Set(s1), HeapValue::Set(s2)) => {
1005 s1.items.len() == s2.items.len() && s1.items.iter().all(|item| s2.contains(item))
1006 }
1007 (HeapValue::Content(a), HeapValue::Content(b)) => a == b,
1008 (HeapValue::Instant(a), HeapValue::Instant(b)) => a == b,
1009 (HeapValue::IoHandle(a), HeapValue::IoHandle(b)) => {
1010 std::sync::Arc::ptr_eq(&a.resource, &b.resource)
1011 }
1012 (HeapValue::IntArray(a), HeapValue::IntArray(b)) => a == b,
1013 (HeapValue::FloatArray(a), HeapValue::FloatArray(b)) => a == b,
1014 (HeapValue::IntArray(a), HeapValue::FloatArray(b)) => int_float_array_eq(a, b),
1015 (HeapValue::FloatArray(a), HeapValue::IntArray(b)) => int_float_array_eq(b, a),
1016 (HeapValue::BoolArray(a), HeapValue::BoolArray(b)) => a == b,
1017 (HeapValue::I8Array(a), HeapValue::I8Array(b)) => a == b,
1018 (HeapValue::I16Array(a), HeapValue::I16Array(b)) => a == b,
1019 (HeapValue::I32Array(a), HeapValue::I32Array(b)) => a == b,
1020 (HeapValue::U8Array(a), HeapValue::U8Array(b)) => a == b,
1021 (HeapValue::U16Array(a), HeapValue::U16Array(b)) => a == b,
1022 (HeapValue::U32Array(a), HeapValue::U32Array(b)) => a == b,
1023 (HeapValue::U64Array(a), HeapValue::U64Array(b)) => a == b,
1024 (HeapValue::F32Array(a), HeapValue::F32Array(b)) => a == b,
1025 (HeapValue::Matrix(a), HeapValue::Matrix(b)) => matrix_eq(a, b),
1026 (
1027 HeapValue::FloatArraySlice {
1028 parent: p1,
1029 offset: o1,
1030 len: l1,
1031 },
1032 HeapValue::FloatArraySlice {
1033 parent: p2,
1034 offset: o2,
1035 len: l2,
1036 },
1037 ) => {
1038 let s1 = &p1.data[*o1 as usize..(*o1 + *l1) as usize];
1039 let s2 = &p2.data[*o2 as usize..(*o2 + *l2) as usize];
1040 s1 == s2
1041 }
1042 _ => false,
1043 }
1044 }
1045
1046 #[inline]
1048 pub fn equals(&self, other: &HeapValue) -> bool {
1049 match (self, other) {
1050 (HeapValue::Char(a), HeapValue::Char(b)) => a == b,
1051 (HeapValue::String(a), HeapValue::String(b)) => a == b,
1052 (HeapValue::Char(c), HeapValue::String(s))
1054 | (HeapValue::String(s), HeapValue::Char(c)) => {
1055 let mut buf = [0u8; 4];
1056 let cs = c.encode_utf8(&mut buf);
1057 cs == s.as_str()
1058 }
1059 (HeapValue::Array(a), HeapValue::Array(b)) => {
1060 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| x.vw_equals(y))
1061 }
1062 (
1063 HeapValue::TypedObject {
1064 schema_id: s1,
1065 slots: sl1,
1066 heap_mask: hm1,
1067 },
1068 HeapValue::TypedObject {
1069 schema_id: s2,
1070 slots: sl2,
1071 heap_mask: hm2,
1072 },
1073 ) => {
1074 if s1 != s2 || sl1.len() != sl2.len() || hm1 != hm2 {
1075 return false;
1076 }
1077 for i in 0..sl1.len() {
1078 let is_heap = (hm1 & (1u64 << i)) != 0;
1079 if is_heap {
1080 let a_nb = sl1[i].as_heap_nb();
1082 let b_nb = sl2[i].as_heap_nb();
1083 if !a_nb.vw_equals(&b_nb) {
1084 return false;
1085 }
1086 } else {
1087 if sl1[i].raw() != sl2[i].raw() {
1089 return false;
1090 }
1091 }
1092 }
1093 true
1094 }
1095 (
1096 HeapValue::Closure {
1097 function_id: f1, ..
1098 },
1099 HeapValue::Closure {
1100 function_id: f2, ..
1101 },
1102 ) => f1 == f2,
1103 (HeapValue::Decimal(a), HeapValue::Decimal(b)) => a == b,
1104 (HeapValue::BigInt(a), HeapValue::BigInt(b)) => a == b,
1105 (HeapValue::BigInt(a), HeapValue::Decimal(b)) => bigint_decimal_eq(a, b),
1106 (HeapValue::Decimal(a), HeapValue::BigInt(b)) => bigint_decimal_eq(b, a),
1107 (HeapValue::DataTable(a), HeapValue::DataTable(b)) => Arc::ptr_eq(a, b),
1108 (
1109 HeapValue::TypedTable {
1110 schema_id: s1,
1111 table: t1,
1112 },
1113 HeapValue::TypedTable {
1114 schema_id: s2,
1115 table: t2,
1116 },
1117 ) => s1 == s2 && Arc::ptr_eq(t1, t2),
1118 (
1119 HeapValue::RowView {
1120 schema_id: s1,
1121 row_idx: r1,
1122 table: t1,
1123 },
1124 HeapValue::RowView {
1125 schema_id: s2,
1126 row_idx: r2,
1127 table: t2,
1128 },
1129 ) => s1 == s2 && r1 == r2 && Arc::ptr_eq(t1, t2),
1130 (
1131 HeapValue::ColumnRef {
1132 schema_id: s1,
1133 col_id: c1,
1134 table: t1,
1135 },
1136 HeapValue::ColumnRef {
1137 schema_id: s2,
1138 col_id: c2,
1139 table: t2,
1140 },
1141 ) => s1 == s2 && c1 == c2 && Arc::ptr_eq(t1, t2),
1142 (
1143 HeapValue::IndexedTable {
1144 schema_id: s1,
1145 index_col: c1,
1146 table: t1,
1147 },
1148 HeapValue::IndexedTable {
1149 schema_id: s2,
1150 index_col: c2,
1151 table: t2,
1152 },
1153 ) => s1 == s2 && c1 == c2 && Arc::ptr_eq(t1, t2),
1154 (HeapValue::HashMap(d1), HeapValue::HashMap(d2)) => {
1155 d1.keys.len() == d2.keys.len()
1156 && d1
1157 .keys
1158 .iter()
1159 .zip(d2.keys.iter())
1160 .all(|(a, b)| a.vw_equals(b))
1161 && d1
1162 .values
1163 .iter()
1164 .zip(d2.values.iter())
1165 .all(|(a, b)| a.vw_equals(b))
1166 }
1167 (HeapValue::Set(s1), HeapValue::Set(s2)) => {
1168 s1.items.len() == s2.items.len() && s1.items.iter().all(|item| s2.contains(item))
1169 }
1170 (HeapValue::Content(a), HeapValue::Content(b)) => a == b,
1171 (HeapValue::Instant(a), HeapValue::Instant(b)) => a == b,
1172 (HeapValue::IoHandle(a), HeapValue::IoHandle(b)) => {
1173 Arc::ptr_eq(&a.resource, &b.resource)
1174 }
1175 (HeapValue::Mutex(a), HeapValue::Mutex(b)) => Arc::ptr_eq(&a.inner, &b.inner),
1176 (HeapValue::Atomic(a), HeapValue::Atomic(b)) => Arc::ptr_eq(&a.inner, &b.inner),
1177 (HeapValue::Lazy(a), HeapValue::Lazy(b)) => Arc::ptr_eq(&a.value, &b.value),
1178 (HeapValue::Range { .. }, HeapValue::Range { .. }) => false,
1179 (HeapValue::Enum(a), HeapValue::Enum(b)) => {
1180 a.enum_name == b.enum_name && a.variant == b.variant
1181 }
1182 (HeapValue::Some(a), HeapValue::Some(b)) => a.vw_equals(b),
1183 (HeapValue::Ok(a), HeapValue::Ok(b)) => a.vw_equals(b),
1184 (HeapValue::Err(a), HeapValue::Err(b)) => a.vw_equals(b),
1185 (HeapValue::Future(a), HeapValue::Future(b)) => a == b,
1186 (HeapValue::Time(a), HeapValue::Time(b)) => a == b,
1187 (HeapValue::Duration(a), HeapValue::Duration(b)) => a == b,
1188 (HeapValue::TimeSpan(a), HeapValue::TimeSpan(b)) => a == b,
1189 (HeapValue::Timeframe(a), HeapValue::Timeframe(b)) => a == b,
1190 (HeapValue::FunctionRef { name: n1, .. }, HeapValue::FunctionRef { name: n2, .. }) => {
1191 n1 == n2
1192 }
1193 (HeapValue::ProjectedRef(a), HeapValue::ProjectedRef(b)) => a == b,
1194 (HeapValue::DataReference(a), HeapValue::DataReference(b)) => {
1195 a.datetime == b.datetime && a.id == b.id && a.timeframe == b.timeframe
1196 }
1197 (HeapValue::NativeScalar(a), HeapValue::NativeScalar(b)) => a == b,
1198 (HeapValue::NativeView(a), HeapValue::NativeView(b)) => native_view_eq(a, b),
1199 (HeapValue::IntArray(a), HeapValue::IntArray(b)) => a == b,
1200 (HeapValue::FloatArray(a), HeapValue::FloatArray(b)) => a == b,
1201 (HeapValue::IntArray(a), HeapValue::FloatArray(b)) => int_float_array_eq(a, b),
1202 (HeapValue::FloatArray(a), HeapValue::IntArray(b)) => int_float_array_eq(b, a),
1203 (HeapValue::BoolArray(a), HeapValue::BoolArray(b)) => a == b,
1204 (HeapValue::I8Array(a), HeapValue::I8Array(b)) => a == b,
1205 (HeapValue::I16Array(a), HeapValue::I16Array(b)) => a == b,
1206 (HeapValue::I32Array(a), HeapValue::I32Array(b)) => a == b,
1207 (HeapValue::U8Array(a), HeapValue::U8Array(b)) => a == b,
1208 (HeapValue::U16Array(a), HeapValue::U16Array(b)) => a == b,
1209 (HeapValue::U32Array(a), HeapValue::U32Array(b)) => a == b,
1210 (HeapValue::U64Array(a), HeapValue::U64Array(b)) => a == b,
1211 (HeapValue::F32Array(a), HeapValue::F32Array(b)) => a == b,
1212 (HeapValue::Matrix(a), HeapValue::Matrix(b)) => matrix_eq(a, b),
1213 (
1214 HeapValue::FloatArraySlice {
1215 parent: p1,
1216 offset: o1,
1217 len: l1,
1218 },
1219 HeapValue::FloatArraySlice {
1220 parent: p2,
1221 offset: o2,
1222 len: l2,
1223 },
1224 ) => {
1225 let s1 = &p1.data[*o1 as usize..(*o1 + *l1) as usize];
1226 let s2 = &p2.data[*o2 as usize..(*o2 + *l2) as usize];
1227 s1 == s2
1228 }
1229 (HeapValue::NativeScalar(a), HeapValue::BigInt(b)) => native_scalar_bigint_eq(a, b),
1231 (HeapValue::BigInt(a), HeapValue::NativeScalar(b)) => native_scalar_bigint_eq(b, a),
1232 (HeapValue::NativeScalar(a), HeapValue::Decimal(b)) => {
1233 native_scalar_decimal_eq(a, b)
1234 }
1235 (HeapValue::Decimal(a), HeapValue::NativeScalar(b)) => {
1236 native_scalar_decimal_eq(b, a)
1237 }
1238 _ => false,
1239 }
1240 }
1241}