Skip to main content

shape_runtime/
snapshot.rs

1//! Snapshotting and resumability support
2//!
3//! Provides binary, diff-friendly snapshots via a content-addressed store.
4
5use std::collections::{HashMap, HashSet};
6use std::fs;
7use std::io::{Read, Write};
8use std::path::PathBuf;
9use anyhow::{Context, Result};
10use serde::de::DeserializeOwned;
11use serde::{Deserialize, Serialize};
12// EnumPayload, EnumValue, PrintResult, PrintSpan, Upvalue, ValueWord,
13// ValueWordExt imports removed alongside the deleted value-(de)serialization
14// functions (Phase 2b). The strict-typed slot-serialization API will land
15// in a follow-up commit using (bits, NativeKind) pairs threaded from the
16// FunctionBlob's per-slot kind metadata.
17
18use crate::event_queue::WaitCondition;
19use crate::hashing::{HashDigest, hash_bytes};
20use shape_ast::ast::{DataDateTimeRef, DateTimeExpr, EnumDef, TimeReference, TypeAnnotation};
21use shape_ast::data::Timeframe;
22
23use shape_value::datatable::DataTable;
24
25/// Schema version for the snapshot binary format.
26///
27/// This version is embedded in every [`ExecutionSnapshot`] via the `version`
28/// field. Readers should check this value to determine whether they can
29/// decode a snapshot or need migration logic.
30///
31/// Version history:
32/// - v5 (current): ValueWord-native serialization — `nanboxed_to_serializable`
33///   and `serializable_to_nanboxed` operate on ValueWord directly without
34///   intermediate ValueWord conversion. Format is wire-compatible with v4
35///   (same `SerializableVMValue` enum), so v4 snapshots deserialize
36///   correctly without migration.
37pub const SNAPSHOT_VERSION: u32 = 5;
38
39pub(crate) const DEFAULT_CHUNK_LEN: usize = 4096;
40pub(crate) const BYTE_CHUNK_LEN: usize = 256 * 1024;
41
42/// Content-addressed snapshot store
43#[derive(Clone)]
44pub struct SnapshotStore {
45    root: PathBuf,
46}
47
48impl SnapshotStore {
49    pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
50        let root = root.into();
51        fs::create_dir_all(root.join("blobs"))
52            .with_context(|| format!("failed to create snapshot blob dir at {}", root.display()))?;
53        fs::create_dir_all(root.join("snapshots"))
54            .with_context(|| format!("failed to create snapshot dir at {}", root.display()))?;
55        Ok(Self { root })
56    }
57
58    fn blob_path(&self, hash: &HashDigest) -> PathBuf {
59        self.root
60            .join("blobs")
61            .join(format!("{}.bin.zst", hash.hex()))
62    }
63
64    fn snapshot_path(&self, hash: &HashDigest) -> PathBuf {
65        self.root
66            .join("snapshots")
67            .join(format!("{}.bin.zst", hash.hex()))
68    }
69
70    pub fn put_blob(&self, data: &[u8]) -> Result<HashDigest> {
71        let hash = hash_bytes(data);
72        let path = self.blob_path(&hash);
73        if path.exists() {
74            return Ok(hash);
75        }
76        let compressed = zstd::stream::encode_all(data, 0)?;
77        let mut file = fs::File::create(&path)?;
78        file.write_all(&compressed)?;
79        Ok(hash)
80    }
81
82    pub fn get_blob(&self, hash: &HashDigest) -> Result<Vec<u8>> {
83        let path = self.blob_path(hash);
84        let mut file = fs::File::open(&path)
85            .with_context(|| format!("snapshot blob not found: {}", path.display()))?;
86        let mut buf = Vec::new();
87        file.read_to_end(&mut buf)?;
88        let decompressed = zstd::stream::decode_all(&buf[..])?;
89        Ok(decompressed)
90    }
91
92    pub fn put_struct<T: Serialize>(&self, value: &T) -> Result<HashDigest> {
93        let bytes = bincode::serialize(value)?;
94        self.put_blob(&bytes)
95    }
96
97    pub fn get_struct<T: for<'de> Deserialize<'de>>(&self, hash: &HashDigest) -> Result<T> {
98        let bytes = self.get_blob(hash)?;
99        Ok(bincode::deserialize(&bytes)?)
100    }
101
102    pub fn put_snapshot(&self, snapshot: &ExecutionSnapshot) -> Result<HashDigest> {
103        let bytes = bincode::serialize(snapshot)?;
104        let hash = hash_bytes(&bytes);
105        let path = self.snapshot_path(&hash);
106        if !path.exists() {
107            let compressed = zstd::stream::encode_all(&bytes[..], 0)?;
108            let mut file = fs::File::create(&path)?;
109            file.write_all(&compressed)?;
110        }
111        Ok(hash)
112    }
113
114    pub fn get_snapshot(&self, hash: &HashDigest) -> Result<ExecutionSnapshot> {
115        let path = self.snapshot_path(hash);
116        let mut file = fs::File::open(&path)
117            .with_context(|| format!("snapshot not found: {}", path.display()))?;
118        let mut buf = Vec::new();
119        file.read_to_end(&mut buf)?;
120        let decompressed = zstd::stream::decode_all(&buf[..])?;
121        Ok(bincode::deserialize(&decompressed)?)
122    }
123
124    /// List all snapshots in the store, returning (hash, snapshot) pairs.
125    ///
126    /// **Note:** This method eagerly loads and deserializes every snapshot in the
127    /// store directory into memory. For stores with many snapshots this may
128    /// become a bottleneck. A future improvement could return a lazy iterator
129    /// that streams snapshot metadata (hash + `created_at_ms`) without
130    /// deserializing full payloads until requested — e.g. via a
131    /// `SnapshotEntry { hash, created_at_ms }` header read, deferring full
132    /// `ExecutionSnapshot` deserialization to an explicit `.load()` call.
133    pub fn list_snapshots(&self) -> Result<Vec<(HashDigest, ExecutionSnapshot)>> {
134        let snapshots_dir = self.root.join("snapshots");
135        if !snapshots_dir.exists() {
136            return Ok(Vec::new());
137        }
138        let mut results = Vec::new();
139        for entry in fs::read_dir(&snapshots_dir)? {
140            let entry = entry?;
141            let path = entry.path();
142            if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
143                // Files are named "<hex>.bin.zst"
144                if let Some(hex) = name.strip_suffix(".bin.zst") {
145                    let hash = HashDigest::from_hex(hex);
146                    match self.get_snapshot(&hash) {
147                        Ok(snap) => results.push((hash, snap)),
148                        Err(_) => continue, // skip corrupt entries
149                    }
150                }
151            }
152        }
153        // Sort by creation time, newest first
154        results.sort_by(|a, b| b.1.created_at_ms.cmp(&a.1.created_at_ms));
155        Ok(results)
156    }
157
158    /// Delete a snapshot file by hash.
159    pub fn delete_snapshot(&self, hash: &HashDigest) -> Result<()> {
160        let path = self.snapshot_path(hash);
161        fs::remove_file(&path)
162            .with_context(|| format!("failed to delete snapshot: {}", path.display()))?;
163        Ok(())
164    }
165}
166
167/// A serializable snapshot of a Shape program's execution state.
168///
169/// The `version` field records which [`SNAPSHOT_VERSION`] was used to
170/// produce this snapshot. Readers must check this value before
171/// deserializing the referenced sub-snapshots (semantic, context, VM)
172/// to ensure binary compatibility or apply migration logic.
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct ExecutionSnapshot {
175    /// Schema version — should equal [`SNAPSHOT_VERSION`] at write time.
176    /// Used by readers to detect format changes and apply migrations.
177    pub version: u32,
178    pub created_at_ms: i64,
179    pub semantic_hash: HashDigest,
180    pub context_hash: HashDigest,
181    pub vm_hash: Option<HashDigest>,
182    pub bytecode_hash: Option<HashDigest>,
183    /// Path of the script that was executing when the snapshot was taken
184    #[serde(default)]
185    pub script_path: Option<String>,
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct SemanticSnapshot {
190    pub exported_symbols: HashSet<String>,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct ContextSnapshot {
195    pub data_load_mode: crate::context::DataLoadMode,
196    pub data_cache: Option<DataCacheSnapshot>,
197    pub current_id: Option<String>,
198    pub current_row_index: usize,
199    pub variable_scopes: Vec<HashMap<String, VariableSnapshot>>,
200    pub reference_datetime: Option<chrono::DateTime<chrono::Utc>>,
201    pub current_timeframe: Option<Timeframe>,
202    pub base_timeframe: Option<Timeframe>,
203    pub date_range: Option<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>,
204    pub range_start: usize,
205    pub range_end: usize,
206    pub range_active: bool,
207    pub type_alias_registry: HashMap<String, TypeAliasRuntimeEntrySnapshot>,
208    pub enum_registry: HashMap<String, EnumDef>,
209    #[serde(default)]
210    pub struct_type_registry: HashMap<String, shape_ast::ast::StructTypeDef>,
211    pub suspension_state: Option<SuspensionStateSnapshot>,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct VariableSnapshot {
216    pub value: SerializableVMValue,
217    pub kind: shape_ast::ast::VarKind,
218    pub is_initialized: bool,
219    pub is_function_scoped: bool,
220    pub format_hint: Option<String>,
221    pub format_overrides: Option<HashMap<String, SerializableVMValue>>,
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct TypeAliasRuntimeEntrySnapshot {
226    pub base_type: String,
227    pub overrides: Option<HashMap<String, SerializableVMValue>>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct SuspensionStateSnapshot {
232    pub waiting_for: WaitCondition,
233    pub resume_pc: usize,
234    pub saved_locals: Vec<SerializableVMValue>,
235    pub saved_stack: Vec<SerializableVMValue>,
236}
237
238#[derive(Debug, Clone, Serialize, Deserialize)]
239pub struct VmSnapshot {
240    pub ip: usize,
241    pub stack: Vec<SerializableVMValue>,
242    pub locals: Vec<SerializableVMValue>,
243    pub module_bindings: Vec<SerializableVMValue>,
244    pub call_stack: Vec<SerializableCallFrame>,
245    pub loop_stack: Vec<SerializableLoopContext>,
246    pub timeframe_stack: Vec<Option<Timeframe>>,
247    pub exception_handlers: Vec<SerializableExceptionHandler>,
248    /// Content hash of the function blob that the top-level IP belongs to.
249    /// Used for relocating the IP after recompilation.
250    #[serde(default)]
251    pub ip_blob_hash: Option<[u8; 32]>,
252    /// Instruction offset within the function blob for the top-level IP.
253    /// Computed as `ip - function_entry_point` when saving; reconstructed
254    /// to absolute IP on restore. Only meaningful when `ip_blob_hash` is `Some`.
255    #[serde(default)]
256    pub ip_local_offset: Option<usize>,
257    /// Function ID that the top-level IP belongs to.
258    /// Used as a fallback when `ip_blob_hash` is not available.
259    #[serde(default)]
260    pub ip_function_id: Option<u16>,
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct SerializableCallFrame {
265    pub return_ip: usize,
266    pub locals_base: usize,
267    pub locals_count: usize,
268    pub function_id: Option<u16>,
269    pub upvalues: Option<Vec<SerializableVMValue>>,
270    /// Content hash of the function blob (for content-addressed state capture).
271    /// When present, `local_ip` stores the instruction offset relative to the
272    /// function's entry point rather than an absolute IP.
273    #[serde(default)]
274    pub blob_hash: Option<[u8; 32]>,
275    /// Instruction offset within the function blob.
276    /// Computed as `ip - function_entry_point` when saving; reconstructed to
277    /// absolute IP on restore. Only meaningful when `blob_hash` is `Some`.
278    #[serde(default)]
279    pub local_ip: Option<usize>,
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SerializableLoopContext {
284    pub start: usize,
285    pub end: usize,
286}
287
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct SerializableExceptionHandler {
290    pub catch_ip: usize,
291    pub stack_size: usize,
292    pub call_depth: usize,
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub enum SerializableVMValue {
297    Int(i64),
298    Number(f64),
299    Decimal(rust_decimal::Decimal),
300    String(String),
301    Bool(bool),
302    None,
303    Some(Box<SerializableVMValue>),
304    Unit,
305    Timeframe(Timeframe),
306    Duration(shape_ast::ast::Duration),
307    Time(chrono::DateTime<chrono::FixedOffset>),
308    TimeSpan(i64), // millis
309    TimeReference(TimeReference),
310    DateTimeExpr(DateTimeExpr),
311    DataDateTimeRef(DataDateTimeRef),
312    Array(Vec<SerializableVMValue>),
313    Function(u16),
314    TypeAnnotation(TypeAnnotation),
315    TypeAnnotatedValue {
316        type_name: String,
317        value: Box<SerializableVMValue>,
318    },
319    Enum(EnumValueSnapshot),
320    /// A closure value carrying the function body id, its capture signature
321    /// id, and the raw capture payloads.
322    ///
323    /// Track A.2A: `function_id` is widened to `u32` (from `u16`) to match
324    /// the raw `TypedClosureHeader` field width, and a new `type_id: u32`
325    /// carries the `ClosureTypeId` needed to re-resolve the
326    /// `ClosureLayout` side-table on the receiver. Deserialization hard-
327    /// errors when no layout is available — no legacy
328    /// `HeapValue::Closure` fallback exists per the v2 closure closeout
329    /// directive.
330    Closure {
331        function_id: u32,
332        type_id: u32,
333        upvalues: Vec<SerializableVMValue>,
334    },
335    ModuleFunction(String),
336    // ADR-005 §Forbidden / Q10 forward pointer: snapshot must NOT
337    // re-introduce Box<HeapValue> slot wrapping. The current schema-+-slot
338    // serialization layout aligns with ADR-005 §3 (typed slot bits + the
339    // schema; deserialization reconstructs the typed pointer; no
340    // intermediate HeapValue materialization). Audit of this path for full
341    // ADR-005 conformance is queued for a future cluster (cluster #1
342    // audit Q10). See docs/adr/005-typed-slot-construction.md.
343    TypedObject {
344        schema_id: u64,
345        /// Serialized slots: each slot is 8 bytes (raw bits for simple, serialized heap values for heap slots)
346        slot_data: Vec<SerializableVMValue>,
347        heap_mask: u64,
348    },
349    Range {
350        start: Option<Box<SerializableVMValue>>,
351        end: Option<Box<SerializableVMValue>>,
352        inclusive: bool,
353    },
354    Ok(Box<SerializableVMValue>),
355    Err(Box<SerializableVMValue>),
356    PrintResult(PrintableSnapshot),
357    SimulationCall {
358        name: String,
359        params: HashMap<String, SerializableVMValue>,
360    },
361    FunctionRef {
362        name: String,
363        closure: Option<Box<SerializableVMValue>>,
364    },
365    DataReference {
366        datetime: chrono::DateTime<chrono::FixedOffset>,
367        id: String,
368        timeframe: Timeframe,
369    },
370    Future(u64),
371    DataTable(BlobRef),
372    TypedTable {
373        schema_id: u64,
374        table: BlobRef,
375    },
376    RowView {
377        schema_id: u64,
378        table: BlobRef,
379        row_idx: usize,
380    },
381    ColumnRef {
382        schema_id: u64,
383        table: BlobRef,
384        col_id: u32,
385    },
386    IndexedTable {
387        schema_id: u64,
388        table: BlobRef,
389        index_col: u32,
390    },
391    /// Binary-serialized typed array (raw bytes via BlobRef).
392    TypedArray {
393        element_kind: TypedArrayElementKind,
394        blob: BlobRef,
395        len: usize,
396    },
397    /// Binary-serialized matrix (raw f64 bytes, row-major).
398    Matrix {
399        blob: BlobRef,
400        rows: u32,
401        cols: u32,
402    },
403    /// Dedicated HashMap variant preserving type identity.
404    HashMap {
405        keys: Vec<SerializableVMValue>,
406        values: Vec<SerializableVMValue>,
407    },
408    /// Placeholder for sidecar-split large blobs (Phase 3B).
409    /// Metadata fields preserve TypedArray len and Matrix rows/cols
410    /// so reassembly can reconstruct the exact original variant.
411    SidecarRef {
412        sidecar_id: u32,
413        blob_kind: BlobKind,
414        original_hash: HashDigest,
415        /// For TypedArray: element count. For Matrix: row count. Otherwise 0.
416        meta_a: u32,
417        /// For Matrix: column count. Otherwise 0.
418        meta_b: u32,
419    },
420
421    // ── W17-snapshot-roundtrip extension (ADR-006 §2.7.5.1, 2026-05-11) ──
422    //
423    // Wire-format arms for the post-W14/W15/W16/Wave-2.5 HeapKinds that
424    // had no `SerializableVMValue` arm pre-W17-snapshot-roundtrip. Each
425    // arm pairs 1:1 with a `HeapKind` ordinal and is post-proof per
426    // §2.7.5.1: the discriminator (variant tag) carries the kind, the
427    // payload carries the per-kind serialized data. Adding a new
428    // `HeapKind` variant requires extending this enum in lockstep.
429    //
430    // Arm-by-arm coverage policy (per §2.7.5.1):
431    //
432    // - **Full payload round-trip** when the inner state is trivially
433    //   serializable (HashSet keys are Arc<String>; PriorityQueue heap
434    //   is Vec<i64>; Atomic value is i64; Char/BigInt are scalar).
435    // - **Opaque-stub round-trip** when the inner state carries cross-
436    //   value references that would re-introduce the deleted
437    //   Arc<HeapValue> generic serializer shape (Iterator carries a
438    //   closure-self share; Channel/Deque queues carry KindedSlot
439    //   payloads of arbitrary kinds; FilterExpr is an AST tree of
440    //   query nodes; Reference points into another heap object;
441    //   SharedCell is binding-storage with parallel-kind track; Mutex
442    //   and Lazy each carry a nested KindedSlot payload). The opaque
443    //   stub carries the kind discriminator plus a per-arm descriptor
444    //   string and surfaces a structured runtime error on resume —
445    //   no silent corruption (the §2.7.4 invariant).
446    //
447    // Adding deep payload serialization for the opaque-stub arms lands
448    // in follow-up sub-clusters per CLAUDE.md "Forbidden rationalizations"
449    // (no `Arc<HeapValue>` generic serializer, no Bool-default
450    // fallback — surface-and-stop is the right shape for the deep arms).
451
452    /// `HeapKind::HashSet` — string-keyed insertion-ordered set (Wave 13).
453    /// Round-trips the key array verbatim (per ADR-006 §2.7.15 string-
454    /// only keyspace).
455    HashSet { keys: Vec<String> },
456
457    /// `HeapKind::Iterator` — lazy iterator carrier (W13 §2.7.16).
458    /// Iterator state carries (a) a closure-self share for transform
459    /// closures and (b) source-buffer references; serializing the
460    /// graph requires walking the closure capture set, which re-
461    /// introduces the Arc<HeapValue> generic serializer shape
462    /// §2.7.5.1 forbids. Stored opaquely; resume surfaces an error
463    /// citing the W17-snapshot-iterator follow-up.
464    IteratorOpaque,
465
466    /// `HeapKind::Result` — typed-Arc Result<T,E> carrier (Wave 14 §2.7.17).
467    /// `Ok` / `Err` arms already exist for the pre-bulldozer
468    /// scalar-payload form; this arm wraps a `KindedSlot`-payloaded
469    /// `ResultData` per the post-§2.7.17 typed-Arc shape: the
470    /// discriminator (is_ok) plus the inner serializable payload.
471    ResultData {
472        is_ok: bool,
473        payload: Box<SerializableVMValue>,
474    },
475
476    /// `HeapKind::Option` — typed-Arc Option<T> carrier (Wave 14 §2.7.17).
477    /// Mirror of `ResultData`: the discriminator (is_some) plus the
478    /// inner payload (or `None` sentinel when is_some == false).
479    OptionData {
480        is_some: bool,
481        payload: Option<Box<SerializableVMValue>>,
482    },
483
484    /// `HeapKind::Deque` — heterogeneous-element double-ended queue
485    /// (Wave 15 §2.7.19). The element-payload storage is
486    /// `Arc<HeapValue>` per the ADR-005 §1 single-discriminator
487    /// shape; the items array is round-trippable as `Vec<SerializableVMValue>`
488    /// once each element is projected through `slot_to_serializable`.
489    /// Opaque at landing — per-element projection over an arbitrary
490    /// `Arc<HeapValue>` walks the same generic-serializer shape
491    /// §2.7.5.1 forbids; the per-element kinded path lands when the
492    /// Deque method-tier wires its KindedSlot return-shape.
493    DequeOpaque { len: usize },
494
495    /// `HeapKind::Channel` — concurrency-primitive carrier
496    /// (Wave 15 §2.7.20). The inner queue holds `KindedSlot` payloads
497    /// of arbitrary kinds; same per-element-projection blocker as
498    /// `DequeOpaque`. Closed-flag round-trips; queue contents land
499    /// in the W17-snapshot-channel-queue follow-up.
500    ChannelOpaque { closed: bool, len: usize },
501
502    /// `HeapKind::PriorityQueue` — i64-priority min-heap
503    /// (Wave 15 §2.7.18). i64-priority-only storage means full
504    /// payload round-trip — the heap-ordered i64 vec encodes losslessly.
505    PriorityQueueHeap { heap: Vec<i64> },
506
507    /// `HeapKind::Reference` — `&expr` / `&mut expr` reference handle
508    /// (Wave 8). The reference's target lives inside the same VM's
509    /// heap; round-tripping requires tracking target identity across
510    /// snapshot boundaries which is unspecified by ADR-006 §2.7. The
511    /// W17-snapshot-references follow-up answers the identity question.
512    ReferenceOpaque,
513
514    /// `HeapKind::FilterExpr` — query-DSL AST tree (Wave-γ §2.7.9).
515    /// Carries `Arc<FilterNode>` whose `And/Or/Not` branches recurse
516    /// into other `FilterExpr` shares — round-tripping requires a
517    /// dedicated AST serializer per the §2.7.9 pure-discriminator
518    /// shape. Opaque at landing; the W17-snapshot-filter-expr
519    /// follow-up lands the tree serializer.
520    FilterExprOpaque,
521
522    /// `HeapKind::SharedCell` — binding-storage interior-mutability
523    /// carrier (Wave 8 §2.7.8). Carries the parallel-kind track and
524    /// reaches into the cell payload's HeapKind dispatch; same per-
525    /// element-projection blocker as Deque/Channel. Round-tripping
526    /// also bumps into the binding-identity question (two `var x`
527    /// bindings that share a cell observe each other's mutations,
528    /// so cell identity must survive the snapshot).
529    SharedCellOpaque,
530
531    /// `HeapKind::Mutex` — single-typed-payload exclusion cell
532    /// (Wave 2.5 §2.7.25). Inner `Option<KindedSlot>` payload of
533    /// arbitrary kind; round-trip requires per-kind projection. The
534    /// `MutexEmpty` discriminator distinguishes "no inner payload"
535    /// (transient post-`take`) from "payload present, opaque on
536    /// landing".
537    MutexOpaque { has_value: bool },
538
539    /// `HeapKind::Atomic` — atomic i64 cell (Wave 2.5 §2.7.25).
540    /// Full payload round-trip — `AtomicI64::load(SeqCst)` reads the
541    /// value, `AtomicI64::new(value)` restores. Memory ordering is
542    /// `SeqCst` per the §2.7.25 ruling.
543    AtomicI64 { value: i64 },
544
545    /// `HeapKind::Lazy` — initialize-once cell (Wave 2.5 §2.7.25).
546    /// Carries an initializer-closure `KindedSlot` (kind
547    /// `Ptr(HeapKind::Closure)`) and a cached-value slot. Both halves
548    /// land opaque pending the W17-snapshot-closure follow-up which
549    /// also blocks `SerializableVMValue::Closure` deep round-trip
550    /// (the existing Closure arm carries function_id + type_id +
551    /// upvalues as `Vec<SerializableVMValue>` — restoration requires
552    /// the ClosureLayout side-table on the receiver, which is itself
553    /// part of the snapshot's program payload).
554    LazyOpaque { is_initialized: bool },
555
556    /// `HeapKind::Char` — single Unicode scalar value (Wave 12 §2.7.13).
557    /// Full payload round-trip — `char` already serializes via serde.
558    Char(char),
559
560    /// `HeapKind::BigInt` — arbitrary-precision int (currently `Arc<i64>`
561    /// per the Phase-2 deletion of the full bigint impl). Round-trips
562    /// as i64; a future typed-payload BigInt rebuild updates the wire
563    /// format.
564    BigInt(i64),
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize)]
568pub struct EnumValueSnapshot {
569    pub enum_name: String,
570    pub variant: String,
571    pub payload: EnumPayloadSnapshot,
572}
573
574#[derive(Debug, Clone, Serialize, Deserialize)]
575pub enum EnumPayloadSnapshot {
576    Unit,
577    Tuple(Vec<SerializableVMValue>),
578    Struct(Vec<(String, SerializableVMValue)>),
579}
580
581#[derive(Debug, Clone, Serialize, Deserialize)]
582pub struct PrintableSnapshot {
583    pub rendered: String,
584    pub spans: Vec<PrintSpanSnapshot>,
585}
586
587#[derive(Debug, Clone, Serialize, Deserialize)]
588pub enum PrintSpanSnapshot {
589    Literal {
590        text: String,
591        start: usize,
592        end: usize,
593        span_id: String,
594    },
595    Value {
596        text: String,
597        start: usize,
598        end: usize,
599        span_id: String,
600        variable_name: Option<String>,
601        raw_value: Box<SerializableVMValue>,
602        type_name: String,
603        current_format: String,
604        format_params: HashMap<String, SerializableVMValue>,
605    },
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize)]
609pub struct BlobRef {
610    pub hash: HashDigest,
611    pub kind: BlobKind,
612}
613
614/// Element type for typed array binary serialization.
615#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
616pub enum TypedArrayElementKind {
617    I8,
618    I16,
619    I32,
620    I64,
621    U8,
622    U16,
623    U32,
624    U64,
625    F32,
626    F64,
627    Bool,
628}
629
630#[derive(Debug, Clone, Serialize, Deserialize)]
631pub enum BlobKind {
632    DataTable,
633    /// Raw typed array bytes (element type encoded separately).
634    TypedArray(TypedArrayElementKind),
635    /// Raw f64 bytes in row-major order.
636    Matrix,
637}
638
639#[derive(Debug, Clone, Serialize, Deserialize)]
640pub struct ChunkedBlob {
641    pub chunk_hashes: Vec<HashDigest>,
642    pub total_len: usize,
643    pub chunk_len: usize,
644}
645
646#[derive(Debug, Clone, Serialize, Deserialize)]
647pub struct SerializableDataTable {
648    pub ipc_chunks: ChunkedBlob,
649    pub type_name: Option<String>,
650    pub schema_id: Option<u32>,
651}
652
653#[derive(Debug, Clone, Serialize, Deserialize)]
654pub struct SerializableDataFrame {
655    pub id: String,
656    pub timeframe: Timeframe,
657    pub timestamps: ChunkedBlob,
658    pub columns: Vec<SerializableDataFrameColumn>,
659}
660
661#[derive(Debug, Clone, Serialize, Deserialize)]
662pub struct SerializableDataFrameColumn {
663    pub name: String,
664    pub values: ChunkedBlob,
665}
666
667#[derive(Debug, Clone, Serialize, Deserialize)]
668pub struct CacheKeySnapshot {
669    pub id: String,
670    pub timeframe: Timeframe,
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize)]
674pub struct CachedDataSnapshot {
675    pub key: CacheKeySnapshot,
676    pub historical: SerializableDataFrame,
677    pub current_index: usize,
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize)]
681pub struct LiveBufferSnapshot {
682    pub key: CacheKeySnapshot,
683    pub rows: ChunkedBlob,
684}
685
686#[derive(Debug, Clone, Serialize, Deserialize)]
687pub struct DataCacheSnapshot {
688    pub historical: Vec<CachedDataSnapshot>,
689    pub live_buffer: Vec<LiveBufferSnapshot>,
690}
691
692pub(crate) fn store_chunked_vec<T: Serialize>(
693    values: &[T],
694    chunk_len: usize,
695    store: &SnapshotStore,
696) -> Result<ChunkedBlob> {
697    let chunk_len = chunk_len.max(1);
698    if values.is_empty() {
699        return Ok(ChunkedBlob {
700            chunk_hashes: Vec::new(),
701            total_len: 0,
702            chunk_len,
703        });
704    }
705    let mut hashes = Vec::new();
706    for chunk in values.chunks(chunk_len) {
707        let bytes = bincode::serialize(chunk)?;
708        let hash = store.put_blob(&bytes)?;
709        hashes.push(hash);
710    }
711    Ok(ChunkedBlob {
712        chunk_hashes: hashes,
713        total_len: values.len(),
714        chunk_len,
715    })
716}
717
718pub(crate) fn load_chunked_vec<T: DeserializeOwned>(
719    chunked: &ChunkedBlob,
720    store: &SnapshotStore,
721) -> Result<Vec<T>> {
722    if chunked.total_len == 0 {
723        return Ok(Vec::new());
724    }
725    let mut out = Vec::with_capacity(chunked.total_len);
726    for hash in &chunked.chunk_hashes {
727        let bytes = store.get_blob(hash)?;
728        let chunk: Vec<T> = bincode::deserialize(&bytes)?;
729        out.extend(chunk);
730    }
731    out.truncate(chunked.total_len);
732    Ok(out)
733}
734
735/// Store raw bytes in content-addressed chunks (256 KB each).
736pub fn store_chunked_bytes(data: &[u8], store: &SnapshotStore) -> Result<ChunkedBlob> {
737    if data.is_empty() {
738        return Ok(ChunkedBlob {
739            chunk_hashes: Vec::new(),
740            total_len: 0,
741            chunk_len: BYTE_CHUNK_LEN,
742        });
743    }
744    let mut hashes = Vec::new();
745    for chunk in data.chunks(BYTE_CHUNK_LEN) {
746        let hash = store.put_blob(chunk)?;
747        hashes.push(hash);
748    }
749    Ok(ChunkedBlob {
750        chunk_hashes: hashes,
751        total_len: data.len(),
752        chunk_len: BYTE_CHUNK_LEN,
753    })
754}
755
756/// Load raw bytes from content-addressed chunks.
757pub fn load_chunked_bytes(chunked: &ChunkedBlob, store: &SnapshotStore) -> Result<Vec<u8>> {
758    if chunked.total_len == 0 {
759        return Ok(Vec::new());
760    }
761    let mut out = Vec::with_capacity(chunked.total_len);
762    for hash in &chunked.chunk_hashes {
763        let bytes = store.get_blob(hash)?;
764        out.extend_from_slice(&bytes);
765    }
766    out.truncate(chunked.total_len);
767    Ok(out)
768}
769
770/// Reinterpret a byte slice as a slice of `T` (must be properly aligned and sized).
771///
772/// # Safety
773/// The byte slice must have a length that is a multiple of `size_of::<T>()`.
774fn bytes_as_slice<T: Copy>(bytes: &[u8]) -> &[T] {
775    let elem_size = std::mem::size_of::<T>();
776    assert!(
777        bytes.len() % elem_size == 0,
778        "byte slice length {} not a multiple of element size {}",
779        bytes.len(),
780        elem_size
781    );
782    let len = bytes.len() / elem_size;
783    unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const T, len) }
784}
785
786/// Reinterpret a slice of `T` as raw bytes.
787fn slice_as_bytes<T>(data: &[T]) -> &[u8] {
788    let byte_len = data.len() * std::mem::size_of::<T>();
789    unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, byte_len) }
790}
791
792// ===================== Conversion helpers =====================
793//
794// The slot-(de)serialization functions (`nanboxed_to_serializable`,
795// `serializable_to_nanboxed`, `serializable_to_nanboxed_with_layouts`,
796// plus `enum_*`/`print_result_*` adapters) were deleted in Phase 2b
797// alongside the `ValueWord` imports. Their replacement is a kind-
798// threaded `slot_to_serializable(bits, kind, store)` /
799// `serializable_to_slot(sv, expected_kind, store)` pair (mirrors the
800// wire_conversion shape). The new API lands in a follow-up commit when
801// stdlib mass migration (Phase 2c) and shape-vm cascade reveal the
802// concrete consumer needs.
803//
804// W17-snapshot-roundtrip (Phase 2d Wave 2.6, 2026-05-11): the
805// kind-threaded `slot_to_serializable` / `serializable_to_slot` pair
806// lands here per ADR-006 §2.7.5.1. The contract is:
807//
808// - `slot_to_serializable(bits, kind, store)`:
809//   dispatch on `kind` to project the slot's raw u64 bits into the
810//   matching `SerializableVMValue` arm. Scalar kinds project trivially
811//   (`Int64` → `Int`, `Float64` → `Number`, `Bool` → `Bool`, etc.).
812//   Heap kinds (`Ptr(HeapKind::*)`) dispatch via
813//   `slot.as_heap_value()` + `HeapValue::*` match per the §2.7.6 / Q8
814//   carrier-API bound — no `Arc<HeapValue>` generic serializer.
815//
816// - `serializable_to_slot(sv, expected_kind, store)`:
817//   inverse projection — discriminator must match `expected_kind` (or
818//   the function returns a structured kind-mismatch error). On success
819//   returns `(bits, NativeKind)` ready to push to a stack/local slot
820//   via `clone_with_kind` discipline.
821//
822// Both functions return `Result<_, String>` with structured error
823// messages; the §2.7.5.1 forbidden shapes (Bool-default fallback,
824// `Arc<HeapValue>` generic serializer, silent Option wrapping) are
825// refused on sight. Unsupported heap kinds surface clean — the
826// caller observes a runtime error rather than corrupted state.
827
828use shape_value::{HeapKind, KindedSlot, NativeKind, ValueSlot};
829use std::sync::Arc;
830
831/// Project a `(bits, kind)` slot pair into its `SerializableVMValue` arm.
832///
833/// Per ADR-006 §2.7.5.1: scalar kinds project from raw u64 bits via the
834/// canonical sign-extension / bitcast rules; heap kinds (`Ptr(HeapKind::*)`)
835/// recover their typed `Arc<T>` via `ValueSlot::from_raw(bits).as_heap_value()`
836/// + `HeapValue::*` match, then serialize per-arm. Heap kinds whose deep
837/// payload requires the `Arc<HeapValue>` generic serializer shape
838/// (§2.7.5.1 forbidden) project to their opaque-stub arm instead.
839///
840/// The `_store` parameter is reserved for chunked-blob arms that
841/// off-line large payloads (DataTable IPC, TypedArray binary). Scalar
842/// + simple-heap arms do not touch the store.
843pub fn slot_to_serializable(
844    bits: u64,
845    kind: NativeKind,
846    _store: &SnapshotStore,
847) -> std::result::Result<SerializableVMValue, String> {
848    use SerializableVMValue as SV;
849    match kind {
850        NativeKind::Int64 => Ok(SV::Int(bits as i64)),
851        NativeKind::Int32 => Ok(SV::Int(bits as i32 as i64)),
852        NativeKind::Int16 => Ok(SV::Int(bits as i16 as i64)),
853        NativeKind::Int8 => Ok(SV::Int(bits as i8 as i64)),
854        NativeKind::UInt64 => Ok(SV::Int(bits as i64)),
855        NativeKind::UInt32 => Ok(SV::Int((bits as u32) as i64)),
856        NativeKind::UInt16 => Ok(SV::Int((bits as u16) as i64)),
857        NativeKind::UInt8 => Ok(SV::Int((bits as u8) as i64)),
858        NativeKind::IntSize => Ok(SV::Int(bits as isize as i64)),
859        NativeKind::UIntSize => Ok(SV::Int((bits as usize) as i64)),
860        NativeKind::Float64 => Ok(SV::Number(f64::from_bits(bits))),
861        // Round 19 S1.5 W12-nativekind-scalar-additions (2026-05-14):
862        // ADR-006 §2.7.5 amendment adds F32 + Char as 4-byte scalar
863        // variants. Wire-format projection: F32 widens to `SV::Number`
864        // via `f64::from(f32)` (lossless); Char projects to `SV::Char`
865        // by recovering the codepoint from the low 32 bits.
866        NativeKind::Float32 => Ok(SV::Number(f64::from(f32::from_bits(bits as u32)))),
867        NativeKind::Char => match char::from_u32(bits as u32) {
868            Some(c) => Ok(SV::Char(c)),
869            None => Err(format!(
870                "slot_to_serializable: NativeKind::Char slot has invalid \
871                 codepoint bits 0x{:x} — construction-side contract violated",
872                bits,
873            )),
874        },
875        NativeKind::Bool => Ok(SV::Bool(bits != 0)),
876        // R5b-2-bool-null-sentinel-cluster (ADR-006 §2.7 + §2.7.5 +
877        // §2.7.7/Q9, 2026-05-19): `NativeKind::Null` is the canonical
878        // absence-of-value discriminator. Pre-disposition this was
879        // encoded as `(0u64, NativeKind::Bool)` colliding with
880        // legitimate `false` bool values; post-disposition the kind
881        // IS the discriminator. Project to `SV::None`.
882        NativeKind::Null => Ok(SV::None),
883        NativeKind::NullableInt64
884        | NativeKind::NullableInt32
885        | NativeKind::NullableInt16
886        | NativeKind::NullableInt8
887        | NativeKind::NullableUInt64
888        | NativeKind::NullableUInt32
889        | NativeKind::NullableUInt16
890        | NativeKind::NullableUInt8
891        | NativeKind::NullableIntSize
892        | NativeKind::NullableUIntSize
893        | NativeKind::NullableFloat64 => {
894            // Nullable scalar wire-format: surface-and-stop. The
895            // canonical None sentinel (NaN for Float, MIN for signed,
896            // MAX for unsigned) differs per kind; the §2.7.5.1
897            // post-proof shape needs an explicit `Nullable<T>`
898            // amendment with the sentinel rule. Tracked as
899            // W17-snapshot-nullable follow-up.
900            Err(format!(
901                "slot_to_serializable: W17-snapshot-roundtrip surface — \
902                 nullable-scalar kind {kind:?} has no SerializableVMValue \
903                 arm at landing. The post-proof sentinel-rule amendment \
904                 is the W17-snapshot-nullable follow-up. \
905                 ADR-006 §2.7.5.1.",
906            ))
907        }
908        NativeKind::String => {
909            // String kind: bits is `Arc::into_raw(Arc<String>)`.
910            // SAFETY: per the §2.7.6 String-arm construction contract,
911            // a kind=String slot's bits encode a strong-count share on
912            // an `Arc<String>` allocation. Reconstruct, clone, restore.
913            if bits == 0 {
914                return Err("slot_to_serializable: String slot with null bits".into());
915            }
916            unsafe {
917                let arc = Arc::<String>::from_raw(bits as *const String);
918                let cloned = (*arc).clone();
919                let _ = Arc::into_raw(arc); // restore the original share
920                Ok(SV::String(cloned))
921            }
922        }
923        // Wave 2 Agent B W12-StringV2-DecimalV2-NativeKind-additions
924        // (ADR-006 §2.7.5 amendment, 2026-05-14): the v2-raw `*const StringObj`
925        // carrier projects to the same `SV::String` wire shape as
926        // `NativeKind::String` — `StringObj::as_str` reads the UTF-8 payload
927        // directly off the `repr(C)` carrier. The slot bits are NOT an
928        // `Arc<T>` pointer, so we do not reconstruct + clone an `Arc`; we
929        // borrow the inner `&str` (`StringObj::as_str` is `unsafe fn`
930        // returning a `'static` borrow tied to the carrier's lifetime; the
931        // slot owns one v2-retain share so the carrier is live for the
932        // duration of this call).
933        NativeKind::StringV2 => {
934            if bits == 0 {
935                return Err("slot_to_serializable: StringV2 slot with null bits".into());
936            }
937            // SAFETY: per the §2.7.5 amendment construction contract,
938            // kind=StringV2 means bits = `ptr as u64` where `ptr` points to
939            // a live `StringObj` whose refcount has been bumped to claim
940            // this share. We borrow the inner UTF-8 bytes via
941            // `StringObj::as_str`.
942            let ptr = bits as *const shape_value::v2::string_obj::StringObj;
943            let s: &str = unsafe { shape_value::v2::string_obj::StringObj::as_str(ptr) };
944            Ok(SV::String(s.to_string()))
945        }
946        // Wave 2 Agent B: the v2-raw `*const DecimalObj` carrier projects
947        // to the same `SV::Decimal` wire shape as
948        // `NativeKind::Ptr(HeapKind::Decimal)` — `DecimalObj::value` returns
949        // the inline `rust_decimal::Decimal` directly off the `repr(C)`
950        // carrier. Same construction-side contract as `StringV2`.
951        NativeKind::DecimalV2 => {
952            if bits == 0 {
953                return Err("slot_to_serializable: DecimalV2 slot with null bits".into());
954            }
955            // SAFETY: per the §2.7.5 amendment construction contract,
956            // kind=DecimalV2 means bits = `ptr as u64` pointing to a live
957            // `DecimalObj` with bumped refcount.
958            let ptr = bits as *const shape_value::v2::decimal_obj::DecimalObj;
959            let value = unsafe { shape_value::v2::decimal_obj::DecimalObj::value(ptr) };
960            Ok(SV::Decimal(value))
961        }
962        NativeKind::Ptr(heap_kind) => slot_heap_to_serializable(bits, heap_kind),
963    }
964}
965
966/// Project a heap-kinded slot to its `SerializableVMValue` arm.
967///
968/// Per the canonical typed-pointer recovery pattern (CLAUDE.md
969/// "The 5-arm receiver-recovery soundness rule"): the slot bits for
970/// `kind=Ptr(HeapKind::X)` are `Arc::into_raw(Arc<XData>) as u64`,
971/// NOT `*const HeapValue`. Casting to `*const HeapValue` is wrong-
972/// type recovery and segfaults. Each arm reconstructs the typed
973/// `Arc<T>`, clones it (bumping the strong count for our read),
974/// rebuilds the original share, and reads through the cloned Arc.
975fn slot_heap_to_serializable(
976    bits: u64,
977    expected_kind: HeapKind,
978) -> std::result::Result<SerializableVMValue, String> {
979    use SerializableVMValue as SV;
980    use shape_value::heap_value::{
981        AtomicData, ChannelData, DequeData, HashSetData, LazyData, MutexData,
982        OptionData, PriorityQueueData, ResultData,
983    };
984    if bits == 0 {
985        return Err(format!(
986            "slot_to_serializable: Ptr({expected_kind:?}) slot with null bits",
987        ));
988    }
989    match expected_kind {
990        HeapKind::String => {
991            // SAFETY: bits = `Arc::into_raw(Arc<String>)` per the
992            // ValueSlot::from_string_arc construction contract.
993            unsafe {
994                let arc = Arc::<String>::from_raw(bits as *const String);
995                let cloned = (*arc).clone();
996                let _ = Arc::into_raw(arc);
997                Ok(SV::String(cloned))
998            }
999        }
1000        HeapKind::Decimal => unsafe {
1001            let arc = Arc::<rust_decimal::Decimal>::from_raw(bits as *const rust_decimal::Decimal);
1002            let v = *arc;
1003            let _ = Arc::into_raw(arc);
1004            Ok(SV::Decimal(v))
1005        },
1006        HeapKind::BigInt => unsafe {
1007            let arc = Arc::<i64>::from_raw(bits as *const i64);
1008            let v = *arc;
1009            let _ = Arc::into_raw(arc);
1010            Ok(SV::BigInt(v))
1011        },
1012        HeapKind::Char => {
1013            // Char is inline-scalar per the §2.7 raw-bits encoding
1014            // (the bits are the u32 codepoint).
1015            let cp = bits as u32;
1016            match char::from_u32(cp) {
1017                Some(c) => Ok(SV::Char(c)),
1018                None => Err(format!(
1019                    "slot_to_serializable: Char arm: invalid codepoint {cp:#x}"
1020                )),
1021            }
1022        }
1023        HeapKind::HashSet => unsafe {
1024            let arc = Arc::<HashSetData>::from_raw(bits as *const HashSetData);
1025            let keys: Vec<String> = arc.keys.iter().map(|k| (**k).clone()).collect();
1026            let _ = Arc::into_raw(arc);
1027            Ok(SV::HashSet { keys })
1028        },
1029        HeapKind::PriorityQueue => unsafe {
1030            let arc = Arc::<PriorityQueueData>::from_raw(bits as *const PriorityQueueData);
1031            let heap: Vec<i64> = (*arc.heap).clone();
1032            let _ = Arc::into_raw(arc);
1033            Ok(SV::PriorityQueueHeap { heap })
1034        },
1035        HeapKind::Atomic => unsafe {
1036            let arc = Arc::<AtomicData>::from_raw(bits as *const AtomicData);
1037            let v = arc.load();
1038            let _ = Arc::into_raw(arc);
1039            Ok(SV::AtomicI64 { value: v })
1040        },
1041        HeapKind::Lazy => unsafe {
1042            let arc = Arc::<LazyData>::from_raw(bits as *const LazyData);
1043            let is_init = arc.is_initialized();
1044            let _ = Arc::into_raw(arc);
1045            Ok(SV::LazyOpaque {
1046                is_initialized: is_init,
1047            })
1048        },
1049        HeapKind::Mutex => unsafe {
1050            let arc = Arc::<MutexData>::from_raw(bits as *const MutexData);
1051            // get() always returns Some — MutexData::new always
1052            // installs a payload. has_value is true unless the
1053            // inner is a Bool-zero (the canonical no-op None
1054            // sentinel per §2.7.25).
1055            let inner = arc.get();
1056            let has_value =
1057                !(matches!(inner.kind(), NativeKind::Bool) && inner.slot().raw() == 0);
1058            drop(inner);
1059            let _ = Arc::into_raw(arc);
1060            Ok(SV::MutexOpaque { has_value })
1061        },
1062        HeapKind::Channel => unsafe {
1063            let arc = Arc::<ChannelData>::from_raw(bits as *const ChannelData);
1064            let closed = arc.is_closed();
1065            let len = arc.len();
1066            let _ = Arc::into_raw(arc);
1067            Ok(SV::ChannelOpaque { closed, len })
1068        },
1069        HeapKind::Deque => unsafe {
1070            let arc = Arc::<DequeData>::from_raw(bits as *const DequeData);
1071            let len = arc.items.len();
1072            let _ = Arc::into_raw(arc);
1073            Ok(SV::DequeOpaque { len })
1074        },
1075        HeapKind::Result => unsafe {
1076            let arc = Arc::<ResultData>::from_raw(bits as *const ResultData);
1077            let is_ok = arc.is_ok;
1078            let payload_kind = arc.payload.kind();
1079            let payload_bits = arc.payload.slot().raw();
1080            let inner = serializable_inner_kinded(payload_bits, payload_kind)?;
1081            let _ = Arc::into_raw(arc);
1082            Ok(SV::ResultData {
1083                is_ok,
1084                payload: Box::new(inner),
1085            })
1086        },
1087        HeapKind::Option => unsafe {
1088            let arc = Arc::<OptionData>::from_raw(bits as *const OptionData);
1089            let is_some = arc.is_some;
1090            let payload = if is_some {
1091                let payload_kind = arc.payload.kind();
1092                let payload_bits = arc.payload.slot().raw();
1093                Some(Box::new(serializable_inner_kinded(payload_bits, payload_kind)?))
1094            } else {
1095                None
1096            };
1097            let _ = Arc::into_raw(arc);
1098            Ok(SV::OptionData { is_some, payload })
1099        },
1100        // Reference/FilterExpr/SharedCell/Iterator: discriminator-only
1101        // round-trip. The typed Arcs exist but their deep walks are
1102        // follow-up work; we don't even need to touch the Arc on the
1103        // way out for the opaque round-trip path.
1104        HeapKind::Reference => Ok(SV::ReferenceOpaque),
1105        HeapKind::FilterExpr => Ok(SV::FilterExprOpaque),
1106        HeapKind::SharedCell => Ok(SV::SharedCellOpaque),
1107        HeapKind::Iterator => Ok(SV::IteratorOpaque),
1108        // Future is inline u64 per §2.7.4.
1109        HeapKind::Future => Ok(SV::Future(bits)),
1110
1111        // Pre-existing complex shapes: surface-and-stop per §2.7.5.1.
1112        // These have rich pre-bulldozer SerializableVMValue arms whose
1113        // construction requires more than typed-Arc recovery (Range
1114        // bounds carry KindedSlot endpoints; TypedObject carries the
1115        // schema_id + parallel-kind track over its fields; TypedArray
1116        // carries a typed-buffer payload and lands in a sidecar blob;
1117        // DataTable / TableView / HashMap / Temporal / TaskGroup /
1118        // IoHandle / NativeView / NativeScalar / Content / ClosureRaw
1119        // each have their own multi-step landing path).
1120        other => Err(format!(
1121            "slot_to_serializable: W17-snapshot-roundtrip surface — \
1122             HeapKind::{other:?} arm has no in-session SerializableVMValue \
1123             projection. Tracked as W17-snapshot-{other:?} follow-up per \
1124             docs/cluster-audits/phase-2d-playbook.md §3. \
1125             ADR-006 §2.7.5.1.",
1126        )),
1127    }
1128}
1129
1130/// Inner KindedSlot serialization for Result/Option payloads.
1131/// Bool-zero short-circuit (unit-shape None marker) returns
1132/// `SerializableVMValue::Unit`; other kinds route through the
1133/// canonical `slot_to_serializable` path with a sentinel store.
1134fn serializable_inner_kinded(
1135    bits: u64,
1136    kind: NativeKind,
1137) -> std::result::Result<SerializableVMValue, String> {
1138    if matches!(kind, NativeKind::Bool) && bits == 0 {
1139        return Ok(SerializableVMValue::Unit);
1140    }
1141    match kind {
1142        NativeKind::Int64 => Ok(SerializableVMValue::Int(bits as i64)),
1143        NativeKind::Float64 => Ok(SerializableVMValue::Number(f64::from_bits(bits))),
1144        NativeKind::Bool => Ok(SerializableVMValue::Bool(bits != 0)),
1145        NativeKind::String => {
1146            if bits == 0 {
1147                return Ok(SerializableVMValue::None);
1148            }
1149            unsafe {
1150                let arc = Arc::<String>::from_raw(bits as *const String);
1151                let cloned = (*arc).clone();
1152                let _ = Arc::into_raw(arc);
1153                Ok(SerializableVMValue::String(cloned))
1154            }
1155        }
1156        _ => Err(format!(
1157            "serializable_inner_kinded: W17-snapshot-roundtrip surface — \
1158             inner Result/Option payload kind {kind:?} is not in the \
1159             initial scalar set; deep payload arms land in follow-up. \
1160             ADR-006 §2.7.5.1.",
1161        )),
1162    }
1163}
1164
1165/// Inverse of [`slot_to_serializable`] — project a `SerializableVMValue`
1166/// back into a `(bits, NativeKind)` pair for placement into a stack
1167/// or local slot.
1168///
1169/// `expected_kind` is the post-proof kind the caller has already
1170/// committed to (from `FrameDescriptor.slots[i]` or the parallel
1171/// stack-kind track). A discriminator-vs-expected-kind mismatch
1172/// surfaces as a structured error rather than a Bool-default
1173/// fallback (§2.7.7 #9 / §2.7.5.1 forbidden).
1174pub fn serializable_to_slot(
1175    sv: &SerializableVMValue,
1176    expected_kind: NativeKind,
1177    _store: &SnapshotStore,
1178) -> std::result::Result<(u64, NativeKind), String> {
1179    use SerializableVMValue as SV;
1180    // Scalar projections — discriminator must match `expected_kind`'s
1181    // family (signed/unsigned/float/bool/string/heap).
1182    match (sv, expected_kind) {
1183        (SV::Int(i), NativeKind::Int64) => Ok((*i as u64, NativeKind::Int64)),
1184        (SV::Int(i), NativeKind::Int32) => Ok(((*i as i32) as u64, NativeKind::Int32)),
1185        (SV::Int(i), NativeKind::Int16) => Ok(((*i as i16 as i32) as u64, NativeKind::Int16)),
1186        (SV::Int(i), NativeKind::Int8) => Ok(((*i as i8 as i32) as u64, NativeKind::Int8)),
1187        (SV::Int(i), NativeKind::UInt64) => Ok((*i as u64, NativeKind::UInt64)),
1188        (SV::Int(i), NativeKind::UInt32) => Ok(((*i as u32) as u64, NativeKind::UInt32)),
1189        (SV::Int(i), NativeKind::UInt16) => Ok(((*i as u16) as u64, NativeKind::UInt16)),
1190        (SV::Int(i), NativeKind::UInt8) => Ok(((*i as u8) as u64, NativeKind::UInt8)),
1191        (SV::Int(i), NativeKind::IntSize) => Ok((*i as isize as u64, NativeKind::IntSize)),
1192        (SV::Int(i), NativeKind::UIntSize) => Ok((*i as u64, NativeKind::UIntSize)),
1193        (SV::Number(f), NativeKind::Float64) => Ok((f.to_bits(), NativeKind::Float64)),
1194        (SV::Bool(b), NativeKind::Bool) => Ok((if *b { 1 } else { 0 }, NativeKind::Bool)),
1195        (SV::String(s), NativeKind::String) => {
1196            let arc = Arc::new(s.clone());
1197            let raw = Arc::into_raw(arc) as u64;
1198            Ok((raw, NativeKind::String))
1199        }
1200        (SV::None | SV::Unit, NativeKind::Bool) => Ok((0, NativeKind::Bool)),
1201
1202        // Heap kinds — discriminator must align with `expected_kind`'s
1203        // `HeapKind::*`. Reconstructing typed-Arc payloads is the
1204        // inverse of `slot_heap_to_serializable`; see per-arm coverage.
1205        (sv, NativeKind::Ptr(hk)) => serializable_to_heap_slot(sv, hk),
1206
1207        // Wildcards — surface-and-stop. No Bool-default fallback.
1208        (other_sv, other_kind) => Err(format!(
1209            "serializable_to_slot: W17-snapshot-roundtrip surface — \
1210             SerializableVMValue arm {} cannot satisfy expected kind \
1211             {other_kind:?}. Discriminator-vs-kind mismatch is a structured \
1212             error, not a Bool-default fallback (§2.7.5.1 forbidden). \
1213             ADR-006 §2.7.5.1.",
1214            serializable_arm_name(other_sv),
1215        )),
1216    }
1217}
1218
1219/// Inverse of [`slot_heap_to_serializable`] — reconstruct a heap-kinded
1220/// slot from its serialized arm. Returns `(bits, NativeKind)` ready
1221/// to push to a slot. The reconstructed slot owns one strong-count
1222/// share on the typed `Arc<T>` carrier.
1223fn serializable_to_heap_slot(
1224    sv: &SerializableVMValue,
1225    heap_kind: HeapKind,
1226) -> std::result::Result<(u64, NativeKind), String> {
1227    use SerializableVMValue as SV;
1228    use shape_value::heap_value::{
1229        AtomicData, HashSetData, OptionData, PriorityQueueData, ResultData,
1230    };
1231    match (sv, heap_kind) {
1232        (SV::String(s), HeapKind::String) => {
1233            // String can flow via either the dedicated NativeKind::String
1234            // or as Ptr(HeapKind::String) per ADR-005 §2 String exception.
1235            let arc = Arc::new(s.clone());
1236            let raw = Arc::into_raw(arc) as u64;
1237            Ok((raw, NativeKind::Ptr(HeapKind::String)))
1238        }
1239        (SV::Char(c), HeapKind::Char) => {
1240            // Char is an inline scalar in the HeapValue arm (Arc<char>
1241            // would be wasteful for a 4-byte value); slot ABI carries
1242            // it as raw bits.
1243            let bits = (*c as u32) as u64;
1244            Ok((bits, NativeKind::Ptr(HeapKind::Char)))
1245        }
1246        (SV::BigInt(n), HeapKind::BigInt) => {
1247            let arc = Arc::new(*n);
1248            let raw = Arc::into_raw(arc) as u64;
1249            Ok((raw, NativeKind::Ptr(HeapKind::BigInt)))
1250        }
1251        (SV::Decimal(d), HeapKind::Decimal) => {
1252            let arc = Arc::new(*d);
1253            let raw = Arc::into_raw(arc) as u64;
1254            Ok((raw, NativeKind::Ptr(HeapKind::Decimal)))
1255        }
1256        (SV::HashSet { keys }, HeapKind::HashSet) => {
1257            let arcs: Vec<Arc<String>> = keys.iter().map(|k| Arc::new(k.clone())).collect();
1258            let data = HashSetData::from_keys(arcs);
1259            let arc = Arc::new(data);
1260            let raw = Arc::into_raw(arc) as u64;
1261            Ok((raw, NativeKind::Ptr(HeapKind::HashSet)))
1262        }
1263        (SV::PriorityQueueHeap { heap }, HeapKind::PriorityQueue) => {
1264            let mut pq = PriorityQueueData::new();
1265            // Push values back through the public API to maintain
1266            // heap invariant. The serialized array is heap-order, not
1267            // sorted-order, so direct copy would be equivalent here —
1268            // but going through `push` is the safer canonical path.
1269            for &v in heap {
1270                pq.push(v);
1271            }
1272            let arc = Arc::new(pq);
1273            let raw = Arc::into_raw(arc) as u64;
1274            Ok((raw, NativeKind::Ptr(HeapKind::PriorityQueue)))
1275        }
1276        (SV::AtomicI64 { value }, HeapKind::Atomic) => {
1277            let arc = Arc::new(AtomicData::new(*value));
1278            let raw = Arc::into_raw(arc) as u64;
1279            Ok((raw, NativeKind::Ptr(HeapKind::Atomic)))
1280        }
1281        (SV::ResultData { is_ok, payload }, HeapKind::Result) => {
1282            // Inner payload kind: we need the expected kind for the
1283            // inner slot to dispatch. The §2.7.17 typed-Arc Result
1284            // carrier doesn't statically pin the inner kind; the
1285            // serialized discriminator is used to pick the inner kind
1286            // (Int→Int64, String→String, Bool→Bool, Number→Float64,
1287            // Unit→Bool-zero placeholder).
1288            let inner_slot = inner_kinded_from_serializable(payload)?;
1289            let data = if *is_ok {
1290                ResultData::ok(inner_slot)
1291            } else {
1292                ResultData::err(inner_slot)
1293            };
1294            let arc = Arc::new(data);
1295            let raw = Arc::into_raw(arc) as u64;
1296            Ok((raw, NativeKind::Ptr(HeapKind::Result)))
1297        }
1298        (SV::OptionData { is_some, payload }, HeapKind::Option) => {
1299            let data = if *is_some {
1300                match payload {
1301                    Some(p) => OptionData::some(inner_kinded_from_serializable(p)?),
1302                    None => return Err(
1303                        "serializable_to_slot: OptionData is_some=true but payload=None — \
1304                         malformed wire shape; expected Some(SerializableVMValue) for \
1305                         is_some=true. ADR-006 §2.7.5.1."
1306                            .to_string(),
1307                    ),
1308                }
1309            } else {
1310                OptionData::none()
1311            };
1312            let arc = Arc::new(data);
1313            let raw = Arc::into_raw(arc) as u64;
1314            Ok((raw, NativeKind::Ptr(HeapKind::Option)))
1315        }
1316
1317        // Opaque arms — surface-and-stop on restore. These produced
1318        // discriminator-only wire shapes; the inner payload is lost.
1319        // Restoring as a structured runtime error rather than a
1320        // placeholder lets the caller observe the missing capability
1321        // cleanly (§2.7.4 invariant).
1322        (SV::IteratorOpaque, HeapKind::Iterator)
1323        | (SV::DequeOpaque { .. }, HeapKind::Deque)
1324        | (SV::ChannelOpaque { .. }, HeapKind::Channel)
1325        | (SV::ReferenceOpaque, HeapKind::Reference)
1326        | (SV::FilterExprOpaque, HeapKind::FilterExpr)
1327        | (SV::SharedCellOpaque, HeapKind::SharedCell)
1328        | (SV::MutexOpaque { .. }, HeapKind::Mutex)
1329        | (SV::LazyOpaque { .. }, HeapKind::Lazy) => Err(format!(
1330            "serializable_to_slot: W17-snapshot-roundtrip surface — \
1331             {heap_kind:?} arm restored from opaque wire shape; \
1332             deep payload reconstruction is the W17-snapshot-{:?} \
1333             follow-up. ADR-006 §2.7.5.1.",
1334            heap_kind,
1335        )),
1336
1337        // Anything else: the discriminator doesn't pair with the
1338        // expected heap_kind. Surface-and-stop, no fabrication.
1339        (other_sv, hk) => Err(format!(
1340            "serializable_to_slot: W17-snapshot-roundtrip surface — \
1341             SerializableVMValue arm {} cannot satisfy expected heap kind \
1342             Ptr({hk:?}). Either the wire-format arm has no inverse \
1343             projection (deep follow-up) or the discriminator is \
1344             mismatched. ADR-006 §2.7.5.1.",
1345            serializable_arm_name(other_sv),
1346        )),
1347    }
1348}
1349
1350/// Inverse for Result/Option inner payloads — discriminator-driven
1351/// scalar projection (Int→Int64, String→String, Bool→Bool,
1352/// Number→Float64, Unit→Bool-zero).
1353fn inner_kinded_from_serializable(
1354    sv: &SerializableVMValue,
1355) -> std::result::Result<KindedSlot, String> {
1356    use SerializableVMValue as SV;
1357    match sv {
1358        SV::Int(i) => Ok(KindedSlot::new(
1359            ValueSlot::from_raw(*i as u64),
1360            NativeKind::Int64,
1361        )),
1362        SV::Number(f) => Ok(KindedSlot::new(
1363            ValueSlot::from_raw(f.to_bits()),
1364            NativeKind::Float64,
1365        )),
1366        SV::Bool(b) => Ok(KindedSlot::new(
1367            ValueSlot::from_raw(if *b { 1 } else { 0 }),
1368            NativeKind::Bool,
1369        )),
1370        SV::String(s) => Ok(KindedSlot::from_string_arc(Arc::new(s.clone()))),
1371        SV::Unit | SV::None => Ok(KindedSlot::new(
1372            ValueSlot::from_raw(0),
1373            NativeKind::Bool,
1374        )),
1375        other => Err(format!(
1376            "inner_kinded_from_serializable: W17-snapshot-roundtrip surface — \
1377             SerializableVMValue arm {} has no in-session inner-payload \
1378             projection. Tracked as follow-up. ADR-006 §2.7.5.1.",
1379            serializable_arm_name(other),
1380        )),
1381    }
1382}
1383
1384/// One-line discriminator name for diagnostic messages.
1385fn serializable_arm_name(sv: &SerializableVMValue) -> &'static str {
1386    use SerializableVMValue as SV;
1387    match sv {
1388        SV::Int(_) => "Int",
1389        SV::Number(_) => "Number",
1390        SV::Decimal(_) => "Decimal",
1391        SV::String(_) => "String",
1392        SV::Bool(_) => "Bool",
1393        SV::None => "None",
1394        SV::Some(_) => "Some",
1395        SV::Unit => "Unit",
1396        SV::Timeframe(_) => "Timeframe",
1397        SV::Duration(_) => "Duration",
1398        SV::Time(_) => "Time",
1399        SV::TimeSpan(_) => "TimeSpan",
1400        SV::TimeReference(_) => "TimeReference",
1401        SV::DateTimeExpr(_) => "DateTimeExpr",
1402        SV::DataDateTimeRef(_) => "DataDateTimeRef",
1403        SV::Array(_) => "Array",
1404        SV::Function(_) => "Function",
1405        SV::TypeAnnotation(_) => "TypeAnnotation",
1406        SV::TypeAnnotatedValue { .. } => "TypeAnnotatedValue",
1407        SV::Enum(_) => "Enum",
1408        SV::Closure { .. } => "Closure",
1409        SV::ModuleFunction(_) => "ModuleFunction",
1410        SV::TypedObject { .. } => "TypedObject",
1411        SV::Range { .. } => "Range",
1412        SV::Ok(_) => "Ok",
1413        SV::Err(_) => "Err",
1414        SV::PrintResult(_) => "PrintResult",
1415        SV::SimulationCall { .. } => "SimulationCall",
1416        SV::FunctionRef { .. } => "FunctionRef",
1417        SV::DataReference { .. } => "DataReference",
1418        SV::Future(_) => "Future",
1419        SV::DataTable(_) => "DataTable",
1420        SV::TypedTable { .. } => "TypedTable",
1421        SV::RowView { .. } => "RowView",
1422        SV::ColumnRef { .. } => "ColumnRef",
1423        SV::IndexedTable { .. } => "IndexedTable",
1424        SV::TypedArray { .. } => "TypedArray",
1425        SV::Matrix { .. } => "Matrix",
1426        SV::HashMap { .. } => "HashMap",
1427        SV::SidecarRef { .. } => "SidecarRef",
1428        SV::HashSet { .. } => "HashSet",
1429        SV::IteratorOpaque => "IteratorOpaque",
1430        SV::ResultData { .. } => "ResultData",
1431        SV::OptionData { .. } => "OptionData",
1432        SV::DequeOpaque { .. } => "DequeOpaque",
1433        SV::ChannelOpaque { .. } => "ChannelOpaque",
1434        SV::PriorityQueueHeap { .. } => "PriorityQueueHeap",
1435        SV::ReferenceOpaque => "ReferenceOpaque",
1436        SV::FilterExprOpaque => "FilterExprOpaque",
1437        SV::SharedCellOpaque => "SharedCellOpaque",
1438        SV::MutexOpaque { .. } => "MutexOpaque",
1439        SV::AtomicI64 { .. } => "AtomicI64",
1440        SV::LazyOpaque { .. } => "LazyOpaque",
1441        SV::Char(_) => "Char",
1442        SV::BigInt(_) => "BigInt",
1443    }
1444}
1445
1446fn serialize_datatable(dt: &DataTable, store: &SnapshotStore) -> Result<SerializableDataTable> {
1447    let mut buf = Vec::new();
1448    let schema = dt.inner().schema();
1449    let mut writer = arrow_ipc::writer::FileWriter::try_new(&mut buf, schema.as_ref())?;
1450    writer.write(dt.inner())?;
1451    writer.finish()?;
1452    let ipc_chunks = store_chunked_vec(&buf, BYTE_CHUNK_LEN, store)?;
1453    Ok(SerializableDataTable {
1454        ipc_chunks,
1455        type_name: dt.type_name().map(|s| s.to_string()),
1456        schema_id: dt.schema_id(),
1457    })
1458}
1459
1460fn deserialize_datatable(
1461    serialized: SerializableDataTable,
1462    store: &SnapshotStore,
1463) -> Result<DataTable> {
1464    let bytes = load_chunked_vec(&serialized.ipc_chunks, store)?;
1465    let cursor = std::io::Cursor::new(bytes);
1466    let mut reader = arrow_ipc::reader::FileReader::try_new(cursor, None)?;
1467    let batch = reader
1468        .next()
1469        .transpose()?
1470        .context("no RecordBatch in DataTable snapshot")?;
1471    let mut dt = DataTable::new(batch);
1472    if let Some(name) = serialized.type_name {
1473        dt = DataTable::with_type_name(dt.into_inner(), name);
1474    }
1475    if let Some(schema_id) = serialized.schema_id {
1476        dt = dt.with_schema_id(schema_id);
1477    }
1478    Ok(dt)
1479}
1480