1use std::collections::{HashMap, HashSet};
6use std::fs;
7use std::io::{Read, Write};
8use std::path::PathBuf;
9use anyhow::{Context, Result};
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use crate::event_queue::WaitCondition;
19use crate::hashing::{HashDigest, hash_bytes};
20use shape_ast::ast::{DataDateTimeRef, DateTimeExpr, EnumDef, TimeReference, TypeAnnotation};
21use shape_ast::data::Timeframe;
22
23use shape_value::datatable::DataTable;
24
25pub const SNAPSHOT_VERSION: u32 = 5;
38
39pub(crate) const DEFAULT_CHUNK_LEN: usize = 4096;
40pub(crate) const BYTE_CHUNK_LEN: usize = 256 * 1024;
41
42#[derive(Clone)]
44pub struct SnapshotStore {
45 root: PathBuf,
46}
47
48impl SnapshotStore {
49 pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
50 let root = root.into();
51 fs::create_dir_all(root.join("blobs"))
52 .with_context(|| format!("failed to create snapshot blob dir at {}", root.display()))?;
53 fs::create_dir_all(root.join("snapshots"))
54 .with_context(|| format!("failed to create snapshot dir at {}", root.display()))?;
55 Ok(Self { root })
56 }
57
58 fn blob_path(&self, hash: &HashDigest) -> PathBuf {
59 self.root
60 .join("blobs")
61 .join(format!("{}.bin.zst", hash.hex()))
62 }
63
64 fn snapshot_path(&self, hash: &HashDigest) -> PathBuf {
65 self.root
66 .join("snapshots")
67 .join(format!("{}.bin.zst", hash.hex()))
68 }
69
70 pub fn put_blob(&self, data: &[u8]) -> Result<HashDigest> {
71 let hash = hash_bytes(data);
72 let path = self.blob_path(&hash);
73 if path.exists() {
74 return Ok(hash);
75 }
76 let compressed = zstd::stream::encode_all(data, 0)?;
77 let mut file = fs::File::create(&path)?;
78 file.write_all(&compressed)?;
79 Ok(hash)
80 }
81
82 pub fn get_blob(&self, hash: &HashDigest) -> Result<Vec<u8>> {
83 let path = self.blob_path(hash);
84 let mut file = fs::File::open(&path)
85 .with_context(|| format!("snapshot blob not found: {}", path.display()))?;
86 let mut buf = Vec::new();
87 file.read_to_end(&mut buf)?;
88 let decompressed = zstd::stream::decode_all(&buf[..])?;
89 Ok(decompressed)
90 }
91
92 pub fn put_struct<T: Serialize>(&self, value: &T) -> Result<HashDigest> {
93 let bytes = bincode::serialize(value)?;
94 self.put_blob(&bytes)
95 }
96
97 pub fn get_struct<T: for<'de> Deserialize<'de>>(&self, hash: &HashDigest) -> Result<T> {
98 let bytes = self.get_blob(hash)?;
99 Ok(bincode::deserialize(&bytes)?)
100 }
101
102 pub fn put_snapshot(&self, snapshot: &ExecutionSnapshot) -> Result<HashDigest> {
103 let bytes = bincode::serialize(snapshot)?;
104 let hash = hash_bytes(&bytes);
105 let path = self.snapshot_path(&hash);
106 if !path.exists() {
107 let compressed = zstd::stream::encode_all(&bytes[..], 0)?;
108 let mut file = fs::File::create(&path)?;
109 file.write_all(&compressed)?;
110 }
111 Ok(hash)
112 }
113
114 pub fn get_snapshot(&self, hash: &HashDigest) -> Result<ExecutionSnapshot> {
115 let path = self.snapshot_path(hash);
116 let mut file = fs::File::open(&path)
117 .with_context(|| format!("snapshot not found: {}", path.display()))?;
118 let mut buf = Vec::new();
119 file.read_to_end(&mut buf)?;
120 let decompressed = zstd::stream::decode_all(&buf[..])?;
121 Ok(bincode::deserialize(&decompressed)?)
122 }
123
124 pub fn list_snapshots(&self) -> Result<Vec<(HashDigest, ExecutionSnapshot)>> {
134 let snapshots_dir = self.root.join("snapshots");
135 if !snapshots_dir.exists() {
136 return Ok(Vec::new());
137 }
138 let mut results = Vec::new();
139 for entry in fs::read_dir(&snapshots_dir)? {
140 let entry = entry?;
141 let path = entry.path();
142 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
143 if let Some(hex) = name.strip_suffix(".bin.zst") {
145 let hash = HashDigest::from_hex(hex);
146 match self.get_snapshot(&hash) {
147 Ok(snap) => results.push((hash, snap)),
148 Err(_) => continue, }
150 }
151 }
152 }
153 results.sort_by(|a, b| b.1.created_at_ms.cmp(&a.1.created_at_ms));
155 Ok(results)
156 }
157
158 pub fn delete_snapshot(&self, hash: &HashDigest) -> Result<()> {
160 let path = self.snapshot_path(hash);
161 fs::remove_file(&path)
162 .with_context(|| format!("failed to delete snapshot: {}", path.display()))?;
163 Ok(())
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct ExecutionSnapshot {
175 pub version: u32,
178 pub created_at_ms: i64,
179 pub semantic_hash: HashDigest,
180 pub context_hash: HashDigest,
181 pub vm_hash: Option<HashDigest>,
182 pub bytecode_hash: Option<HashDigest>,
183 #[serde(default)]
185 pub script_path: Option<String>,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct SemanticSnapshot {
190 pub exported_symbols: HashSet<String>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ContextSnapshot {
195 pub data_load_mode: crate::context::DataLoadMode,
196 pub data_cache: Option<DataCacheSnapshot>,
197 pub current_id: Option<String>,
198 pub current_row_index: usize,
199 pub variable_scopes: Vec<HashMap<String, VariableSnapshot>>,
200 pub reference_datetime: Option<chrono::DateTime<chrono::Utc>>,
201 pub current_timeframe: Option<Timeframe>,
202 pub base_timeframe: Option<Timeframe>,
203 pub date_range: Option<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>,
204 pub range_start: usize,
205 pub range_end: usize,
206 pub range_active: bool,
207 pub type_alias_registry: HashMap<String, TypeAliasRuntimeEntrySnapshot>,
208 pub enum_registry: HashMap<String, EnumDef>,
209 #[serde(default)]
210 pub struct_type_registry: HashMap<String, shape_ast::ast::StructTypeDef>,
211 pub suspension_state: Option<SuspensionStateSnapshot>,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct VariableSnapshot {
216 pub value: SerializableVMValue,
217 pub kind: shape_ast::ast::VarKind,
218 pub is_initialized: bool,
219 pub is_function_scoped: bool,
220 pub format_hint: Option<String>,
221 pub format_overrides: Option<HashMap<String, SerializableVMValue>>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct TypeAliasRuntimeEntrySnapshot {
226 pub base_type: String,
227 pub overrides: Option<HashMap<String, SerializableVMValue>>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct SuspensionStateSnapshot {
232 pub waiting_for: WaitCondition,
233 pub resume_pc: usize,
234 pub saved_locals: Vec<SerializableVMValue>,
235 pub saved_stack: Vec<SerializableVMValue>,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct VmSnapshot {
240 pub ip: usize,
241 pub stack: Vec<SerializableVMValue>,
242 pub locals: Vec<SerializableVMValue>,
243 pub module_bindings: Vec<SerializableVMValue>,
244 pub call_stack: Vec<SerializableCallFrame>,
245 pub loop_stack: Vec<SerializableLoopContext>,
246 pub timeframe_stack: Vec<Option<Timeframe>>,
247 pub exception_handlers: Vec<SerializableExceptionHandler>,
248 #[serde(default)]
251 pub ip_blob_hash: Option<[u8; 32]>,
252 #[serde(default)]
256 pub ip_local_offset: Option<usize>,
257 #[serde(default)]
260 pub ip_function_id: Option<u16>,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct SerializableCallFrame {
265 pub return_ip: usize,
266 pub locals_base: usize,
267 pub locals_count: usize,
268 pub function_id: Option<u16>,
269 pub upvalues: Option<Vec<SerializableVMValue>>,
270 #[serde(default)]
274 pub blob_hash: Option<[u8; 32]>,
275 #[serde(default)]
279 pub local_ip: Option<usize>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SerializableLoopContext {
284 pub start: usize,
285 pub end: usize,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct SerializableExceptionHandler {
290 pub catch_ip: usize,
291 pub stack_size: usize,
292 pub call_depth: usize,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub enum SerializableVMValue {
297 Int(i64),
298 Number(f64),
299 Decimal(rust_decimal::Decimal),
300 String(String),
301 Bool(bool),
302 None,
303 Some(Box<SerializableVMValue>),
304 Unit,
305 Timeframe(Timeframe),
306 Duration(shape_ast::ast::Duration),
307 Time(chrono::DateTime<chrono::FixedOffset>),
308 TimeSpan(i64), TimeReference(TimeReference),
310 DateTimeExpr(DateTimeExpr),
311 DataDateTimeRef(DataDateTimeRef),
312 Array(Vec<SerializableVMValue>),
313 Function(u16),
314 TypeAnnotation(TypeAnnotation),
315 TypeAnnotatedValue {
316 type_name: String,
317 value: Box<SerializableVMValue>,
318 },
319 Enum(EnumValueSnapshot),
320 Closure {
331 function_id: u32,
332 type_id: u32,
333 upvalues: Vec<SerializableVMValue>,
334 },
335 ModuleFunction(String),
336 TypedObject {
344 schema_id: u64,
345 slot_data: Vec<SerializableVMValue>,
347 heap_mask: u64,
348 },
349 Range {
350 start: Option<Box<SerializableVMValue>>,
351 end: Option<Box<SerializableVMValue>>,
352 inclusive: bool,
353 },
354 Ok(Box<SerializableVMValue>),
355 Err(Box<SerializableVMValue>),
356 PrintResult(PrintableSnapshot),
357 SimulationCall {
358 name: String,
359 params: HashMap<String, SerializableVMValue>,
360 },
361 FunctionRef {
362 name: String,
363 closure: Option<Box<SerializableVMValue>>,
364 },
365 DataReference {
366 datetime: chrono::DateTime<chrono::FixedOffset>,
367 id: String,
368 timeframe: Timeframe,
369 },
370 Future(u64),
371 DataTable(BlobRef),
372 TypedTable {
373 schema_id: u64,
374 table: BlobRef,
375 },
376 RowView {
377 schema_id: u64,
378 table: BlobRef,
379 row_idx: usize,
380 },
381 ColumnRef {
382 schema_id: u64,
383 table: BlobRef,
384 col_id: u32,
385 },
386 IndexedTable {
387 schema_id: u64,
388 table: BlobRef,
389 index_col: u32,
390 },
391 TypedArray {
393 element_kind: TypedArrayElementKind,
394 blob: BlobRef,
395 len: usize,
396 },
397 Matrix {
399 blob: BlobRef,
400 rows: u32,
401 cols: u32,
402 },
403 HashMap {
405 keys: Vec<SerializableVMValue>,
406 values: Vec<SerializableVMValue>,
407 },
408 SidecarRef {
412 sidecar_id: u32,
413 blob_kind: BlobKind,
414 original_hash: HashDigest,
415 meta_a: u32,
417 meta_b: u32,
419 },
420
421 HashSet { keys: Vec<String> },
456
457 IteratorOpaque,
465
466 ResultData {
472 is_ok: bool,
473 payload: Box<SerializableVMValue>,
474 },
475
476 OptionData {
480 is_some: bool,
481 payload: Option<Box<SerializableVMValue>>,
482 },
483
484 DequeOpaque { len: usize },
494
495 ChannelOpaque { closed: bool, len: usize },
501
502 PriorityQueueHeap { heap: Vec<i64> },
506
507 ReferenceOpaque,
513
514 FilterExprOpaque,
521
522 SharedCellOpaque,
530
531 MutexOpaque { has_value: bool },
538
539 AtomicI64 { value: i64 },
544
545 LazyOpaque { is_initialized: bool },
555
556 Char(char),
559
560 BigInt(i64),
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct EnumValueSnapshot {
569 pub enum_name: String,
570 pub variant: String,
571 pub payload: EnumPayloadSnapshot,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub enum EnumPayloadSnapshot {
576 Unit,
577 Tuple(Vec<SerializableVMValue>),
578 Struct(Vec<(String, SerializableVMValue)>),
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize)]
582pub struct PrintableSnapshot {
583 pub rendered: String,
584 pub spans: Vec<PrintSpanSnapshot>,
585}
586
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub enum PrintSpanSnapshot {
589 Literal {
590 text: String,
591 start: usize,
592 end: usize,
593 span_id: String,
594 },
595 Value {
596 text: String,
597 start: usize,
598 end: usize,
599 span_id: String,
600 variable_name: Option<String>,
601 raw_value: Box<SerializableVMValue>,
602 type_name: String,
603 current_format: String,
604 format_params: HashMap<String, SerializableVMValue>,
605 },
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct BlobRef {
610 pub hash: HashDigest,
611 pub kind: BlobKind,
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
616pub enum TypedArrayElementKind {
617 I8,
618 I16,
619 I32,
620 I64,
621 U8,
622 U16,
623 U32,
624 U64,
625 F32,
626 F64,
627 Bool,
628}
629
630#[derive(Debug, Clone, Serialize, Deserialize)]
631pub enum BlobKind {
632 DataTable,
633 TypedArray(TypedArrayElementKind),
635 Matrix,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct ChunkedBlob {
641 pub chunk_hashes: Vec<HashDigest>,
642 pub total_len: usize,
643 pub chunk_len: usize,
644}
645
646#[derive(Debug, Clone, Serialize, Deserialize)]
647pub struct SerializableDataTable {
648 pub ipc_chunks: ChunkedBlob,
649 pub type_name: Option<String>,
650 pub schema_id: Option<u32>,
651}
652
653#[derive(Debug, Clone, Serialize, Deserialize)]
654pub struct SerializableDataFrame {
655 pub id: String,
656 pub timeframe: Timeframe,
657 pub timestamps: ChunkedBlob,
658 pub columns: Vec<SerializableDataFrameColumn>,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct SerializableDataFrameColumn {
663 pub name: String,
664 pub values: ChunkedBlob,
665}
666
667#[derive(Debug, Clone, Serialize, Deserialize)]
668pub struct CacheKeySnapshot {
669 pub id: String,
670 pub timeframe: Timeframe,
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct CachedDataSnapshot {
675 pub key: CacheKeySnapshot,
676 pub historical: SerializableDataFrame,
677 pub current_index: usize,
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
681pub struct LiveBufferSnapshot {
682 pub key: CacheKeySnapshot,
683 pub rows: ChunkedBlob,
684}
685
686#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct DataCacheSnapshot {
688 pub historical: Vec<CachedDataSnapshot>,
689 pub live_buffer: Vec<LiveBufferSnapshot>,
690}
691
692pub(crate) fn store_chunked_vec<T: Serialize>(
693 values: &[T],
694 chunk_len: usize,
695 store: &SnapshotStore,
696) -> Result<ChunkedBlob> {
697 let chunk_len = chunk_len.max(1);
698 if values.is_empty() {
699 return Ok(ChunkedBlob {
700 chunk_hashes: Vec::new(),
701 total_len: 0,
702 chunk_len,
703 });
704 }
705 let mut hashes = Vec::new();
706 for chunk in values.chunks(chunk_len) {
707 let bytes = bincode::serialize(chunk)?;
708 let hash = store.put_blob(&bytes)?;
709 hashes.push(hash);
710 }
711 Ok(ChunkedBlob {
712 chunk_hashes: hashes,
713 total_len: values.len(),
714 chunk_len,
715 })
716}
717
718pub(crate) fn load_chunked_vec<T: DeserializeOwned>(
719 chunked: &ChunkedBlob,
720 store: &SnapshotStore,
721) -> Result<Vec<T>> {
722 if chunked.total_len == 0 {
723 return Ok(Vec::new());
724 }
725 let mut out = Vec::with_capacity(chunked.total_len);
726 for hash in &chunked.chunk_hashes {
727 let bytes = store.get_blob(hash)?;
728 let chunk: Vec<T> = bincode::deserialize(&bytes)?;
729 out.extend(chunk);
730 }
731 out.truncate(chunked.total_len);
732 Ok(out)
733}
734
735pub fn store_chunked_bytes(data: &[u8], store: &SnapshotStore) -> Result<ChunkedBlob> {
737 if data.is_empty() {
738 return Ok(ChunkedBlob {
739 chunk_hashes: Vec::new(),
740 total_len: 0,
741 chunk_len: BYTE_CHUNK_LEN,
742 });
743 }
744 let mut hashes = Vec::new();
745 for chunk in data.chunks(BYTE_CHUNK_LEN) {
746 let hash = store.put_blob(chunk)?;
747 hashes.push(hash);
748 }
749 Ok(ChunkedBlob {
750 chunk_hashes: hashes,
751 total_len: data.len(),
752 chunk_len: BYTE_CHUNK_LEN,
753 })
754}
755
756pub fn load_chunked_bytes(chunked: &ChunkedBlob, store: &SnapshotStore) -> Result<Vec<u8>> {
758 if chunked.total_len == 0 {
759 return Ok(Vec::new());
760 }
761 let mut out = Vec::with_capacity(chunked.total_len);
762 for hash in &chunked.chunk_hashes {
763 let bytes = store.get_blob(hash)?;
764 out.extend_from_slice(&bytes);
765 }
766 out.truncate(chunked.total_len);
767 Ok(out)
768}
769
770fn bytes_as_slice<T: Copy>(bytes: &[u8]) -> &[T] {
775 let elem_size = std::mem::size_of::<T>();
776 assert!(
777 bytes.len() % elem_size == 0,
778 "byte slice length {} not a multiple of element size {}",
779 bytes.len(),
780 elem_size
781 );
782 let len = bytes.len() / elem_size;
783 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const T, len) }
784}
785
786fn slice_as_bytes<T>(data: &[T]) -> &[u8] {
788 let byte_len = data.len() * std::mem::size_of::<T>();
789 unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, byte_len) }
790}
791
792use shape_value::{HeapKind, KindedSlot, NativeKind, ValueSlot};
829use std::sync::Arc;
830
831pub fn slot_to_serializable(
844 bits: u64,
845 kind: NativeKind,
846 _store: &SnapshotStore,
847) -> std::result::Result<SerializableVMValue, String> {
848 use SerializableVMValue as SV;
849 match kind {
850 NativeKind::Int64 => Ok(SV::Int(bits as i64)),
851 NativeKind::Int32 => Ok(SV::Int(bits as i32 as i64)),
852 NativeKind::Int16 => Ok(SV::Int(bits as i16 as i64)),
853 NativeKind::Int8 => Ok(SV::Int(bits as i8 as i64)),
854 NativeKind::UInt64 => Ok(SV::Int(bits as i64)),
855 NativeKind::UInt32 => Ok(SV::Int((bits as u32) as i64)),
856 NativeKind::UInt16 => Ok(SV::Int((bits as u16) as i64)),
857 NativeKind::UInt8 => Ok(SV::Int((bits as u8) as i64)),
858 NativeKind::IntSize => Ok(SV::Int(bits as isize as i64)),
859 NativeKind::UIntSize => Ok(SV::Int((bits as usize) as i64)),
860 NativeKind::Float64 => Ok(SV::Number(f64::from_bits(bits))),
861 NativeKind::Float32 => Ok(SV::Number(f64::from(f32::from_bits(bits as u32)))),
867 NativeKind::Char => match char::from_u32(bits as u32) {
868 Some(c) => Ok(SV::Char(c)),
869 None => Err(format!(
870 "slot_to_serializable: NativeKind::Char slot has invalid \
871 codepoint bits 0x{:x} — construction-side contract violated",
872 bits,
873 )),
874 },
875 NativeKind::Bool => Ok(SV::Bool(bits != 0)),
876 NativeKind::NullableInt64
877 | NativeKind::NullableInt32
878 | NativeKind::NullableInt16
879 | NativeKind::NullableInt8
880 | NativeKind::NullableUInt64
881 | NativeKind::NullableUInt32
882 | NativeKind::NullableUInt16
883 | NativeKind::NullableUInt8
884 | NativeKind::NullableIntSize
885 | NativeKind::NullableUIntSize
886 | NativeKind::NullableFloat64 => {
887 Err(format!(
894 "slot_to_serializable: W17-snapshot-roundtrip surface — \
895 nullable-scalar kind {kind:?} has no SerializableVMValue \
896 arm at landing. The post-proof sentinel-rule amendment \
897 is the W17-snapshot-nullable follow-up. \
898 ADR-006 §2.7.5.1.",
899 ))
900 }
901 NativeKind::String => {
902 if bits == 0 {
907 return Err("slot_to_serializable: String slot with null bits".into());
908 }
909 unsafe {
910 let arc = Arc::<String>::from_raw(bits as *const String);
911 let cloned = (*arc).clone();
912 let _ = Arc::into_raw(arc); Ok(SV::String(cloned))
914 }
915 }
916 NativeKind::StringV2 => {
927 if bits == 0 {
928 return Err("slot_to_serializable: StringV2 slot with null bits".into());
929 }
930 let ptr = bits as *const shape_value::v2::string_obj::StringObj;
936 let s: &str = unsafe { shape_value::v2::string_obj::StringObj::as_str(ptr) };
937 Ok(SV::String(s.to_string()))
938 }
939 NativeKind::DecimalV2 => {
945 if bits == 0 {
946 return Err("slot_to_serializable: DecimalV2 slot with null bits".into());
947 }
948 let ptr = bits as *const shape_value::v2::decimal_obj::DecimalObj;
952 let value = unsafe { shape_value::v2::decimal_obj::DecimalObj::value(ptr) };
953 Ok(SV::Decimal(value))
954 }
955 NativeKind::Ptr(heap_kind) => slot_heap_to_serializable(bits, heap_kind),
956 }
957}
958
959fn slot_heap_to_serializable(
969 bits: u64,
970 expected_kind: HeapKind,
971) -> std::result::Result<SerializableVMValue, String> {
972 use SerializableVMValue as SV;
973 use shape_value::heap_value::{
974 AtomicData, ChannelData, DequeData, HashSetData, LazyData, MutexData,
975 OptionData, PriorityQueueData, ResultData,
976 };
977 if bits == 0 {
978 return Err(format!(
979 "slot_to_serializable: Ptr({expected_kind:?}) slot with null bits",
980 ));
981 }
982 match expected_kind {
983 HeapKind::String => {
984 unsafe {
987 let arc = Arc::<String>::from_raw(bits as *const String);
988 let cloned = (*arc).clone();
989 let _ = Arc::into_raw(arc);
990 Ok(SV::String(cloned))
991 }
992 }
993 HeapKind::Decimal => unsafe {
994 let arc = Arc::<rust_decimal::Decimal>::from_raw(bits as *const rust_decimal::Decimal);
995 let v = *arc;
996 let _ = Arc::into_raw(arc);
997 Ok(SV::Decimal(v))
998 },
999 HeapKind::BigInt => unsafe {
1000 let arc = Arc::<i64>::from_raw(bits as *const i64);
1001 let v = *arc;
1002 let _ = Arc::into_raw(arc);
1003 Ok(SV::BigInt(v))
1004 },
1005 HeapKind::Char => {
1006 let cp = bits as u32;
1009 match char::from_u32(cp) {
1010 Some(c) => Ok(SV::Char(c)),
1011 None => Err(format!(
1012 "slot_to_serializable: Char arm: invalid codepoint {cp:#x}"
1013 )),
1014 }
1015 }
1016 HeapKind::HashSet => unsafe {
1017 let arc = Arc::<HashSetData>::from_raw(bits as *const HashSetData);
1018 let keys: Vec<String> = arc.keys.iter().map(|k| (**k).clone()).collect();
1019 let _ = Arc::into_raw(arc);
1020 Ok(SV::HashSet { keys })
1021 },
1022 HeapKind::PriorityQueue => unsafe {
1023 let arc = Arc::<PriorityQueueData>::from_raw(bits as *const PriorityQueueData);
1024 let heap: Vec<i64> = (*arc.heap).clone();
1025 let _ = Arc::into_raw(arc);
1026 Ok(SV::PriorityQueueHeap { heap })
1027 },
1028 HeapKind::Atomic => unsafe {
1029 let arc = Arc::<AtomicData>::from_raw(bits as *const AtomicData);
1030 let v = arc.load();
1031 let _ = Arc::into_raw(arc);
1032 Ok(SV::AtomicI64 { value: v })
1033 },
1034 HeapKind::Lazy => unsafe {
1035 let arc = Arc::<LazyData>::from_raw(bits as *const LazyData);
1036 let is_init = arc.is_initialized();
1037 let _ = Arc::into_raw(arc);
1038 Ok(SV::LazyOpaque {
1039 is_initialized: is_init,
1040 })
1041 },
1042 HeapKind::Mutex => unsafe {
1043 let arc = Arc::<MutexData>::from_raw(bits as *const MutexData);
1044 let inner = arc.get();
1049 let has_value =
1050 !(matches!(inner.kind(), NativeKind::Bool) && inner.slot().raw() == 0);
1051 drop(inner);
1052 let _ = Arc::into_raw(arc);
1053 Ok(SV::MutexOpaque { has_value })
1054 },
1055 HeapKind::Channel => unsafe {
1056 let arc = Arc::<ChannelData>::from_raw(bits as *const ChannelData);
1057 let closed = arc.is_closed();
1058 let len = arc.len();
1059 let _ = Arc::into_raw(arc);
1060 Ok(SV::ChannelOpaque { closed, len })
1061 },
1062 HeapKind::Deque => unsafe {
1063 let arc = Arc::<DequeData>::from_raw(bits as *const DequeData);
1064 let len = arc.items.len();
1065 let _ = Arc::into_raw(arc);
1066 Ok(SV::DequeOpaque { len })
1067 },
1068 HeapKind::Result => unsafe {
1069 let arc = Arc::<ResultData>::from_raw(bits as *const ResultData);
1070 let is_ok = arc.is_ok;
1071 let payload_kind = arc.payload.kind();
1072 let payload_bits = arc.payload.slot().raw();
1073 let inner = serializable_inner_kinded(payload_bits, payload_kind)?;
1074 let _ = Arc::into_raw(arc);
1075 Ok(SV::ResultData {
1076 is_ok,
1077 payload: Box::new(inner),
1078 })
1079 },
1080 HeapKind::Option => unsafe {
1081 let arc = Arc::<OptionData>::from_raw(bits as *const OptionData);
1082 let is_some = arc.is_some;
1083 let payload = if is_some {
1084 let payload_kind = arc.payload.kind();
1085 let payload_bits = arc.payload.slot().raw();
1086 Some(Box::new(serializable_inner_kinded(payload_bits, payload_kind)?))
1087 } else {
1088 None
1089 };
1090 let _ = Arc::into_raw(arc);
1091 Ok(SV::OptionData { is_some, payload })
1092 },
1093 HeapKind::Reference => Ok(SV::ReferenceOpaque),
1098 HeapKind::FilterExpr => Ok(SV::FilterExprOpaque),
1099 HeapKind::SharedCell => Ok(SV::SharedCellOpaque),
1100 HeapKind::Iterator => Ok(SV::IteratorOpaque),
1101 HeapKind::Future => Ok(SV::Future(bits)),
1103
1104 other => Err(format!(
1114 "slot_to_serializable: W17-snapshot-roundtrip surface — \
1115 HeapKind::{other:?} arm has no in-session SerializableVMValue \
1116 projection. Tracked as W17-snapshot-{other:?} follow-up per \
1117 docs/cluster-audits/phase-2d-playbook.md §3. \
1118 ADR-006 §2.7.5.1.",
1119 )),
1120 }
1121}
1122
1123fn serializable_inner_kinded(
1128 bits: u64,
1129 kind: NativeKind,
1130) -> std::result::Result<SerializableVMValue, String> {
1131 if matches!(kind, NativeKind::Bool) && bits == 0 {
1132 return Ok(SerializableVMValue::Unit);
1133 }
1134 match kind {
1135 NativeKind::Int64 => Ok(SerializableVMValue::Int(bits as i64)),
1136 NativeKind::Float64 => Ok(SerializableVMValue::Number(f64::from_bits(bits))),
1137 NativeKind::Bool => Ok(SerializableVMValue::Bool(bits != 0)),
1138 NativeKind::String => {
1139 if bits == 0 {
1140 return Ok(SerializableVMValue::None);
1141 }
1142 unsafe {
1143 let arc = Arc::<String>::from_raw(bits as *const String);
1144 let cloned = (*arc).clone();
1145 let _ = Arc::into_raw(arc);
1146 Ok(SerializableVMValue::String(cloned))
1147 }
1148 }
1149 _ => Err(format!(
1150 "serializable_inner_kinded: W17-snapshot-roundtrip surface — \
1151 inner Result/Option payload kind {kind:?} is not in the \
1152 initial scalar set; deep payload arms land in follow-up. \
1153 ADR-006 §2.7.5.1.",
1154 )),
1155 }
1156}
1157
1158pub fn serializable_to_slot(
1168 sv: &SerializableVMValue,
1169 expected_kind: NativeKind,
1170 _store: &SnapshotStore,
1171) -> std::result::Result<(u64, NativeKind), String> {
1172 use SerializableVMValue as SV;
1173 match (sv, expected_kind) {
1176 (SV::Int(i), NativeKind::Int64) => Ok((*i as u64, NativeKind::Int64)),
1177 (SV::Int(i), NativeKind::Int32) => Ok(((*i as i32) as u64, NativeKind::Int32)),
1178 (SV::Int(i), NativeKind::Int16) => Ok(((*i as i16 as i32) as u64, NativeKind::Int16)),
1179 (SV::Int(i), NativeKind::Int8) => Ok(((*i as i8 as i32) as u64, NativeKind::Int8)),
1180 (SV::Int(i), NativeKind::UInt64) => Ok((*i as u64, NativeKind::UInt64)),
1181 (SV::Int(i), NativeKind::UInt32) => Ok(((*i as u32) as u64, NativeKind::UInt32)),
1182 (SV::Int(i), NativeKind::UInt16) => Ok(((*i as u16) as u64, NativeKind::UInt16)),
1183 (SV::Int(i), NativeKind::UInt8) => Ok(((*i as u8) as u64, NativeKind::UInt8)),
1184 (SV::Int(i), NativeKind::IntSize) => Ok((*i as isize as u64, NativeKind::IntSize)),
1185 (SV::Int(i), NativeKind::UIntSize) => Ok((*i as u64, NativeKind::UIntSize)),
1186 (SV::Number(f), NativeKind::Float64) => Ok((f.to_bits(), NativeKind::Float64)),
1187 (SV::Bool(b), NativeKind::Bool) => Ok((if *b { 1 } else { 0 }, NativeKind::Bool)),
1188 (SV::String(s), NativeKind::String) => {
1189 let arc = Arc::new(s.clone());
1190 let raw = Arc::into_raw(arc) as u64;
1191 Ok((raw, NativeKind::String))
1192 }
1193 (SV::None | SV::Unit, NativeKind::Bool) => Ok((0, NativeKind::Bool)),
1194
1195 (sv, NativeKind::Ptr(hk)) => serializable_to_heap_slot(sv, hk),
1199
1200 (other_sv, other_kind) => Err(format!(
1202 "serializable_to_slot: W17-snapshot-roundtrip surface — \
1203 SerializableVMValue arm {} cannot satisfy expected kind \
1204 {other_kind:?}. Discriminator-vs-kind mismatch is a structured \
1205 error, not a Bool-default fallback (§2.7.5.1 forbidden). \
1206 ADR-006 §2.7.5.1.",
1207 serializable_arm_name(other_sv),
1208 )),
1209 }
1210}
1211
1212fn serializable_to_heap_slot(
1217 sv: &SerializableVMValue,
1218 heap_kind: HeapKind,
1219) -> std::result::Result<(u64, NativeKind), String> {
1220 use SerializableVMValue as SV;
1221 use shape_value::heap_value::{
1222 AtomicData, HashSetData, OptionData, PriorityQueueData, ResultData,
1223 };
1224 match (sv, heap_kind) {
1225 (SV::String(s), HeapKind::String) => {
1226 let arc = Arc::new(s.clone());
1229 let raw = Arc::into_raw(arc) as u64;
1230 Ok((raw, NativeKind::Ptr(HeapKind::String)))
1231 }
1232 (SV::Char(c), HeapKind::Char) => {
1233 let bits = (*c as u32) as u64;
1237 Ok((bits, NativeKind::Ptr(HeapKind::Char)))
1238 }
1239 (SV::BigInt(n), HeapKind::BigInt) => {
1240 let arc = Arc::new(*n);
1241 let raw = Arc::into_raw(arc) as u64;
1242 Ok((raw, NativeKind::Ptr(HeapKind::BigInt)))
1243 }
1244 (SV::Decimal(d), HeapKind::Decimal) => {
1245 let arc = Arc::new(*d);
1246 let raw = Arc::into_raw(arc) as u64;
1247 Ok((raw, NativeKind::Ptr(HeapKind::Decimal)))
1248 }
1249 (SV::HashSet { keys }, HeapKind::HashSet) => {
1250 let arcs: Vec<Arc<String>> = keys.iter().map(|k| Arc::new(k.clone())).collect();
1251 let data = HashSetData::from_keys(arcs);
1252 let arc = Arc::new(data);
1253 let raw = Arc::into_raw(arc) as u64;
1254 Ok((raw, NativeKind::Ptr(HeapKind::HashSet)))
1255 }
1256 (SV::PriorityQueueHeap { heap }, HeapKind::PriorityQueue) => {
1257 let mut pq = PriorityQueueData::new();
1258 for &v in heap {
1263 pq.push(v);
1264 }
1265 let arc = Arc::new(pq);
1266 let raw = Arc::into_raw(arc) as u64;
1267 Ok((raw, NativeKind::Ptr(HeapKind::PriorityQueue)))
1268 }
1269 (SV::AtomicI64 { value }, HeapKind::Atomic) => {
1270 let arc = Arc::new(AtomicData::new(*value));
1271 let raw = Arc::into_raw(arc) as u64;
1272 Ok((raw, NativeKind::Ptr(HeapKind::Atomic)))
1273 }
1274 (SV::ResultData { is_ok, payload }, HeapKind::Result) => {
1275 let inner_slot = inner_kinded_from_serializable(payload)?;
1282 let data = if *is_ok {
1283 ResultData::ok(inner_slot)
1284 } else {
1285 ResultData::err(inner_slot)
1286 };
1287 let arc = Arc::new(data);
1288 let raw = Arc::into_raw(arc) as u64;
1289 Ok((raw, NativeKind::Ptr(HeapKind::Result)))
1290 }
1291 (SV::OptionData { is_some, payload }, HeapKind::Option) => {
1292 let data = if *is_some {
1293 match payload {
1294 Some(p) => OptionData::some(inner_kinded_from_serializable(p)?),
1295 None => return Err(
1296 "serializable_to_slot: OptionData is_some=true but payload=None — \
1297 malformed wire shape; expected Some(SerializableVMValue) for \
1298 is_some=true. ADR-006 §2.7.5.1."
1299 .to_string(),
1300 ),
1301 }
1302 } else {
1303 OptionData::none()
1304 };
1305 let arc = Arc::new(data);
1306 let raw = Arc::into_raw(arc) as u64;
1307 Ok((raw, NativeKind::Ptr(HeapKind::Option)))
1308 }
1309
1310 (SV::IteratorOpaque, HeapKind::Iterator)
1316 | (SV::DequeOpaque { .. }, HeapKind::Deque)
1317 | (SV::ChannelOpaque { .. }, HeapKind::Channel)
1318 | (SV::ReferenceOpaque, HeapKind::Reference)
1319 | (SV::FilterExprOpaque, HeapKind::FilterExpr)
1320 | (SV::SharedCellOpaque, HeapKind::SharedCell)
1321 | (SV::MutexOpaque { .. }, HeapKind::Mutex)
1322 | (SV::LazyOpaque { .. }, HeapKind::Lazy) => Err(format!(
1323 "serializable_to_slot: W17-snapshot-roundtrip surface — \
1324 {heap_kind:?} arm restored from opaque wire shape; \
1325 deep payload reconstruction is the W17-snapshot-{:?} \
1326 follow-up. ADR-006 §2.7.5.1.",
1327 heap_kind,
1328 )),
1329
1330 (other_sv, hk) => Err(format!(
1333 "serializable_to_slot: W17-snapshot-roundtrip surface — \
1334 SerializableVMValue arm {} cannot satisfy expected heap kind \
1335 Ptr({hk:?}). Either the wire-format arm has no inverse \
1336 projection (deep follow-up) or the discriminator is \
1337 mismatched. ADR-006 §2.7.5.1.",
1338 serializable_arm_name(other_sv),
1339 )),
1340 }
1341}
1342
1343fn inner_kinded_from_serializable(
1347 sv: &SerializableVMValue,
1348) -> std::result::Result<KindedSlot, String> {
1349 use SerializableVMValue as SV;
1350 match sv {
1351 SV::Int(i) => Ok(KindedSlot::new(
1352 ValueSlot::from_raw(*i as u64),
1353 NativeKind::Int64,
1354 )),
1355 SV::Number(f) => Ok(KindedSlot::new(
1356 ValueSlot::from_raw(f.to_bits()),
1357 NativeKind::Float64,
1358 )),
1359 SV::Bool(b) => Ok(KindedSlot::new(
1360 ValueSlot::from_raw(if *b { 1 } else { 0 }),
1361 NativeKind::Bool,
1362 )),
1363 SV::String(s) => Ok(KindedSlot::from_string_arc(Arc::new(s.clone()))),
1364 SV::Unit | SV::None => Ok(KindedSlot::new(
1365 ValueSlot::from_raw(0),
1366 NativeKind::Bool,
1367 )),
1368 other => Err(format!(
1369 "inner_kinded_from_serializable: W17-snapshot-roundtrip surface — \
1370 SerializableVMValue arm {} has no in-session inner-payload \
1371 projection. Tracked as follow-up. ADR-006 §2.7.5.1.",
1372 serializable_arm_name(other),
1373 )),
1374 }
1375}
1376
1377fn serializable_arm_name(sv: &SerializableVMValue) -> &'static str {
1379 use SerializableVMValue as SV;
1380 match sv {
1381 SV::Int(_) => "Int",
1382 SV::Number(_) => "Number",
1383 SV::Decimal(_) => "Decimal",
1384 SV::String(_) => "String",
1385 SV::Bool(_) => "Bool",
1386 SV::None => "None",
1387 SV::Some(_) => "Some",
1388 SV::Unit => "Unit",
1389 SV::Timeframe(_) => "Timeframe",
1390 SV::Duration(_) => "Duration",
1391 SV::Time(_) => "Time",
1392 SV::TimeSpan(_) => "TimeSpan",
1393 SV::TimeReference(_) => "TimeReference",
1394 SV::DateTimeExpr(_) => "DateTimeExpr",
1395 SV::DataDateTimeRef(_) => "DataDateTimeRef",
1396 SV::Array(_) => "Array",
1397 SV::Function(_) => "Function",
1398 SV::TypeAnnotation(_) => "TypeAnnotation",
1399 SV::TypeAnnotatedValue { .. } => "TypeAnnotatedValue",
1400 SV::Enum(_) => "Enum",
1401 SV::Closure { .. } => "Closure",
1402 SV::ModuleFunction(_) => "ModuleFunction",
1403 SV::TypedObject { .. } => "TypedObject",
1404 SV::Range { .. } => "Range",
1405 SV::Ok(_) => "Ok",
1406 SV::Err(_) => "Err",
1407 SV::PrintResult(_) => "PrintResult",
1408 SV::SimulationCall { .. } => "SimulationCall",
1409 SV::FunctionRef { .. } => "FunctionRef",
1410 SV::DataReference { .. } => "DataReference",
1411 SV::Future(_) => "Future",
1412 SV::DataTable(_) => "DataTable",
1413 SV::TypedTable { .. } => "TypedTable",
1414 SV::RowView { .. } => "RowView",
1415 SV::ColumnRef { .. } => "ColumnRef",
1416 SV::IndexedTable { .. } => "IndexedTable",
1417 SV::TypedArray { .. } => "TypedArray",
1418 SV::Matrix { .. } => "Matrix",
1419 SV::HashMap { .. } => "HashMap",
1420 SV::SidecarRef { .. } => "SidecarRef",
1421 SV::HashSet { .. } => "HashSet",
1422 SV::IteratorOpaque => "IteratorOpaque",
1423 SV::ResultData { .. } => "ResultData",
1424 SV::OptionData { .. } => "OptionData",
1425 SV::DequeOpaque { .. } => "DequeOpaque",
1426 SV::ChannelOpaque { .. } => "ChannelOpaque",
1427 SV::PriorityQueueHeap { .. } => "PriorityQueueHeap",
1428 SV::ReferenceOpaque => "ReferenceOpaque",
1429 SV::FilterExprOpaque => "FilterExprOpaque",
1430 SV::SharedCellOpaque => "SharedCellOpaque",
1431 SV::MutexOpaque { .. } => "MutexOpaque",
1432 SV::AtomicI64 { .. } => "AtomicI64",
1433 SV::LazyOpaque { .. } => "LazyOpaque",
1434 SV::Char(_) => "Char",
1435 SV::BigInt(_) => "BigInt",
1436 }
1437}
1438
1439fn serialize_datatable(dt: &DataTable, store: &SnapshotStore) -> Result<SerializableDataTable> {
1440 let mut buf = Vec::new();
1441 let schema = dt.inner().schema();
1442 let mut writer = arrow_ipc::writer::FileWriter::try_new(&mut buf, schema.as_ref())?;
1443 writer.write(dt.inner())?;
1444 writer.finish()?;
1445 let ipc_chunks = store_chunked_vec(&buf, BYTE_CHUNK_LEN, store)?;
1446 Ok(SerializableDataTable {
1447 ipc_chunks,
1448 type_name: dt.type_name().map(|s| s.to_string()),
1449 schema_id: dt.schema_id(),
1450 })
1451}
1452
1453fn deserialize_datatable(
1454 serialized: SerializableDataTable,
1455 store: &SnapshotStore,
1456) -> Result<DataTable> {
1457 let bytes = load_chunked_vec(&serialized.ipc_chunks, store)?;
1458 let cursor = std::io::Cursor::new(bytes);
1459 let mut reader = arrow_ipc::reader::FileReader::try_new(cursor, None)?;
1460 let batch = reader
1461 .next()
1462 .transpose()?
1463 .context("no RecordBatch in DataTable snapshot")?;
1464 let mut dt = DataTable::new(batch);
1465 if let Some(name) = serialized.type_name {
1466 dt = DataTable::with_type_name(dt.into_inner(), name);
1467 }
1468 if let Some(schema_id) = serialized.schema_id {
1469 dt = dt.with_schema_id(schema_id);
1470 }
1471 Ok(dt)
1472}
1473