1use std::collections::{HashMap, HashSet};
6use std::fs;
7use std::io::{Read, Write};
8use std::path::PathBuf;
9
10use anyhow::{Context, Result};
11use serde::de::DeserializeOwned;
12use serde::{Deserialize, Serialize};
13use shape_value::{EnumPayload, EnumValue, PrintResult, PrintSpan, Upvalue, ValueWord};
14
15use crate::event_queue::WaitCondition;
16use crate::hashing::{HashDigest, hash_bytes};
17use shape_ast::ast::{DataDateTimeRef, DateTimeExpr, EnumDef, TimeReference, TypeAnnotation};
18use shape_ast::data::Timeframe;
19
20use crate::data::DataFrame;
21use shape_value::datatable::DataTable;
22
23pub const SNAPSHOT_VERSION: u32 = 5;
31
32pub(crate) const DEFAULT_CHUNK_LEN: usize = 4096;
33pub(crate) const BYTE_CHUNK_LEN: usize = 256 * 1024;
34
35#[derive(Clone)]
37pub struct SnapshotStore {
38 root: PathBuf,
39}
40
41impl SnapshotStore {
42 pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
43 let root = root.into();
44 fs::create_dir_all(root.join("blobs"))
45 .with_context(|| format!("failed to create snapshot blob dir at {}", root.display()))?;
46 fs::create_dir_all(root.join("snapshots"))
47 .with_context(|| format!("failed to create snapshot dir at {}", root.display()))?;
48 Ok(Self { root })
49 }
50
51 fn blob_path(&self, hash: &HashDigest) -> PathBuf {
52 self.root
53 .join("blobs")
54 .join(format!("{}.bin.zst", hash.hex()))
55 }
56
57 fn snapshot_path(&self, hash: &HashDigest) -> PathBuf {
58 self.root
59 .join("snapshots")
60 .join(format!("{}.bin.zst", hash.hex()))
61 }
62
63 pub fn put_blob(&self, data: &[u8]) -> Result<HashDigest> {
64 let hash = hash_bytes(data);
65 let path = self.blob_path(&hash);
66 if path.exists() {
67 return Ok(hash);
68 }
69 let compressed = zstd::stream::encode_all(data, 0)?;
70 let mut file = fs::File::create(&path)?;
71 file.write_all(&compressed)?;
72 Ok(hash)
73 }
74
75 pub fn get_blob(&self, hash: &HashDigest) -> Result<Vec<u8>> {
76 let path = self.blob_path(hash);
77 let mut file = fs::File::open(&path)
78 .with_context(|| format!("snapshot blob not found: {}", path.display()))?;
79 let mut buf = Vec::new();
80 file.read_to_end(&mut buf)?;
81 let decompressed = zstd::stream::decode_all(&buf[..])?;
82 Ok(decompressed)
83 }
84
85 pub fn put_struct<T: Serialize>(&self, value: &T) -> Result<HashDigest> {
86 let bytes = bincode::serialize(value)?;
87 self.put_blob(&bytes)
88 }
89
90 pub fn get_struct<T: for<'de> Deserialize<'de>>(&self, hash: &HashDigest) -> Result<T> {
91 let bytes = self.get_blob(hash)?;
92 Ok(bincode::deserialize(&bytes)?)
93 }
94
95 pub fn put_snapshot(&self, snapshot: &ExecutionSnapshot) -> Result<HashDigest> {
96 let bytes = bincode::serialize(snapshot)?;
97 let hash = hash_bytes(&bytes);
98 let path = self.snapshot_path(&hash);
99 if !path.exists() {
100 let compressed = zstd::stream::encode_all(&bytes[..], 0)?;
101 let mut file = fs::File::create(&path)?;
102 file.write_all(&compressed)?;
103 }
104 Ok(hash)
105 }
106
107 pub fn get_snapshot(&self, hash: &HashDigest) -> Result<ExecutionSnapshot> {
108 let path = self.snapshot_path(hash);
109 let mut file = fs::File::open(&path)
110 .with_context(|| format!("snapshot not found: {}", path.display()))?;
111 let mut buf = Vec::new();
112 file.read_to_end(&mut buf)?;
113 let decompressed = zstd::stream::decode_all(&buf[..])?;
114 Ok(bincode::deserialize(&decompressed)?)
115 }
116
117 pub fn list_snapshots(&self) -> Result<Vec<(HashDigest, ExecutionSnapshot)>> {
119 let snapshots_dir = self.root.join("snapshots");
120 if !snapshots_dir.exists() {
121 return Ok(Vec::new());
122 }
123 let mut results = Vec::new();
124 for entry in fs::read_dir(&snapshots_dir)? {
125 let entry = entry?;
126 let path = entry.path();
127 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
128 if let Some(hex) = name.strip_suffix(".bin.zst") {
130 let hash = HashDigest::from_hex(hex);
131 match self.get_snapshot(&hash) {
132 Ok(snap) => results.push((hash, snap)),
133 Err(_) => continue, }
135 }
136 }
137 }
138 results.sort_by(|a, b| b.1.created_at_ms.cmp(&a.1.created_at_ms));
140 Ok(results)
141 }
142
143 pub fn delete_snapshot(&self, hash: &HashDigest) -> Result<()> {
145 let path = self.snapshot_path(hash);
146 fs::remove_file(&path)
147 .with_context(|| format!("failed to delete snapshot: {}", path.display()))?;
148 Ok(())
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
153pub struct ExecutionSnapshot {
154 pub version: u32,
155 pub created_at_ms: i64,
156 pub semantic_hash: HashDigest,
157 pub context_hash: HashDigest,
158 pub vm_hash: Option<HashDigest>,
159 pub bytecode_hash: Option<HashDigest>,
160 #[serde(default)]
162 pub script_path: Option<String>,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct SemanticSnapshot {
167 pub exported_symbols: HashSet<String>,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct ContextSnapshot {
172 pub data_load_mode: crate::context::DataLoadMode,
173 pub data_cache: Option<DataCacheSnapshot>,
174 pub current_id: Option<String>,
175 pub current_row_index: usize,
176 pub variable_scopes: Vec<HashMap<String, VariableSnapshot>>,
177 pub reference_datetime: Option<chrono::DateTime<chrono::Utc>>,
178 pub current_timeframe: Option<Timeframe>,
179 pub base_timeframe: Option<Timeframe>,
180 pub date_range: Option<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>,
181 pub range_start: usize,
182 pub range_end: usize,
183 pub range_active: bool,
184 pub type_alias_registry: HashMap<String, TypeAliasRuntimeEntrySnapshot>,
185 pub enum_registry: HashMap<String, EnumDef>,
186 pub suspension_state: Option<SuspensionStateSnapshot>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct VariableSnapshot {
191 pub value: SerializableVMValue,
192 pub kind: shape_ast::ast::VarKind,
193 pub is_initialized: bool,
194 pub is_function_scoped: bool,
195 pub format_hint: Option<String>,
196 pub format_overrides: Option<HashMap<String, SerializableVMValue>>,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct TypeAliasRuntimeEntrySnapshot {
201 pub base_type: String,
202 pub overrides: Option<HashMap<String, SerializableVMValue>>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SuspensionStateSnapshot {
207 pub waiting_for: WaitCondition,
208 pub resume_pc: usize,
209 pub saved_locals: Vec<SerializableVMValue>,
210 pub saved_stack: Vec<SerializableVMValue>,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct VmSnapshot {
215 pub ip: usize,
216 pub stack: Vec<SerializableVMValue>,
217 pub locals: Vec<SerializableVMValue>,
218 pub module_bindings: Vec<SerializableVMValue>,
219 pub call_stack: Vec<SerializableCallFrame>,
220 pub loop_stack: Vec<SerializableLoopContext>,
221 pub timeframe_stack: Vec<Option<Timeframe>>,
222 pub exception_handlers: Vec<SerializableExceptionHandler>,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct SerializableCallFrame {
227 pub return_ip: usize,
228 pub locals_base: usize,
229 pub locals_count: usize,
230 pub function_id: Option<u16>,
231 pub upvalues: Option<Vec<SerializableVMValue>>,
232 #[serde(default)]
236 pub blob_hash: Option<[u8; 32]>,
237 #[serde(default)]
241 pub local_ip: Option<usize>,
242}
243
244#[derive(Debug, Clone, Serialize, Deserialize)]
245pub struct SerializableLoopContext {
246 pub start: usize,
247 pub end: usize,
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct SerializableExceptionHandler {
252 pub catch_ip: usize,
253 pub stack_size: usize,
254 pub call_depth: usize,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub enum SerializableVMValue {
259 Int(i64),
260 Number(f64),
261 Decimal(rust_decimal::Decimal),
262 String(String),
263 Bool(bool),
264 None,
265 Some(Box<SerializableVMValue>),
266 Unit,
267 Timeframe(Timeframe),
268 Duration(shape_ast::ast::Duration),
269 Time(chrono::DateTime<chrono::FixedOffset>),
270 TimeSpan(i64), TimeReference(TimeReference),
272 DateTimeExpr(DateTimeExpr),
273 DataDateTimeRef(DataDateTimeRef),
274 Array(Vec<SerializableVMValue>),
275 Function(u16),
276 TypeAnnotation(TypeAnnotation),
277 TypeAnnotatedValue {
278 type_name: String,
279 value: Box<SerializableVMValue>,
280 },
281 Enum(EnumValueSnapshot),
282 Closure {
283 function_id: u16,
284 upvalues: Vec<SerializableVMValue>,
285 },
286 ModuleFunction(String),
287 TypedObject {
288 schema_id: u64,
289 slot_data: Vec<SerializableVMValue>,
291 heap_mask: u64,
292 },
293 Range {
294 start: Option<Box<SerializableVMValue>>,
295 end: Option<Box<SerializableVMValue>>,
296 inclusive: bool,
297 },
298 Ok(Box<SerializableVMValue>),
299 Err(Box<SerializableVMValue>),
300 PrintResult(PrintableSnapshot),
301 SimulationCall {
302 name: String,
303 params: HashMap<String, SerializableVMValue>,
304 },
305 FunctionRef {
306 name: String,
307 closure: Option<Box<SerializableVMValue>>,
308 },
309 DataReference {
310 datetime: chrono::DateTime<chrono::FixedOffset>,
311 id: String,
312 timeframe: Timeframe,
313 },
314 Future(u64),
315 DataTable(BlobRef),
316 TypedTable {
317 schema_id: u64,
318 table: BlobRef,
319 },
320 RowView {
321 schema_id: u64,
322 table: BlobRef,
323 row_idx: usize,
324 },
325 ColumnRef {
326 schema_id: u64,
327 table: BlobRef,
328 col_id: u32,
329 },
330 IndexedTable {
331 schema_id: u64,
332 table: BlobRef,
333 index_col: u32,
334 },
335 TypedArray {
337 element_kind: TypedArrayElementKind,
338 blob: BlobRef,
339 len: usize,
340 },
341 Matrix {
343 blob: BlobRef,
344 rows: u32,
345 cols: u32,
346 },
347 HashMap {
349 keys: Vec<SerializableVMValue>,
350 values: Vec<SerializableVMValue>,
351 },
352 SidecarRef {
356 sidecar_id: u32,
357 blob_kind: BlobKind,
358 original_hash: HashDigest,
359 meta_a: u32,
361 meta_b: u32,
363 },
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
367pub struct EnumValueSnapshot {
368 pub enum_name: String,
369 pub variant: String,
370 pub payload: EnumPayloadSnapshot,
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub enum EnumPayloadSnapshot {
375 Unit,
376 Tuple(Vec<SerializableVMValue>),
377 Struct(Vec<(String, SerializableVMValue)>),
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct PrintableSnapshot {
382 pub rendered: String,
383 pub spans: Vec<PrintSpanSnapshot>,
384}
385
386#[derive(Debug, Clone, Serialize, Deserialize)]
387pub enum PrintSpanSnapshot {
388 Literal {
389 text: String,
390 start: usize,
391 end: usize,
392 span_id: String,
393 },
394 Value {
395 text: String,
396 start: usize,
397 end: usize,
398 span_id: String,
399 variable_name: Option<String>,
400 raw_value: Box<SerializableVMValue>,
401 type_name: String,
402 current_format: String,
403 format_params: HashMap<String, SerializableVMValue>,
404 },
405}
406
407#[derive(Debug, Clone, Serialize, Deserialize)]
408pub struct BlobRef {
409 pub hash: HashDigest,
410 pub kind: BlobKind,
411}
412
413#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
415pub enum TypedArrayElementKind {
416 I8,
417 I16,
418 I32,
419 I64,
420 U8,
421 U16,
422 U32,
423 U64,
424 F32,
425 F64,
426 Bool,
427}
428
429#[derive(Debug, Clone, Serialize, Deserialize)]
430pub enum BlobKind {
431 DataTable,
432 TypedArray(TypedArrayElementKind),
434 Matrix,
436}
437
438#[derive(Debug, Clone, Serialize, Deserialize)]
439pub struct ChunkedBlob {
440 pub chunk_hashes: Vec<HashDigest>,
441 pub total_len: usize,
442 pub chunk_len: usize,
443}
444
445#[derive(Debug, Clone, Serialize, Deserialize)]
446pub struct SerializableDataTable {
447 pub ipc_chunks: ChunkedBlob,
448 pub type_name: Option<String>,
449 pub schema_id: Option<u32>,
450}
451
452#[derive(Debug, Clone, Serialize, Deserialize)]
453pub struct SerializableDataFrame {
454 pub id: String,
455 pub timeframe: Timeframe,
456 pub timestamps: ChunkedBlob,
457 pub columns: Vec<SerializableDataFrameColumn>,
458}
459
460#[derive(Debug, Clone, Serialize, Deserialize)]
461pub struct SerializableDataFrameColumn {
462 pub name: String,
463 pub values: ChunkedBlob,
464}
465
466#[derive(Debug, Clone, Serialize, Deserialize)]
467pub struct CacheKeySnapshot {
468 pub id: String,
469 pub timeframe: Timeframe,
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct CachedDataSnapshot {
474 pub key: CacheKeySnapshot,
475 pub historical: SerializableDataFrame,
476 pub current_index: usize,
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
480pub struct LiveBufferSnapshot {
481 pub key: CacheKeySnapshot,
482 pub rows: ChunkedBlob,
483}
484
485#[derive(Debug, Clone, Serialize, Deserialize)]
486pub struct DataCacheSnapshot {
487 pub historical: Vec<CachedDataSnapshot>,
488 pub live_buffer: Vec<LiveBufferSnapshot>,
489}
490
491pub(crate) fn store_chunked_vec<T: Serialize>(
492 values: &[T],
493 chunk_len: usize,
494 store: &SnapshotStore,
495) -> Result<ChunkedBlob> {
496 let chunk_len = chunk_len.max(1);
497 if values.is_empty() {
498 return Ok(ChunkedBlob {
499 chunk_hashes: Vec::new(),
500 total_len: 0,
501 chunk_len,
502 });
503 }
504 let mut hashes = Vec::new();
505 for chunk in values.chunks(chunk_len) {
506 let bytes = bincode::serialize(chunk)?;
507 let hash = store.put_blob(&bytes)?;
508 hashes.push(hash);
509 }
510 Ok(ChunkedBlob {
511 chunk_hashes: hashes,
512 total_len: values.len(),
513 chunk_len,
514 })
515}
516
517pub(crate) fn load_chunked_vec<T: DeserializeOwned>(
518 chunked: &ChunkedBlob,
519 store: &SnapshotStore,
520) -> Result<Vec<T>> {
521 if chunked.total_len == 0 {
522 return Ok(Vec::new());
523 }
524 let mut out = Vec::with_capacity(chunked.total_len);
525 for hash in &chunked.chunk_hashes {
526 let bytes = store.get_blob(hash)?;
527 let chunk: Vec<T> = bincode::deserialize(&bytes)?;
528 out.extend(chunk);
529 }
530 out.truncate(chunked.total_len);
531 Ok(out)
532}
533
534pub fn store_chunked_bytes(data: &[u8], store: &SnapshotStore) -> Result<ChunkedBlob> {
536 if data.is_empty() {
537 return Ok(ChunkedBlob {
538 chunk_hashes: Vec::new(),
539 total_len: 0,
540 chunk_len: BYTE_CHUNK_LEN,
541 });
542 }
543 let mut hashes = Vec::new();
544 for chunk in data.chunks(BYTE_CHUNK_LEN) {
545 let hash = store.put_blob(chunk)?;
546 hashes.push(hash);
547 }
548 Ok(ChunkedBlob {
549 chunk_hashes: hashes,
550 total_len: data.len(),
551 chunk_len: BYTE_CHUNK_LEN,
552 })
553}
554
555pub fn load_chunked_bytes(chunked: &ChunkedBlob, store: &SnapshotStore) -> Result<Vec<u8>> {
557 if chunked.total_len == 0 {
558 return Ok(Vec::new());
559 }
560 let mut out = Vec::with_capacity(chunked.total_len);
561 for hash in &chunked.chunk_hashes {
562 let bytes = store.get_blob(hash)?;
563 out.extend_from_slice(&bytes);
564 }
565 out.truncate(chunked.total_len);
566 Ok(out)
567}
568
569fn bytes_as_slice<T: Copy>(bytes: &[u8]) -> &[T] {
574 let elem_size = std::mem::size_of::<T>();
575 assert!(
576 bytes.len() % elem_size == 0,
577 "byte slice length {} not a multiple of element size {}",
578 bytes.len(),
579 elem_size
580 );
581 let len = bytes.len() / elem_size;
582 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const T, len) }
583}
584
585fn slice_as_bytes<T>(data: &[T]) -> &[u8] {
587 let byte_len = data.len() * std::mem::size_of::<T>();
588 unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, byte_len) }
589}
590
591pub fn nanboxed_to_serializable(
599 nb: &ValueWord,
600 store: &SnapshotStore,
601) -> Result<SerializableVMValue> {
602 use shape_value::value_word::NanTag;
603
604 match nb.tag() {
605 NanTag::F64 => Ok(SerializableVMValue::Number(nb.as_f64().unwrap())),
606 NanTag::I48 => Ok(SerializableVMValue::Int(nb.as_i64().unwrap())),
607 NanTag::Bool => Ok(SerializableVMValue::Bool(nb.as_bool().unwrap())),
608 NanTag::None => Ok(SerializableVMValue::None),
609 NanTag::Unit => Ok(SerializableVMValue::Unit),
610 NanTag::Function => Ok(SerializableVMValue::Function(nb.as_function().unwrap())),
611 NanTag::ModuleFunction => Ok(SerializableVMValue::ModuleFunction(format!(
612 "native#{}",
613 nb.as_module_function().unwrap()
614 ))),
615 NanTag::Heap => {
616 let hv = nb.as_heap_ref().unwrap();
617 heap_value_to_serializable(hv, store)
618 }
619 NanTag::Ref => Ok(SerializableVMValue::None), }
621}
622
623fn heap_value_to_serializable(
625 hv: &shape_value::heap_value::HeapValue,
626 store: &SnapshotStore,
627) -> Result<SerializableVMValue> {
628 use shape_value::heap_value::HeapValue;
629
630 Ok(match hv {
631 HeapValue::String(s) => SerializableVMValue::String((**s).clone()),
632 HeapValue::Decimal(d) => SerializableVMValue::Decimal(*d),
633 HeapValue::BigInt(i) => SerializableVMValue::Int(*i),
634 HeapValue::Array(arr) => {
635 let mut out = Vec::with_capacity(arr.len());
636 for v in arr.iter() {
637 out.push(nanboxed_to_serializable(v, store)?);
638 }
639 SerializableVMValue::Array(out)
640 }
641 HeapValue::Closure {
642 function_id,
643 upvalues,
644 } => {
645 let mut ups = Vec::new();
646 for up in upvalues.iter() {
647 let nb = up.get();
648 ups.push(nanboxed_to_serializable(&nb, store)?);
649 }
650 SerializableVMValue::Closure {
651 function_id: *function_id,
652 upvalues: ups,
653 }
654 }
655 HeapValue::TypedObject {
656 schema_id,
657 slots,
658 heap_mask,
659 } => {
660 let mut slot_data = Vec::with_capacity(slots.len());
661 for i in 0..slots.len() {
662 if *heap_mask & (1u64 << i) != 0 {
663 let hv_inner = slots[i].as_heap_value();
664 slot_data.push(heap_value_to_serializable(hv_inner, store)?);
665 } else {
666 let nb = unsafe { ValueWord::clone_from_bits(slots[i].raw()) };
669 slot_data.push(nanboxed_to_serializable(&nb, store)?);
670 }
671 }
672 SerializableVMValue::TypedObject {
673 schema_id: *schema_id,
674 slot_data,
675 heap_mask: *heap_mask,
676 }
677 }
678 HeapValue::HostClosure(_)
679 | HeapValue::ExprProxy(_)
680 | HeapValue::FilterExpr(_)
681 | HeapValue::TaskGroup { .. }
682 | HeapValue::TraitObject { .. }
683 | HeapValue::NativeView(_) => {
684 return Err(anyhow::anyhow!(
685 "Cannot snapshot transient value: {}",
686 hv.type_name()
687 ));
688 }
689 HeapValue::Enum(ev) => SerializableVMValue::Enum(enum_to_snapshot(ev, store)?),
690 HeapValue::Some(inner) => {
691 SerializableVMValue::Some(Box::new(nanboxed_to_serializable(inner, store)?))
692 }
693 HeapValue::Ok(inner) => {
694 SerializableVMValue::Ok(Box::new(nanboxed_to_serializable(inner, store)?))
695 }
696 HeapValue::Err(inner) => {
697 SerializableVMValue::Err(Box::new(nanboxed_to_serializable(inner, store)?))
698 }
699 HeapValue::Range {
700 start,
701 end,
702 inclusive,
703 } => SerializableVMValue::Range {
704 start: match start {
705 Some(v) => Some(Box::new(nanboxed_to_serializable(v, store)?)),
706 None => None,
707 },
708 end: match end {
709 Some(v) => Some(Box::new(nanboxed_to_serializable(v, store)?)),
710 None => None,
711 },
712 inclusive: *inclusive,
713 },
714 HeapValue::Timeframe(tf) => SerializableVMValue::Timeframe(*tf),
715 HeapValue::Duration(d) => SerializableVMValue::Duration(d.clone()),
716 HeapValue::Time(t) => SerializableVMValue::Time(*t),
717 HeapValue::TimeSpan(span) => SerializableVMValue::TimeSpan(span.num_milliseconds()),
718 HeapValue::TimeReference(tr) => SerializableVMValue::TimeReference((**tr).clone()),
719 HeapValue::DateTimeExpr(expr) => SerializableVMValue::DateTimeExpr((**expr).clone()),
720 HeapValue::DataDateTimeRef(dref) => SerializableVMValue::DataDateTimeRef((**dref).clone()),
721 HeapValue::TypeAnnotation(ta) => SerializableVMValue::TypeAnnotation((**ta).clone()),
722 HeapValue::TypeAnnotatedValue { type_name, value } => {
723 SerializableVMValue::TypeAnnotatedValue {
724 type_name: type_name.clone(),
725 value: Box::new(nanboxed_to_serializable(value, store)?),
726 }
727 }
728 HeapValue::PrintResult(pr) => {
729 SerializableVMValue::PrintResult(print_result_to_snapshot(pr, store)?)
730 }
731 HeapValue::SimulationCall(data) => {
732 let mut out = HashMap::new();
733 for (k, v) in data.params.iter() {
734 out.insert(k.clone(), nanboxed_to_serializable(&v.clone(), store)?);
736 }
737 SerializableVMValue::SimulationCall {
738 name: data.name.clone(),
739 params: out,
740 }
741 }
742 HeapValue::FunctionRef { name, closure } => SerializableVMValue::FunctionRef {
743 name: name.clone(),
744 closure: match closure {
745 Some(c) => Some(Box::new(nanboxed_to_serializable(c, store)?)),
746 None => None,
747 },
748 },
749 HeapValue::DataReference(data) => SerializableVMValue::DataReference {
750 datetime: data.datetime,
751 id: data.id.clone(),
752 timeframe: data.timeframe,
753 },
754 HeapValue::Future(id) => SerializableVMValue::Future(*id),
755 HeapValue::DataTable(dt) => {
756 let ser = serialize_datatable(dt, store)?;
757 let hash = store.put_struct(&ser)?;
758 SerializableVMValue::DataTable(BlobRef {
759 hash,
760 kind: BlobKind::DataTable,
761 })
762 }
763 HeapValue::TypedTable { schema_id, table } => {
764 let ser = serialize_datatable(table, store)?;
765 let hash = store.put_struct(&ser)?;
766 SerializableVMValue::TypedTable {
767 schema_id: *schema_id,
768 table: BlobRef {
769 hash,
770 kind: BlobKind::DataTable,
771 },
772 }
773 }
774 HeapValue::RowView {
775 schema_id,
776 table,
777 row_idx,
778 } => {
779 let ser = serialize_datatable(table, store)?;
780 let hash = store.put_struct(&ser)?;
781 SerializableVMValue::RowView {
782 schema_id: *schema_id,
783 table: BlobRef {
784 hash,
785 kind: BlobKind::DataTable,
786 },
787 row_idx: *row_idx,
788 }
789 }
790 HeapValue::ColumnRef {
791 schema_id,
792 table,
793 col_id,
794 } => {
795 let ser = serialize_datatable(table, store)?;
796 let hash = store.put_struct(&ser)?;
797 SerializableVMValue::ColumnRef {
798 schema_id: *schema_id,
799 table: BlobRef {
800 hash,
801 kind: BlobKind::DataTable,
802 },
803 col_id: *col_id,
804 }
805 }
806 HeapValue::IndexedTable {
807 schema_id,
808 table,
809 index_col,
810 } => {
811 let ser = serialize_datatable(table, store)?;
812 let hash = store.put_struct(&ser)?;
813 SerializableVMValue::IndexedTable {
814 schema_id: *schema_id,
815 table: BlobRef {
816 hash,
817 kind: BlobKind::DataTable,
818 },
819 index_col: *index_col,
820 }
821 }
822 HeapValue::NativeScalar(v) => {
823 if let Some(i) = v.as_i64() {
824 SerializableVMValue::Int(i)
825 } else {
826 SerializableVMValue::Number(v.as_f64())
827 }
828 }
829 HeapValue::HashMap(d) => {
830 let mut out_keys = Vec::with_capacity(d.keys.len());
831 let mut out_values = Vec::with_capacity(d.values.len());
832 for k in d.keys.iter() {
833 out_keys.push(nanboxed_to_serializable(k, store)?);
834 }
835 for v in d.values.iter() {
836 out_values.push(nanboxed_to_serializable(v, store)?);
837 }
838 SerializableVMValue::HashMap {
839 keys: out_keys,
840 values: out_values,
841 }
842 }
843 HeapValue::Set(d) => {
844 let mut out = Vec::with_capacity(d.items.len());
845 for item in d.items.iter() {
846 out.push(nanboxed_to_serializable(item, store)?);
847 }
848 SerializableVMValue::Array(out)
849 }
850 HeapValue::Deque(d) => {
851 let mut out = Vec::with_capacity(d.items.len());
852 for item in d.items.iter() {
853 out.push(nanboxed_to_serializable(item, store)?);
854 }
855 SerializableVMValue::Array(out)
856 }
857 HeapValue::PriorityQueue(d) => {
858 let mut out = Vec::with_capacity(d.items.len());
859 for item in d.items.iter() {
860 out.push(nanboxed_to_serializable(item, store)?);
861 }
862 SerializableVMValue::Array(out)
863 }
864 HeapValue::Content(node) => SerializableVMValue::String(format!("{}", node)),
865 HeapValue::Instant(t) => {
866 SerializableVMValue::String(format!("<instant:{:?}>", t.elapsed()))
867 }
868 HeapValue::IoHandle(data) => {
869 let status = if data.is_open() { "open" } else { "closed" };
870 SerializableVMValue::String(format!("<io_handle:{}:{}>", data.path, status))
871 }
872 HeapValue::SharedCell(arc) => nanboxed_to_serializable(&arc.read().unwrap(), store)?,
873 HeapValue::IntArray(a) => {
874 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
875 let hash = store.put_struct(&blob)?;
876 SerializableVMValue::TypedArray {
877 element_kind: TypedArrayElementKind::I64,
878 blob: BlobRef {
879 hash,
880 kind: BlobKind::TypedArray(TypedArrayElementKind::I64),
881 },
882 len: a.len(),
883 }
884 }
885 HeapValue::FloatArray(a) => {
886 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
887 let hash = store.put_struct(&blob)?;
888 SerializableVMValue::TypedArray {
889 element_kind: TypedArrayElementKind::F64,
890 blob: BlobRef {
891 hash,
892 kind: BlobKind::TypedArray(TypedArrayElementKind::F64),
893 },
894 len: a.len(),
895 }
896 }
897 HeapValue::BoolArray(a) => {
898 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
899 let hash = store.put_struct(&blob)?;
900 SerializableVMValue::TypedArray {
901 element_kind: TypedArrayElementKind::Bool,
902 blob: BlobRef {
903 hash,
904 kind: BlobKind::TypedArray(TypedArrayElementKind::Bool),
905 },
906 len: a.len(),
907 }
908 }
909 HeapValue::I8Array(a) => {
910 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
911 let hash = store.put_struct(&blob)?;
912 SerializableVMValue::TypedArray {
913 element_kind: TypedArrayElementKind::I8,
914 blob: BlobRef {
915 hash,
916 kind: BlobKind::TypedArray(TypedArrayElementKind::I8),
917 },
918 len: a.len(),
919 }
920 }
921 HeapValue::I16Array(a) => {
922 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
923 let hash = store.put_struct(&blob)?;
924 SerializableVMValue::TypedArray {
925 element_kind: TypedArrayElementKind::I16,
926 blob: BlobRef {
927 hash,
928 kind: BlobKind::TypedArray(TypedArrayElementKind::I16),
929 },
930 len: a.len(),
931 }
932 }
933 HeapValue::I32Array(a) => {
934 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
935 let hash = store.put_struct(&blob)?;
936 SerializableVMValue::TypedArray {
937 element_kind: TypedArrayElementKind::I32,
938 blob: BlobRef {
939 hash,
940 kind: BlobKind::TypedArray(TypedArrayElementKind::I32),
941 },
942 len: a.len(),
943 }
944 }
945 HeapValue::U8Array(a) => {
946 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
947 let hash = store.put_struct(&blob)?;
948 SerializableVMValue::TypedArray {
949 element_kind: TypedArrayElementKind::U8,
950 blob: BlobRef {
951 hash,
952 kind: BlobKind::TypedArray(TypedArrayElementKind::U8),
953 },
954 len: a.len(),
955 }
956 }
957 HeapValue::U16Array(a) => {
958 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
959 let hash = store.put_struct(&blob)?;
960 SerializableVMValue::TypedArray {
961 element_kind: TypedArrayElementKind::U16,
962 blob: BlobRef {
963 hash,
964 kind: BlobKind::TypedArray(TypedArrayElementKind::U16),
965 },
966 len: a.len(),
967 }
968 }
969 HeapValue::U32Array(a) => {
970 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
971 let hash = store.put_struct(&blob)?;
972 SerializableVMValue::TypedArray {
973 element_kind: TypedArrayElementKind::U32,
974 blob: BlobRef {
975 hash,
976 kind: BlobKind::TypedArray(TypedArrayElementKind::U32),
977 },
978 len: a.len(),
979 }
980 }
981 HeapValue::U64Array(a) => {
982 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
983 let hash = store.put_struct(&blob)?;
984 SerializableVMValue::TypedArray {
985 element_kind: TypedArrayElementKind::U64,
986 blob: BlobRef {
987 hash,
988 kind: BlobKind::TypedArray(TypedArrayElementKind::U64),
989 },
990 len: a.len(),
991 }
992 }
993 HeapValue::F32Array(a) => {
994 let blob = store_chunked_bytes(slice_as_bytes(a.as_slice()), store)?;
995 let hash = store.put_struct(&blob)?;
996 SerializableVMValue::TypedArray {
997 element_kind: TypedArrayElementKind::F32,
998 blob: BlobRef {
999 hash,
1000 kind: BlobKind::TypedArray(TypedArrayElementKind::F32),
1001 },
1002 len: a.len(),
1003 }
1004 }
1005 HeapValue::Matrix(m) => {
1006 let raw_bytes = slice_as_bytes(m.data.as_slice());
1007 let blob = store_chunked_bytes(raw_bytes, store)?;
1008 let hash = store.put_struct(&blob)?;
1009 SerializableVMValue::Matrix {
1010 blob: BlobRef {
1011 hash,
1012 kind: BlobKind::Matrix,
1013 },
1014 rows: m.rows,
1015 cols: m.cols,
1016 }
1017 }
1018 HeapValue::Iterator(_)
1019 | HeapValue::Generator(_)
1020 | HeapValue::Mutex(_)
1021 | HeapValue::Atomic(_)
1022 | HeapValue::Lazy(_)
1023 | HeapValue::Channel(_) => {
1024 return Err(anyhow::anyhow!(
1025 "Cannot snapshot transient value: {}",
1026 hv.type_name()
1027 ));
1028 }
1029 })
1030}
1031
1032pub fn serializable_to_nanboxed(
1038 value: &SerializableVMValue,
1039 store: &SnapshotStore,
1040) -> Result<ValueWord> {
1041 Ok(match value {
1042 SerializableVMValue::Int(i) => ValueWord::from_i64(*i),
1043 SerializableVMValue::Number(n) => ValueWord::from_f64(*n),
1044 SerializableVMValue::Decimal(d) => ValueWord::from_decimal(*d),
1045 SerializableVMValue::String(s) => ValueWord::from_string(std::sync::Arc::new(s.clone())),
1046 SerializableVMValue::Bool(b) => ValueWord::from_bool(*b),
1047 SerializableVMValue::None => ValueWord::none(),
1048 SerializableVMValue::Unit => ValueWord::unit(),
1049 SerializableVMValue::Function(f) => ValueWord::from_function(*f),
1050 SerializableVMValue::ModuleFunction(_name) => ValueWord::from_module_function(0),
1051 SerializableVMValue::Some(v) => ValueWord::from_some(serializable_to_nanboxed(v, store)?),
1052 SerializableVMValue::Ok(v) => ValueWord::from_ok(serializable_to_nanboxed(v, store)?),
1053 SerializableVMValue::Err(v) => ValueWord::from_err(serializable_to_nanboxed(v, store)?),
1054 SerializableVMValue::Timeframe(tf) => ValueWord::from_timeframe(*tf),
1055 SerializableVMValue::Duration(d) => ValueWord::from_duration(d.clone()),
1056 SerializableVMValue::Time(t) => ValueWord::from_time(*t),
1057 SerializableVMValue::TimeSpan(ms) => {
1058 ValueWord::from_timespan(chrono::Duration::milliseconds(*ms))
1059 }
1060 SerializableVMValue::TimeReference(tr) => ValueWord::from_time_reference(tr.clone()),
1061 SerializableVMValue::DateTimeExpr(expr) => ValueWord::from_datetime_expr(expr.clone()),
1062 SerializableVMValue::DataDateTimeRef(dref) => {
1063 ValueWord::from_data_datetime_ref(dref.clone())
1064 }
1065 SerializableVMValue::Array(arr) => {
1066 let mut out = Vec::with_capacity(arr.len());
1067 for v in arr.iter() {
1068 out.push(serializable_to_nanboxed(v, store)?);
1069 }
1070 ValueWord::from_array(std::sync::Arc::new(out))
1071 }
1072 SerializableVMValue::TypeAnnotation(ta) => ValueWord::from_type_annotation(ta.clone()),
1073 SerializableVMValue::TypeAnnotatedValue { type_name, value } => {
1074 ValueWord::from_type_annotated_value(
1075 type_name.clone(),
1076 serializable_to_nanboxed(value, store)?,
1077 )
1078 }
1079 SerializableVMValue::Range {
1080 start,
1081 end,
1082 inclusive,
1083 } => ValueWord::from_range(
1084 match start {
1085 Some(v) => Some(serializable_to_nanboxed(v, store)?),
1086 None => None,
1087 },
1088 match end {
1089 Some(v) => Some(serializable_to_nanboxed(v, store)?),
1090 None => None,
1091 },
1092 *inclusive,
1093 ),
1094 SerializableVMValue::Future(id) => ValueWord::from_future(*id),
1095 SerializableVMValue::FunctionRef { name, closure } => ValueWord::from_function_ref(
1096 name.clone(),
1097 match closure {
1098 Some(c) => Some(serializable_to_nanboxed(c, store)?),
1099 None => None,
1100 },
1101 ),
1102 SerializableVMValue::DataReference {
1103 datetime,
1104 id,
1105 timeframe,
1106 } => ValueWord::from_data_reference(*datetime, id.clone(), *timeframe),
1107 SerializableVMValue::DataTable(blob) => {
1108 let ser: SerializableDataTable = store.get_struct(&blob.hash)?;
1109 ValueWord::from_datatable(std::sync::Arc::new(deserialize_datatable(ser, store)?))
1110 }
1111 SerializableVMValue::TypedTable { schema_id, table } => {
1112 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1113 ValueWord::from_typed_table(
1114 *schema_id,
1115 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1116 )
1117 }
1118 SerializableVMValue::RowView {
1119 schema_id,
1120 table,
1121 row_idx,
1122 } => {
1123 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1124 ValueWord::from_row_view(
1125 *schema_id,
1126 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1127 *row_idx,
1128 )
1129 }
1130 SerializableVMValue::ColumnRef {
1131 schema_id,
1132 table,
1133 col_id,
1134 } => {
1135 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1136 ValueWord::from_column_ref(
1137 *schema_id,
1138 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1139 *col_id,
1140 )
1141 }
1142 SerializableVMValue::IndexedTable {
1143 schema_id,
1144 table,
1145 index_col,
1146 } => {
1147 let ser: SerializableDataTable = store.get_struct(&table.hash)?;
1148 ValueWord::from_indexed_table(
1149 *schema_id,
1150 std::sync::Arc::new(deserialize_datatable(ser, store)?),
1151 *index_col,
1152 )
1153 }
1154 SerializableVMValue::TypedArray {
1155 element_kind,
1156 blob,
1157 len,
1158 } => {
1159 let chunked: ChunkedBlob = store.get_struct(&blob.hash)?;
1160 let raw = load_chunked_bytes(&chunked, store)?;
1161 match element_kind {
1162 TypedArrayElementKind::I64 => {
1163 let data: Vec<i64> = bytes_as_slice::<i64>(&raw)[..*len].to_vec();
1164 ValueWord::from_int_array(std::sync::Arc::new(
1165 shape_value::TypedBuffer::from_vec(data),
1166 ))
1167 }
1168 TypedArrayElementKind::F64 => {
1169 let data: Vec<f64> = bytes_as_slice::<f64>(&raw)[..*len].to_vec();
1170 let aligned = shape_value::AlignedVec::from_vec(data);
1171 ValueWord::from_float_array(std::sync::Arc::new(
1172 shape_value::AlignedTypedBuffer::from_aligned(aligned),
1173 ))
1174 }
1175 TypedArrayElementKind::Bool => {
1176 let data: Vec<u8> = raw[..*len].to_vec();
1177 ValueWord::from_bool_array(std::sync::Arc::new(
1178 shape_value::TypedBuffer::from_vec(data),
1179 ))
1180 }
1181 TypedArrayElementKind::I8 => {
1182 let data: Vec<i8> = bytes_as_slice::<i8>(&raw)[..*len].to_vec();
1183 ValueWord::from_i8_array(std::sync::Arc::new(
1184 shape_value::TypedBuffer::from_vec(data),
1185 ))
1186 }
1187 TypedArrayElementKind::I16 => {
1188 let data: Vec<i16> = bytes_as_slice::<i16>(&raw)[..*len].to_vec();
1189 ValueWord::from_i16_array(std::sync::Arc::new(
1190 shape_value::TypedBuffer::from_vec(data),
1191 ))
1192 }
1193 TypedArrayElementKind::I32 => {
1194 let data: Vec<i32> = bytes_as_slice::<i32>(&raw)[..*len].to_vec();
1195 ValueWord::from_i32_array(std::sync::Arc::new(
1196 shape_value::TypedBuffer::from_vec(data),
1197 ))
1198 }
1199 TypedArrayElementKind::U8 => {
1200 let data: Vec<u8> = raw[..*len].to_vec();
1201 ValueWord::from_u8_array(std::sync::Arc::new(
1202 shape_value::TypedBuffer::from_vec(data),
1203 ))
1204 }
1205 TypedArrayElementKind::U16 => {
1206 let data: Vec<u16> = bytes_as_slice::<u16>(&raw)[..*len].to_vec();
1207 ValueWord::from_u16_array(std::sync::Arc::new(
1208 shape_value::TypedBuffer::from_vec(data),
1209 ))
1210 }
1211 TypedArrayElementKind::U32 => {
1212 let data: Vec<u32> = bytes_as_slice::<u32>(&raw)[..*len].to_vec();
1213 ValueWord::from_u32_array(std::sync::Arc::new(
1214 shape_value::TypedBuffer::from_vec(data),
1215 ))
1216 }
1217 TypedArrayElementKind::U64 => {
1218 let data: Vec<u64> = bytes_as_slice::<u64>(&raw)[..*len].to_vec();
1219 ValueWord::from_u64_array(std::sync::Arc::new(
1220 shape_value::TypedBuffer::from_vec(data),
1221 ))
1222 }
1223 TypedArrayElementKind::F32 => {
1224 let data: Vec<f32> = bytes_as_slice::<f32>(&raw)[..*len].to_vec();
1225 ValueWord::from_f32_array(std::sync::Arc::new(
1226 shape_value::TypedBuffer::from_vec(data),
1227 ))
1228 }
1229 }
1230 }
1231 SerializableVMValue::Matrix { blob, rows, cols } => {
1232 let chunked: ChunkedBlob = store.get_struct(&blob.hash)?;
1233 let raw = load_chunked_bytes(&chunked, store)?;
1234 let data: Vec<f64> = bytes_as_slice::<f64>(&raw).to_vec();
1235 let aligned = shape_value::AlignedVec::from_vec(data);
1236 let matrix = shape_value::heap_value::MatrixData::from_flat(aligned, *rows, *cols);
1237 ValueWord::from_matrix(Box::new(matrix))
1238 }
1239 SerializableVMValue::HashMap { keys, values } => {
1240 let mut k_out = Vec::with_capacity(keys.len());
1241 for k in keys.iter() {
1242 k_out.push(serializable_to_nanboxed(k, store)?);
1243 }
1244 let mut v_out = Vec::with_capacity(values.len());
1245 for v in values.iter() {
1246 v_out.push(serializable_to_nanboxed(v, store)?);
1247 }
1248 ValueWord::from_hashmap_pairs(k_out, v_out)
1249 }
1250 SerializableVMValue::SidecarRef { .. } => {
1251 return Err(anyhow::anyhow!(
1252 "SidecarRef must be reassembled before deserialization"
1253 ));
1254 }
1255 SerializableVMValue::Enum(ev) => {
1256 let enum_val = snapshot_to_enum(ev, store)?;
1258 enum_val
1259 }
1260 SerializableVMValue::Closure {
1261 function_id,
1262 upvalues,
1263 } => {
1264 let mut ups = Vec::new();
1265 for v in upvalues.iter() {
1266 ups.push(Upvalue::new(serializable_to_nanboxed(v, store)?));
1267 }
1268 ValueWord::from_heap_value(shape_value::heap_value::HeapValue::Closure {
1269 function_id: *function_id,
1270 upvalues: ups,
1271 })
1272 }
1273 SerializableVMValue::TypedObject {
1274 schema_id,
1275 slot_data,
1276 heap_mask,
1277 } => {
1278 let mut slots = Vec::with_capacity(slot_data.len());
1279 let mut new_heap_mask: u64 = 0;
1280 for (i, sv) in slot_data.iter().enumerate() {
1281 if *heap_mask & (1u64 << i) != 0 {
1282 let nb = serializable_to_nanboxed(sv, store)?;
1287 let (slot, is_heap) = crate::type_schema::nb_to_slot(&nb);
1288 slots.push(slot);
1289 if is_heap {
1290 new_heap_mask |= 1u64 << i;
1291 }
1292 } else {
1293 let n = match sv {
1295 SerializableVMValue::Number(n) => *n,
1296 _ => 0.0,
1297 };
1298 slots.push(shape_value::ValueSlot::from_number(n));
1299 }
1300 }
1301 ValueWord::from_heap_value(shape_value::heap_value::HeapValue::TypedObject {
1302 schema_id: *schema_id,
1303 slots: slots.into_boxed_slice(),
1304 heap_mask: new_heap_mask,
1305 })
1306 }
1307 SerializableVMValue::PrintResult(pr) => {
1308 let print_result = snapshot_to_print_result(pr, store)?;
1310 ValueWord::from_print_result(print_result)
1311 }
1312 SerializableVMValue::SimulationCall { name, params } => {
1313 let mut out = HashMap::new();
1315 for (k, v) in params.iter() {
1316 out.insert(k.clone(), serializable_to_nanboxed(v, store)?.clone());
1317 }
1318 ValueWord::from_simulation_call(name.clone(), out)
1319 }
1320 })
1321}
1322
1323fn enum_to_snapshot(value: &EnumValue, store: &SnapshotStore) -> Result<EnumValueSnapshot> {
1324 Ok(EnumValueSnapshot {
1325 enum_name: value.enum_name.clone(),
1326 variant: value.variant.clone(),
1327 payload: match &value.payload {
1328 EnumPayload::Unit => EnumPayloadSnapshot::Unit,
1329 EnumPayload::Tuple(values) => {
1330 let mut out = Vec::new();
1331 for v in values.iter() {
1332 out.push(nanboxed_to_serializable(v, store)?);
1333 }
1334 EnumPayloadSnapshot::Tuple(out)
1335 }
1336 EnumPayload::Struct(map) => {
1337 let mut out = Vec::new();
1338 for (k, v) in map.iter() {
1339 out.push((k.clone(), nanboxed_to_serializable(v, store)?));
1340 }
1341 EnumPayloadSnapshot::Struct(out)
1342 }
1343 },
1344 })
1345}
1346
1347fn snapshot_to_enum(snapshot: &EnumValueSnapshot, store: &SnapshotStore) -> Result<ValueWord> {
1348 Ok(ValueWord::from_enum(EnumValue {
1349 enum_name: snapshot.enum_name.clone(),
1350 variant: snapshot.variant.clone(),
1351 payload: match &snapshot.payload {
1352 EnumPayloadSnapshot::Unit => EnumPayload::Unit,
1353 EnumPayloadSnapshot::Tuple(values) => {
1354 let mut out = Vec::new();
1355 for v in values.iter() {
1356 out.push(serializable_to_nanboxed(v, store)?);
1357 }
1358 EnumPayload::Tuple(out)
1359 }
1360 EnumPayloadSnapshot::Struct(map) => {
1361 let mut out = HashMap::new();
1362 for (k, v) in map.iter() {
1363 out.insert(k.clone(), serializable_to_nanboxed(v, store)?);
1364 }
1365 EnumPayload::Struct(out)
1366 }
1367 },
1368 }))
1369}
1370
1371fn print_result_to_snapshot(
1372 result: &PrintResult,
1373 store: &SnapshotStore,
1374) -> Result<PrintableSnapshot> {
1375 let mut spans = Vec::new();
1376 for span in result.spans.iter() {
1377 match span {
1378 PrintSpan::Literal {
1379 text,
1380 start,
1381 end,
1382 span_id,
1383 } => spans.push(PrintSpanSnapshot::Literal {
1384 text: text.clone(),
1385 start: *start,
1386 end: *end,
1387 span_id: span_id.clone(),
1388 }),
1389 PrintSpan::Value {
1390 text,
1391 start,
1392 end,
1393 span_id,
1394 variable_name,
1395 raw_value,
1396 type_name,
1397 current_format,
1398 format_params,
1399 } => {
1400 let mut params = HashMap::new();
1401 for (k, v) in format_params.iter() {
1402 params.insert(k.clone(), nanboxed_to_serializable(&v.clone(), store)?);
1403 }
1404 spans.push(PrintSpanSnapshot::Value {
1405 text: text.clone(),
1406 start: *start,
1407 end: *end,
1408 span_id: span_id.clone(),
1409 variable_name: variable_name.clone(),
1410 raw_value: Box::new(nanboxed_to_serializable(raw_value.as_ref(), store)?),
1411 type_name: type_name.clone(),
1412 current_format: current_format.clone(),
1413 format_params: params,
1414 });
1415 }
1416 }
1417 }
1418 Ok(PrintableSnapshot {
1419 rendered: result.rendered.clone(),
1420 spans,
1421 })
1422}
1423
1424fn snapshot_to_print_result(
1425 snapshot: &PrintableSnapshot,
1426 store: &SnapshotStore,
1427) -> Result<PrintResult> {
1428 let mut spans = Vec::new();
1429 for span in snapshot.spans.iter() {
1430 match span {
1431 PrintSpanSnapshot::Literal {
1432 text,
1433 start,
1434 end,
1435 span_id,
1436 } => {
1437 spans.push(PrintSpan::Literal {
1438 text: text.clone(),
1439 start: *start,
1440 end: *end,
1441 span_id: span_id.clone(),
1442 });
1443 }
1444 PrintSpanSnapshot::Value {
1445 text,
1446 start,
1447 end,
1448 span_id,
1449 variable_name,
1450 raw_value,
1451 type_name,
1452 current_format,
1453 format_params,
1454 } => {
1455 let mut params = HashMap::new();
1456 for (k, v) in format_params.iter() {
1457 params.insert(k.clone(), serializable_to_nanboxed(v, store)?.clone());
1458 }
1459 spans.push(PrintSpan::Value {
1460 text: text.clone(),
1461 start: *start,
1462 end: *end,
1463 span_id: span_id.clone(),
1464 variable_name: variable_name.clone(),
1465 raw_value: Box::new(serializable_to_nanboxed(raw_value, store)?.clone()),
1466 type_name: type_name.clone(),
1467 current_format: current_format.clone(),
1468 format_params: params,
1469 });
1470 }
1471 }
1472 }
1473 Ok(PrintResult {
1474 rendered: snapshot.rendered.clone(),
1475 spans,
1476 })
1477}
1478
1479pub(crate) fn serialize_dataframe(
1480 df: &DataFrame,
1481 store: &SnapshotStore,
1482) -> Result<SerializableDataFrame> {
1483 let mut columns: Vec<_> = df.columns.iter().collect();
1484 columns.sort_by(|a, b| a.0.cmp(b.0));
1485 let mut serialized_cols = Vec::with_capacity(columns.len());
1486 for (name, values) in columns.into_iter() {
1487 let blob = store_chunked_vec(values, DEFAULT_CHUNK_LEN, store)?;
1488 serialized_cols.push(SerializableDataFrameColumn {
1489 name: name.clone(),
1490 values: blob,
1491 });
1492 }
1493 Ok(SerializableDataFrame {
1494 id: df.id.clone(),
1495 timeframe: df.timeframe,
1496 timestamps: store_chunked_vec(&df.timestamps, DEFAULT_CHUNK_LEN, store)?,
1497 columns: serialized_cols,
1498 })
1499}
1500
1501pub(crate) fn deserialize_dataframe(
1502 serialized: SerializableDataFrame,
1503 store: &SnapshotStore,
1504) -> Result<DataFrame> {
1505 let timestamps: Vec<i64> = load_chunked_vec(&serialized.timestamps, store)?;
1506 let mut columns = HashMap::new();
1507 for col in serialized.columns.into_iter() {
1508 let values: Vec<f64> = load_chunked_vec(&col.values, store)?;
1509 columns.insert(col.name, values);
1510 }
1511 Ok(DataFrame {
1512 id: serialized.id,
1513 timeframe: serialized.timeframe,
1514 timestamps,
1515 columns,
1516 })
1517}
1518
1519fn serialize_datatable(dt: &DataTable, store: &SnapshotStore) -> Result<SerializableDataTable> {
1520 let mut buf = Vec::new();
1521 let schema = dt.inner().schema();
1522 let mut writer = arrow_ipc::writer::FileWriter::try_new(&mut buf, schema.as_ref())?;
1523 writer.write(dt.inner())?;
1524 writer.finish()?;
1525 let ipc_chunks = store_chunked_vec(&buf, BYTE_CHUNK_LEN, store)?;
1526 Ok(SerializableDataTable {
1527 ipc_chunks,
1528 type_name: dt.type_name().map(|s| s.to_string()),
1529 schema_id: dt.schema_id(),
1530 })
1531}
1532
1533fn deserialize_datatable(
1534 serialized: SerializableDataTable,
1535 store: &SnapshotStore,
1536) -> Result<DataTable> {
1537 let bytes = load_chunked_vec(&serialized.ipc_chunks, store)?;
1538 let cursor = std::io::Cursor::new(bytes);
1539 let mut reader = arrow_ipc::reader::FileReader::try_new(cursor, None)?;
1540 let batch = reader
1541 .next()
1542 .transpose()?
1543 .context("no RecordBatch in DataTable snapshot")?;
1544 let mut dt = DataTable::new(batch);
1545 if let Some(name) = serialized.type_name {
1546 dt = DataTable::with_type_name(dt.into_inner(), name);
1547 }
1548 if let Some(schema_id) = serialized.schema_id {
1549 dt = dt.with_schema_id(schema_id);
1550 }
1551 Ok(dt)
1552}
1553
1554#[cfg(test)]
1555mod tests {
1556 use super::*;
1557 use arrow_array::{Float64Array, Int64Array, RecordBatch};
1558 use arrow_schema::{DataType, Field, Schema};
1559 use std::sync::Arc;
1560
1561 fn make_test_table(ids: &[i64], values: &[f64]) -> DataTable {
1563 let schema = Arc::new(Schema::new(vec![
1564 Field::new("id", DataType::Int64, false),
1565 Field::new("value", DataType::Float64, false),
1566 ]));
1567 let batch = RecordBatch::try_new(
1568 schema,
1569 vec![
1570 Arc::new(Int64Array::from(ids.to_vec())),
1571 Arc::new(Float64Array::from(values.to_vec())),
1572 ],
1573 )
1574 .unwrap();
1575 DataTable::new(batch)
1576 }
1577
1578 #[test]
1579 fn test_datatable_snapshot_roundtrip_preserves_original_data() {
1580 let dir = tempfile::tempdir().unwrap();
1581 let store_path = dir.path().join("store");
1582
1583 let original_dt = make_test_table(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10], &[150.0; 10]);
1584 assert_eq!(original_dt.row_count(), 10);
1585 let original_nb = ValueWord::from_datatable(Arc::new(original_dt));
1586
1587 let store = SnapshotStore::new(&store_path).unwrap();
1589 let serialized = nanboxed_to_serializable(&original_nb, &store).unwrap();
1590
1591 let restored_nb = serializable_to_nanboxed(&serialized, &store).unwrap();
1593 let restored = restored_nb.clone();
1594 let dt = restored
1595 .as_datatable()
1596 .expect("Expected DataTable after restore");
1597 assert_eq!(
1598 dt.row_count(),
1599 10,
1600 "restored table should have original 10 rows"
1601 );
1602 }
1603
1604 #[test]
1605 fn test_indexed_table_snapshot_roundtrip() {
1606 let dir = tempfile::tempdir().unwrap();
1607 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1608
1609 let dt = make_test_table(&[1, 2, 3], &[10.0, 20.0, 30.0]);
1610 let original_nb = ValueWord::from_indexed_table(1, Arc::new(dt), 0);
1611
1612 let serialized = nanboxed_to_serializable(&original_nb, &store).unwrap();
1613
1614 match &serialized {
1616 SerializableVMValue::IndexedTable {
1617 schema_id,
1618 index_col,
1619 ..
1620 } => {
1621 assert_eq!(schema_id, &1);
1622 assert_eq!(index_col, &0);
1623 }
1624 other => panic!(
1625 "Expected SerializableVMValue::IndexedTable, got {:?}",
1626 std::mem::discriminant(other)
1627 ),
1628 }
1629
1630 let restored = serializable_to_nanboxed(&serialized, &store)
1631 .unwrap()
1632 .clone();
1633 let (schema_id, table, index_col) = restored
1634 .as_indexed_table()
1635 .expect("Expected ValueWord::IndexedTable");
1636 assert_eq!(schema_id, 1);
1637 assert_eq!(index_col, 0);
1638 assert_eq!(table.row_count(), 3);
1639 }
1640
1641 #[test]
1642 fn test_typed_table_snapshot_roundtrip() {
1643 let dir = tempfile::tempdir().unwrap();
1644 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1645
1646 let dt = make_test_table(&[10, 20], &[1.5, 2.5]);
1647 let original_nb = ValueWord::from_typed_table(42, Arc::new(dt));
1648
1649 let serialized = nanboxed_to_serializable(&original_nb, &store).unwrap();
1650 let restored = serializable_to_nanboxed(&serialized, &store)
1651 .unwrap()
1652 .clone();
1653
1654 let (schema_id, table) = restored
1655 .as_typed_table()
1656 .expect("Expected ValueWord::TypedTable");
1657 assert_eq!(schema_id, 42);
1658 assert_eq!(table.row_count(), 2);
1659 let vals = table.get_f64_column("value").unwrap();
1660 assert!((vals.value(0) - 1.5).abs() < f64::EPSILON);
1661 assert!((vals.value(1) - 2.5).abs() < f64::EPSILON);
1662 }
1663
1664 #[test]
1667 fn test_float_array_typed_roundtrip() {
1668 let dir = tempfile::tempdir().unwrap();
1669 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1670
1671 let data: Vec<f64> = (0..1000).map(|i| i as f64 * 1.5).collect();
1672 let aligned = shape_value::AlignedVec::from_vec(data.clone());
1673 let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
1674 let nb = ValueWord::from_float_array(Arc::new(buf));
1675
1676 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1677 match &serialized {
1678 SerializableVMValue::TypedArray {
1679 element_kind, len, ..
1680 } => {
1681 assert_eq!(*element_kind, TypedArrayElementKind::F64);
1682 assert_eq!(*len, 1000);
1683 }
1684 other => panic!(
1685 "Expected TypedArray, got {:?}",
1686 std::mem::discriminant(other)
1687 ),
1688 }
1689
1690 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1691 let hv = restored.as_heap_ref().unwrap();
1692 match hv {
1693 shape_value::heap_value::HeapValue::FloatArray(a) => {
1694 assert_eq!(a.len(), 1000);
1695 for i in 0..1000 {
1696 assert!((a.as_slice()[i] - data[i]).abs() < f64::EPSILON);
1697 }
1698 }
1699 _ => panic!("Expected FloatArray after restore"),
1700 }
1701 }
1702
1703 #[test]
1704 fn test_int_array_typed_roundtrip() {
1705 let dir = tempfile::tempdir().unwrap();
1706 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1707
1708 let data: Vec<i64> = (0..500).map(|i| i * 3 - 100).collect();
1709 let buf = shape_value::TypedBuffer::from_vec(data.clone());
1710 let nb = ValueWord::from_int_array(Arc::new(buf));
1711
1712 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1713 match &serialized {
1714 SerializableVMValue::TypedArray {
1715 element_kind, len, ..
1716 } => {
1717 assert_eq!(*element_kind, TypedArrayElementKind::I64);
1718 assert_eq!(*len, 500);
1719 }
1720 other => panic!(
1721 "Expected TypedArray, got {:?}",
1722 std::mem::discriminant(other)
1723 ),
1724 }
1725
1726 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1727 let hv = restored.as_heap_ref().unwrap();
1728 match hv {
1729 shape_value::heap_value::HeapValue::IntArray(a) => {
1730 assert_eq!(a.len(), 500);
1731 for i in 0..500 {
1732 assert_eq!(a.as_slice()[i], data[i]);
1733 }
1734 }
1735 _ => panic!("Expected IntArray after restore"),
1736 }
1737 }
1738
1739 #[test]
1740 fn test_matrix_roundtrip() {
1741 let dir = tempfile::tempdir().unwrap();
1742 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1743
1744 let data: Vec<f64> = (0..12).map(|i| i as f64).collect();
1745 let aligned = shape_value::AlignedVec::from_vec(data.clone());
1746 let matrix = shape_value::heap_value::MatrixData::from_flat(aligned, 3, 4);
1747 let nb = ValueWord::from_matrix(Box::new(matrix));
1748
1749 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1750 match &serialized {
1751 SerializableVMValue::Matrix { rows, cols, .. } => {
1752 assert_eq!(*rows, 3);
1753 assert_eq!(*cols, 4);
1754 }
1755 other => panic!("Expected Matrix, got {:?}", std::mem::discriminant(other)),
1756 }
1757
1758 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1759 let hv = restored.as_heap_ref().unwrap();
1760 match hv {
1761 shape_value::heap_value::HeapValue::Matrix(m) => {
1762 assert_eq!(m.rows, 3);
1763 assert_eq!(m.cols, 4);
1764 for i in 0..12 {
1765 assert!((m.data.as_slice()[i] - data[i]).abs() < f64::EPSILON);
1766 }
1767 }
1768 _ => panic!("Expected Matrix after restore"),
1769 }
1770 }
1771
1772 #[test]
1773 fn test_hashmap_typed_roundtrip() {
1774 let dir = tempfile::tempdir().unwrap();
1775 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1776
1777 let keys = vec![
1778 ValueWord::from_string(Arc::new("a".to_string())),
1779 ValueWord::from_string(Arc::new("b".to_string())),
1780 ];
1781 let values = vec![ValueWord::from_i64(1), ValueWord::from_i64(2)];
1782 let nb = ValueWord::from_hashmap_pairs(keys, values);
1783
1784 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1785 match &serialized {
1786 SerializableVMValue::HashMap { keys, values } => {
1787 assert_eq!(keys.len(), 2);
1788 assert_eq!(values.len(), 2);
1789 }
1790 other => panic!("Expected HashMap, got {:?}", std::mem::discriminant(other)),
1791 }
1792
1793 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1794 let hv = restored.as_heap_ref().unwrap();
1795 match hv {
1796 shape_value::heap_value::HeapValue::HashMap(d) => {
1797 assert_eq!(d.keys.len(), 2);
1798 assert_eq!(d.values.len(), 2);
1799 let key_a = ValueWord::from_string(Arc::new("a".to_string()));
1801 let idx = d.find_key(&key_a);
1802 assert!(idx.is_some(), "should find key 'a' in rebuilt index");
1803 }
1804 _ => panic!("Expected HashMap after restore"),
1805 }
1806 }
1807
1808 #[test]
1809 fn test_bool_array_typed_roundtrip() {
1810 let dir = tempfile::tempdir().unwrap();
1811 let store = SnapshotStore::new(dir.path().join("store")).unwrap();
1812
1813 let data: Vec<u8> = vec![1, 0, 1, 1, 0];
1814 let buf = shape_value::TypedBuffer::from_vec(data.clone());
1815 let nb = ValueWord::from_bool_array(Arc::new(buf));
1816
1817 let serialized = nanboxed_to_serializable(&nb, &store).unwrap();
1818 match &serialized {
1819 SerializableVMValue::TypedArray {
1820 element_kind, len, ..
1821 } => {
1822 assert_eq!(*element_kind, TypedArrayElementKind::Bool);
1823 assert_eq!(*len, 5);
1824 }
1825 other => panic!(
1826 "Expected TypedArray, got {:?}",
1827 std::mem::discriminant(other)
1828 ),
1829 }
1830
1831 let restored = serializable_to_nanboxed(&serialized, &store).unwrap();
1832 let hv = restored.as_heap_ref().unwrap();
1833 match hv {
1834 shape_value::heap_value::HeapValue::BoolArray(a) => {
1835 assert_eq!(a.len(), 5);
1836 assert_eq!(a.as_slice(), &data);
1837 }
1838 _ => panic!("Expected BoolArray after restore"),
1839 }
1840 }
1841
1842 #[test]
1845 fn test_all_snapshot_components_bincode_roundtrip() {
1846 use rust_decimal::Decimal;
1847 use shape_ast::ast::VarKind;
1848 use shape_ast::data::Timeframe;
1849
1850 let decimal_val = SerializableVMValue::Decimal(Decimal::new(31415, 4)); let bytes = bincode::serialize(&decimal_val).expect("serialize Decimal ValueWord");
1853 let decoded: SerializableVMValue =
1854 bincode::deserialize(&bytes).expect("deserialize Decimal ValueWord");
1855 match decoded {
1856 SerializableVMValue::Decimal(d) => assert_eq!(d, Decimal::new(31415, 4)),
1857 _ => panic!("wrong variant"),
1858 }
1859
1860 let vm_snap = VmSnapshot {
1862 ip: 42,
1863 stack: vec![
1864 SerializableVMValue::Int(1),
1865 SerializableVMValue::Decimal(Decimal::new(99999, 2)),
1866 SerializableVMValue::String("hello".into()),
1867 ],
1868 locals: vec![SerializableVMValue::Decimal(Decimal::new(0, 0))],
1869 module_bindings: vec![SerializableVMValue::Number(3.14)],
1870 call_stack: vec![],
1871 loop_stack: vec![],
1872 timeframe_stack: vec![],
1873 exception_handlers: vec![],
1874 };
1875 let bytes = bincode::serialize(&vm_snap).expect("serialize VmSnapshot");
1876 let decoded: VmSnapshot = bincode::deserialize(&bytes).expect("deserialize VmSnapshot");
1877 assert_eq!(decoded.ip, 42);
1878 assert_eq!(decoded.stack.len(), 3);
1879
1880 let ctx_snap = ContextSnapshot {
1882 data_load_mode: crate::context::DataLoadMode::Async,
1883 data_cache: None,
1884 current_id: Some("test".into()),
1885 current_row_index: 0,
1886 variable_scopes: vec![{
1887 let mut scope = HashMap::new();
1888 scope.insert(
1889 "price".into(),
1890 VariableSnapshot {
1891 value: SerializableVMValue::Decimal(Decimal::new(15099, 2)),
1892 kind: VarKind::Let,
1893 is_initialized: true,
1894 is_function_scoped: false,
1895 format_hint: None,
1896 format_overrides: None,
1897 },
1898 );
1899 scope
1900 }],
1901 reference_datetime: None,
1902 current_timeframe: Some(Timeframe::m1()),
1903 base_timeframe: None,
1904 date_range: None,
1905 range_start: 0,
1906 range_end: 0,
1907 range_active: false,
1908 type_alias_registry: HashMap::new(),
1909 enum_registry: HashMap::new(),
1910 suspension_state: None,
1911 };
1912 let bytes = bincode::serialize(&ctx_snap).expect("serialize ContextSnapshot");
1913 let decoded: ContextSnapshot =
1914 bincode::deserialize(&bytes).expect("deserialize ContextSnapshot");
1915 assert_eq!(decoded.variable_scopes.len(), 1);
1916
1917 let sem_snap = SemanticSnapshot {
1919 exported_symbols: HashSet::new(),
1920 };
1921 let bytes = bincode::serialize(&sem_snap).expect("serialize SemanticSnapshot");
1922 let _decoded: SemanticSnapshot =
1923 bincode::deserialize(&bytes).expect("deserialize SemanticSnapshot");
1924
1925 let exec_snap = ExecutionSnapshot {
1927 version: SNAPSHOT_VERSION,
1928 created_at_ms: 1234567890,
1929 semantic_hash: HashDigest::from_hex("abc123"),
1930 context_hash: HashDigest::from_hex("def456"),
1931 vm_hash: Some(HashDigest::from_hex("789aaa")),
1932 bytecode_hash: Some(HashDigest::from_hex("bbb000")),
1933 script_path: Some("/tmp/test.shape".into()),
1934 };
1935 let bytes = bincode::serialize(&exec_snap).expect("serialize ExecutionSnapshot");
1936 let decoded: ExecutionSnapshot =
1937 bincode::deserialize(&bytes).expect("deserialize ExecutionSnapshot");
1938 assert_eq!(decoded.version, SNAPSHOT_VERSION);
1939 }
1940}