1use crate::aligned_vec::AlignedVec;
14use crate::value_word::ValueWord;
15use chrono::{DateTime, FixedOffset};
16use shape_ast::data::Timeframe;
17use std::collections::HashMap;
18use std::sync::Arc;
19
20#[derive(Debug, Clone)]
24pub struct MatrixData {
25 pub data: AlignedVec<f64>,
26 pub rows: u32,
27 pub cols: u32,
28}
29
30impl MatrixData {
31 pub fn new(rows: u32, cols: u32) -> Self {
33 let len = (rows as usize) * (cols as usize);
34 let mut data = AlignedVec::with_capacity(len);
35 for _ in 0..len {
36 data.push(0.0);
37 }
38 Self { data, rows, cols }
39 }
40
41 pub fn from_flat(data: AlignedVec<f64>, rows: u32, cols: u32) -> Self {
43 debug_assert_eq!(data.len(), (rows as usize) * (cols as usize));
44 Self { data, rows, cols }
45 }
46
47 #[inline]
49 pub fn get(&self, row: u32, col: u32) -> f64 {
50 self.data[(row as usize) * (self.cols as usize) + (col as usize)]
51 }
52
53 #[inline]
55 pub fn set(&mut self, row: u32, col: u32, val: f64) {
56 self.data[(row as usize) * (self.cols as usize) + (col as usize)] = val;
57 }
58
59 #[inline]
61 pub fn row_slice(&self, row: u32) -> &[f64] {
62 let start = (row as usize) * (self.cols as usize);
63 &self.data[start..start + self.cols as usize]
64 }
65
66 #[inline]
68 pub fn shape(&self) -> (u32, u32) {
69 (self.rows, self.cols)
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct IteratorState {
76 pub source: ValueWord,
77 pub position: usize,
78 pub transforms: Vec<IteratorTransform>,
79 pub done: bool,
80}
81
82#[derive(Debug, Clone)]
84pub enum IteratorTransform {
85 Map(ValueWord),
86 Filter(ValueWord),
87 Take(usize),
88 Skip(usize),
89 FlatMap(ValueWord),
90}
91
92#[derive(Debug, Clone)]
94pub struct GeneratorState {
95 pub function_id: u16,
96 pub state: u16,
97 pub locals: Box<[ValueWord]>,
98 pub result: Option<Box<ValueWord>>,
99}
100
101#[derive(Debug, Clone)]
103pub struct SimulationCallData {
104 pub name: String,
105 pub params: HashMap<String, ValueWord>,
106}
107
108#[derive(Debug, Clone)]
110pub struct DataReferenceData {
111 pub datetime: DateTime<FixedOffset>,
112 pub id: String,
113 pub timeframe: Timeframe,
114}
115
116#[derive(Debug, Clone)]
122pub struct HashMapData {
123 pub keys: Vec<ValueWord>,
124 pub values: Vec<ValueWord>,
125 pub index: HashMap<u64, Vec<usize>>,
126 pub shape_id: Option<crate::shape_graph::ShapeId>,
129}
130
131impl HashMapData {
132 #[inline]
134 pub fn find_key(&self, key: &ValueWord) -> Option<usize> {
135 let hash = key.vw_hash();
136 let bucket = self.index.get(&hash)?;
137 bucket
138 .iter()
139 .copied()
140 .find(|&idx| self.keys[idx].vw_equals(key))
141 }
142
143 pub fn rebuild_index(keys: &[ValueWord]) -> HashMap<u64, Vec<usize>> {
145 let mut index: HashMap<u64, Vec<usize>> = HashMap::new();
146 for (i, k) in keys.iter().enumerate() {
147 index.entry(k.vw_hash()).or_default().push(i);
148 }
149 index
150 }
151
152 pub fn compute_shape(keys: &[ValueWord]) -> Option<crate::shape_graph::ShapeId> {
157 if keys.is_empty() || keys.len() > 64 {
158 return None;
159 }
160 let mut key_hashes = Vec::with_capacity(keys.len());
161 for k in keys {
162 if let Some(s) = k.as_str() {
163 key_hashes.push(crate::shape_graph::hash_property_name(s));
164 } else {
165 return None; }
167 }
168 crate::shape_graph::shape_for_hashmap_keys(&key_hashes)
169 }
170
171 #[inline]
176 pub fn shape_get(&self, property: &str) -> Option<&ValueWord> {
177 let shape_id = self.shape_id?;
178 let prop_hash = crate::shape_graph::hash_property_name(property);
179 let idx = crate::shape_graph::shape_property_index(shape_id, prop_hash)?;
180 self.values.get(idx)
181 }
182}
183
184#[derive(Debug, Clone)]
188pub struct SetData {
189 pub items: Vec<ValueWord>,
190 pub index: HashMap<u64, Vec<usize>>,
191}
192
193impl SetData {
194 #[inline]
196 pub fn contains(&self, item: &ValueWord) -> bool {
197 let hash = item.vw_hash();
198 if let Some(bucket) = self.index.get(&hash) {
199 bucket.iter().any(|&idx| self.items[idx].vw_equals(item))
200 } else {
201 false
202 }
203 }
204
205 pub fn insert(&mut self, item: ValueWord) -> bool {
207 if self.contains(&item) {
208 return false;
209 }
210 let hash = item.vw_hash();
211 let idx = self.items.len();
212 self.items.push(item);
213 self.index.entry(hash).or_default().push(idx);
214 true
215 }
216
217 pub fn remove(&mut self, item: &ValueWord) -> bool {
219 let hash = item.vw_hash();
220 if let Some(bucket) = self.index.get(&hash) {
221 if let Some(&idx) = bucket.iter().find(|&&idx| self.items[idx].vw_equals(item)) {
222 self.items.swap_remove(idx);
223 self.rebuild_index_from_items();
224 return true;
225 }
226 }
227 false
228 }
229
230 pub fn rebuild_index(items: &[ValueWord]) -> HashMap<u64, Vec<usize>> {
232 let mut index: HashMap<u64, Vec<usize>> = HashMap::new();
233 for (i, k) in items.iter().enumerate() {
234 index.entry(k.vw_hash()).or_default().push(i);
235 }
236 index
237 }
238
239 fn rebuild_index_from_items(&mut self) {
240 self.index = Self::rebuild_index(&self.items);
241 }
242
243 pub fn from_items(items: Vec<ValueWord>) -> Self {
245 let mut set = SetData {
246 items: Vec::with_capacity(items.len()),
247 index: HashMap::new(),
248 };
249 for item in items {
250 set.insert(item);
251 }
252 set
253 }
254}
255
256#[derive(Debug, Clone)]
261pub struct PriorityQueueData {
262 pub items: Vec<ValueWord>,
263}
264
265impl PriorityQueueData {
266 pub fn new() -> Self {
267 PriorityQueueData { items: Vec::new() }
268 }
269
270 pub fn from_items(mut items: Vec<ValueWord>) -> Self {
271 let mut pq = PriorityQueueData { items };
272 pq.heapify();
273 pq
274 }
275
276 #[inline]
279 fn cmp_items(a: &ValueWord, b: &ValueWord) -> std::cmp::Ordering {
280 match (a.as_number_coerce(), b.as_number_coerce()) {
281 (Some(fa), Some(fb)) => fa.partial_cmp(&fb).unwrap_or(std::cmp::Ordering::Equal),
282 (Some(_), None) => std::cmp::Ordering::Less,
283 (None, Some(_)) => std::cmp::Ordering::Greater,
284 (None, None) => {
285 let sa = format!("{}", a);
287 let sb = format!("{}", b);
288 sa.cmp(&sb)
289 }
290 }
291 }
292
293 pub fn push(&mut self, item: ValueWord) {
295 self.items.push(item);
296 self.sift_up(self.items.len() - 1);
297 }
298
299 pub fn pop(&mut self) -> Option<ValueWord> {
301 if self.items.is_empty() {
302 return None;
303 }
304 let last = self.items.len() - 1;
305 self.items.swap(0, last);
306 let result = self.items.pop();
307 if !self.items.is_empty() {
308 self.sift_down(0);
309 }
310 result
311 }
312
313 pub fn peek(&self) -> Option<&ValueWord> {
315 self.items.first()
316 }
317
318 fn sift_up(&mut self, mut idx: usize) {
319 while idx > 0 {
320 let parent = (idx - 1) / 2;
321 if Self::cmp_items(&self.items[idx], &self.items[parent]) == std::cmp::Ordering::Less {
322 self.items.swap(idx, parent);
323 idx = parent;
324 } else {
325 break;
326 }
327 }
328 }
329
330 fn sift_down(&mut self, mut idx: usize) {
331 let len = self.items.len();
332 loop {
333 let left = 2 * idx + 1;
334 let right = 2 * idx + 2;
335 let mut smallest = idx;
336
337 if left < len
338 && Self::cmp_items(&self.items[left], &self.items[smallest])
339 == std::cmp::Ordering::Less
340 {
341 smallest = left;
342 }
343 if right < len
344 && Self::cmp_items(&self.items[right], &self.items[smallest])
345 == std::cmp::Ordering::Less
346 {
347 smallest = right;
348 }
349
350 if smallest != idx {
351 self.items.swap(idx, smallest);
352 idx = smallest;
353 } else {
354 break;
355 }
356 }
357 }
358
359 fn heapify(&mut self) {
360 if self.items.len() <= 1 {
361 return;
362 }
363 for i in (0..self.items.len() / 2).rev() {
364 self.sift_down(i);
365 }
366 }
367}
368
369#[derive(Debug, Clone)]
371pub struct DequeData {
372 pub items: std::collections::VecDeque<ValueWord>,
373}
374
375impl DequeData {
376 pub fn new() -> Self {
377 DequeData {
378 items: std::collections::VecDeque::new(),
379 }
380 }
381
382 pub fn from_items(items: Vec<ValueWord>) -> Self {
383 DequeData {
384 items: items.into(),
385 }
386 }
387}
388
389#[derive(Debug, Clone, Copy, PartialEq)]
394pub enum NativeScalar {
395 I8(i8),
396 U8(u8),
397 I16(i16),
398 U16(u16),
399 I32(i32),
400 I64(i64),
401 U32(u32),
402 U64(u64),
403 Isize(isize),
404 Usize(usize),
405 Ptr(usize),
406 F32(f32),
407}
408
409impl NativeScalar {
410 #[inline]
411 pub fn type_name(&self) -> &'static str {
412 match self {
413 NativeScalar::I8(_) => "i8",
414 NativeScalar::U8(_) => "u8",
415 NativeScalar::I16(_) => "i16",
416 NativeScalar::U16(_) => "u16",
417 NativeScalar::I32(_) => "i32",
418 NativeScalar::I64(_) => "i64",
419 NativeScalar::U32(_) => "u32",
420 NativeScalar::U64(_) => "u64",
421 NativeScalar::Isize(_) => "isize",
422 NativeScalar::Usize(_) => "usize",
423 NativeScalar::Ptr(_) => "ptr",
424 NativeScalar::F32(_) => "f32",
425 }
426 }
427
428 #[inline]
429 pub fn is_truthy(&self) -> bool {
430 match self {
431 NativeScalar::I8(v) => *v != 0,
432 NativeScalar::U8(v) => *v != 0,
433 NativeScalar::I16(v) => *v != 0,
434 NativeScalar::U16(v) => *v != 0,
435 NativeScalar::I32(v) => *v != 0,
436 NativeScalar::I64(v) => *v != 0,
437 NativeScalar::U32(v) => *v != 0,
438 NativeScalar::U64(v) => *v != 0,
439 NativeScalar::Isize(v) => *v != 0,
440 NativeScalar::Usize(v) => *v != 0,
441 NativeScalar::Ptr(v) => *v != 0,
442 NativeScalar::F32(v) => *v != 0.0 && !v.is_nan(),
443 }
444 }
445
446 #[inline]
447 pub fn as_i64(&self) -> Option<i64> {
448 match self {
449 NativeScalar::I8(v) => Some(*v as i64),
450 NativeScalar::U8(v) => Some(*v as i64),
451 NativeScalar::I16(v) => Some(*v as i64),
452 NativeScalar::U16(v) => Some(*v as i64),
453 NativeScalar::I32(v) => Some(*v as i64),
454 NativeScalar::I64(v) => Some(*v),
455 NativeScalar::U32(v) => Some(*v as i64),
456 NativeScalar::U64(v) => i64::try_from(*v).ok(),
457 NativeScalar::Isize(v) => i64::try_from(*v).ok(),
458 NativeScalar::Usize(v) => i64::try_from(*v).ok(),
459 NativeScalar::Ptr(v) => i64::try_from(*v).ok(),
460 NativeScalar::F32(_) => None,
461 }
462 }
463
464 #[inline]
465 pub fn as_u64(&self) -> Option<u64> {
466 match self {
467 NativeScalar::U8(v) => Some(*v as u64),
468 NativeScalar::U16(v) => Some(*v as u64),
469 NativeScalar::U32(v) => Some(*v as u64),
470 NativeScalar::U64(v) => Some(*v),
471 NativeScalar::Usize(v) => Some(*v as u64),
472 NativeScalar::Ptr(v) => Some(*v as u64),
473 NativeScalar::I8(v) if *v >= 0 => Some(*v as u64),
474 NativeScalar::I16(v) if *v >= 0 => Some(*v as u64),
475 NativeScalar::I32(v) if *v >= 0 => Some(*v as u64),
476 NativeScalar::I64(v) if *v >= 0 => Some(*v as u64),
477 NativeScalar::Isize(v) if *v >= 0 => Some(*v as u64),
478 _ => None,
479 }
480 }
481
482 #[inline]
483 pub fn as_i128(&self) -> Option<i128> {
484 match self {
485 NativeScalar::I8(v) => Some(*v as i128),
486 NativeScalar::U8(v) => Some(*v as i128),
487 NativeScalar::I16(v) => Some(*v as i128),
488 NativeScalar::U16(v) => Some(*v as i128),
489 NativeScalar::I32(v) => Some(*v as i128),
490 NativeScalar::U32(v) => Some(*v as i128),
491 NativeScalar::I64(v) => Some(*v as i128),
492 NativeScalar::U64(v) => Some(*v as i128),
493 NativeScalar::Isize(v) => Some(*v as i128),
494 NativeScalar::Usize(v) => Some(*v as i128),
495 NativeScalar::Ptr(v) => Some(*v as i128),
496 NativeScalar::F32(_) => None,
497 }
498 }
499
500 #[inline]
501 pub fn as_f64(&self) -> f64 {
502 match self {
503 NativeScalar::I8(v) => *v as f64,
504 NativeScalar::U8(v) => *v as f64,
505 NativeScalar::I16(v) => *v as f64,
506 NativeScalar::U16(v) => *v as f64,
507 NativeScalar::I32(v) => *v as f64,
508 NativeScalar::I64(v) => *v as f64,
509 NativeScalar::U32(v) => *v as f64,
510 NativeScalar::U64(v) => *v as f64,
511 NativeScalar::Isize(v) => *v as f64,
512 NativeScalar::Usize(v) => *v as f64,
513 NativeScalar::Ptr(v) => *v as f64,
514 NativeScalar::F32(v) => *v as f64,
515 }
516 }
517}
518
519impl std::fmt::Display for NativeScalar {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 match self {
522 NativeScalar::I8(v) => write!(f, "{v}"),
523 NativeScalar::U8(v) => write!(f, "{v}"),
524 NativeScalar::I16(v) => write!(f, "{v}"),
525 NativeScalar::U16(v) => write!(f, "{v}"),
526 NativeScalar::I32(v) => write!(f, "{v}"),
527 NativeScalar::I64(v) => write!(f, "{v}"),
528 NativeScalar::U32(v) => write!(f, "{v}"),
529 NativeScalar::U64(v) => write!(f, "{v}"),
530 NativeScalar::Isize(v) => write!(f, "{v}"),
531 NativeScalar::Usize(v) => write!(f, "{v}"),
532 NativeScalar::Ptr(v) => write!(f, "0x{v:x}"),
533 NativeScalar::F32(v) => write!(f, "{v}"),
534 }
535 }
536}
537
538#[derive(Debug, Clone)]
540pub struct NativeLayoutField {
541 pub name: String,
542 pub c_type: String,
543 pub offset: u32,
544 pub size: u32,
545 pub align: u32,
546}
547
548#[derive(Debug, Clone)]
550pub struct NativeTypeLayout {
551 pub name: String,
552 pub abi: String,
553 pub size: u32,
554 pub align: u32,
555 pub fields: Vec<NativeLayoutField>,
556}
557
558impl NativeTypeLayout {
559 #[inline]
560 pub fn field(&self, name: &str) -> Option<&NativeLayoutField> {
561 self.fields.iter().find(|field| field.name == name)
562 }
563}
564
565#[derive(Debug, Clone)]
567pub struct NativeViewData {
568 pub ptr: usize,
569 pub layout: Arc<NativeTypeLayout>,
570 pub mutable: bool,
571}
572
573#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575#[repr(u8)]
576pub enum IoHandleKind {
577 File = 0,
578 TcpStream = 1,
579 TcpListener = 2,
580 UdpSocket = 3,
581 ChildProcess = 4,
582 PipeReader = 5,
583 PipeWriter = 6,
584 Custom = 7,
585}
586
587pub enum IoResource {
589 File(std::fs::File),
590 TcpStream(std::net::TcpStream),
591 TcpListener(std::net::TcpListener),
592 UdpSocket(std::net::UdpSocket),
593 ChildProcess(std::process::Child),
594 PipeReader(std::process::ChildStdout),
595 PipeWriter(std::process::ChildStdin),
596 PipeReaderErr(std::process::ChildStderr),
597 Custom(Box<dyn std::any::Any + Send>),
599}
600
601impl std::fmt::Debug for IoResource {
602 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
603 match self {
604 IoResource::File(_) => write!(f, "File(...)"),
605 IoResource::TcpStream(_) => write!(f, "TcpStream(...)"),
606 IoResource::TcpListener(_) => write!(f, "TcpListener(...)"),
607 IoResource::UdpSocket(_) => write!(f, "UdpSocket(...)"),
608 IoResource::ChildProcess(_) => write!(f, "ChildProcess(...)"),
609 IoResource::PipeReader(_) => write!(f, "PipeReader(...)"),
610 IoResource::PipeWriter(_) => write!(f, "PipeWriter(...)"),
611 IoResource::PipeReaderErr(_) => write!(f, "PipeReaderErr(...)"),
612 IoResource::Custom(_) => write!(f, "Custom(...)"),
613 }
614 }
615}
616
617#[derive(Clone)]
623pub struct IoHandleData {
624 pub kind: IoHandleKind,
625 pub resource: Arc<std::sync::Mutex<Option<IoResource>>>,
626 pub path: String,
627 pub mode: String,
628}
629
630impl std::fmt::Debug for IoHandleData {
631 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632 f.debug_struct("IoHandleData")
633 .field("kind", &self.kind)
634 .field("path", &self.path)
635 .field("mode", &self.mode)
636 .field(
637 "open",
638 &self.resource.lock().map(|g| g.is_some()).unwrap_or(false),
639 )
640 .finish()
641 }
642}
643
644impl IoHandleData {
645 pub fn new_file(file: std::fs::File, path: String, mode: String) -> Self {
647 Self {
648 kind: IoHandleKind::File,
649 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::File(file)))),
650 path,
651 mode,
652 }
653 }
654
655 pub fn new_tcp_stream(stream: std::net::TcpStream, addr: String) -> Self {
657 Self {
658 kind: IoHandleKind::TcpStream,
659 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::TcpStream(stream)))),
660 path: addr,
661 mode: "rw".to_string(),
662 }
663 }
664
665 pub fn new_tcp_listener(listener: std::net::TcpListener, addr: String) -> Self {
667 Self {
668 kind: IoHandleKind::TcpListener,
669 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::TcpListener(
670 listener,
671 )))),
672 path: addr,
673 mode: "listen".to_string(),
674 }
675 }
676
677 pub fn new_udp_socket(socket: std::net::UdpSocket, addr: String) -> Self {
679 Self {
680 kind: IoHandleKind::UdpSocket,
681 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::UdpSocket(socket)))),
682 path: addr,
683 mode: "rw".to_string(),
684 }
685 }
686
687 pub fn new_child_process(child: std::process::Child, cmd: String) -> Self {
689 Self {
690 kind: IoHandleKind::ChildProcess,
691 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::ChildProcess(child)))),
692 path: cmd,
693 mode: "process".to_string(),
694 }
695 }
696
697 pub fn new_pipe_reader(stdout: std::process::ChildStdout, label: String) -> Self {
699 Self {
700 kind: IoHandleKind::PipeReader,
701 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::PipeReader(stdout)))),
702 path: label,
703 mode: "r".to_string(),
704 }
705 }
706
707 pub fn new_pipe_writer(stdin: std::process::ChildStdin, label: String) -> Self {
709 Self {
710 kind: IoHandleKind::PipeWriter,
711 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::PipeWriter(stdin)))),
712 path: label,
713 mode: "w".to_string(),
714 }
715 }
716
717 pub fn new_pipe_reader_err(stderr: std::process::ChildStderr, label: String) -> Self {
719 Self {
720 kind: IoHandleKind::PipeReader,
721 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::PipeReaderErr(
722 stderr,
723 )))),
724 path: label,
725 mode: "r".to_string(),
726 }
727 }
728
729 pub fn new_custom(resource: Box<dyn std::any::Any + Send>, label: String) -> Self {
731 Self {
732 kind: IoHandleKind::Custom,
733 resource: Arc::new(std::sync::Mutex::new(Some(IoResource::Custom(resource)))),
734 path: label,
735 mode: "custom".to_string(),
736 }
737 }
738
739 pub fn is_open(&self) -> bool {
741 self.resource.lock().map(|g| g.is_some()).unwrap_or(false)
742 }
743
744 pub fn close(&self) -> bool {
746 if let Ok(mut guard) = self.resource.lock() {
747 guard.take().is_some()
748 } else {
749 false
750 }
751 }
752}
753
754#[derive(Debug, Clone)]
759pub struct MutexData {
760 pub inner: Arc<std::sync::Mutex<ValueWord>>,
761}
762
763impl MutexData {
764 pub fn new(value: ValueWord) -> Self {
765 Self {
766 inner: Arc::new(std::sync::Mutex::new(value)),
767 }
768 }
769}
770
771#[derive(Debug, Clone)]
774pub struct AtomicData {
775 pub inner: Arc<std::sync::atomic::AtomicI64>,
776}
777
778impl AtomicData {
779 pub fn new(value: i64) -> Self {
780 Self {
781 inner: Arc::new(std::sync::atomic::AtomicI64::new(value)),
782 }
783 }
784}
785
786#[derive(Debug, Clone)]
789pub struct LazyData {
790 pub initializer: Arc<std::sync::Mutex<Option<ValueWord>>>,
792 pub value: Arc<std::sync::Mutex<Option<ValueWord>>>,
794}
795
796impl LazyData {
797 pub fn new(initializer: ValueWord) -> Self {
798 Self {
799 initializer: Arc::new(std::sync::Mutex::new(Some(initializer))),
800 value: Arc::new(std::sync::Mutex::new(None)),
801 }
802 }
803
804 pub fn is_initialized(&self) -> bool {
806 self.value.lock().map(|g| g.is_some()).unwrap_or(false)
807 }
808}
809
810#[derive(Debug, Clone)]
816pub enum ChannelData {
817 Sender {
818 tx: Arc<std::sync::mpsc::Sender<ValueWord>>,
819 closed: Arc<std::sync::atomic::AtomicBool>,
820 },
821 Receiver {
822 rx: Arc<std::sync::Mutex<std::sync::mpsc::Receiver<ValueWord>>>,
823 closed: Arc<std::sync::atomic::AtomicBool>,
824 },
825}
826
827impl ChannelData {
828 pub fn new_pair() -> (Self, Self) {
830 let (tx, rx) = std::sync::mpsc::channel();
831 let closed = Arc::new(std::sync::atomic::AtomicBool::new(false));
832 (
833 ChannelData::Sender {
834 tx: Arc::new(tx),
835 closed: closed.clone(),
836 },
837 ChannelData::Receiver {
838 rx: Arc::new(std::sync::Mutex::new(rx)),
839 closed,
840 },
841 )
842 }
843
844 pub fn is_closed(&self) -> bool {
846 match self {
847 ChannelData::Sender { closed, .. } | ChannelData::Receiver { closed, .. } => {
848 closed.load(std::sync::atomic::Ordering::Relaxed)
849 }
850 }
851 }
852
853 pub fn close(&self) {
855 match self {
856 ChannelData::Sender { closed, .. } | ChannelData::Receiver { closed, .. } => {
857 closed.store(true, std::sync::atomic::Ordering::Relaxed);
858 }
859 }
860 }
861
862 pub fn is_sender(&self) -> bool {
864 matches!(self, ChannelData::Sender { .. })
865 }
866}
867
868crate::define_heap_types!();
872
873impl HeapValue {
876 pub fn structural_eq(&self, other: &HeapValue) -> bool {
881 match (self, other) {
882 (HeapValue::String(a), HeapValue::String(b)) => a == b,
883 (HeapValue::Array(a), HeapValue::Array(b)) => {
884 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| x == y)
885 }
886 (HeapValue::Decimal(a), HeapValue::Decimal(b)) => a == b,
887 (HeapValue::BigInt(a), HeapValue::BigInt(b)) => a == b,
888 (HeapValue::Some(a), HeapValue::Some(b)) => a == b,
889 (HeapValue::Ok(a), HeapValue::Ok(b)) => a == b,
890 (HeapValue::Err(a), HeapValue::Err(b)) => a == b,
891 (HeapValue::NativeScalar(a), HeapValue::NativeScalar(b)) => a == b,
892 (HeapValue::NativeView(a), HeapValue::NativeView(b)) => {
893 a.ptr == b.ptr && a.mutable == b.mutable && a.layout.name == b.layout.name
894 }
895 (HeapValue::Mutex(a), HeapValue::Mutex(b)) => Arc::ptr_eq(&a.inner, &b.inner),
896 (HeapValue::Atomic(a), HeapValue::Atomic(b)) => Arc::ptr_eq(&a.inner, &b.inner),
897 (HeapValue::Lazy(a), HeapValue::Lazy(b)) => Arc::ptr_eq(&a.value, &b.value),
898 (HeapValue::Future(a), HeapValue::Future(b)) => a == b,
899 (HeapValue::ExprProxy(a), HeapValue::ExprProxy(b)) => a == b,
900 (HeapValue::Time(a), HeapValue::Time(b)) => a == b,
901 (HeapValue::HashMap(d1), HeapValue::HashMap(d2)) => {
902 d1.keys.len() == d2.keys.len()
903 && d1.keys.iter().zip(d2.keys.iter()).all(|(a, b)| a == b)
904 && d1.values.iter().zip(d2.values.iter()).all(|(a, b)| a == b)
905 }
906 (HeapValue::Set(s1), HeapValue::Set(s2)) => {
907 s1.items.len() == s2.items.len() && s1.items.iter().all(|item| s2.contains(item))
908 }
909 (HeapValue::Content(a), HeapValue::Content(b)) => a == b,
910 (HeapValue::Instant(a), HeapValue::Instant(b)) => a == b,
911 (HeapValue::IoHandle(a), HeapValue::IoHandle(b)) => {
912 std::sync::Arc::ptr_eq(&a.resource, &b.resource)
913 }
914 (HeapValue::IntArray(a), HeapValue::IntArray(b)) => a == b,
915 (HeapValue::FloatArray(a), HeapValue::FloatArray(b)) => a == b,
916 (HeapValue::IntArray(a), HeapValue::FloatArray(b)) => {
917 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| (*x as f64) == *y)
918 }
919 (HeapValue::FloatArray(a), HeapValue::IntArray(b)) => {
920 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| *x == (*y as f64))
921 }
922 (HeapValue::BoolArray(a), HeapValue::BoolArray(b)) => a == b,
923 (HeapValue::I8Array(a), HeapValue::I8Array(b)) => a == b,
924 (HeapValue::I16Array(a), HeapValue::I16Array(b)) => a == b,
925 (HeapValue::I32Array(a), HeapValue::I32Array(b)) => a == b,
926 (HeapValue::U8Array(a), HeapValue::U8Array(b)) => a == b,
927 (HeapValue::U16Array(a), HeapValue::U16Array(b)) => a == b,
928 (HeapValue::U32Array(a), HeapValue::U32Array(b)) => a == b,
929 (HeapValue::U64Array(a), HeapValue::U64Array(b)) => a == b,
930 (HeapValue::F32Array(a), HeapValue::F32Array(b)) => a == b,
931 (HeapValue::Matrix(a), HeapValue::Matrix(b)) => {
932 a.rows == b.rows
933 && a.cols == b.cols
934 && a.data.len() == b.data.len()
935 && a.data.iter().zip(b.data.iter()).all(|(x, y)| x == y)
936 }
937 _ => false,
938 }
939 }
940
941 #[inline]
943 pub fn equals(&self, other: &HeapValue) -> bool {
944 match (self, other) {
945 (HeapValue::String(a), HeapValue::String(b)) => a == b,
946 (HeapValue::Array(a), HeapValue::Array(b)) => {
947 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| x.vw_equals(y))
948 }
949 (
950 HeapValue::TypedObject {
951 schema_id: s1,
952 slots: sl1,
953 heap_mask: hm1,
954 },
955 HeapValue::TypedObject {
956 schema_id: s2,
957 slots: sl2,
958 heap_mask: hm2,
959 },
960 ) => {
961 if s1 != s2 || sl1.len() != sl2.len() || hm1 != hm2 {
962 return false;
963 }
964 for i in 0..sl1.len() {
965 let is_heap = (hm1 & (1u64 << i)) != 0;
966 if is_heap {
967 let a_nb = sl1[i].as_heap_nb();
969 let b_nb = sl2[i].as_heap_nb();
970 if !a_nb.vw_equals(&b_nb) {
971 return false;
972 }
973 } else {
974 if sl1[i].raw() != sl2[i].raw() {
976 return false;
977 }
978 }
979 }
980 true
981 }
982 (
983 HeapValue::Closure {
984 function_id: f1, ..
985 },
986 HeapValue::Closure {
987 function_id: f2, ..
988 },
989 ) => f1 == f2,
990 (HeapValue::Decimal(a), HeapValue::Decimal(b)) => a == b,
991 (HeapValue::BigInt(a), HeapValue::BigInt(b)) => a == b,
992 (HeapValue::BigInt(a), HeapValue::Decimal(b)) => rust_decimal::Decimal::from(*a) == *b,
993 (HeapValue::Decimal(a), HeapValue::BigInt(b)) => *a == rust_decimal::Decimal::from(*b),
994 (HeapValue::DataTable(a), HeapValue::DataTable(b)) => Arc::ptr_eq(a, b),
995 (
996 HeapValue::TypedTable {
997 schema_id: s1,
998 table: t1,
999 },
1000 HeapValue::TypedTable {
1001 schema_id: s2,
1002 table: t2,
1003 },
1004 ) => s1 == s2 && Arc::ptr_eq(t1, t2),
1005 (
1006 HeapValue::RowView {
1007 schema_id: s1,
1008 row_idx: r1,
1009 table: t1,
1010 },
1011 HeapValue::RowView {
1012 schema_id: s2,
1013 row_idx: r2,
1014 table: t2,
1015 },
1016 ) => s1 == s2 && r1 == r2 && Arc::ptr_eq(t1, t2),
1017 (
1018 HeapValue::ColumnRef {
1019 schema_id: s1,
1020 col_id: c1,
1021 table: t1,
1022 },
1023 HeapValue::ColumnRef {
1024 schema_id: s2,
1025 col_id: c2,
1026 table: t2,
1027 },
1028 ) => s1 == s2 && c1 == c2 && Arc::ptr_eq(t1, t2),
1029 (
1030 HeapValue::IndexedTable {
1031 schema_id: s1,
1032 index_col: c1,
1033 table: t1,
1034 },
1035 HeapValue::IndexedTable {
1036 schema_id: s2,
1037 index_col: c2,
1038 table: t2,
1039 },
1040 ) => s1 == s2 && c1 == c2 && Arc::ptr_eq(t1, t2),
1041 (HeapValue::HashMap(d1), HeapValue::HashMap(d2)) => {
1042 d1.keys.len() == d2.keys.len()
1043 && d1
1044 .keys
1045 .iter()
1046 .zip(d2.keys.iter())
1047 .all(|(a, b)| a.vw_equals(b))
1048 && d1
1049 .values
1050 .iter()
1051 .zip(d2.values.iter())
1052 .all(|(a, b)| a.vw_equals(b))
1053 }
1054 (HeapValue::Set(s1), HeapValue::Set(s2)) => {
1055 s1.items.len() == s2.items.len() && s1.items.iter().all(|item| s2.contains(item))
1056 }
1057 (HeapValue::Content(a), HeapValue::Content(b)) => a == b,
1058 (HeapValue::Instant(a), HeapValue::Instant(b)) => a == b,
1059 (HeapValue::IoHandle(a), HeapValue::IoHandle(b)) => {
1060 Arc::ptr_eq(&a.resource, &b.resource)
1061 }
1062 (HeapValue::Mutex(a), HeapValue::Mutex(b)) => Arc::ptr_eq(&a.inner, &b.inner),
1063 (HeapValue::Atomic(a), HeapValue::Atomic(b)) => Arc::ptr_eq(&a.inner, &b.inner),
1064 (HeapValue::Lazy(a), HeapValue::Lazy(b)) => Arc::ptr_eq(&a.value, &b.value),
1065 (HeapValue::Range { .. }, HeapValue::Range { .. }) => false,
1066 (HeapValue::Enum(a), HeapValue::Enum(b)) => {
1067 a.enum_name == b.enum_name && a.variant == b.variant
1068 }
1069 (HeapValue::Some(a), HeapValue::Some(b)) => a.vw_equals(b),
1070 (HeapValue::Ok(a), HeapValue::Ok(b)) => a.vw_equals(b),
1071 (HeapValue::Err(a), HeapValue::Err(b)) => a.vw_equals(b),
1072 (HeapValue::Future(a), HeapValue::Future(b)) => a == b,
1073 (HeapValue::Time(a), HeapValue::Time(b)) => a == b,
1074 (HeapValue::Duration(a), HeapValue::Duration(b)) => a == b,
1075 (HeapValue::TimeSpan(a), HeapValue::TimeSpan(b)) => a == b,
1076 (HeapValue::Timeframe(a), HeapValue::Timeframe(b)) => a == b,
1077 (HeapValue::FunctionRef { name: n1, .. }, HeapValue::FunctionRef { name: n2, .. }) => {
1078 n1 == n2
1079 }
1080 (HeapValue::DataReference(a), HeapValue::DataReference(b)) => {
1081 a.datetime == b.datetime && a.id == b.id && a.timeframe == b.timeframe
1082 }
1083 (HeapValue::NativeScalar(a), HeapValue::NativeScalar(b)) => a == b,
1084 (HeapValue::NativeView(a), HeapValue::NativeView(b)) => {
1085 a.ptr == b.ptr && a.mutable == b.mutable && a.layout.name == b.layout.name
1086 }
1087 (HeapValue::IntArray(a), HeapValue::IntArray(b)) => a == b,
1088 (HeapValue::FloatArray(a), HeapValue::FloatArray(b)) => a == b,
1089 (HeapValue::IntArray(a), HeapValue::FloatArray(b)) => {
1090 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| (*x as f64) == *y)
1091 }
1092 (HeapValue::FloatArray(a), HeapValue::IntArray(b)) => {
1093 a.len() == b.len() && a.iter().zip(b.iter()).all(|(x, y)| *x == (*y as f64))
1094 }
1095 (HeapValue::BoolArray(a), HeapValue::BoolArray(b)) => a == b,
1096 (HeapValue::I8Array(a), HeapValue::I8Array(b)) => a == b,
1097 (HeapValue::I16Array(a), HeapValue::I16Array(b)) => a == b,
1098 (HeapValue::I32Array(a), HeapValue::I32Array(b)) => a == b,
1099 (HeapValue::U8Array(a), HeapValue::U8Array(b)) => a == b,
1100 (HeapValue::U16Array(a), HeapValue::U16Array(b)) => a == b,
1101 (HeapValue::U32Array(a), HeapValue::U32Array(b)) => a == b,
1102 (HeapValue::U64Array(a), HeapValue::U64Array(b)) => a == b,
1103 (HeapValue::F32Array(a), HeapValue::F32Array(b)) => a == b,
1104 (HeapValue::Matrix(a), HeapValue::Matrix(b)) => {
1105 a.rows == b.rows
1106 && a.cols == b.cols
1107 && a.data.len() == b.data.len()
1108 && a.data.iter().zip(b.data.iter()).all(|(x, y)| x == y)
1109 }
1110 (HeapValue::NativeScalar(a), HeapValue::BigInt(b)) => {
1112 a.as_i64().is_some_and(|v| v == *b)
1113 }
1114 (HeapValue::BigInt(a), HeapValue::NativeScalar(b)) => {
1115 b.as_i64().is_some_and(|v| *a == v)
1116 }
1117 (HeapValue::NativeScalar(a), HeapValue::Decimal(b)) => match a {
1118 NativeScalar::F32(v) => {
1119 rust_decimal::Decimal::from_f64_retain(*v as f64).is_some_and(|v| v == *b)
1120 }
1121 _ => a
1122 .as_i128()
1123 .map(|n| rust_decimal::Decimal::from_i128_with_scale(n, 0))
1124 .is_some_and(|to_dec| to_dec == *b),
1125 },
1126 (HeapValue::Decimal(a), HeapValue::NativeScalar(b)) => match b {
1127 NativeScalar::F32(v) => {
1128 rust_decimal::Decimal::from_f64_retain(*v as f64).is_some_and(|v| *a == v)
1129 }
1130 _ => b
1131 .as_i128()
1132 .map(|n| rust_decimal::Decimal::from_i128_with_scale(n, 0))
1133 .is_some_and(|to_dec| *a == to_dec),
1134 },
1135 _ => false,
1136 }
1137 }
1138}