1use std::collections::{HashMap, HashSet};
6use std::fs;
7use std::io::{Read, Write};
8use std::path::PathBuf;
9
10use anyhow::{Context, Result};
11use serde::de::DeserializeOwned;
12use serde::{Deserialize, Serialize};
13use shape_value::{EnumPayload, EnumValue, PrintResult, PrintSpan, Upvalue, ValueWord};
14
15use crate::event_queue::WaitCondition;
16use crate::hashing::{HashDigest, hash_bytes};
17use shape_ast::ast::{DataDateTimeRef, DateTimeExpr, EnumDef, TimeReference, TypeAnnotation};
18use shape_ast::data::Timeframe;
19
20use crate::data::DataFrame;
21use crate::semantic::symbol_table::SymbolTable;
22use shape_value::datatable::DataTable;
23
24pub const SNAPSHOT_VERSION: u32 = 5;
32
33pub(crate) const DEFAULT_CHUNK_LEN: usize = 4096;
34pub(crate) const BYTE_CHUNK_LEN: usize = 256 * 1024;
35
36#[derive(Clone)]
38pub struct SnapshotStore {
39 root: PathBuf,
40}
41
42impl SnapshotStore {
43 pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
44 let root = root.into();
45 fs::create_dir_all(root.join("blobs"))
46 .with_context(|| format!("failed to create snapshot blob dir at {}", root.display()))?;
47 fs::create_dir_all(root.join("snapshots"))
48 .with_context(|| format!("failed to create snapshot dir at {}", root.display()))?;
49 Ok(Self { root })
50 }
51
52 fn blob_path(&self, hash: &HashDigest) -> PathBuf {
53 self.root
54 .join("blobs")
55 .join(format!("{}.bin.zst", hash.hex()))
56 }
57
58 fn snapshot_path(&self, hash: &HashDigest) -> PathBuf {
59 self.root
60 .join("snapshots")
61 .join(format!("{}.bin.zst", hash.hex()))
62 }
63
64 pub fn put_blob(&self, data: &[u8]) -> Result<HashDigest> {
65 let hash = hash_bytes(data);
66 let path = self.blob_path(&hash);
67 if path.exists() {
68 return Ok(hash);
69 }
70 let compressed = zstd::stream::encode_all(data, 0)?;
71 let mut file = fs::File::create(&path)?;
72 file.write_all(&compressed)?;
73 Ok(hash)
74 }
75
76 pub fn get_blob(&self, hash: &HashDigest) -> Result<Vec<u8>> {
77 let path = self.blob_path(hash);
78 let mut file = fs::File::open(&path)
79 .with_context(|| format!("snapshot blob not found: {}", path.display()))?;
80 let mut buf = Vec::new();
81 file.read_to_end(&mut buf)?;
82 let decompressed = zstd::stream::decode_all(&buf[..])?;
83 Ok(decompressed)
84 }
85
86 pub fn put_struct<T: Serialize>(&self, value: &T) -> Result<HashDigest> {
87 let bytes = bincode::serialize(value)?;
88 self.put_blob(&bytes)
89 }
90
91 pub fn get_struct<T: for<'de> Deserialize<'de>>(&self, hash: &HashDigest) -> Result<T> {
92 let bytes = self.get_blob(hash)?;
93 Ok(bincode::deserialize(&bytes)?)
94 }
95
96 pub fn put_snapshot(&self, snapshot: &ExecutionSnapshot) -> Result<HashDigest> {
97 let bytes = bincode::serialize(snapshot)?;
98 let hash = hash_bytes(&bytes);
99 let path = self.snapshot_path(&hash);
100 if !path.exists() {
101 let compressed = zstd::stream::encode_all(&bytes[..], 0)?;
102 let mut file = fs::File::create(&path)?;
103 file.write_all(&compressed)?;
104 }
105 Ok(hash)
106 }
107
108 pub fn get_snapshot(&self, hash: &HashDigest) -> Result<ExecutionSnapshot> {
109 let path = self.snapshot_path(hash);
110 let mut file = fs::File::open(&path)
111 .with_context(|| format!("snapshot not found: {}", path.display()))?;
112 let mut buf = Vec::new();
113 file.read_to_end(&mut buf)?;
114 let decompressed = zstd::stream::decode_all(&buf[..])?;
115 Ok(bincode::deserialize(&decompressed)?)
116 }
117
118 pub fn list_snapshots(&self) -> Result<Vec<(HashDigest, ExecutionSnapshot)>> {
120 let snapshots_dir = self.root.join("snapshots");
121 if !snapshots_dir.exists() {
122 return Ok(Vec::new());
123 }
124 let mut results = Vec::new();
125 for entry in fs::read_dir(&snapshots_dir)? {
126 let entry = entry?;
127 let path = entry.path();
128 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
129 if let Some(hex) = name.strip_suffix(".bin.zst") {
131 let hash = HashDigest::from_hex(hex);
132 match self.get_snapshot(&hash) {
133 Ok(snap) => results.push((hash, snap)),
134 Err(_) => continue, }
136 }
137 }
138 }
139 results.sort_by(|a, b| b.1.created_at_ms.cmp(&a.1.created_at_ms));
141 Ok(results)
142 }
143
144 pub fn delete_snapshot(&self, hash: &HashDigest) -> Result<()> {
146 let path = self.snapshot_path(hash);
147 fs::remove_file(&path)
148 .with_context(|| format!("failed to delete snapshot: {}", path.display()))?;
149 Ok(())
150 }
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
154pub struct ExecutionSnapshot {
155 pub version: u32,
156 pub created_at_ms: i64,
157 pub semantic_hash: HashDigest,
158 pub context_hash: HashDigest,
159 pub vm_hash: Option<HashDigest>,
160 pub bytecode_hash: Option<HashDigest>,
161 #[serde(default)]
163 pub script_path: Option<String>,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct SemanticSnapshot {
168 pub symbol_table: SymbolTable,
169 pub exported_symbols: HashSet<String>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct ContextSnapshot {
174 pub data_load_mode: crate::context::DataLoadMode,
175 pub data_cache: Option<DataCacheSnapshot>,
176 pub current_id: Option<String>,
177 pub current_row_index: usize,
178 pub variable_scopes: Vec<HashMap<String, VariableSnapshot>>,
179 pub reference_datetime: Option<chrono::DateTime<chrono::Utc>>,
180 pub current_timeframe: Option<Timeframe>,
181 pub base_timeframe: Option<Timeframe>,
182 pub date_range: Option<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>,
183 pub range_start: usize,
184 pub range_end: usize,
185 pub range_active: bool,
186 pub type_alias_registry: HashMap<String, TypeAliasRuntimeEntrySnapshot>,
187 pub enum_registry: HashMap<String, EnumDef>,
188 pub suspension_state: Option<SuspensionStateSnapshot>,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct VariableSnapshot {
193 pub value: SerializableVMValue,
194 pub kind: shape_ast::ast::VarKind,
195 pub is_initialized: bool,
196 pub is_function_scoped: bool,
197 pub format_hint: Option<String>,
198 pub format_overrides: Option<HashMap<String, SerializableVMValue>>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct TypeAliasRuntimeEntrySnapshot {
203 pub base_type: String,
204 pub overrides: Option<HashMap<String, SerializableVMValue>>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct SuspensionStateSnapshot {
209 pub waiting_for: WaitCondition,
210 pub resume_pc: usize,
211 pub saved_locals: Vec<SerializableVMValue>,
212 pub saved_stack: Vec<SerializableVMValue>,
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct VmSnapshot {
217 pub ip: usize,
218 pub stack: Vec<SerializableVMValue>,
219 pub locals: Vec<SerializableVMValue>,
220 pub module_bindings: Vec<SerializableVMValue>,
221 pub call_stack: Vec<SerializableCallFrame>,
222 pub loop_stack: Vec<SerializableLoopContext>,
223 pub timeframe_stack: Vec<Option<Timeframe>>,
224 pub exception_handlers: Vec<SerializableExceptionHandler>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct SerializableCallFrame {
229 pub return_ip: usize,
230 pub locals_base: usize,
231 pub locals_count: usize,
232 pub function_id: Option<u16>,
233 pub upvalues: Option<Vec<SerializableVMValue>>,
234 #[serde(default)]
238 pub blob_hash: Option<[u8; 32]>,
239 #[serde(default)]
243 pub local_ip: Option<usize>,
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct SerializableLoopContext {
248 pub start: usize,
249 pub end: usize,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
253pub struct SerializableExceptionHandler {
254 pub catch_ip: usize,
255 pub stack_size: usize,
256 pub call_depth: usize,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub enum SerializableVMValue {
261 Int(i64),
262 Number(f64),
263 Decimal(rust_decimal::Decimal),
264 String(String),
265 Bool(bool),
266 None,
267 Some(Box<SerializableVMValue>),
268 Unit,
269 Timeframe(Timeframe),
270 Duration(shape_ast::ast::Duration),
271 Time(chrono::DateTime<chrono::FixedOffset>),
272 TimeSpan(i64), TimeReference(TimeReference),
274 DateTimeExpr(DateTimeExpr),
275 DataDateTimeRef(DataDateTimeRef),
276 Array(Vec<SerializableVMValue>),
277 Function(u16),
278 TypeAnnotation(TypeAnnotation),
279 TypeAnnotatedValue {
280 type_name: String,
281 value: Box<SerializableVMValue>,
282 },
283 Enum(EnumValueSnapshot),
284 Closure {
285 function_id: u16,
286 upvalues: Vec<SerializableVMValue>,
287 },
288 ModuleFunction(String),
289 TypedObject {
290 schema_id: u64,
291 slot_data: Vec<SerializableVMValue>,
293 heap_mask: u64,
294 },
295 Range {
296 start: Option<Box<SerializableVMValue>>,
297 end: Option<Box<SerializableVMValue>>,
298 inclusive: bool,
299 },
300 Ok(Box<SerializableVMValue>),
301 Err(Box<SerializableVMValue>),
302 PrintResult(PrintableSnapshot),
303 SimulationCall {
304 name: String,
305 params: HashMap<String, SerializableVMValue>,
306 },
307 FunctionRef {
308 name: String,
309 closure: Option<Box<SerializableVMValue>>,
310 },
311 DataReference {
312 datetime: chrono::DateTime<chrono::FixedOffset>,
313 id: String,
314 timeframe: Timeframe,
315 },
316 Future(u64),
317 DataTable(BlobRef),
318 TypedTable {
319 schema_id: u64,
320 table: BlobRef,
321 },
322 RowView {
323 schema_id: u64,
324 table: BlobRef,
325 row_idx: usize,
326 },
327 ColumnRef {
328 schema_id: u64,
329 table: BlobRef,
330 col_id: u32,
331 },
332 IndexedTable {
333 schema_id: u64,
334 table: BlobRef,
335 index_col: u32,
336 },
337 TypedArray {
339 element_kind: TypedArrayElementKind,
340 blob: BlobRef,
341 len: usize,
342 },
343 Matrix {
345 blob: BlobRef,
346 rows: u32,
347 cols: u32,
348 },
349 HashMap {
351 keys: Vec<SerializableVMValue>,
352 values: Vec<SerializableVMValue>,
353 },
354 SidecarRef {
358 sidecar_id: u32,
359 blob_kind: BlobKind,
360 original_hash: HashDigest,
361 meta_a: u32,
363 meta_b: u32,
365 },
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize)]
369pub struct EnumValueSnapshot {
370 pub enum_name: String,
371 pub variant: String,
372 pub payload: EnumPayloadSnapshot,
373}
374
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub enum EnumPayloadSnapshot {
377 Unit,
378 Tuple(Vec<SerializableVMValue>),
379 Struct(Vec<(String, SerializableVMValue)>),
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize)]
383pub struct PrintableSnapshot {
384 pub rendered: String,
385 pub spans: Vec<PrintSpanSnapshot>,
386}
387
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub enum PrintSpanSnapshot {
390 Literal {
391 text: String,
392 start: usize,
393 end: usize,
394 span_id: String,
395 },
396 Value {
397 text: String,
398 start: usize,
399 end: usize,
400 span_id: String,
401 variable_name: Option<String>,
402 raw_value: Box<SerializableVMValue>,
403 type_name: String,
404 current_format: String,
405 format_params: HashMap<String, SerializableVMValue>,
406 },
407}
408
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct BlobRef {
411 pub hash: HashDigest,
412 pub kind: BlobKind,
413}
414
415#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
417pub enum TypedArrayElementKind {
418 I8,
419 I16,
420 I32,
421 I64,
422 U8,
423 U16,
424 U32,
425 U64,
426 F32,
427 F64,
428 Bool,
429}
430
431#[derive(Debug, Clone, Serialize, Deserialize)]
432pub enum BlobKind {
433 DataTable,
434 TypedArray(TypedArrayElementKind),
436 Matrix,
438}
439
440#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct ChunkedBlob {
442 pub chunk_hashes: Vec<HashDigest>,
443 pub total_len: usize,
444 pub chunk_len: usize,
445}
446
447#[derive(Debug, Clone, Serialize, Deserialize)]
448pub struct SerializableDataTable {
449 pub ipc_chunks: ChunkedBlob,
450 pub type_name: Option<String>,
451 pub schema_id: Option<u32>,
452}
453
454#[derive(Debug, Clone, Serialize, Deserialize)]
455pub struct SerializableDataFrame {
456 pub id: String,
457 pub timeframe: Timeframe,
458 pub timestamps: ChunkedBlob,
459 pub columns: Vec<SerializableDataFrameColumn>,
460}
461
462#[derive(Debug, Clone, Serialize, Deserialize)]
463pub struct SerializableDataFrameColumn {
464 pub name: String,
465 pub values: ChunkedBlob,
466}
467
468#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct CacheKeySnapshot {
470 pub id: String,
471 pub timeframe: Timeframe,
472}
473
474#[derive(Debug, Clone, Serialize, Deserialize)]
475pub struct CachedDataSnapshot {
476 pub key: CacheKeySnapshot,
477 pub historical: SerializableDataFrame,
478 pub current_index: usize,
479}
480
481#[derive(Debug, Clone, Serialize, Deserialize)]
482pub struct LiveBufferSnapshot {
483 pub key: CacheKeySnapshot,
484 pub rows: ChunkedBlob,
485}
486
487#[derive(Debug, Clone, Serialize, Deserialize)]
488pub struct DataCacheSnapshot {
489 pub historical: Vec<CachedDataSnapshot>,
490 pub live_buffer: Vec<LiveBufferSnapshot>,
491}
492
493pub(crate) fn store_chunked_vec<T: Serialize>(
494 values: &[T],
495 chunk_len: usize,
496 store: &SnapshotStore,
497) -> Result<ChunkedBlob> {
498 let chunk_len = chunk_len.max(1);
499 if values.is_empty() {
500 return Ok(ChunkedBlob {
501 chunk_hashes: Vec::new(),
502 total_len: 0,
503 chunk_len,
504 });
505 }
506 let mut hashes = Vec::new();
507 for chunk in values.chunks(chunk_len) {
508 let bytes = bincode::serialize(chunk)?;
509 let hash = store.put_blob(&bytes)?;
510 hashes.push(hash);
511 }
512 Ok(ChunkedBlob {
513 chunk_hashes: hashes,
514 total_len: values.len(),
515 chunk_len,
516 })
517}
518
519pub(crate) fn load_chunked_vec<T: DeserializeOwned>(
520 chunked: &ChunkedBlob,
521 store: &SnapshotStore,
522) -> Result<Vec<T>> {
523 if chunked.total_len == 0 {
524 return Ok(Vec::new());
525 }
526 let mut out = Vec::with_capacity(chunked.total_len);
527 for hash in &chunked.chunk_hashes {
528 let bytes = store.get_blob(hash)?;
529 let chunk: Vec<T> = bincode::deserialize(&bytes)?;
530 out.extend(chunk);
531 }
532 out.truncate(chunked.total_len);
533 Ok(out)
534}
535
536pub fn store_chunked_bytes(data: &[u8], store: &SnapshotStore) -> Result<ChunkedBlob> {
538 if data.is_empty() {
539 return Ok(ChunkedBlob {
540 chunk_hashes: Vec::new(),
541 total_len: 0,
542 chunk_len: BYTE_CHUNK_LEN,
543 });
544 }
545 let mut hashes = Vec::new();
546 for chunk in data.chunks(BYTE_CHUNK_LEN) {
547 let hash = store.put_blob(chunk)?;
548 hashes.push(hash);
549 }
550 Ok(ChunkedBlob {
551 chunk_hashes: hashes,
552 total_len: data.len(),
553 chunk_len: BYTE_CHUNK_LEN,
554 })
555}
556
557pub fn load_chunked_bytes(chunked: &ChunkedBlob, store: &SnapshotStore) -> Result<Vec<u8>> {
559 if chunked.total_len == 0 {
560 return Ok(Vec::new());
561 }
562 let mut out = Vec::with_capacity(chunked.total_len);
563 for hash in &chunked.chunk_hashes {
564 let bytes = store.get_blob(hash)?;
565 out.extend_from_slice(&bytes);
566 }
567 out.truncate(chunked.total_len);
568 Ok(out)
569}
570
571fn bytes_as_slice<T: Copy>(bytes: &[u8]) -> &[T] {
576 let elem_size = std::mem::size_of::<T>();
577 assert!(
578 bytes.len() % elem_size == 0,
579 "byte slice length {} not a multiple of element size {}",
580 bytes.len(),
581 elem_size
582 );
583 let len = bytes.len() / elem_size;
584 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const T, len) }
585}
586
587fn slice_as_bytes<T>(data: &[T]) -> &[u8] {
589 let byte_len = data.len() * std::mem::size_of::<T>();
590 unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, byte_len) }
591}
592
593pub fn nanboxed_to_serializable(
601 nb: &ValueWord,
602 store: &SnapshotStore,
603) -> Result<SerializableVMValue> {
604 use shape_value::value_word::NanTag;
605
606 match nb.tag() {
607 NanTag::F64 => Ok(SerializableVMValue::Number(nb.as_f64().unwrap())),
608 NanTag::I48 => Ok(SerializableVMValue::Int(nb.as_i64().unwrap())),
609 NanTag::Bool => Ok(SerializableVMValue::Bool(nb.as_bool().unwrap())),
610 NanTag::None => Ok(SerializableVMValue::None),
611 NanTag::Unit => Ok(SerializableVMValue::Unit),
612 NanTag::Function => Ok(SerializableVMValue::Function(nb.as_function().unwrap())),
613 NanTag::ModuleFunction => Ok(SerializableVMValue::ModuleFunction(format!(
614 "native#{}",
615 nb.as_module_function().unwrap()
616 ))),
617 NanTag::Heap => {
618 let hv = nb.as_heap_ref().unwrap();
619 heap_value_to_serializable(hv, store)
620 }
621 NanTag::Ref => Ok(SerializableVMValue::None), }
623}
624
625fn heap_value_to_serializable(
627 hv: &shape_value::heap_value::HeapValue,
628 store: &SnapshotStore,
629) -> Result<SerializableVMValue> {
630 use shape_value::heap_value::HeapValue;
631
632 Ok(match hv {
633 HeapValue::String(s) => SerializableVMValue::String((**s).clone()),
634 HeapValue::Decimal(d) => SerializableVMValue::Decimal(*d),
635 HeapValue::BigInt(i) => SerializableVMValue::Int(*i),
636 HeapValue::Array(arr) => {
637 let mut out = Vec::with_capacity(arr.len());
638 for v in arr.iter() {
639 out.push(nanboxed_to_serializable(v, store)?);
640 }
641 SerializableVMValue::Array(out)
642 }
643 HeapValue::Closure {
644 function_id,
645 upvalues,
646 } => {
647 let mut ups = Vec::new();
648 for up in upvalues.iter() {
649 let nb = up.get();
650 ups.push(nanboxed_to_serializable(&nb, store)?);
651 }
652 SerializableVMValue::Closure {
653 function_id: *function_id,
654 upvalues: ups,
655 }
656 }
657 HeapValue::TypedObject {
658 schema_id,
659 slots,
660 heap_mask,
661 } => {
662 let mut slot_data = Vec::with_capacity(slots.len());
663 for i in 0..slots.len() {
664 if *heap_mask & (1u64 << i) != 0 {
665 let hv_inner = slots[i].as_heap_value();
666 slot_data.push(heap_value_to_serializable(hv_inner, store)?);
667 } else {
668 let nb = unsafe { ValueWord::clone_from_bits(slots[i].raw()) };
671 slot_data.push(nanboxed_to_serializable(&nb, store)?);
672 }
673 }
674 SerializableVMValue::TypedObject {
675 schema_id: *schema_id,
676 slot_data,
677 heap_mask: *heap_mask,
678 }
679 }
680 HeapValue::HostClosure(_)
681 | HeapValue::ExprProxy(_)
682 | HeapValue::FilterExpr(_)
683 | HeapValue::TaskGroup { .. }
684 | HeapValue::TraitObject { .. }
685 | HeapValue::NativeView(_) => {
686 return Err(anyhow::anyhow!(
687 "Cannot snapshot transient value: {}",
688 hv.type_name()
689 ));
690 }
691 HeapValue::Enum(ev) => SerializableVMValue::Enum(enum_to_snapshot(ev, store)?),
692 HeapValue::Some(inner) => {
693 SerializableVMValue::Some(Box::new(nanboxed_to_serializable(inner, store)?))
694 }
695 HeapValue::Ok(inner) => {
696 SerializableVMValue::Ok(Box::new(nanboxed_to_serializable(inner, store)?))
697 }
698 HeapValue::Err(inner) => {
699 SerializableVMValue::Err(Box::new(nanboxed_to_serializable(inner, store)?))
700 }
701 HeapValue::Range {
702 start,
703 end,
704 inclusive,
705 } => SerializableVMValue::Range {
706 start: match start {
707 Some(v) => Some(Box::new(nanboxed_to_serializable(v, store)?)),
708 None => None,
709 },
710 end: match end {
711 Some(v) => Some(Box::new(nanboxed_to_serializable(v, store)?)),
712 None => None,
713 },
714 inclusive: *inclusive,
715 },
716 HeapValue::Timeframe(tf) => SerializableVMValue::Timeframe(*tf),
717 HeapValue::Duration(d) => SerializableVMValue::Duration(d.clone()),
718 HeapValue::Time(t) => SerializableVMValue::Time(*t),
719 HeapValue::TimeSpan(span) => SerializableVMValue::TimeSpan(span.num_milliseconds()),
720 HeapValue::TimeReference(tr) => SerializableVMValue::TimeReference((**tr).clone()),
721 HeapValue::DateTimeExpr(expr) => SerializableVMValue::DateTimeExpr((**expr).clone()),
722 HeapValue::DataDateTimeRef(dref) => SerializableVMValue::DataDateTimeRef((**dref).clone()),
723 HeapValue::TypeAnnotation(ta) => SerializableVMValue::TypeAnnotation((**ta).clone()),
724 HeapValue::TypeAnnotatedValue { type_name, value } => {
725 SerializableVMValue::TypeAnnotatedValue {
726 type_name: type_name.clone(),
727 value: Box::new(nanboxed_to_serializable(value, store)?),
728 }
729 }
730 HeapValue::PrintResult(pr) => {
731 SerializableVMValue::PrintResult(print_result_to_snapshot(pr, store)?)
732 }
733 HeapValue::SimulationCall(data) => {
734 let mut out = HashMap::new();
735 for (k, v) in data.params.iter() {
736 out.insert(k.clone(), nanboxed_to_serializable(&v.clone(), store)?);
738 }
739 SerializableVMValue::SimulationCall {
740 name: data.name.clone(),
741 params: out,
742 }
743 }
744 HeapValue::FunctionRef { name, closure } => SerializableVMValue::FunctionRef {
745 name: name.clone(),
746 closure: match closure {
747 Some(c) => Some(Box::new(nanboxed_to_serializable(c, store)?)),
748 None => None,
749 },
750 },
751 HeapValue::DataReference(data) => SerializableVMValue::DataReference {
752 datetime: data.datetime,
753 id: data.id.clone(),
754 timeframe: data.timeframe,
755 },
756 HeapValue::Future(id) => SerializableVMValue::Future(*id),
757 HeapValue::DataTable(dt) => {
758 let ser = serialize_datatable(dt, store)?;
759 let hash = store.put_struct(&ser)?;
760 SerializableVMValue::DataTable(BlobRef {
761 hash,
762 kind: BlobKind::DataTable,
763 })
764 }
765 HeapValue::TypedTable { schema_id, table } => {
766 let ser = serialize_datatable(table, store)?;
767 let hash = store.put_struct(&ser)?;
768 SerializableVMValue::TypedTable {
769 schema_id: *schema_id,
770 table: BlobRef {
771 hash,
772 kind: BlobKind::DataTable,
773 },
774 }
775 }
776 HeapValue::RowView {
777 schema_id,
778 table,
779 row_idx,
780 } => {
781 let ser = serialize_datatable(table, store)?;
782 let hash = store.put_struct(&ser)?;
783 SerializableVMValue::RowView {
784 schema_id: *schema_id,
785 table: BlobRef {
786 hash,
787 kind: BlobKind::DataTable,
788 },
789 row_idx: *row_idx,
790 }
791 }
792 HeapValue::ColumnRef {
793 schema_id,
794 table,
795 col_id,
796 } => {
797 let ser = serialize_datatable(table, store)?;
798 let hash = store.put_struct(&ser)?;
799 SerializableVMValue::ColumnRef {
800 schema_id: *schema_id,
801 table: BlobRef {
802 hash,
803 kind: BlobKind::DataTable,
804 },
805 col_id: *col_id,
806 }
807 }
808 HeapValue::IndexedTable {
809 schema_id,
810 table,
811 index_col,
812 } => {
813 let ser = serialize_datatable(table, store)?;
814 let hash = store.put_struct(&ser)?;
815 SerializableVMValue::IndexedTable {
816 schema_id: *schema_id,
817 table: BlobRef {
818 hash,
819 kind: BlobKind::DataTable,
820 },
821 index_col: *index_col,
822 }
823 }
824 HeapValue::NativeScalar(v) => {
825 if let Some(i) = v.as_i64() {
826 SerializableVMValue::Int(i)
827 } else {
828 SerializableVMValue::Number(v.as_f64())
829 }
830 }
831 HeapValue::HashMap(d) => {
832 let mut out_keys = Vec::with_capacity(d.keys.len());
833 let mut out_values = Vec::with_capacity(d.values.len());
834 for k in d.keys.iter() {
835 out_keys.push(nanboxed_to_serializable(k, store)?);
836 }
837 for v in d.values.iter() {
838 out_values.push(nanboxed_to_serializable(v, store)?);
839 }
840 SerializableVMValue::HashMap {
841 keys: out_keys,
842 values: out_values,
843 }
844 }
845 HeapValue::Set(d) => {
846 let mut out = Vec::with_capacity(d.items.len());
847 for item in d.items.iter() {
848 out.push(nanboxed_to_serializable(item, store)?);
849 }
850 SerializableVMValue::Array(out)
851 }
852 HeapValue::Deque(d) => {
853 let mut out = Vec::with_capacity(d.items.len());
854 for item in d.items.iter() {
855 out.push(nanboxed_to_serializable(item, store)?);
856 }
857 SerializableVMValue::Array(out)
858 }
859 HeapValue::PriorityQueue(d) => {
860 let mut out = Vec::with_capacity(d.items.len());
861 for item in d.items.iter() {
862 out.push(nanboxed_to_serializable(item, store)?);
863 }
864 SerializableVMValue::Array(out)
865 }
866 HeapValue::Content(node) => SerializableVMValue::String(format!("{}", node)),
867 HeapValue::Instant(t) => {
868 SerializableVMValue::String(format!("<instant:{:?}>", t.elapsed()))
869 }
870 HeapValue::IoHandle(data) => {
871 let status = if data.is_open() { "open" } else { "closed" };
872 SerializableVMValue::String(format!("<io_handle:{}:{}>", data.path, status))
873 }
874 HeapValue::SharedCell(arc) => nanboxed_to_serializable(&arc.read().unwrap(), store)?,
875 HeapValue::IntArray(a) => {
876 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
877 let hash = store.put_struct(&blob)?;
878 SerializableVMValue::TypedArray {
879 element_kind: TypedArrayElementKind::I64,
880 blob: BlobRef {
881 hash,
882 kind: BlobKind::TypedArray(TypedArrayElementKind::I64),
883 },
884 len: a.len(),
885 }
886 }
887 HeapValue::FloatArray(a) => {
888 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
889 let hash = store.put_struct(&blob)?;
890 SerializableVMValue::TypedArray {
891 element_kind: TypedArrayElementKind::F64,
892 blob: BlobRef {
893 hash,
894 kind: BlobKind::TypedArray(TypedArrayElementKind::F64),
895 },
896 len: a.len(),
897 }
898 }
899 HeapValue::BoolArray(a) => {
900 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
901 let hash = store.put_struct(&blob)?;
902 SerializableVMValue::TypedArray {
903 element_kind: TypedArrayElementKind::Bool,
904 blob: BlobRef {
905 hash,
906 kind: BlobKind::TypedArray(TypedArrayElementKind::Bool),
907 },
908 len: a.len(),
909 }
910 }
911 HeapValue::I8Array(a) => {
912 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
913 let hash = store.put_struct(&blob)?;
914 SerializableVMValue::TypedArray {
915 element_kind: TypedArrayElementKind::I8,
916 blob: BlobRef {
917 hash,
918 kind: BlobKind::TypedArray(TypedArrayElementKind::I8),
919 },
920 len: a.len(),
921 }
922 }
923 HeapValue::I16Array(a) => {
924 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
925 let hash = store.put_struct(&blob)?;
926 SerializableVMValue::TypedArray {
927 element_kind: TypedArrayElementKind::I16,
928 blob: BlobRef {
929 hash,
930 kind: BlobKind::TypedArray(TypedArrayElementKind::I16),
931 },
932 len: a.len(),
933 }
934 }
935 HeapValue::I32Array(a) => {
936 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
937 let hash = store.put_struct(&blob)?;
938 SerializableVMValue::TypedArray {
939 element_kind: TypedArrayElementKind::I32,
940 blob: BlobRef {
941 hash,
942 kind: BlobKind::TypedArray(TypedArrayElementKind::I32),
943 },
944 len: a.len(),
945 }
946 }
947 HeapValue::U8Array(a) => {
948 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
949 let hash = store.put_struct(&blob)?;
950 SerializableVMValue::TypedArray {
951 element_kind: TypedArrayElementKind::U8,
952 blob: BlobRef {
953 hash,
954 kind: BlobKind::TypedArray(TypedArrayElementKind::U8),
955 },
956 len: a.len(),
957 }
958 }
959 HeapValue::U16Array(a) => {
960 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
961 let hash = store.put_struct(&blob)?;
962 SerializableVMValue::TypedArray {
963 element_kind: TypedArrayElementKind::U16,
964 blob: BlobRef {
965 hash,
966 kind: BlobKind::TypedArray(TypedArrayElementKind::U16),
967 },
968 len: a.len(),
969 }
970 }
971 HeapValue::U32Array(a) => {
972 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
973 let hash = store.put_struct(&blob)?;
974 SerializableVMValue::TypedArray {
975 element_kind: TypedArrayElementKind::U32,
976 blob: BlobRef {
977 hash,
978 kind: BlobKind::TypedArray(TypedArrayElementKind::U32),
979 },
980 len: a.len(),
981 }
982 }
983 HeapValue::U64Array(a) => {
984 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
985 let hash = store.put_struct(&blob)?;
986 SerializableVMValue::TypedArray {
987 element_kind: TypedArrayElementKind::U64,
988 blob: BlobRef {
989 hash,
990 kind: BlobKind::TypedArray(TypedArrayElementKind::U64),
991 },
992 len: a.len(),
993 }
994 }
995 HeapValue::F32Array(a) => {
996 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
997 let hash = store.put_struct(&blob)?;
998 SerializableVMValue::TypedArray {
999 element_kind: TypedArrayElementKind::F32,
1000 blob: BlobRef {
1001 hash,
1002 kind: BlobKind::TypedArray(TypedArrayElementKind::F32),
1003 },
1004 len: a.len(),
1005 }
1006 }
1007 HeapValue::Matrix(m) => {
1008 let raw_bytes = slice_as_bytes(m.data.as_slice());
1009 let blob = store_chunked_bytes(raw_bytes, store)?;
1010 let hash = store.put_struct(&blob)?;
1011 SerializableVMValue::Matrix {
1012 blob: BlobRef {
1013 hash,
1014 kind: BlobKind::Matrix,
1015 },
1016 rows: m.rows,
1017 cols: m.cols,
1018 }
1019 }
1020 HeapValue::Iterator(_)
1021 | HeapValue::Generator(_)
1022 | HeapValue::Mutex(_)
1023 | HeapValue::Atomic(_)
1024 | HeapValue::Lazy(_)
1025 | HeapValue::Channel(_) => {
1026 return Err(anyhow::anyhow!(
1027 "Cannot snapshot transient value: {}",
1028 hv.type_name()
1029 ));
1030 }
1031 })
1032}
1033
1034pub fn serializable_to_nanboxed(
1040 value: &SerializableVMValue,
1041 store: &SnapshotStore,
1042) -> Result<ValueWord> {
1043 Ok(match value {
1044 SerializableVMValue::Int(i) => ValueWord::from_i64(*i),
1045 SerializableVMValue::Number(n) => ValueWord::from_f64(*n),
1046 SerializableVMValue::Decimal(d) => ValueWord::from_decimal(*d),
1047 SerializableVMValue::String(s) => ValueWord::from_string(std::sync::Arc::new(s.clone())),
1048 SerializableVMValue::Bool(b) => ValueWord::from_bool(*b),
1049 SerializableVMValue::None => ValueWord::none(),
1050 SerializableVMValue::Unit => ValueWord::unit(),
1051 SerializableVMValue::Function(f) => ValueWord::from_function(*f),
1052 SerializableVMValue::ModuleFunction(_name) => ValueWord::from_module_function(0),
1053 SerializableVMValue::Some(v) => ValueWord::from_some(serializable_to_nanboxed(v, store)?),
1054 SerializableVMValue::Ok(v) => ValueWord::from_ok(serializable_to_nanboxed(v, store)?),
1055 SerializableVMValue::Err(v) => ValueWord::from_err(serializable_to_nanboxed(v, store)?),
1056 SerializableVMValue::Timeframe(tf) => ValueWord::from_timeframe(*tf),
1057 SerializableVMValue::Duration(d) => ValueWord::from_duration(d.clone()),
1058 SerializableVMValue::Time(t) => ValueWord::from_time(*t),
1059 SerializableVMValue::TimeSpan(ms) => {
1060 ValueWord::from_timespan(chrono::Duration::milliseconds(*ms))
1061 }
1062 SerializableVMValue::TimeReference(tr) => ValueWord::from_time_reference(tr.clone()),
1063 SerializableVMValue::DateTimeExpr(expr) => ValueWord::from_datetime_expr(expr.clone()),
1064 SerializableVMValue::DataDateTimeRef(dref) => {
1065 ValueWord::from_data_datetime_ref(dref.clone())
1066 }
1067 SerializableVMValue::Array(arr) => {
1068 let mut out = Vec::with_capacity(arr.len());
1069 for v in arr.iter() {
1070 out.push(serializable_to_nanboxed(v, store)?);
1071 }
1072 ValueWord::from_array(std::sync::Arc::new(out))
1073 }
1074 SerializableVMValue::TypeAnnotation(ta) => ValueWord::from_type_annotation(ta.clone()),
1075 SerializableVMValue::TypeAnnotatedValue { type_name, value } => {
1076 ValueWord::from_type_annotated_value(
1077 type_name.clone(),
1078 serializable_to_nanboxed(value, store)?,
1079 )
1080 }
1081 SerializableVMValue::Range {
1082 start,
1083 end,
1084 inclusive,
1085 } => ValueWord::from_range(
1086 match start {
1087 Some(v) => Some(serializable_to_nanboxed(v, store)?),
1088 None => None,
1089 },
1090 match end {
1091 Some(v) => Some(serializable_to_nanboxed(v, store)?),
1092 None => None,
1093 },
1094 *inclusive,
1095 ),
1096 SerializableVMValue::Future(id) => ValueWord::from_future(*id),
1097 SerializableVMValue::FunctionRef { name, closure } => ValueWord::from_function_ref(
1098 name.clone(),
1099 match closure {
1100 Some(c) => Some(serializable_to_nanboxed(c, store)?),
1101 None => None,
1102 },
1103 ),
1104 SerializableVMValue::DataReference {
1105 datetime,
1106 id,
1107 timeframe,
1108 } => ValueWord::from_data_reference(*datetime, id.clone(), *timeframe),
1109 SerializableVMValue::DataTable(blob) => {
1110 let ser: SerializableDataTable = store.get_struct(&blob.hash)?;
1111 ValueWord::from_datatable(std::sync::Arc::new(deserialize_datatable(ser, store)?))
1112 }
1113 SerializableVMValue::TypedTable { schema_id, table } => {
1114 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1115 ValueWord::from_typed_table(
1116 *schema_id,
1117 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1118 )
1119 }
1120 SerializableVMValue::RowView {
1121 schema_id,
1122 table,
1123 row_idx,
1124 } => {
1125 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1126 ValueWord::from_row_view(
1127 *schema_id,
1128 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1129 *row_idx,
1130 )
1131 }
1132 SerializableVMValue::ColumnRef {
1133 schema_id,
1134 table,
1135 col_id,
1136 } => {
1137 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1138 ValueWord::from_column_ref(
1139 *schema_id,
1140 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1141 *col_id,
1142 )
1143 }
1144 SerializableVMValue::IndexedTable {
1145 schema_id,
1146 table,
1147 index_col,
1148 } => {
1149 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1150 ValueWord::from_indexed_table(
1151 *schema_id,
1152 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1153 *index_col,
1154 )
1155 }
1156 SerializableVMValue::TypedArray {
1157 element_kind,
1158 blob,
1159 len,
1160 } => {
1161 let chunked: ChunkedBlob = store.get_struct(&blob.hash)?;
1162 let raw = load_chunked_bytes(&chunked, store)?;
1163 match element_kind {
1164 TypedArrayElementKind::I64 => {
1165 let data: Vec<i64> = bytes_as_slice::<i64>(&raw)[..*len].to_vec();
1166 ValueWord::from_int_array(std::sync::Arc::new(
1167 shape_value::TypedBuffer::from_vec(data),
1168 ))
1169 }
1170 TypedArrayElementKind::F64 => {
1171 let data: Vec<f64> = bytes_as_slice::<f64>(&raw)[..*len].to_vec();
1172 let aligned = shape_value::AlignedVec::from_vec(data);
1173 ValueWord::from_float_array(std::sync::Arc::new(
1174 shape_value::AlignedTypedBuffer::from_aligned(aligned),
1175 ))
1176 }
1177 TypedArrayElementKind::Bool => {
1178 let data: Vec<u8> = raw[..*len].to_vec();
1179 ValueWord::from_bool_array(std::sync::Arc::new(
1180 shape_value::TypedBuffer::from_vec(data),
1181 ))
1182 }
1183 TypedArrayElementKind::I8 => {
1184 let data: Vec<i8> = bytes_as_slice::<i8>(&raw)[..*len].to_vec();
1185 ValueWord::from_i8_array(std::sync::Arc::new(
1186 shape_value::TypedBuffer::from_vec(data),
1187 ))
1188 }
1189 TypedArrayElementKind::I16 => {
1190 let data: Vec<i16> = bytes_as_slice::<i16>(&raw)[..*len].to_vec();
1191 ValueWord::from_i16_array(std::sync::Arc::new(
1192 shape_value::TypedBuffer::from_vec(data),
1193 ))
1194 }
1195 TypedArrayElementKind::I32 => {
1196 let data: Vec<i32> = bytes_as_slice::<i32>(&raw)[..*len].to_vec();
1197 ValueWord::from_i32_array(std::sync::Arc::new(
1198 shape_value::TypedBuffer::from_vec(data),
1199 ))
1200 }
1201 TypedArrayElementKind::U8 => {
1202 let data: Vec<u8> = raw[..*len].to_vec();
1203 ValueWord::from_u8_array(std::sync::Arc::new(
1204 shape_value::TypedBuffer::from_vec(data),
1205 ))
1206 }
1207 TypedArrayElementKind::U16 => {
1208 let data: Vec<u16> = bytes_as_slice::<u16>(&raw)[..*len].to_vec();
1209 ValueWord::from_u16_array(std::sync::Arc::new(
1210 shape_value::TypedBuffer::from_vec(data),
1211 ))
1212 }
1213 TypedArrayElementKind::U32 => {
1214 let data: Vec<u32> = bytes_as_slice::<u32>(&raw)[..*len].to_vec();
1215 ValueWord::from_u32_array(std::sync::Arc::new(
1216 shape_value::TypedBuffer::from_vec(data),
1217 ))
1218 }
1219 TypedArrayElementKind::U64 => {
1220 let data: Vec<u64> = bytes_as_slice::<u64>(&raw)[..*len].to_vec();
1221 ValueWord::from_u64_array(std::sync::Arc::new(
1222 shape_value::TypedBuffer::from_vec(data),
1223 ))
1224 }
1225 TypedArrayElementKind::F32 => {
1226 let data: Vec<f32> = bytes_as_slice::<f32>(&raw)[..*len].to_vec();
1227 ValueWord::from_f32_array(std::sync::Arc::new(
1228 shape_value::TypedBuffer::from_vec(data),
1229 ))
1230 }
1231 }
1232 }
1233 SerializableVMValue::Matrix { blob, rows, cols } => {
1234 let chunked: ChunkedBlob = store.get_struct(&blob.hash)?;
1235 let raw = load_chunked_bytes(&chunked, store)?;
1236 let data: Vec<f64> = bytes_as_slice::<f64>(&raw).to_vec();
1237 let aligned = shape_value::AlignedVec::from_vec(data);
1238 let matrix = shape_value::heap_value::MatrixData::from_flat(aligned, *rows, *cols);
1239 ValueWord::from_matrix(Box::new(matrix))
1240 }
1241 SerializableVMValue::HashMap { keys, values } => {
1242 let mut k_out = Vec::with_capacity(keys.len());
1243 for k in keys.iter() {
1244 k_out.push(serializable_to_nanboxed(k, store)?);
1245 }
1246 let mut v_out = Vec::with_capacity(values.len());
1247 for v in values.iter() {
1248 v_out.push(serializable_to_nanboxed(v, store)?);
1249 }
1250 ValueWord::from_hashmap_pairs(k_out, v_out)
1251 }
1252 SerializableVMValue::SidecarRef { .. } => {
1253 return Err(anyhow::anyhow!(
1254 "SidecarRef must be reassembled before deserialization"
1255 ));
1256 }
1257 SerializableVMValue::Enum(ev) => {
1258 let enum_val = snapshot_to_enum(ev, store)?;
1260 enum_val
1261 }
1262 SerializableVMValue::Closure {
1263 function_id,
1264 upvalues,
1265 } => {
1266 let mut ups = Vec::new();
1267 for v in upvalues.iter() {
1268 ups.push(Upvalue::new(serializable_to_nanboxed(v, store)?));
1269 }
1270 ValueWord::from_heap_value(shape_value::heap_value::HeapValue::Closure {
1271 function_id: *function_id,
1272 upvalues: ups,
1273 })
1274 }
1275 SerializableVMValue::TypedObject {
1276 schema_id,
1277 slot_data,
1278 heap_mask,
1279 } => {
1280 let mut slots = Vec::with_capacity(slot_data.len());
1281 let mut new_heap_mask: u64 = 0;
1282 for (i, sv) in slot_data.iter().enumerate() {
1283 if *heap_mask & (1u64 << i) != 0 {
1284 let nb = serializable_to_nanboxed(sv, store)?;
1289 let (slot, is_heap) = crate::type_schema::nb_to_slot(&nb);
1290 slots.push(slot);
1291 if is_heap {
1292 new_heap_mask |= 1u64 << i;
1293 }
1294 } else {
1295 let n = match sv {
1297 SerializableVMValue::Number(n) => *n,
1298 _ => 0.0,
1299 };
1300 slots.push(shape_value::ValueSlot::from_number(n));
1301 }
1302 }
1303 ValueWord::from_heap_value(shape_value::heap_value::HeapValue::TypedObject {
1304 schema_id: *schema_id,
1305 slots: slots.into_boxed_slice(),
1306 heap_mask: new_heap_mask,
1307 })
1308 }
1309 SerializableVMValue::PrintResult(pr) => {
1310 let print_result = snapshot_to_print_result(pr, store)?;
1312 ValueWord::from_print_result(print_result)
1313 }
1314 SerializableVMValue::SimulationCall { name, params } => {
1315 let mut out = HashMap::new();
1317 for (k, v) in params.iter() {
1318 out.insert(k.clone(), serializable_to_nanboxed(v, store)?.clone());
1319 }
1320 ValueWord::from_simulation_call(name.clone(), out)
1321 }
1322 })
1323}
1324
1325fn enum_to_snapshot(value: &EnumValue, store: &SnapshotStore) -> Result<EnumValueSnapshot> {
1326 Ok(EnumValueSnapshot {
1327 enum_name: value.enum_name.clone(),
1328 variant: value.variant.clone(),
1329 payload: match &value.payload {
1330 EnumPayload::Unit => EnumPayloadSnapshot::Unit,
1331 EnumPayload::Tuple(values) => {
1332 let mut out = Vec::new();
1333 for v in values.iter() {
1334 out.push(nanboxed_to_serializable(v, store)?);
1335 }
1336 EnumPayloadSnapshot::Tuple(out)
1337 }
1338 EnumPayload::Struct(map) => {
1339 let mut out = Vec::new();
1340 for (k, v) in map.iter() {
1341 out.push((k.clone(), nanboxed_to_serializable(v, store)?));
1342 }
1343 EnumPayloadSnapshot::Struct(out)
1344 }
1345 },
1346 })
1347}
1348
1349fn snapshot_to_enum(snapshot: &EnumValueSnapshot, store: &SnapshotStore) -> Result<ValueWord> {
1350 Ok(ValueWord::from_enum(EnumValue {
1351 enum_name: snapshot.enum_name.clone(),
1352 variant: snapshot.variant.clone(),
1353 payload: match &snapshot.payload {
1354 EnumPayloadSnapshot::Unit => EnumPayload::Unit,
1355 EnumPayloadSnapshot::Tuple(values) => {
1356 let mut out = Vec::new();
1357 for v in values.iter() {
1358 out.push(serializable_to_nanboxed(v, store)?);
1359 }
1360 EnumPayload::Tuple(out)
1361 }
1362 EnumPayloadSnapshot::Struct(map) => {
1363 let mut out = HashMap::new();
1364 for (k, v) in map.iter() {
1365 out.insert(k.clone(), serializable_to_nanboxed(v, store)?);
1366 }
1367 EnumPayload::Struct(out)
1368 }
1369 },
1370 }))
1371}
1372
1373fn print_result_to_snapshot(
1374 result: &PrintResult,
1375 store: &SnapshotStore,
1376) -> Result<PrintableSnapshot> {
1377 let mut spans = Vec::new();
1378 for span in result.spans.iter() {
1379 match span {
1380 PrintSpan::Literal {
1381 text,
1382 start,
1383 end,
1384 span_id,
1385 } => spans.push(PrintSpanSnapshot::Literal {
1386 text: text.clone(),
1387 start: *start,
1388 end: *end,
1389 span_id: span_id.clone(),
1390 }),
1391 PrintSpan::Value {
1392 text,
1393 start,
1394 end,
1395 span_id,
1396 variable_name,
1397 raw_value,
1398 type_name,
1399 current_format,
1400 format_params,
1401 } => {
1402 let mut params = HashMap::new();
1403 for (k, v) in format_params.iter() {
1404 params.insert(k.clone(), nanboxed_to_serializable(&v.clone(), store)?);
1405 }
1406 spans.push(PrintSpanSnapshot::Value {
1407 text: text.clone(),
1408 start: *start,
1409 end: *end,
1410 span_id: span_id.clone(),
1411 variable_name: variable_name.clone(),
1412 raw_value: Box::new(nanboxed_to_serializable(raw_value.as_ref(), store)?),
1413 type_name: type_name.clone(),
1414 current_format: current_format.clone(),
1415 format_params: params,
1416 });
1417 }
1418 }
1419 }
1420 Ok(PrintableSnapshot {
1421 rendered: result.rendered.clone(),
1422 spans,
1423 })
1424}
1425
1426fn snapshot_to_print_result(
1427 snapshot: &PrintableSnapshot,
1428 store: &SnapshotStore,
1429) -> Result<PrintResult> {
1430 let mut spans = Vec::new();
1431 for span in snapshot.spans.iter() {
1432 match span {
1433 PrintSpanSnapshot::Literal {
1434 text,
1435 start,
1436 end,
1437 span_id,
1438 } => {
1439 spans.push(PrintSpan::Literal {
1440 text: text.clone(),
1441 start: *start,
1442 end: *end,
1443 span_id: span_id.clone(),
1444 });
1445 }
1446 PrintSpanSnapshot::Value {
1447 text,
1448 start,
1449 end,
1450 span_id,
1451 variable_name,
1452 raw_value,
1453 type_name,
1454 current_format,
1455 format_params,
1456 } => {
1457 let mut params = HashMap::new();
1458 for (k, v) in format_params.iter() {
1459 params.insert(k.clone(), serializable_to_nanboxed(v, store)?.clone());
1460 }
1461 spans.push(PrintSpan::Value {
1462 text: text.clone(),
1463 start: *start,
1464 end: *end,
1465 span_id: span_id.clone(),
1466 variable_name: variable_name.clone(),
1467 raw_value: Box::new(serializable_to_nanboxed(raw_value, store)?.clone()),
1468 type_name: type_name.clone(),
1469 current_format: current_format.clone(),
1470 format_params: params,
1471 });
1472 }
1473 }
1474 }
1475 Ok(PrintResult {
1476 rendered: snapshot.rendered.clone(),
1477 spans,
1478 })
1479}
1480
1481pub(crate) fn serialize_dataframe(
1482 df: &DataFrame,
1483 store: &SnapshotStore,
1484) -> Result<SerializableDataFrame> {
1485 let mut columns: Vec<_> = df.columns.iter().collect();
1486 columns.sort_by(|a, b| a.0.cmp(b.0));
1487 let mut serialized_cols = Vec::with_capacity(columns.len());
1488 for (name, values) in columns.into_iter() {
1489 let blob = store_chunked_vec(values, DEFAULT_CHUNK_LEN, store)?;
1490 serialized_cols.push(SerializableDataFrameColumn {
1491 name: name.clone(),
1492 values: blob,
1493 });
1494 }
1495 Ok(SerializableDataFrame {
1496 id: df.id.clone(),
1497 timeframe: df.timeframe,
1498 timestamps: store_chunked_vec(&df.timestamps, DEFAULT_CHUNK_LEN, store)?,
1499 columns: serialized_cols,
1500 })
1501}
1502
1503pub(crate) fn deserialize_dataframe(
1504 serialized: SerializableDataFrame,
1505 store: &SnapshotStore,
1506) -> Result<DataFrame> {
1507 let timestamps: Vec<i64> = load_chunked_vec(&serialized.timestamps, store)?;
1508 let mut columns = HashMap::new();
1509 for col in serialized.columns.into_iter() {
1510 let values: Vec<f64> = load_chunked_vec(&col.values, store)?;
1511 columns.insert(col.name, values);
1512 }
1513 Ok(DataFrame {
1514 id: serialized.id,
1515 timeframe: serialized.timeframe,
1516 timestamps,
1517 columns,
1518 })
1519}
1520
1521fn serialize_datatable(dt: &DataTable, store: &SnapshotStore) -> Result<SerializableDataTable> {
1522 let mut buf = Vec::new();
1523 let schema = dt.inner().schema();
1524 let mut writer = arrow_ipc::writer::FileWriter::try_new(&mut buf, schema.as_ref())?;
1525 writer.write(dt.inner())?;
1526 writer.finish()?;
1527 let ipc_chunks = store_chunked_vec(&buf, BYTE_CHUNK_LEN, store)?;
1528 Ok(SerializableDataTable {
1529 ipc_chunks,
1530 type_name: dt.type_name().map(|s| s.to_string()),
1531 schema_id: dt.schema_id(),
1532 })
1533}
1534
1535fn deserialize_datatable(
1536 serialized: SerializableDataTable,
1537 store: &SnapshotStore,
1538) -> Result<DataTable> {
1539 let bytes = load_chunked_vec(&serialized.ipc_chunks, store)?;
1540 let cursor = std::io::Cursor::new(bytes);
1541 let mut reader = arrow_ipc::reader::FileReader::try_new(cursor, None)?;
1542 let batch = reader
1543 .next()
1544 .transpose()?
1545 .context("no RecordBatch in DataTable snapshot")?;
1546 let mut dt = DataTable::new(batch);
1547 if let Some(name) = serialized.type_name {
1548 dt = DataTable::with_type_name(dt.into_inner(), name);
1549 }
1550 if let Some(schema_id) = serialized.schema_id {
1551 dt = dt.with_schema_id(schema_id);
1552 }
1553 Ok(dt)
1554}
1555
1556#[cfg(test)]
1557mod tests {
1558 use super::*;
1559 use arrow_array::{Float64Array, Int64Array, RecordBatch};
1560 use arrow_schema::{DataType, Field, Schema};
1561 use std::sync::Arc;
1562
1563 fn make_test_table(ids: &[i64], values: &[f64]) -> DataTable {
1565 let schema = Arc::new(Schema::new(vec![
1566 Field::new("id", DataType::Int64, false),
1567 Field::new("value", DataType::Float64, false),
1568 ]));
1569 let batch = RecordBatch::try_new(
1570 schema,
1571 vec![
1572 Arc::new(Int64Array::from(ids.to_vec())),
1573 Arc::new(Float64Array::from(values.to_vec())),
1574 ],
1575 )
1576 .unwrap();
1577 DataTable::new(batch)
1578 }
1579
1580 #[test]
1581 fn test_datatable_snapshot_roundtrip_preserves_original_data() {
1582 let dir = tempfile::tempdir().unwrap();
1583 let store_path = dir.path().join("store");
1584
1585 let original_dt = make_test_table(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &[150.0; 10]);
1586 assert_eq!(original_dt.row_count(), 10);
1587 let original_nb = ValueWord::from_datatable(Arc::new(original_dt));
1588
1589 let store = SnapshotStore::new(&store_path).unwrap();
1591 let serialized = nanboxed_to_serializable(&original_nb, &store).unwrap();
1592
1593 let restored_nb = serializable_to_nanboxed(&serialized, &store).unwrap();
1595 let restored = restored_nb.clone();
1596 let dt = restored
1597 .as_datatable()
1598 .expect("Expected DataTable after restore");
1599 assert_eq!(
1600 dt.row_count(),
1601 10,
1602 "restored table should have original 10 rows"
1603 );
1604 }
1605
1606 #[test]
1607 fn test_indexed_table_snapshot_roundtrip() {
1608 let dir = tempfile::tempdir().unwrap();
1609 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1610
1611 let dt = make_test_table(&[1, 2, 3], &[10.0, 20.0, 30.0]);
1612 let original_nb = ValueWord::from_indexed_table(1, Arc::new(dt), 0);
1613
1614 let serialized = nanboxed_to_serializable(&original_nb, &store).unwrap();
1615
1616 match &serialized {
1618 SerializableVMValue::IndexedTable {
1619 schema_id,
1620 index_col,
1621 ..
1622 } => {
1623 assert_eq!(schema_id, &1);
1624 assert_eq!(index_col, &0);
1625 }
1626 other => panic!(
1627 "Expected SerializableVMValue::IndexedTable, got {:?}",
1628 std::mem::discriminant(other)
1629 ),
1630 }
1631
1632 let restored = serializable_to_nanboxed(&serialized, &store)
1633 .unwrap()
1634 .clone();
1635 let (schema_id, table, index_col) = restored
1636 .as_indexed_table()
1637 .expect("Expected ValueWord::IndexedTable");
1638 assert_eq!(schema_id, 1);
1639 assert_eq!(index_col, 0);
1640 assert_eq!(table.row_count(), 3);
1641 }
1642
1643 #[test]
1644 fn test_typed_table_snapshot_roundtrip() {
1645 let dir = tempfile::tempdir().unwrap();
1646 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1647
1648 let dt = make_test_table(&[10, 20], &[1.5, 2.5]);
1649 let original_nb = ValueWord::from_typed_table(42, Arc::new(dt));
1650
1651 let serialized = nanboxed_to_serializable(&original_nb, &store).unwrap();
1652 let restored = serializable_to_nanboxed(&serialized, &store)
1653 .unwrap()
1654 .clone();
1655
1656 let (schema_id, table) = restored
1657 .as_typed_table()
1658 .expect("Expected ValueWord::TypedTable");
1659 assert_eq!(schema_id, 42);
1660 assert_eq!(table.row_count(), 2);
1661 let vals = table.get_f64_column("value").unwrap();
1662 assert!((vals.value(0) - 1.5).abs() < f64::EPSILON);
1663 assert!((vals.value(1) - 2.5).abs() < f64::EPSILON);
1664 }
1665
1666 #[test]
1669 fn test_float_array_typed_roundtrip() {
1670 let dir = tempfile::tempdir().unwrap();
1671 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1672
1673 let data: Vec<f64> = (0..1000).map(|i| i as f64 * 1.5).collect();
1674 let aligned = shape_value::AlignedVec::from_vec(data.clone());
1675 let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
1676 let nb = ValueWord::from_float_array(Arc::new(buf));
1677
1678 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1679 match &serialized {
1680 SerializableVMValue::TypedArray {
1681 element_kind, len, ..
1682 } => {
1683 assert_eq!(*element_kind, TypedArrayElementKind::F64);
1684 assert_eq!(*len, 1000);
1685 }
1686 other => panic!(
1687 "Expected TypedArray, got {:?}",
1688 std::mem::discriminant(other)
1689 ),
1690 }
1691
1692 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1693 let hv = restored.as_heap_ref().unwrap();
1694 match hv {
1695 shape_value::heap_value::HeapValue::FloatArray(a) => {
1696 assert_eq!(a.len(), 1000);
1697 for i in 0..1000 {
1698 assert!((a.as_slice()[i] - data[i]).abs() < f64::EPSILON);
1699 }
1700 }
1701 _ => panic!("Expected FloatArray after restore"),
1702 }
1703 }
1704
1705 #[test]
1706 fn test_int_array_typed_roundtrip() {
1707 let dir = tempfile::tempdir().unwrap();
1708 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1709
1710 let data: Vec<i64> = (0..500).map(|i| i * 3 - 100).collect();
1711 let buf = shape_value::TypedBuffer::from_vec(data.clone());
1712 let nb = ValueWord::from_int_array(Arc::new(buf));
1713
1714 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1715 match &serialized {
1716 SerializableVMValue::TypedArray {
1717 element_kind, len, ..
1718 } => {
1719 assert_eq!(*element_kind, TypedArrayElementKind::I64);
1720 assert_eq!(*len, 500);
1721 }
1722 other => panic!(
1723 "Expected TypedArray, got {:?}",
1724 std::mem::discriminant(other)
1725 ),
1726 }
1727
1728 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1729 let hv = restored.as_heap_ref().unwrap();
1730 match hv {
1731 shape_value::heap_value::HeapValue::IntArray(a) => {
1732 assert_eq!(a.len(), 500);
1733 for i in 0..500 {
1734 assert_eq!(a.as_slice()[i], data[i]);
1735 }
1736 }
1737 _ => panic!("Expected IntArray after restore"),
1738 }
1739 }
1740
1741 #[test]
1742 fn test_matrix_roundtrip() {
1743 let dir = tempfile::tempdir().unwrap();
1744 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1745
1746 let data: Vec<f64> = (0..12).map(|i| i as f64).collect();
1747 let aligned = shape_value::AlignedVec::from_vec(data.clone());
1748 let matrix = shape_value::heap_value::MatrixData::from_flat(aligned, 3, 4);
1749 let nb = ValueWord::from_matrix(Box::new(matrix));
1750
1751 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1752 match &serialized {
1753 SerializableVMValue::Matrix { rows, cols, .. } => {
1754 assert_eq!(*rows, 3);
1755 assert_eq!(*cols, 4);
1756 }
1757 other => panic!("Expected Matrix, got {:?}", std::mem::discriminant(other)),
1758 }
1759
1760 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1761 let hv = restored.as_heap_ref().unwrap();
1762 match hv {
1763 shape_value::heap_value::HeapValue::Matrix(m) => {
1764 assert_eq!(m.rows, 3);
1765 assert_eq!(m.cols, 4);
1766 for i in 0..12 {
1767 assert!((m.data.as_slice()[i] - data[i]).abs() < f64::EPSILON);
1768 }
1769 }
1770 _ => panic!("Expected Matrix after restore"),
1771 }
1772 }
1773
1774 #[test]
1775 fn test_hashmap_typed_roundtrip() {
1776 let dir = tempfile::tempdir().unwrap();
1777 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1778
1779 let keys = vec![
1780 ValueWord::from_string(Arc::new("a".to_string())),
1781 ValueWord::from_string(Arc::new("b".to_string())),
1782 ];
1783 let values = vec![ValueWord::from_i64(1), ValueWord::from_i64(2)];
1784 let nb = ValueWord::from_hashmap_pairs(keys, values);
1785
1786 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1787 match &serialized {
1788 SerializableVMValue::HashMap { keys, values } => {
1789 assert_eq!(keys.len(), 2);
1790 assert_eq!(values.len(), 2);
1791 }
1792 other => panic!("Expected HashMap, got {:?}", std::mem::discriminant(other)),
1793 }
1794
1795 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1796 let hv = restored.as_heap_ref().unwrap();
1797 match hv {
1798 shape_value::heap_value::HeapValue::HashMap(d) => {
1799 assert_eq!(d.keys.len(), 2);
1800 assert_eq!(d.values.len(), 2);
1801 let key_a = ValueWord::from_string(Arc::new("a".to_string()));
1803 let idx = d.find_key(&key_a);
1804 assert!(idx.is_some(), "should find key 'a' in rebuilt index");
1805 }
1806 _ => panic!("Expected HashMap after restore"),
1807 }
1808 }
1809
1810 #[test]
1811 fn test_bool_array_typed_roundtrip() {
1812 let dir = tempfile::tempdir().unwrap();
1813 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1814
1815 let data: Vec<u8> = vec![1, 0, 1, 1, 0];
1816 let buf = shape_value::TypedBuffer::from_vec(data.clone());
1817 let nb = ValueWord::from_bool_array(Arc::new(buf));
1818
1819 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1820 match &serialized {
1821 SerializableVMValue::TypedArray {
1822 element_kind, len, ..
1823 } => {
1824 assert_eq!(*element_kind, TypedArrayElementKind::Bool);
1825 assert_eq!(*len, 5);
1826 }
1827 other => panic!(
1828 "Expected TypedArray, got {:?}",
1829 std::mem::discriminant(other)
1830 ),
1831 }
1832
1833 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1834 let hv = restored.as_heap_ref().unwrap();
1835 match hv {
1836 shape_value::heap_value::HeapValue::BoolArray(a) => {
1837 assert_eq!(a.len(), 5);
1838 assert_eq!(a.as_slice(), &data);
1839 }
1840 _ => panic!("Expected BoolArray after restore"),
1841 }
1842 }
1843
1844 #[test]
1847 fn test_all_snapshot_components_bincode_roundtrip() {
1848 use rust_decimal::Decimal;
1849 use shape_ast::ast::VarKind;
1850 use shape_ast::data::Timeframe;
1851
1852 let decimal_val = SerializableVMValue::Decimal(Decimal::new(31415, 4)); let bytes = bincode::serialize(&decimal_val).expect("serialize Decimal ValueWord");
1855 let decoded: SerializableVMValue =
1856 bincode::deserialize(&bytes).expect("deserialize Decimal ValueWord");
1857 match decoded {
1858 SerializableVMValue::Decimal(d) => assert_eq!(d, Decimal::new(31415, 4)),
1859 _ => panic!("wrong variant"),
1860 }
1861
1862 let vm_snap = VmSnapshot {
1864 ip: 42,
1865 stack: vec![
1866 SerializableVMValue::Int(1),
1867 SerializableVMValue::Decimal(Decimal::new(99999, 2)),
1868 SerializableVMValue::String("hello".into()),
1869 ],
1870 locals: vec![SerializableVMValue::Decimal(Decimal::new(0, 0))],
1871 module_bindings: vec![SerializableVMValue::Number(3.14)],
1872 call_stack: vec![],
1873 loop_stack: vec![],
1874 timeframe_stack: vec![],
1875 exception_handlers: vec![],
1876 };
1877 let bytes = bincode::serialize(&vm_snap).expect("serialize VmSnapshot");
1878 let decoded: VmSnapshot = bincode::deserialize(&bytes).expect("deserialize VmSnapshot");
1879 assert_eq!(decoded.ip, 42);
1880 assert_eq!(decoded.stack.len(), 3);
1881
1882 let ctx_snap = ContextSnapshot {
1884 data_load_mode: crate::context::DataLoadMode::Async,
1885 data_cache: None,
1886 current_id: Some("test".into()),
1887 current_row_index: 0,
1888 variable_scopes: vec![{
1889 let mut scope = HashMap::new();
1890 scope.insert(
1891 "price".into(),
1892 VariableSnapshot {
1893 value: SerializableVMValue::Decimal(Decimal::new(15099, 2)),
1894 kind: VarKind::Let,
1895 is_initialized: true,
1896 is_function_scoped: false,
1897 format_hint: None,
1898 format_overrides: None,
1899 },
1900 );
1901 scope
1902 }],
1903 reference_datetime: None,
1904 current_timeframe: Some(Timeframe::m1()),
1905 base_timeframe: None,
1906 date_range: None,
1907 range_start: 0,
1908 range_end: 0,
1909 range_active: false,
1910 type_alias_registry: HashMap::new(),
1911 enum_registry: HashMap::new(),
1912 suspension_state: None,
1913 };
1914 let bytes = bincode::serialize(&ctx_snap).expect("serialize ContextSnapshot");
1915 let decoded: ContextSnapshot =
1916 bincode::deserialize(&bytes).expect("deserialize ContextSnapshot");
1917 assert_eq!(decoded.variable_scopes.len(), 1);
1918
1919 let sem_snap = SemanticSnapshot {
1921 symbol_table: crate::semantic::symbol_table::SymbolTable::new(),
1922 exported_symbols: HashSet::new(),
1923 };
1924 let bytes = bincode::serialize(&sem_snap).expect("serialize SemanticSnapshot");
1925 let _decoded: SemanticSnapshot =
1926 bincode::deserialize(&bytes).expect("deserialize SemanticSnapshot");
1927
1928 let exec_snap = ExecutionSnapshot {
1930 version: SNAPSHOT_VERSION,
1931 created_at_ms: 1234567890,
1932 semantic_hash: HashDigest::from_hex("abc123"),
1933 context_hash: HashDigest::from_hex("def456"),
1934 vm_hash: Some(HashDigest::from_hex("789aaa")),
1935 bytecode_hash: Some(HashDigest::from_hex("bbb000")),
1936 script_path: Some("/tmp/test.shape".into()),
1937 };
1938 let bytes = bincode::serialize(&exec_snap).expect("serialize ExecutionSnapshot");
1939 let decoded: ExecutionSnapshot =
1940 bincode::deserialize(&bytes).expect("deserialize ExecutionSnapshot");
1941 assert_eq!(decoded.version, SNAPSHOT_VERSION);
1942 }
1943}