1use std::collections::{HashMap, HashSet};
6use std::fs;
7use std::io::{Read, Write};
8use std::path::PathBuf;
9use anyhow::{Context, Result};
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12use crate::event_queue::WaitCondition;
19use crate::hashing::{HashDigest, hash_bytes};
20use shape_ast::ast::{DataDateTimeRef, DateTimeExpr, EnumDef, TimeReference, TypeAnnotation};
21use shape_ast::data::Timeframe;
22
23use shape_value::datatable::DataTable;
24
25pub const SNAPSHOT_VERSION: u32 = 5;
38
39pub(crate) const DEFAULT_CHUNK_LEN: usize = 4096;
40pub(crate) const BYTE_CHUNK_LEN: usize = 256 * 1024;
41
42#[derive(Clone)]
44pub struct SnapshotStore {
45 root: PathBuf,
46}
47
48impl SnapshotStore {
49 pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
50 let root = root.into();
51 fs::create_dir_all(root.join("blobs"))
52 .with_context(|| format!("failed to create snapshot blob dir at {}", root.display()))?;
53 fs::create_dir_all(root.join("snapshots"))
54 .with_context(|| format!("failed to create snapshot dir at {}", root.display()))?;
55 Ok(Self { root })
56 }
57
58 fn blob_path(&self, hash: &HashDigest) -> PathBuf {
59 self.root
60 .join("blobs")
61 .join(format!("{}.bin.zst", hash.hex()))
62 }
63
64 fn snapshot_path(&self, hash: &HashDigest) -> PathBuf {
65 self.root
66 .join("snapshots")
67 .join(format!("{}.bin.zst", hash.hex()))
68 }
69
70 pub fn put_blob(&self, data: &[u8]) -> Result<HashDigest> {
71 let hash = hash_bytes(data);
72 let path = self.blob_path(&hash);
73 if path.exists() {
74 return Ok(hash);
75 }
76 let compressed = zstd::stream::encode_all(data, 0)?;
77 let mut file = fs::File::create(&path)?;
78 file.write_all(&compressed)?;
79 Ok(hash)
80 }
81
82 pub fn get_blob(&self, hash: &HashDigest) -> Result<Vec<u8>> {
83 let path = self.blob_path(hash);
84 let mut file = fs::File::open(&path)
85 .with_context(|| format!("snapshot blob not found: {}", path.display()))?;
86 let mut buf = Vec::new();
87 file.read_to_end(&mut buf)?;
88 let decompressed = zstd::stream::decode_all(&buf[..])?;
89 Ok(decompressed)
90 }
91
92 pub fn put_struct<T: Serialize>(&self, value: &T) -> Result<HashDigest> {
93 let bytes = bincode::serialize(value)?;
94 self.put_blob(&bytes)
95 }
96
97 pub fn get_struct<T: for<'de> Deserialize<'de>>(&self, hash: &HashDigest) -> Result<T> {
98 let bytes = self.get_blob(hash)?;
99 Ok(bincode::deserialize(&bytes)?)
100 }
101
102 pub fn put_snapshot(&self, snapshot: &ExecutionSnapshot) -> Result<HashDigest> {
103 let bytes = bincode::serialize(snapshot)?;
104 let hash = hash_bytes(&bytes);
105 let path = self.snapshot_path(&hash);
106 if !path.exists() {
107 let compressed = zstd::stream::encode_all(&bytes[..], 0)?;
108 let mut file = fs::File::create(&path)?;
109 file.write_all(&compressed)?;
110 }
111 Ok(hash)
112 }
113
114 pub fn get_snapshot(&self, hash: &HashDigest) -> Result<ExecutionSnapshot> {
115 let path = self.snapshot_path(hash);
116 let mut file = fs::File::open(&path)
117 .with_context(|| format!("snapshot not found: {}", path.display()))?;
118 let mut buf = Vec::new();
119 file.read_to_end(&mut buf)?;
120 let decompressed = zstd::stream::decode_all(&buf[..])?;
121 Ok(bincode::deserialize(&decompressed)?)
122 }
123
124 pub fn list_snapshots(&self) -> Result<Vec<(HashDigest, ExecutionSnapshot)>> {
134 let snapshots_dir = self.root.join("snapshots");
135 if !snapshots_dir.exists() {
136 return Ok(Vec::new());
137 }
138 let mut results = Vec::new();
139 for entry in fs::read_dir(&snapshots_dir)? {
140 let entry = entry?;
141 let path = entry.path();
142 if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
143 if let Some(hex) = name.strip_suffix(".bin.zst") {
145 let hash = HashDigest::from_hex(hex);
146 match self.get_snapshot(&hash) {
147 Ok(snap) => results.push((hash, snap)),
148 Err(_) => continue, }
150 }
151 }
152 }
153 results.sort_by(|a, b| b.1.created_at_ms.cmp(&a.1.created_at_ms));
155 Ok(results)
156 }
157
158 pub fn delete_snapshot(&self, hash: &HashDigest) -> Result<()> {
160 let path = self.snapshot_path(hash);
161 fs::remove_file(&path)
162 .with_context(|| format!("failed to delete snapshot: {}", path.display()))?;
163 Ok(())
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct ExecutionSnapshot {
175 pub version: u32,
178 pub created_at_ms: i64,
179 pub semantic_hash: HashDigest,
180 pub context_hash: HashDigest,
181 pub vm_hash: Option<HashDigest>,
182 pub bytecode_hash: Option<HashDigest>,
183 #[serde(default)]
185 pub script_path: Option<String>,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct SemanticSnapshot {
190 pub exported_symbols: HashSet<String>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ContextSnapshot {
195 pub data_load_mode: crate::context::DataLoadMode,
196 pub data_cache: Option<DataCacheSnapshot>,
197 pub current_id: Option<String>,
198 pub current_row_index: usize,
199 pub variable_scopes: Vec<HashMap<String, VariableSnapshot>>,
200 pub reference_datetime: Option<chrono::DateTime<chrono::Utc>>,
201 pub current_timeframe: Option<Timeframe>,
202 pub base_timeframe: Option<Timeframe>,
203 pub date_range: Option<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>,
204 pub range_start: usize,
205 pub range_end: usize,
206 pub range_active: bool,
207 pub type_alias_registry: HashMap<String, TypeAliasRuntimeEntrySnapshot>,
208 pub enum_registry: HashMap<String, EnumDef>,
209 #[serde(default)]
210 pub struct_type_registry: HashMap<String, shape_ast::ast::StructTypeDef>,
211 pub suspension_state: Option<SuspensionStateSnapshot>,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct VariableSnapshot {
216 pub value: SerializableVMValue,
217 pub kind: shape_ast::ast::VarKind,
218 pub is_initialized: bool,
219 pub is_function_scoped: bool,
220 pub format_hint: Option<String>,
221 pub format_overrides: Option<HashMap<String, SerializableVMValue>>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct TypeAliasRuntimeEntrySnapshot {
226 pub base_type: String,
227 pub overrides: Option<HashMap<String, SerializableVMValue>>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct SuspensionStateSnapshot {
232 pub waiting_for: WaitCondition,
233 pub resume_pc: usize,
234 pub saved_locals: Vec<SerializableVMValue>,
235 pub saved_stack: Vec<SerializableVMValue>,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct VmSnapshot {
240 pub ip: usize,
241 pub stack: Vec<SerializableVMValue>,
242 pub locals: Vec<SerializableVMValue>,
243 pub module_bindings: Vec<SerializableVMValue>,
244 pub call_stack: Vec<SerializableCallFrame>,
245 pub loop_stack: Vec<SerializableLoopContext>,
246 pub timeframe_stack: Vec<Option<Timeframe>>,
247 pub exception_handlers: Vec<SerializableExceptionHandler>,
248 #[serde(default)]
251 pub ip_blob_hash: Option<[u8; 32]>,
252 #[serde(default)]
256 pub ip_local_offset: Option<usize>,
257 #[serde(default)]
260 pub ip_function_id: Option<u16>,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct SerializableCallFrame {
265 pub return_ip: usize,
266 pub locals_base: usize,
267 pub locals_count: usize,
268 pub function_id: Option<u16>,
269 pub upvalues: Option<Vec<SerializableVMValue>>,
270 #[serde(default)]
274 pub blob_hash: Option<[u8; 32]>,
275 #[serde(default)]
279 pub local_ip: Option<usize>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SerializableLoopContext {
284 pub start: usize,
285 pub end: usize,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct SerializableExceptionHandler {
290 pub catch_ip: usize,
291 pub stack_size: usize,
292 pub call_depth: usize,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub enum SerializableVMValue {
297 Int(i64),
298 Number(f64),
299 Decimal(rust_decimal::Decimal),
300 String(String),
301 Bool(bool),
302 None,
303 Some(Box<SerializableVMValue>),
304 Unit,
305 Timeframe(Timeframe),
306 Duration(shape_ast::ast::Duration),
307 Time(chrono::DateTime<chrono::FixedOffset>),
308 TimeSpan(i64), TimeReference(TimeReference),
310 DateTimeExpr(DateTimeExpr),
311 DataDateTimeRef(DataDateTimeRef),
312 Array(Vec<SerializableVMValue>),
313 Function(u16),
314 TypeAnnotation(TypeAnnotation),
315 TypeAnnotatedValue {
316 type_name: String,
317 value: Box<SerializableVMValue>,
318 },
319 Enum(EnumValueSnapshot),
320 Closure {
331 function_id: u32,
332 type_id: u32,
333 upvalues: Vec<SerializableVMValue>,
334 },
335 ModuleFunction(String),
336 TypedObject {
344 schema_id: u64,
345 slot_data: Vec<SerializableVMValue>,
347 heap_mask: u64,
348 },
349 Range {
350 start: Option<Box<SerializableVMValue>>,
351 end: Option<Box<SerializableVMValue>>,
352 inclusive: bool,
353 },
354 Ok(Box<SerializableVMValue>),
355 Err(Box<SerializableVMValue>),
356 PrintResult(PrintableSnapshot),
357 SimulationCall {
358 name: String,
359 params: HashMap<String, SerializableVMValue>,
360 },
361 FunctionRef {
362 name: String,
363 closure: Option<Box<SerializableVMValue>>,
364 },
365 DataReference {
366 datetime: chrono::DateTime<chrono::FixedOffset>,
367 id: String,
368 timeframe: Timeframe,
369 },
370 Future(u64),
371 DataTable(BlobRef),
372 TypedTable {
373 schema_id: u64,
374 table: BlobRef,
375 },
376 RowView {
377 schema_id: u64,
378 table: BlobRef,
379 row_idx: usize,
380 },
381 ColumnRef {
382 schema_id: u64,
383 table: BlobRef,
384 col_id: u32,
385 },
386 IndexedTable {
387 schema_id: u64,
388 table: BlobRef,
389 index_col: u32,
390 },
391 TypedArray {
393 element_kind: TypedArrayElementKind,
394 blob: BlobRef,
395 len: usize,
396 },
397 Matrix {
399 blob: BlobRef,
400 rows: u32,
401 cols: u32,
402 },
403 HashMap {
405 keys: Vec<SerializableVMValue>,
406 values: Vec<SerializableVMValue>,
407 },
408 SidecarRef {
412 sidecar_id: u32,
413 blob_kind: BlobKind,
414 original_hash: HashDigest,
415 meta_a: u32,
417 meta_b: u32,
419 },
420
421 HashSet { keys: Vec<String> },
456
457 IteratorOpaque,
465
466 ResultData {
472 is_ok: bool,
473 payload: Box<SerializableVMValue>,
474 },
475
476 OptionData {
480 is_some: bool,
481 payload: Option<Box<SerializableVMValue>>,
482 },
483
484 DequeOpaque { len: usize },
494
495 ChannelOpaque { closed: bool, len: usize },
501
502 PriorityQueueHeap { heap: Vec<i64> },
506
507 ReferenceOpaque,
513
514 FilterExprOpaque,
521
522 SharedCellOpaque,
530
531 MutexOpaque { has_value: bool },
538
539 AtomicI64 { value: i64 },
544
545 LazyOpaque { is_initialized: bool },
555
556 Char(char),
559
560 BigInt(i64),
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct EnumValueSnapshot {
569 pub enum_name: String,
570 pub variant: String,
571 pub payload: EnumPayloadSnapshot,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub enum EnumPayloadSnapshot {
576 Unit,
577 Tuple(Vec<SerializableVMValue>),
578 Struct(Vec<(String, SerializableVMValue)>),
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize)]
582pub struct PrintableSnapshot {
583 pub rendered: String,
584 pub spans: Vec<PrintSpanSnapshot>,
585}
586
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub enum PrintSpanSnapshot {
589 Literal {
590 text: String,
591 start: usize,
592 end: usize,
593 span_id: String,
594 },
595 Value {
596 text: String,
597 start: usize,
598 end: usize,
599 span_id: String,
600 variable_name: Option<String>,
601 raw_value: Box<SerializableVMValue>,
602 type_name: String,
603 current_format: String,
604 format_params: HashMap<String, SerializableVMValue>,
605 },
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct BlobRef {
610 pub hash: HashDigest,
611 pub kind: BlobKind,
612}
613
614#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
616pub enum TypedArrayElementKind {
617 I8,
618 I16,
619 I32,
620 I64,
621 U8,
622 U16,
623 U32,
624 U64,
625 F32,
626 F64,
627 Bool,
628}
629
630#[derive(Debug, Clone, Serialize, Deserialize)]
631pub enum BlobKind {
632 DataTable,
633 TypedArray(TypedArrayElementKind),
635 Matrix,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct ChunkedBlob {
641 pub chunk_hashes: Vec<HashDigest>,
642 pub total_len: usize,
643 pub chunk_len: usize,
644}
645
646#[derive(Debug, Clone, Serialize, Deserialize)]
647pub struct SerializableDataTable {
648 pub ipc_chunks: ChunkedBlob,
649 pub type_name: Option<String>,
650 pub schema_id: Option<u32>,
651}
652
653#[derive(Debug, Clone, Serialize, Deserialize)]
654pub struct SerializableDataFrame {
655 pub id: String,
656 pub timeframe: Timeframe,
657 pub timestamps: ChunkedBlob,
658 pub columns: Vec<SerializableDataFrameColumn>,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct SerializableDataFrameColumn {
663 pub name: String,
664 pub values: ChunkedBlob,
665}
666
667#[derive(Debug, Clone, Serialize, Deserialize)]
668pub struct CacheKeySnapshot {
669 pub id: String,
670 pub timeframe: Timeframe,
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct CachedDataSnapshot {
675 pub key: CacheKeySnapshot,
676 pub historical: SerializableDataFrame,
677 pub current_index: usize,
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
681pub struct LiveBufferSnapshot {
682 pub key: CacheKeySnapshot,
683 pub rows: ChunkedBlob,
684}
685
686#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct DataCacheSnapshot {
688 pub historical: Vec<CachedDataSnapshot>,
689 pub live_buffer: Vec<LiveBufferSnapshot>,
690}
691
692pub(crate) fn store_chunked_vec<T: Serialize>(
693 values: &[T],
694 chunk_len: usize,
695 store: &SnapshotStore,
696) -> Result<ChunkedBlob> {
697 let chunk_len = chunk_len.max(1);
698 if values.is_empty() {
699 return Ok(ChunkedBlob {
700 chunk_hashes: Vec::new(),
701 total_len: 0,
702 chunk_len,
703 });
704 }
705 let mut hashes = Vec::new();
706 for chunk in values.chunks(chunk_len) {
707 let bytes = bincode::serialize(chunk)?;
708 let hash = store.put_blob(&bytes)?;
709 hashes.push(hash);
710 }
711 Ok(ChunkedBlob {
712 chunk_hashes: hashes,
713 total_len: values.len(),
714 chunk_len,
715 })
716}
717
718pub(crate) fn load_chunked_vec<T: DeserializeOwned>(
719 chunked: &ChunkedBlob,
720 store: &SnapshotStore,
721) -> Result<Vec<T>> {
722 if chunked.total_len == 0 {
723 return Ok(Vec::new());
724 }
725 let mut out = Vec::with_capacity(chunked.total_len);
726 for hash in &chunked.chunk_hashes {
727 let bytes = store.get_blob(hash)?;
728 let chunk: Vec<T> = bincode::deserialize(&bytes)?;
729 out.extend(chunk);
730 }
731 out.truncate(chunked.total_len);
732 Ok(out)
733}
734
735pub fn store_chunked_bytes(data: &[u8], store: &SnapshotStore) -> Result<ChunkedBlob> {
737 if data.is_empty() {
738 return Ok(ChunkedBlob {
739 chunk_hashes: Vec::new(),
740 total_len: 0,
741 chunk_len: BYTE_CHUNK_LEN,
742 });
743 }
744 let mut hashes = Vec::new();
745 for chunk in data.chunks(BYTE_CHUNK_LEN) {
746 let hash = store.put_blob(chunk)?;
747 hashes.push(hash);
748 }
749 Ok(ChunkedBlob {
750 chunk_hashes: hashes,
751 total_len: data.len(),
752 chunk_len: BYTE_CHUNK_LEN,
753 })
754}
755
756pub fn load_chunked_bytes(chunked: &ChunkedBlob, store: &SnapshotStore) -> Result<Vec<u8>> {
758 if chunked.total_len == 0 {
759 return Ok(Vec::new());
760 }
761 let mut out = Vec::with_capacity(chunked.total_len);
762 for hash in &chunked.chunk_hashes {
763 let bytes = store.get_blob(hash)?;
764 out.extend_from_slice(&bytes);
765 }
766 out.truncate(chunked.total_len);
767 Ok(out)
768}
769
770fn bytes_as_slice<T: Copy>(bytes: &[u8]) -> &[T] {
775 let elem_size = std::mem::size_of::<T>();
776 assert!(
777 bytes.len() % elem_size == 0,
778 "byte slice length {} not a multiple of element size {}",
779 bytes.len(),
780 elem_size
781 );
782 let len = bytes.len() / elem_size;
783 unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const T, len) }
784}
785
786fn slice_as_bytes<T>(data: &[T]) -> &[u8] {
788 let byte_len = data.len() * std::mem::size_of::<T>();
789 unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, byte_len) }
790}
791
792use shape_value::{HeapKind, KindedSlot, NativeKind, ValueSlot};
829use std::sync::Arc;
830
831pub fn slot_to_serializable(
844 bits: u64,
845 kind: NativeKind,
846 _store: &SnapshotStore,
847) -> std::result::Result<SerializableVMValue, String> {
848 use SerializableVMValue as SV;
849 match kind {
850 NativeKind::Int64 => Ok(SV::Int(bits as i64)),
851 NativeKind::Int32 => Ok(SV::Int(bits as i32 as i64)),
852 NativeKind::Int16 => Ok(SV::Int(bits as i16 as i64)),
853 NativeKind::Int8 => Ok(SV::Int(bits as i8 as i64)),
854 NativeKind::UInt64 => Ok(SV::Int(bits as i64)),
855 NativeKind::UInt32 => Ok(SV::Int((bits as u32) as i64)),
856 NativeKind::UInt16 => Ok(SV::Int((bits as u16) as i64)),
857 NativeKind::UInt8 => Ok(SV::Int((bits as u8) as i64)),
858 NativeKind::IntSize => Ok(SV::Int(bits as isize as i64)),
859 NativeKind::UIntSize => Ok(SV::Int((bits as usize) as i64)),
860 NativeKind::Float64 => Ok(SV::Number(f64::from_bits(bits))),
861 NativeKind::Float32 => Ok(SV::Number(f64::from(f32::from_bits(bits as u32)))),
867 NativeKind::Char => match char::from_u32(bits as u32) {
868 Some(c) => Ok(SV::Char(c)),
869 None => Err(format!(
870 "slot_to_serializable: NativeKind::Char slot has invalid \
871 codepoint bits 0x{:x} — construction-side contract violated",
872 bits,
873 )),
874 },
875 NativeKind::Bool => Ok(SV::Bool(bits != 0)),
876 NativeKind::Null => Ok(SV::None),
883 NativeKind::NullableInt64
884 | NativeKind::NullableInt32
885 | NativeKind::NullableInt16
886 | NativeKind::NullableInt8
887 | NativeKind::NullableUInt64
888 | NativeKind::NullableUInt32
889 | NativeKind::NullableUInt16
890 | NativeKind::NullableUInt8
891 | NativeKind::NullableIntSize
892 | NativeKind::NullableUIntSize
893 | NativeKind::NullableFloat64 => {
894 Err(format!(
901 "slot_to_serializable: W17-snapshot-roundtrip surface — \
902 nullable-scalar kind {kind:?} has no SerializableVMValue \
903 arm at landing. The post-proof sentinel-rule amendment \
904 is the W17-snapshot-nullable follow-up. \
905 ADR-006 §2.7.5.1.",
906 ))
907 }
908 NativeKind::String => {
909 if bits == 0 {
914 return Err("slot_to_serializable: String slot with null bits".into());
915 }
916 unsafe {
917 let arc = Arc::<String>::from_raw(bits as *const String);
918 let cloned = (*arc).clone();
919 let _ = Arc::into_raw(arc); Ok(SV::String(cloned))
921 }
922 }
923 NativeKind::StringV2 => {
934 if bits == 0 {
935 return Err("slot_to_serializable: StringV2 slot with null bits".into());
936 }
937 let ptr = bits as *const shape_value::v2::string_obj::StringObj;
943 let s: &str = unsafe { shape_value::v2::string_obj::StringObj::as_str(ptr) };
944 Ok(SV::String(s.to_string()))
945 }
946 NativeKind::DecimalV2 => {
952 if bits == 0 {
953 return Err("slot_to_serializable: DecimalV2 slot with null bits".into());
954 }
955 let ptr = bits as *const shape_value::v2::decimal_obj::DecimalObj;
959 let value = unsafe { shape_value::v2::decimal_obj::DecimalObj::value(ptr) };
960 Ok(SV::Decimal(value))
961 }
962 NativeKind::Ptr(heap_kind) => slot_heap_to_serializable(bits, heap_kind),
963 }
964}
965
966fn slot_heap_to_serializable(
976 bits: u64,
977 expected_kind: HeapKind,
978) -> std::result::Result<SerializableVMValue, String> {
979 use SerializableVMValue as SV;
980 use shape_value::heap_value::{
981 AtomicData, ChannelData, DequeData, HashSetData, LazyData, MutexData,
982 OptionData, PriorityQueueData, ResultData,
983 };
984 if bits == 0 {
985 return Err(format!(
986 "slot_to_serializable: Ptr({expected_kind:?}) slot with null bits",
987 ));
988 }
989 match expected_kind {
990 HeapKind::String => {
991 unsafe {
994 let arc = Arc::<String>::from_raw(bits as *const String);
995 let cloned = (*arc).clone();
996 let _ = Arc::into_raw(arc);
997 Ok(SV::String(cloned))
998 }
999 }
1000 HeapKind::Decimal => unsafe {
1001 let arc = Arc::<rust_decimal::Decimal>::from_raw(bits as *const rust_decimal::Decimal);
1002 let v = *arc;
1003 let _ = Arc::into_raw(arc);
1004 Ok(SV::Decimal(v))
1005 },
1006 HeapKind::BigInt => unsafe {
1007 let arc = Arc::<i64>::from_raw(bits as *const i64);
1008 let v = *arc;
1009 let _ = Arc::into_raw(arc);
1010 Ok(SV::BigInt(v))
1011 },
1012 HeapKind::Char => {
1013 let cp = bits as u32;
1016 match char::from_u32(cp) {
1017 Some(c) => Ok(SV::Char(c)),
1018 None => Err(format!(
1019 "slot_to_serializable: Char arm: invalid codepoint {cp:#x}"
1020 )),
1021 }
1022 }
1023 HeapKind::HashSet => unsafe {
1024 let arc = Arc::<HashSetData>::from_raw(bits as *const HashSetData);
1025 let keys: Vec<String> = arc.keys.iter().map(|k| (**k).clone()).collect();
1026 let _ = Arc::into_raw(arc);
1027 Ok(SV::HashSet { keys })
1028 },
1029 HeapKind::PriorityQueue => unsafe {
1030 let arc = Arc::<PriorityQueueData>::from_raw(bits as *const PriorityQueueData);
1031 let heap: Vec<i64> = (*arc.heap).clone();
1032 let _ = Arc::into_raw(arc);
1033 Ok(SV::PriorityQueueHeap { heap })
1034 },
1035 HeapKind::Atomic => unsafe {
1036 let arc = Arc::<AtomicData>::from_raw(bits as *const AtomicData);
1037 let v = arc.load();
1038 let _ = Arc::into_raw(arc);
1039 Ok(SV::AtomicI64 { value: v })
1040 },
1041 HeapKind::Lazy => unsafe {
1042 let arc = Arc::<LazyData>::from_raw(bits as *const LazyData);
1043 let is_init = arc.is_initialized();
1044 let _ = Arc::into_raw(arc);
1045 Ok(SV::LazyOpaque {
1046 is_initialized: is_init,
1047 })
1048 },
1049 HeapKind::Mutex => unsafe {
1050 let arc = Arc::<MutexData>::from_raw(bits as *const MutexData);
1051 let inner = arc.get();
1056 let has_value =
1057 !(matches!(inner.kind(), NativeKind::Bool) && inner.slot().raw() == 0);
1058 drop(inner);
1059 let _ = Arc::into_raw(arc);
1060 Ok(SV::MutexOpaque { has_value })
1061 },
1062 HeapKind::Channel => unsafe {
1063 let arc = Arc::<ChannelData>::from_raw(bits as *const ChannelData);
1064 let closed = arc.is_closed();
1065 let len = arc.len();
1066 let _ = Arc::into_raw(arc);
1067 Ok(SV::ChannelOpaque { closed, len })
1068 },
1069 HeapKind::Deque => unsafe {
1070 let arc = Arc::<DequeData>::from_raw(bits as *const DequeData);
1071 let len = arc.items.len();
1072 let _ = Arc::into_raw(arc);
1073 Ok(SV::DequeOpaque { len })
1074 },
1075 HeapKind::Result => unsafe {
1076 let arc = Arc::<ResultData>::from_raw(bits as *const ResultData);
1077 let is_ok = arc.is_ok;
1078 let payload_kind = arc.payload.kind();
1079 let payload_bits = arc.payload.slot().raw();
1080 let inner = serializable_inner_kinded(payload_bits, payload_kind)?;
1081 let _ = Arc::into_raw(arc);
1082 Ok(SV::ResultData {
1083 is_ok,
1084 payload: Box::new(inner),
1085 })
1086 },
1087 HeapKind::Option => unsafe {
1088 let arc = Arc::<OptionData>::from_raw(bits as *const OptionData);
1089 let is_some = arc.is_some;
1090 let payload = if is_some {
1091 let payload_kind = arc.payload.kind();
1092 let payload_bits = arc.payload.slot().raw();
1093 Some(Box::new(serializable_inner_kinded(payload_bits, payload_kind)?))
1094 } else {
1095 None
1096 };
1097 let _ = Arc::into_raw(arc);
1098 Ok(SV::OptionData { is_some, payload })
1099 },
1100 HeapKind::Reference => Ok(SV::ReferenceOpaque),
1105 HeapKind::FilterExpr => Ok(SV::FilterExprOpaque),
1106 HeapKind::SharedCell => Ok(SV::SharedCellOpaque),
1107 HeapKind::Iterator => Ok(SV::IteratorOpaque),
1108 HeapKind::Future => Ok(SV::Future(bits)),
1110
1111 other => Err(format!(
1121 "slot_to_serializable: W17-snapshot-roundtrip surface — \
1122 HeapKind::{other:?} arm has no in-session SerializableVMValue \
1123 projection. Tracked as W17-snapshot-{other:?} follow-up per \
1124 docs/cluster-audits/phase-2d-playbook.md §3. \
1125 ADR-006 §2.7.5.1.",
1126 )),
1127 }
1128}
1129
1130fn serializable_inner_kinded(
1135 bits: u64,
1136 kind: NativeKind,
1137) -> std::result::Result<SerializableVMValue, String> {
1138 if matches!(kind, NativeKind::Bool) && bits == 0 {
1139 return Ok(SerializableVMValue::Unit);
1140 }
1141 match kind {
1142 NativeKind::Int64 => Ok(SerializableVMValue::Int(bits as i64)),
1143 NativeKind::Float64 => Ok(SerializableVMValue::Number(f64::from_bits(bits))),
1144 NativeKind::Bool => Ok(SerializableVMValue::Bool(bits != 0)),
1145 NativeKind::String => {
1146 if bits == 0 {
1147 return Ok(SerializableVMValue::None);
1148 }
1149 unsafe {
1150 let arc = Arc::<String>::from_raw(bits as *const String);
1151 let cloned = (*arc).clone();
1152 let _ = Arc::into_raw(arc);
1153 Ok(SerializableVMValue::String(cloned))
1154 }
1155 }
1156 _ => Err(format!(
1157 "serializable_inner_kinded: W17-snapshot-roundtrip surface — \
1158 inner Result/Option payload kind {kind:?} is not in the \
1159 initial scalar set; deep payload arms land in follow-up. \
1160 ADR-006 §2.7.5.1.",
1161 )),
1162 }
1163}
1164
1165pub fn serializable_to_slot(
1175 sv: &SerializableVMValue,
1176 expected_kind: NativeKind,
1177 _store: &SnapshotStore,
1178) -> std::result::Result<(u64, NativeKind), String> {
1179 use SerializableVMValue as SV;
1180 match (sv, expected_kind) {
1183 (SV::Int(i), NativeKind::Int64) => Ok((*i as u64, NativeKind::Int64)),
1184 (SV::Int(i), NativeKind::Int32) => Ok(((*i as i32) as u64, NativeKind::Int32)),
1185 (SV::Int(i), NativeKind::Int16) => Ok(((*i as i16 as i32) as u64, NativeKind::Int16)),
1186 (SV::Int(i), NativeKind::Int8) => Ok(((*i as i8 as i32) as u64, NativeKind::Int8)),
1187 (SV::Int(i), NativeKind::UInt64) => Ok((*i as u64, NativeKind::UInt64)),
1188 (SV::Int(i), NativeKind::UInt32) => Ok(((*i as u32) as u64, NativeKind::UInt32)),
1189 (SV::Int(i), NativeKind::UInt16) => Ok(((*i as u16) as u64, NativeKind::UInt16)),
1190 (SV::Int(i), NativeKind::UInt8) => Ok(((*i as u8) as u64, NativeKind::UInt8)),
1191 (SV::Int(i), NativeKind::IntSize) => Ok((*i as isize as u64, NativeKind::IntSize)),
1192 (SV::Int(i), NativeKind::UIntSize) => Ok((*i as u64, NativeKind::UIntSize)),
1193 (SV::Number(f), NativeKind::Float64) => Ok((f.to_bits(), NativeKind::Float64)),
1194 (SV::Bool(b), NativeKind::Bool) => Ok((if *b { 1 } else { 0 }, NativeKind::Bool)),
1195 (SV::String(s), NativeKind::String) => {
1196 let arc = Arc::new(s.clone());
1197 let raw = Arc::into_raw(arc) as u64;
1198 Ok((raw, NativeKind::String))
1199 }
1200 (SV::None | SV::Unit, NativeKind::Bool) => Ok((0, NativeKind::Bool)),
1201
1202 (sv, NativeKind::Ptr(hk)) => serializable_to_heap_slot(sv, hk),
1206
1207 (other_sv, other_kind) => Err(format!(
1209 "serializable_to_slot: W17-snapshot-roundtrip surface — \
1210 SerializableVMValue arm {} cannot satisfy expected kind \
1211 {other_kind:?}. Discriminator-vs-kind mismatch is a structured \
1212 error, not a Bool-default fallback (§2.7.5.1 forbidden). \
1213 ADR-006 §2.7.5.1.",
1214 serializable_arm_name(other_sv),
1215 )),
1216 }
1217}
1218
1219fn serializable_to_heap_slot(
1224 sv: &SerializableVMValue,
1225 heap_kind: HeapKind,
1226) -> std::result::Result<(u64, NativeKind), String> {
1227 use SerializableVMValue as SV;
1228 use shape_value::heap_value::{
1229 AtomicData, HashSetData, OptionData, PriorityQueueData, ResultData,
1230 };
1231 match (sv, heap_kind) {
1232 (SV::String(s), HeapKind::String) => {
1233 let arc = Arc::new(s.clone());
1236 let raw = Arc::into_raw(arc) as u64;
1237 Ok((raw, NativeKind::Ptr(HeapKind::String)))
1238 }
1239 (SV::Char(c), HeapKind::Char) => {
1240 let bits = (*c as u32) as u64;
1244 Ok((bits, NativeKind::Ptr(HeapKind::Char)))
1245 }
1246 (SV::BigInt(n), HeapKind::BigInt) => {
1247 let arc = Arc::new(*n);
1248 let raw = Arc::into_raw(arc) as u64;
1249 Ok((raw, NativeKind::Ptr(HeapKind::BigInt)))
1250 }
1251 (SV::Decimal(d), HeapKind::Decimal) => {
1252 let arc = Arc::new(*d);
1253 let raw = Arc::into_raw(arc) as u64;
1254 Ok((raw, NativeKind::Ptr(HeapKind::Decimal)))
1255 }
1256 (SV::HashSet { keys }, HeapKind::HashSet) => {
1257 let arcs: Vec<Arc<String>> = keys.iter().map(|k| Arc::new(k.clone())).collect();
1258 let data = HashSetData::from_keys(arcs);
1259 let arc = Arc::new(data);
1260 let raw = Arc::into_raw(arc) as u64;
1261 Ok((raw, NativeKind::Ptr(HeapKind::HashSet)))
1262 }
1263 (SV::PriorityQueueHeap { heap }, HeapKind::PriorityQueue) => {
1264 let mut pq = PriorityQueueData::new();
1265 for &v in heap {
1270 pq.push(v);
1271 }
1272 let arc = Arc::new(pq);
1273 let raw = Arc::into_raw(arc) as u64;
1274 Ok((raw, NativeKind::Ptr(HeapKind::PriorityQueue)))
1275 }
1276 (SV::AtomicI64 { value }, HeapKind::Atomic) => {
1277 let arc = Arc::new(AtomicData::new(*value));
1278 let raw = Arc::into_raw(arc) as u64;
1279 Ok((raw, NativeKind::Ptr(HeapKind::Atomic)))
1280 }
1281 (SV::ResultData { is_ok, payload }, HeapKind::Result) => {
1282 let inner_slot = inner_kinded_from_serializable(payload)?;
1289 let data = if *is_ok {
1290 ResultData::ok(inner_slot)
1291 } else {
1292 ResultData::err(inner_slot)
1293 };
1294 let arc = Arc::new(data);
1295 let raw = Arc::into_raw(arc) as u64;
1296 Ok((raw, NativeKind::Ptr(HeapKind::Result)))
1297 }
1298 (SV::OptionData { is_some, payload }, HeapKind::Option) => {
1299 let data = if *is_some {
1300 match payload {
1301 Some(p) => OptionData::some(inner_kinded_from_serializable(p)?),
1302 None => return Err(
1303 "serializable_to_slot: OptionData is_some=true but payload=None — \
1304 malformed wire shape; expected Some(SerializableVMValue) for \
1305 is_some=true. ADR-006 §2.7.5.1."
1306 .to_string(),
1307 ),
1308 }
1309 } else {
1310 OptionData::none()
1311 };
1312 let arc = Arc::new(data);
1313 let raw = Arc::into_raw(arc) as u64;
1314 Ok((raw, NativeKind::Ptr(HeapKind::Option)))
1315 }
1316
1317 (SV::IteratorOpaque, HeapKind::Iterator)
1323 | (SV::DequeOpaque { .. }, HeapKind::Deque)
1324 | (SV::ChannelOpaque { .. }, HeapKind::Channel)
1325 | (SV::ReferenceOpaque, HeapKind::Reference)
1326 | (SV::FilterExprOpaque, HeapKind::FilterExpr)
1327 | (SV::SharedCellOpaque, HeapKind::SharedCell)
1328 | (SV::MutexOpaque { .. }, HeapKind::Mutex)
1329 | (SV::LazyOpaque { .. }, HeapKind::Lazy) => Err(format!(
1330 "serializable_to_slot: W17-snapshot-roundtrip surface — \
1331 {heap_kind:?} arm restored from opaque wire shape; \
1332 deep payload reconstruction is the W17-snapshot-{:?} \
1333 follow-up. ADR-006 §2.7.5.1.",
1334 heap_kind,
1335 )),
1336
1337 (other_sv, hk) => Err(format!(
1340 "serializable_to_slot: W17-snapshot-roundtrip surface — \
1341 SerializableVMValue arm {} cannot satisfy expected heap kind \
1342 Ptr({hk:?}). Either the wire-format arm has no inverse \
1343 projection (deep follow-up) or the discriminator is \
1344 mismatched. ADR-006 §2.7.5.1.",
1345 serializable_arm_name(other_sv),
1346 )),
1347 }
1348}
1349
1350fn inner_kinded_from_serializable(
1354 sv: &SerializableVMValue,
1355) -> std::result::Result<KindedSlot, String> {
1356 use SerializableVMValue as SV;
1357 match sv {
1358 SV::Int(i) => Ok(KindedSlot::new(
1359 ValueSlot::from_raw(*i as u64),
1360 NativeKind::Int64,
1361 )),
1362 SV::Number(f) => Ok(KindedSlot::new(
1363 ValueSlot::from_raw(f.to_bits()),
1364 NativeKind::Float64,
1365 )),
1366 SV::Bool(b) => Ok(KindedSlot::new(
1367 ValueSlot::from_raw(if *b { 1 } else { 0 }),
1368 NativeKind::Bool,
1369 )),
1370 SV::String(s) => Ok(KindedSlot::from_string_arc(Arc::new(s.clone()))),
1371 SV::Unit | SV::None => Ok(KindedSlot::new(
1372 ValueSlot::from_raw(0),
1373 NativeKind::Bool,
1374 )),
1375 other => Err(format!(
1376 "inner_kinded_from_serializable: W17-snapshot-roundtrip surface — \
1377 SerializableVMValue arm {} has no in-session inner-payload \
1378 projection. Tracked as follow-up. ADR-006 §2.7.5.1.",
1379 serializable_arm_name(other),
1380 )),
1381 }
1382}
1383
1384fn serializable_arm_name(sv: &SerializableVMValue) -> &'static str {
1386 use SerializableVMValue as SV;
1387 match sv {
1388 SV::Int(_) => "Int",
1389 SV::Number(_) => "Number",
1390 SV::Decimal(_) => "Decimal",
1391 SV::String(_) => "String",
1392 SV::Bool(_) => "Bool",
1393 SV::None => "None",
1394 SV::Some(_) => "Some",
1395 SV::Unit => "Unit",
1396 SV::Timeframe(_) => "Timeframe",
1397 SV::Duration(_) => "Duration",
1398 SV::Time(_) => "Time",
1399 SV::TimeSpan(_) => "TimeSpan",
1400 SV::TimeReference(_) => "TimeReference",
1401 SV::DateTimeExpr(_) => "DateTimeExpr",
1402 SV::DataDateTimeRef(_) => "DataDateTimeRef",
1403 SV::Array(_) => "Array",
1404 SV::Function(_) => "Function",
1405 SV::TypeAnnotation(_) => "TypeAnnotation",
1406 SV::TypeAnnotatedValue { .. } => "TypeAnnotatedValue",
1407 SV::Enum(_) => "Enum",
1408 SV::Closure { .. } => "Closure",
1409 SV::ModuleFunction(_) => "ModuleFunction",
1410 SV::TypedObject { .. } => "TypedObject",
1411 SV::Range { .. } => "Range",
1412 SV::Ok(_) => "Ok",
1413 SV::Err(_) => "Err",
1414 SV::PrintResult(_) => "PrintResult",
1415 SV::SimulationCall { .. } => "SimulationCall",
1416 SV::FunctionRef { .. } => "FunctionRef",
1417 SV::DataReference { .. } => "DataReference",
1418 SV::Future(_) => "Future",
1419 SV::DataTable(_) => "DataTable",
1420 SV::TypedTable { .. } => "TypedTable",
1421 SV::RowView { .. } => "RowView",
1422 SV::ColumnRef { .. } => "ColumnRef",
1423 SV::IndexedTable { .. } => "IndexedTable",
1424 SV::TypedArray { .. } => "TypedArray",
1425 SV::Matrix { .. } => "Matrix",
1426 SV::HashMap { .. } => "HashMap",
1427 SV::SidecarRef { .. } => "SidecarRef",
1428 SV::HashSet { .. } => "HashSet",
1429 SV::IteratorOpaque => "IteratorOpaque",
1430 SV::ResultData { .. } => "ResultData",
1431 SV::OptionData { .. } => "OptionData",
1432 SV::DequeOpaque { .. } => "DequeOpaque",
1433 SV::ChannelOpaque { .. } => "ChannelOpaque",
1434 SV::PriorityQueueHeap { .. } => "PriorityQueueHeap",
1435 SV::ReferenceOpaque => "ReferenceOpaque",
1436 SV::FilterExprOpaque => "FilterExprOpaque",
1437 SV::SharedCellOpaque => "SharedCellOpaque",
1438 SV::MutexOpaque { .. } => "MutexOpaque",
1439 SV::AtomicI64 { .. } => "AtomicI64",
1440 SV::LazyOpaque { .. } => "LazyOpaque",
1441 SV::Char(_) => "Char",
1442 SV::BigInt(_) => "BigInt",
1443 }
1444}
1445
1446fn serialize_datatable(dt: &DataTable, store: &SnapshotStore) -> Result<SerializableDataTable> {
1447 let mut buf = Vec::new();
1448 let schema = dt.inner().schema();
1449 let mut writer = arrow_ipc::writer::FileWriter::try_new(&mut buf, schema.as_ref())?;
1450 writer.write(dt.inner())?;
1451 writer.finish()?;
1452 let ipc_chunks = store_chunked_vec(&buf, BYTE_CHUNK_LEN, store)?;
1453 Ok(SerializableDataTable {
1454 ipc_chunks,
1455 type_name: dt.type_name().map(|s| s.to_string()),
1456 schema_id: dt.schema_id(),
1457 })
1458}
1459
1460fn deserialize_datatable(
1461 serialized: SerializableDataTable,
1462 store: &SnapshotStore,
1463) -> Result<DataTable> {
1464 let bytes = load_chunked_vec(&serialized.ipc_chunks, store)?;
1465 let cursor = std::io::Cursor::new(bytes);
1466 let mut reader = arrow_ipc::reader::FileReader::try_new(cursor, None)?;
1467 let batch = reader
1468 .next()
1469 .transpose()?
1470 .context("no RecordBatch in DataTable snapshot")?;
1471 let mut dt = DataTable::new(batch);
1472 if let Some(name) = serialized.type_name {
1473 dt = DataTable::with_type_name(dt.into_inner(), name);
1474 }
1475 if let Some(schema_id) = serialized.schema_id {
1476 dt = dt.with_schema_id(schema_id);
1477 }
1478 Ok(dt)
1479}
1480