use std::collections::{HashMap, HashSet};
use std::fs;
use std::io::{Read, Write};
use std::path::PathBuf;
use anyhow::{Context, Result};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::event_queue::WaitCondition;
use crate::hashing::{HashDigest, hash_bytes};
use shape_ast::ast::{DataDateTimeRef, DateTimeExpr, EnumDef, TimeReference, TypeAnnotation};
use shape_ast::data::Timeframe;
use shape_value::datatable::DataTable;
pub const SNAPSHOT_VERSION: u32 = 5;
pub(crate) const DEFAULT_CHUNK_LEN: usize = 4096;
pub(crate) const BYTE_CHUNK_LEN: usize = 256 * 1024;
#[derive(Clone)]
pub struct SnapshotStore {
root: PathBuf,
}
impl SnapshotStore {
pub fn new(root: impl Into<PathBuf>) -> Result<Self> {
let root = root.into();
fs::create_dir_all(root.join("blobs"))
.with_context(|| format!("failed to create snapshot blob dir at {}", root.display()))?;
fs::create_dir_all(root.join("snapshots"))
.with_context(|| format!("failed to create snapshot dir at {}", root.display()))?;
Ok(Self { root })
}
fn blob_path(&self, hash: &HashDigest) -> PathBuf {
self.root
.join("blobs")
.join(format!("{}.bin.zst", hash.hex()))
}
fn snapshot_path(&self, hash: &HashDigest) -> PathBuf {
self.root
.join("snapshots")
.join(format!("{}.bin.zst", hash.hex()))
}
pub fn put_blob(&self, data: &[u8]) -> Result<HashDigest> {
let hash = hash_bytes(data);
let path = self.blob_path(&hash);
if path.exists() {
return Ok(hash);
}
let compressed = zstd::stream::encode_all(data, 0)?;
let mut file = fs::File::create(&path)?;
file.write_all(&compressed)?;
Ok(hash)
}
pub fn get_blob(&self, hash: &HashDigest) -> Result<Vec<u8>> {
let path = self.blob_path(hash);
let mut file = fs::File::open(&path)
.with_context(|| format!("snapshot blob not found: {}", path.display()))?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let decompressed = zstd::stream::decode_all(&buf[..])?;
Ok(decompressed)
}
pub fn put_struct<T: Serialize>(&self, value: &T) -> Result<HashDigest> {
let bytes = bincode::serialize(value)?;
self.put_blob(&bytes)
}
pub fn get_struct<T: for<'de> Deserialize<'de>>(&self, hash: &HashDigest) -> Result<T> {
let bytes = self.get_blob(hash)?;
Ok(bincode::deserialize(&bytes)?)
}
pub fn put_snapshot(&self, snapshot: &ExecutionSnapshot) -> Result<HashDigest> {
let bytes = bincode::serialize(snapshot)?;
let hash = hash_bytes(&bytes);
let path = self.snapshot_path(&hash);
if !path.exists() {
let compressed = zstd::stream::encode_all(&bytes[..], 0)?;
let mut file = fs::File::create(&path)?;
file.write_all(&compressed)?;
}
Ok(hash)
}
pub fn get_snapshot(&self, hash: &HashDigest) -> Result<ExecutionSnapshot> {
let path = self.snapshot_path(hash);
let mut file = fs::File::open(&path)
.with_context(|| format!("snapshot not found: {}", path.display()))?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let decompressed = zstd::stream::decode_all(&buf[..])?;
Ok(bincode::deserialize(&decompressed)?)
}
pub fn list_snapshots(&self) -> Result<Vec<(HashDigest, ExecutionSnapshot)>> {
let snapshots_dir = self.root.join("snapshots");
if !snapshots_dir.exists() {
return Ok(Vec::new());
}
let mut results = Vec::new();
for entry in fs::read_dir(&snapshots_dir)? {
let entry = entry?;
let path = entry.path();
if let Some(name) = path.file_name().and_then(|n| n.to_str()) {
if let Some(hex) = name.strip_suffix(".bin.zst") {
let hash = HashDigest::from_hex(hex);
match self.get_snapshot(&hash) {
Ok(snap) => results.push((hash, snap)),
Err(_) => continue, }
}
}
}
results.sort_by(|a, b| b.1.created_at_ms.cmp(&a.1.created_at_ms));
Ok(results)
}
pub fn delete_snapshot(&self, hash: &HashDigest) -> Result<()> {
let path = self.snapshot_path(hash);
fs::remove_file(&path)
.with_context(|| format!("failed to delete snapshot: {}", path.display()))?;
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionSnapshot {
pub version: u32,
pub created_at_ms: i64,
pub semantic_hash: HashDigest,
pub context_hash: HashDigest,
pub vm_hash: Option<HashDigest>,
pub bytecode_hash: Option<HashDigest>,
#[serde(default)]
pub script_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SemanticSnapshot {
pub exported_symbols: HashSet<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContextSnapshot {
pub data_load_mode: crate::context::DataLoadMode,
pub data_cache: Option<DataCacheSnapshot>,
pub current_id: Option<String>,
pub current_row_index: usize,
pub variable_scopes: Vec<HashMap<String, VariableSnapshot>>,
pub reference_datetime: Option<chrono::DateTime<chrono::Utc>>,
pub current_timeframe: Option<Timeframe>,
pub base_timeframe: Option<Timeframe>,
pub date_range: Option<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)>,
pub range_start: usize,
pub range_end: usize,
pub range_active: bool,
pub type_alias_registry: HashMap<String, TypeAliasRuntimeEntrySnapshot>,
pub enum_registry: HashMap<String, EnumDef>,
#[serde(default)]
pub struct_type_registry: HashMap<String, shape_ast::ast::StructTypeDef>,
pub suspension_state: Option<SuspensionStateSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VariableSnapshot {
pub value: SerializableVMValue,
pub kind: shape_ast::ast::VarKind,
pub is_initialized: bool,
pub is_function_scoped: bool,
pub format_hint: Option<String>,
pub format_overrides: Option<HashMap<String, SerializableVMValue>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TypeAliasRuntimeEntrySnapshot {
pub base_type: String,
pub overrides: Option<HashMap<String, SerializableVMValue>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SuspensionStateSnapshot {
pub waiting_for: WaitCondition,
pub resume_pc: usize,
pub saved_locals: Vec<SerializableVMValue>,
pub saved_stack: Vec<SerializableVMValue>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmSnapshot {
pub ip: usize,
pub stack: Vec<SerializableVMValue>,
pub locals: Vec<SerializableVMValue>,
pub module_bindings: Vec<SerializableVMValue>,
pub call_stack: Vec<SerializableCallFrame>,
pub loop_stack: Vec<SerializableLoopContext>,
pub timeframe_stack: Vec<Option<Timeframe>>,
pub exception_handlers: Vec<SerializableExceptionHandler>,
#[serde(default)]
pub ip_blob_hash: Option<[u8; 32]>,
#[serde(default)]
pub ip_local_offset: Option<usize>,
#[serde(default)]
pub ip_function_id: Option<u16>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableCallFrame {
pub return_ip: usize,
pub locals_base: usize,
pub locals_count: usize,
pub function_id: Option<u16>,
pub upvalues: Option<Vec<SerializableVMValue>>,
#[serde(default)]
pub blob_hash: Option<[u8; 32]>,
#[serde(default)]
pub local_ip: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableLoopContext {
pub start: usize,
pub end: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableExceptionHandler {
pub catch_ip: usize,
pub stack_size: usize,
pub call_depth: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SerializableVMValue {
Int(i64),
Number(f64),
Decimal(rust_decimal::Decimal),
String(String),
Bool(bool),
None,
Some(Box<SerializableVMValue>),
Unit,
Timeframe(Timeframe),
Duration(shape_ast::ast::Duration),
Time(chrono::DateTime<chrono::FixedOffset>),
TimeSpan(i64), TimeReference(TimeReference),
DateTimeExpr(DateTimeExpr),
DataDateTimeRef(DataDateTimeRef),
Array(Vec<SerializableVMValue>),
Function(u16),
TypeAnnotation(TypeAnnotation),
TypeAnnotatedValue {
type_name: String,
value: Box<SerializableVMValue>,
},
Enum(EnumValueSnapshot),
Closure {
function_id: u32,
type_id: u32,
upvalues: Vec<SerializableVMValue>,
},
ModuleFunction(String),
TypedObject {
schema_id: u64,
slot_data: Vec<SerializableVMValue>,
heap_mask: u64,
},
Range {
start: Option<Box<SerializableVMValue>>,
end: Option<Box<SerializableVMValue>>,
inclusive: bool,
},
Ok(Box<SerializableVMValue>),
Err(Box<SerializableVMValue>),
PrintResult(PrintableSnapshot),
SimulationCall {
name: String,
params: HashMap<String, SerializableVMValue>,
},
FunctionRef {
name: String,
closure: Option<Box<SerializableVMValue>>,
},
DataReference {
datetime: chrono::DateTime<chrono::FixedOffset>,
id: String,
timeframe: Timeframe,
},
Future(u64),
DataTable(BlobRef),
TypedTable {
schema_id: u64,
table: BlobRef,
},
RowView {
schema_id: u64,
table: BlobRef,
row_idx: usize,
},
ColumnRef {
schema_id: u64,
table: BlobRef,
col_id: u32,
},
IndexedTable {
schema_id: u64,
table: BlobRef,
index_col: u32,
},
TypedArray {
element_kind: TypedArrayElementKind,
blob: BlobRef,
len: usize,
},
Matrix {
blob: BlobRef,
rows: u32,
cols: u32,
},
HashMap {
keys: Vec<SerializableVMValue>,
values: Vec<SerializableVMValue>,
},
SidecarRef {
sidecar_id: u32,
blob_kind: BlobKind,
original_hash: HashDigest,
meta_a: u32,
meta_b: u32,
},
HashSet { keys: Vec<String> },
IteratorOpaque,
ResultData {
is_ok: bool,
payload: Box<SerializableVMValue>,
},
OptionData {
is_some: bool,
payload: Option<Box<SerializableVMValue>>,
},
DequeOpaque { len: usize },
ChannelOpaque { closed: bool, len: usize },
PriorityQueueHeap { heap: Vec<i64> },
ReferenceOpaque,
FilterExprOpaque,
SharedCellOpaque,
MutexOpaque { has_value: bool },
AtomicI64 { value: i64 },
LazyOpaque { is_initialized: bool },
Char(char),
BigInt(i64),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnumValueSnapshot {
pub enum_name: String,
pub variant: String,
pub payload: EnumPayloadSnapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EnumPayloadSnapshot {
Unit,
Tuple(Vec<SerializableVMValue>),
Struct(Vec<(String, SerializableVMValue)>),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrintableSnapshot {
pub rendered: String,
pub spans: Vec<PrintSpanSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PrintSpanSnapshot {
Literal {
text: String,
start: usize,
end: usize,
span_id: String,
},
Value {
text: String,
start: usize,
end: usize,
span_id: String,
variable_name: Option<String>,
raw_value: Box<SerializableVMValue>,
type_name: String,
current_format: String,
format_params: HashMap<String, SerializableVMValue>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlobRef {
pub hash: HashDigest,
pub kind: BlobKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TypedArrayElementKind {
I8,
I16,
I32,
I64,
U8,
U16,
U32,
U64,
F32,
F64,
Bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BlobKind {
DataTable,
TypedArray(TypedArrayElementKind),
Matrix,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkedBlob {
pub chunk_hashes: Vec<HashDigest>,
pub total_len: usize,
pub chunk_len: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableDataTable {
pub ipc_chunks: ChunkedBlob,
pub type_name: Option<String>,
pub schema_id: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableDataFrame {
pub id: String,
pub timeframe: Timeframe,
pub timestamps: ChunkedBlob,
pub columns: Vec<SerializableDataFrameColumn>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableDataFrameColumn {
pub name: String,
pub values: ChunkedBlob,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheKeySnapshot {
pub id: String,
pub timeframe: Timeframe,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CachedDataSnapshot {
pub key: CacheKeySnapshot,
pub historical: SerializableDataFrame,
pub current_index: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LiveBufferSnapshot {
pub key: CacheKeySnapshot,
pub rows: ChunkedBlob,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataCacheSnapshot {
pub historical: Vec<CachedDataSnapshot>,
pub live_buffer: Vec<LiveBufferSnapshot>,
}
pub(crate) fn store_chunked_vec<T: Serialize>(
values: &[T],
chunk_len: usize,
store: &SnapshotStore,
) -> Result<ChunkedBlob> {
let chunk_len = chunk_len.max(1);
if values.is_empty() {
return Ok(ChunkedBlob {
chunk_hashes: Vec::new(),
total_len: 0,
chunk_len,
});
}
let mut hashes = Vec::new();
for chunk in values.chunks(chunk_len) {
let bytes = bincode::serialize(chunk)?;
let hash = store.put_blob(&bytes)?;
hashes.push(hash);
}
Ok(ChunkedBlob {
chunk_hashes: hashes,
total_len: values.len(),
chunk_len,
})
}
pub(crate) fn load_chunked_vec<T: DeserializeOwned>(
chunked: &ChunkedBlob,
store: &SnapshotStore,
) -> Result<Vec<T>> {
if chunked.total_len == 0 {
return Ok(Vec::new());
}
let mut out = Vec::with_capacity(chunked.total_len);
for hash in &chunked.chunk_hashes {
let bytes = store.get_blob(hash)?;
let chunk: Vec<T> = bincode::deserialize(&bytes)?;
out.extend(chunk);
}
out.truncate(chunked.total_len);
Ok(out)
}
pub fn store_chunked_bytes(data: &[u8], store: &SnapshotStore) -> Result<ChunkedBlob> {
if data.is_empty() {
return Ok(ChunkedBlob {
chunk_hashes: Vec::new(),
total_len: 0,
chunk_len: BYTE_CHUNK_LEN,
});
}
let mut hashes = Vec::new();
for chunk in data.chunks(BYTE_CHUNK_LEN) {
let hash = store.put_blob(chunk)?;
hashes.push(hash);
}
Ok(ChunkedBlob {
chunk_hashes: hashes,
total_len: data.len(),
chunk_len: BYTE_CHUNK_LEN,
})
}
pub fn load_chunked_bytes(chunked: &ChunkedBlob, store: &SnapshotStore) -> Result<Vec<u8>> {
if chunked.total_len == 0 {
return Ok(Vec::new());
}
let mut out = Vec::with_capacity(chunked.total_len);
for hash in &chunked.chunk_hashes {
let bytes = store.get_blob(hash)?;
out.extend_from_slice(&bytes);
}
out.truncate(chunked.total_len);
Ok(out)
}
fn bytes_as_slice<T: Copy>(bytes: &[u8]) -> &[T] {
let elem_size = std::mem::size_of::<T>();
assert!(
bytes.len() % elem_size == 0,
"byte slice length {} not a multiple of element size {}",
bytes.len(),
elem_size
);
let len = bytes.len() / elem_size;
unsafe { std::slice::from_raw_parts(bytes.as_ptr() as *const T, len) }
}
fn slice_as_bytes<T>(data: &[T]) -> &[u8] {
let byte_len = data.len() * std::mem::size_of::<T>();
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, byte_len) }
}
use shape_value::{HeapKind, KindedSlot, NativeKind, ValueSlot};
use std::sync::Arc;
pub fn slot_to_serializable(
bits: u64,
kind: NativeKind,
_store: &SnapshotStore,
) -> std::result::Result<SerializableVMValue, String> {
use SerializableVMValue as SV;
match kind {
NativeKind::Int64 => Ok(SV::Int(bits as i64)),
NativeKind::Int32 => Ok(SV::Int(bits as i32 as i64)),
NativeKind::Int16 => Ok(SV::Int(bits as i16 as i64)),
NativeKind::Int8 => Ok(SV::Int(bits as i8 as i64)),
NativeKind::UInt64 => Ok(SV::Int(bits as i64)),
NativeKind::UInt32 => Ok(SV::Int((bits as u32) as i64)),
NativeKind::UInt16 => Ok(SV::Int((bits as u16) as i64)),
NativeKind::UInt8 => Ok(SV::Int((bits as u8) as i64)),
NativeKind::IntSize => Ok(SV::Int(bits as isize as i64)),
NativeKind::UIntSize => Ok(SV::Int((bits as usize) as i64)),
NativeKind::Float64 => Ok(SV::Number(f64::from_bits(bits))),
NativeKind::Float32 => Ok(SV::Number(f64::from(f32::from_bits(bits as u32)))),
NativeKind::Char => match char::from_u32(bits as u32) {
Some(c) => Ok(SV::Char(c)),
None => Err(format!(
"slot_to_serializable: NativeKind::Char slot has invalid \
codepoint bits 0x{:x} — construction-side contract violated",
bits,
)),
},
NativeKind::Bool => Ok(SV::Bool(bits != 0)),
NativeKind::Null => Ok(SV::None),
NativeKind::NullableInt64
| NativeKind::NullableInt32
| NativeKind::NullableInt16
| NativeKind::NullableInt8
| NativeKind::NullableUInt64
| NativeKind::NullableUInt32
| NativeKind::NullableUInt16
| NativeKind::NullableUInt8
| NativeKind::NullableIntSize
| NativeKind::NullableUIntSize
| NativeKind::NullableFloat64 => {
Err(format!(
"slot_to_serializable: W17-snapshot-roundtrip surface — \
nullable-scalar kind {kind:?} has no SerializableVMValue \
arm at landing. The post-proof sentinel-rule amendment \
is the W17-snapshot-nullable follow-up. \
ADR-006 §2.7.5.1.",
))
}
NativeKind::String => {
if bits == 0 {
return Err("slot_to_serializable: String slot with null bits".into());
}
unsafe {
let arc = Arc::<String>::from_raw(bits as *const String);
let cloned = (*arc).clone();
let _ = Arc::into_raw(arc); Ok(SV::String(cloned))
}
}
NativeKind::StringV2 => {
if bits == 0 {
return Err("slot_to_serializable: StringV2 slot with null bits".into());
}
let ptr = bits as *const shape_value::v2::string_obj::StringObj;
let s: &str = unsafe { shape_value::v2::string_obj::StringObj::as_str(ptr) };
Ok(SV::String(s.to_string()))
}
NativeKind::DecimalV2 => {
if bits == 0 {
return Err("slot_to_serializable: DecimalV2 slot with null bits".into());
}
let ptr = bits as *const shape_value::v2::decimal_obj::DecimalObj;
let value = unsafe { shape_value::v2::decimal_obj::DecimalObj::value(ptr) };
Ok(SV::Decimal(value))
}
NativeKind::Ptr(heap_kind) => slot_heap_to_serializable(bits, heap_kind),
}
}
fn slot_heap_to_serializable(
bits: u64,
expected_kind: HeapKind,
) -> std::result::Result<SerializableVMValue, String> {
use SerializableVMValue as SV;
use shape_value::heap_value::{
AtomicData, ChannelData, DequeData, HashSetData, LazyData, MutexData,
OptionData, PriorityQueueData, ResultData,
};
if bits == 0 {
return Err(format!(
"slot_to_serializable: Ptr({expected_kind:?}) slot with null bits",
));
}
match expected_kind {
HeapKind::String => {
unsafe {
let arc = Arc::<String>::from_raw(bits as *const String);
let cloned = (*arc).clone();
let _ = Arc::into_raw(arc);
Ok(SV::String(cloned))
}
}
HeapKind::Decimal => unsafe {
let arc = Arc::<rust_decimal::Decimal>::from_raw(bits as *const rust_decimal::Decimal);
let v = *arc;
let _ = Arc::into_raw(arc);
Ok(SV::Decimal(v))
},
HeapKind::BigInt => unsafe {
let arc = Arc::<i64>::from_raw(bits as *const i64);
let v = *arc;
let _ = Arc::into_raw(arc);
Ok(SV::BigInt(v))
},
HeapKind::Char => {
let cp = bits as u32;
match char::from_u32(cp) {
Some(c) => Ok(SV::Char(c)),
None => Err(format!(
"slot_to_serializable: Char arm: invalid codepoint {cp:#x}"
)),
}
}
HeapKind::HashSet => unsafe {
let arc = Arc::<HashSetData>::from_raw(bits as *const HashSetData);
let keys: Vec<String> = arc.keys.iter().map(|k| (**k).clone()).collect();
let _ = Arc::into_raw(arc);
Ok(SV::HashSet { keys })
},
HeapKind::PriorityQueue => unsafe {
let arc = Arc::<PriorityQueueData>::from_raw(bits as *const PriorityQueueData);
let heap: Vec<i64> = (*arc.heap).clone();
let _ = Arc::into_raw(arc);
Ok(SV::PriorityQueueHeap { heap })
},
HeapKind::Atomic => unsafe {
let arc = Arc::<AtomicData>::from_raw(bits as *const AtomicData);
let v = arc.load();
let _ = Arc::into_raw(arc);
Ok(SV::AtomicI64 { value: v })
},
HeapKind::Lazy => unsafe {
let arc = Arc::<LazyData>::from_raw(bits as *const LazyData);
let is_init = arc.is_initialized();
let _ = Arc::into_raw(arc);
Ok(SV::LazyOpaque {
is_initialized: is_init,
})
},
HeapKind::Mutex => unsafe {
let arc = Arc::<MutexData>::from_raw(bits as *const MutexData);
let inner = arc.get();
let has_value =
!(matches!(inner.kind(), NativeKind::Bool) && inner.slot().raw() == 0);
drop(inner);
let _ = Arc::into_raw(arc);
Ok(SV::MutexOpaque { has_value })
},
HeapKind::Channel => unsafe {
let arc = Arc::<ChannelData>::from_raw(bits as *const ChannelData);
let closed = arc.is_closed();
let len = arc.len();
let _ = Arc::into_raw(arc);
Ok(SV::ChannelOpaque { closed, len })
},
HeapKind::Deque => unsafe {
let arc = Arc::<DequeData>::from_raw(bits as *const DequeData);
let len = arc.items.len();
let _ = Arc::into_raw(arc);
Ok(SV::DequeOpaque { len })
},
HeapKind::Result => unsafe {
let arc = Arc::<ResultData>::from_raw(bits as *const ResultData);
let is_ok = arc.is_ok;
let payload_kind = arc.payload.kind();
let payload_bits = arc.payload.slot().raw();
let inner = serializable_inner_kinded(payload_bits, payload_kind)?;
let _ = Arc::into_raw(arc);
Ok(SV::ResultData {
is_ok,
payload: Box::new(inner),
})
},
HeapKind::Option => unsafe {
let arc = Arc::<OptionData>::from_raw(bits as *const OptionData);
let is_some = arc.is_some;
let payload = if is_some {
let payload_kind = arc.payload.kind();
let payload_bits = arc.payload.slot().raw();
Some(Box::new(serializable_inner_kinded(payload_bits, payload_kind)?))
} else {
None
};
let _ = Arc::into_raw(arc);
Ok(SV::OptionData { is_some, payload })
},
HeapKind::Reference => Ok(SV::ReferenceOpaque),
HeapKind::FilterExpr => Ok(SV::FilterExprOpaque),
HeapKind::SharedCell => Ok(SV::SharedCellOpaque),
HeapKind::Iterator => Ok(SV::IteratorOpaque),
HeapKind::Future => Ok(SV::Future(bits)),
other => Err(format!(
"slot_to_serializable: W17-snapshot-roundtrip surface — \
HeapKind::{other:?} arm has no in-session SerializableVMValue \
projection. Tracked as W17-snapshot-{other:?} follow-up per \
docs/cluster-audits/phase-2d-playbook.md §3. \
ADR-006 §2.7.5.1.",
)),
}
}
fn serializable_inner_kinded(
bits: u64,
kind: NativeKind,
) -> std::result::Result<SerializableVMValue, String> {
if matches!(kind, NativeKind::Bool) && bits == 0 {
return Ok(SerializableVMValue::Unit);
}
match kind {
NativeKind::Int64 => Ok(SerializableVMValue::Int(bits as i64)),
NativeKind::Float64 => Ok(SerializableVMValue::Number(f64::from_bits(bits))),
NativeKind::Bool => Ok(SerializableVMValue::Bool(bits != 0)),
NativeKind::String => {
if bits == 0 {
return Ok(SerializableVMValue::None);
}
unsafe {
let arc = Arc::<String>::from_raw(bits as *const String);
let cloned = (*arc).clone();
let _ = Arc::into_raw(arc);
Ok(SerializableVMValue::String(cloned))
}
}
_ => Err(format!(
"serializable_inner_kinded: W17-snapshot-roundtrip surface — \
inner Result/Option payload kind {kind:?} is not in the \
initial scalar set; deep payload arms land in follow-up. \
ADR-006 §2.7.5.1.",
)),
}
}
pub fn serializable_to_slot(
sv: &SerializableVMValue,
expected_kind: NativeKind,
_store: &SnapshotStore,
) -> std::result::Result<(u64, NativeKind), String> {
use SerializableVMValue as SV;
match (sv, expected_kind) {
(SV::Int(i), NativeKind::Int64) => Ok((*i as u64, NativeKind::Int64)),
(SV::Int(i), NativeKind::Int32) => Ok(((*i as i32) as u64, NativeKind::Int32)),
(SV::Int(i), NativeKind::Int16) => Ok(((*i as i16 as i32) as u64, NativeKind::Int16)),
(SV::Int(i), NativeKind::Int8) => Ok(((*i as i8 as i32) as u64, NativeKind::Int8)),
(SV::Int(i), NativeKind::UInt64) => Ok((*i as u64, NativeKind::UInt64)),
(SV::Int(i), NativeKind::UInt32) => Ok(((*i as u32) as u64, NativeKind::UInt32)),
(SV::Int(i), NativeKind::UInt16) => Ok(((*i as u16) as u64, NativeKind::UInt16)),
(SV::Int(i), NativeKind::UInt8) => Ok(((*i as u8) as u64, NativeKind::UInt8)),
(SV::Int(i), NativeKind::IntSize) => Ok((*i as isize as u64, NativeKind::IntSize)),
(SV::Int(i), NativeKind::UIntSize) => Ok((*i as u64, NativeKind::UIntSize)),
(SV::Number(f), NativeKind::Float64) => Ok((f.to_bits(), NativeKind::Float64)),
(SV::Bool(b), NativeKind::Bool) => Ok((if *b { 1 } else { 0 }, NativeKind::Bool)),
(SV::String(s), NativeKind::String) => {
let arc = Arc::new(s.clone());
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::String))
}
(SV::None | SV::Unit, NativeKind::Bool) => Ok((0, NativeKind::Bool)),
(sv, NativeKind::Ptr(hk)) => serializable_to_heap_slot(sv, hk),
(other_sv, other_kind) => Err(format!(
"serializable_to_slot: W17-snapshot-roundtrip surface — \
SerializableVMValue arm {} cannot satisfy expected kind \
{other_kind:?}. Discriminator-vs-kind mismatch is a structured \
error, not a Bool-default fallback (§2.7.5.1 forbidden). \
ADR-006 §2.7.5.1.",
serializable_arm_name(other_sv),
)),
}
}
fn serializable_to_heap_slot(
sv: &SerializableVMValue,
heap_kind: HeapKind,
) -> std::result::Result<(u64, NativeKind), String> {
use SerializableVMValue as SV;
use shape_value::heap_value::{
AtomicData, HashSetData, OptionData, PriorityQueueData, ResultData,
};
match (sv, heap_kind) {
(SV::String(s), HeapKind::String) => {
let arc = Arc::new(s.clone());
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::String)))
}
(SV::Char(c), HeapKind::Char) => {
let bits = (*c as u32) as u64;
Ok((bits, NativeKind::Ptr(HeapKind::Char)))
}
(SV::BigInt(n), HeapKind::BigInt) => {
let arc = Arc::new(*n);
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::BigInt)))
}
(SV::Decimal(d), HeapKind::Decimal) => {
let arc = Arc::new(*d);
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::Decimal)))
}
(SV::HashSet { keys }, HeapKind::HashSet) => {
let arcs: Vec<Arc<String>> = keys.iter().map(|k| Arc::new(k.clone())).collect();
let data = HashSetData::from_keys(arcs);
let arc = Arc::new(data);
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::HashSet)))
}
(SV::PriorityQueueHeap { heap }, HeapKind::PriorityQueue) => {
let mut pq = PriorityQueueData::new();
for &v in heap {
pq.push(v);
}
let arc = Arc::new(pq);
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::PriorityQueue)))
}
(SV::AtomicI64 { value }, HeapKind::Atomic) => {
let arc = Arc::new(AtomicData::new(*value));
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::Atomic)))
}
(SV::ResultData { is_ok, payload }, HeapKind::Result) => {
let inner_slot = inner_kinded_from_serializable(payload)?;
let data = if *is_ok {
ResultData::ok(inner_slot)
} else {
ResultData::err(inner_slot)
};
let arc = Arc::new(data);
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::Result)))
}
(SV::OptionData { is_some, payload }, HeapKind::Option) => {
let data = if *is_some {
match payload {
Some(p) => OptionData::some(inner_kinded_from_serializable(p)?),
None => return Err(
"serializable_to_slot: OptionData is_some=true but payload=None — \
malformed wire shape; expected Some(SerializableVMValue) for \
is_some=true. ADR-006 §2.7.5.1."
.to_string(),
),
}
} else {
OptionData::none()
};
let arc = Arc::new(data);
let raw = Arc::into_raw(arc) as u64;
Ok((raw, NativeKind::Ptr(HeapKind::Option)))
}
(SV::IteratorOpaque, HeapKind::Iterator)
| (SV::DequeOpaque { .. }, HeapKind::Deque)
| (SV::ChannelOpaque { .. }, HeapKind::Channel)
| (SV::ReferenceOpaque, HeapKind::Reference)
| (SV::FilterExprOpaque, HeapKind::FilterExpr)
| (SV::SharedCellOpaque, HeapKind::SharedCell)
| (SV::MutexOpaque { .. }, HeapKind::Mutex)
| (SV::LazyOpaque { .. }, HeapKind::Lazy) => Err(format!(
"serializable_to_slot: W17-snapshot-roundtrip surface — \
{heap_kind:?} arm restored from opaque wire shape; \
deep payload reconstruction is the W17-snapshot-{:?} \
follow-up. ADR-006 §2.7.5.1.",
heap_kind,
)),
(other_sv, hk) => Err(format!(
"serializable_to_slot: W17-snapshot-roundtrip surface — \
SerializableVMValue arm {} cannot satisfy expected heap kind \
Ptr({hk:?}). Either the wire-format arm has no inverse \
projection (deep follow-up) or the discriminator is \
mismatched. ADR-006 §2.7.5.1.",
serializable_arm_name(other_sv),
)),
}
}
fn inner_kinded_from_serializable(
sv: &SerializableVMValue,
) -> std::result::Result<KindedSlot, String> {
use SerializableVMValue as SV;
match sv {
SV::Int(i) => Ok(KindedSlot::new(
ValueSlot::from_raw(*i as u64),
NativeKind::Int64,
)),
SV::Number(f) => Ok(KindedSlot::new(
ValueSlot::from_raw(f.to_bits()),
NativeKind::Float64,
)),
SV::Bool(b) => Ok(KindedSlot::new(
ValueSlot::from_raw(if *b { 1 } else { 0 }),
NativeKind::Bool,
)),
SV::String(s) => Ok(KindedSlot::from_string_arc(Arc::new(s.clone()))),
SV::Unit | SV::None => Ok(KindedSlot::new(
ValueSlot::from_raw(0),
NativeKind::Bool,
)),
other => Err(format!(
"inner_kinded_from_serializable: W17-snapshot-roundtrip surface — \
SerializableVMValue arm {} has no in-session inner-payload \
projection. Tracked as follow-up. ADR-006 §2.7.5.1.",
serializable_arm_name(other),
)),
}
}
fn serializable_arm_name(sv: &SerializableVMValue) -> &'static str {
use SerializableVMValue as SV;
match sv {
SV::Int(_) => "Int",
SV::Number(_) => "Number",
SV::Decimal(_) => "Decimal",
SV::String(_) => "String",
SV::Bool(_) => "Bool",
SV::None => "None",
SV::Some(_) => "Some",
SV::Unit => "Unit",
SV::Timeframe(_) => "Timeframe",
SV::Duration(_) => "Duration",
SV::Time(_) => "Time",
SV::TimeSpan(_) => "TimeSpan",
SV::TimeReference(_) => "TimeReference",
SV::DateTimeExpr(_) => "DateTimeExpr",
SV::DataDateTimeRef(_) => "DataDateTimeRef",
SV::Array(_) => "Array",
SV::Function(_) => "Function",
SV::TypeAnnotation(_) => "TypeAnnotation",
SV::TypeAnnotatedValue { .. } => "TypeAnnotatedValue",
SV::Enum(_) => "Enum",
SV::Closure { .. } => "Closure",
SV::ModuleFunction(_) => "ModuleFunction",
SV::TypedObject { .. } => "TypedObject",
SV::Range { .. } => "Range",
SV::Ok(_) => "Ok",
SV::Err(_) => "Err",
SV::PrintResult(_) => "PrintResult",
SV::SimulationCall { .. } => "SimulationCall",
SV::FunctionRef { .. } => "FunctionRef",
SV::DataReference { .. } => "DataReference",
SV::Future(_) => "Future",
SV::DataTable(_) => "DataTable",
SV::TypedTable { .. } => "TypedTable",
SV::RowView { .. } => "RowView",
SV::ColumnRef { .. } => "ColumnRef",
SV::IndexedTable { .. } => "IndexedTable",
SV::TypedArray { .. } => "TypedArray",
SV::Matrix { .. } => "Matrix",
SV::HashMap { .. } => "HashMap",
SV::SidecarRef { .. } => "SidecarRef",
SV::HashSet { .. } => "HashSet",
SV::IteratorOpaque => "IteratorOpaque",
SV::ResultData { .. } => "ResultData",
SV::OptionData { .. } => "OptionData",
SV::DequeOpaque { .. } => "DequeOpaque",
SV::ChannelOpaque { .. } => "ChannelOpaque",
SV::PriorityQueueHeap { .. } => "PriorityQueueHeap",
SV::ReferenceOpaque => "ReferenceOpaque",
SV::FilterExprOpaque => "FilterExprOpaque",
SV::SharedCellOpaque => "SharedCellOpaque",
SV::MutexOpaque { .. } => "MutexOpaque",
SV::AtomicI64 { .. } => "AtomicI64",
SV::LazyOpaque { .. } => "LazyOpaque",
SV::Char(_) => "Char",
SV::BigInt(_) => "BigInt",
}
}
fn serialize_datatable(dt: &DataTable, store: &SnapshotStore) -> Result<SerializableDataTable> {
let mut buf = Vec::new();
let schema = dt.inner().schema();
let mut writer = arrow_ipc::writer::FileWriter::try_new(&mut buf, schema.as_ref())?;
writer.write(dt.inner())?;
writer.finish()?;
let ipc_chunks = store_chunked_vec(&buf, BYTE_CHUNK_LEN, store)?;
Ok(SerializableDataTable {
ipc_chunks,
type_name: dt.type_name().map(|s| s.to_string()),
schema_id: dt.schema_id(),
})
}
fn deserialize_datatable(
serialized: SerializableDataTable,
store: &SnapshotStore,
) -> Result<DataTable> {
let bytes = load_chunked_vec(&serialized.ipc_chunks, store)?;
let cursor = std::io::Cursor::new(bytes);
let mut reader = arrow_ipc::reader::FileReader::try_new(cursor, None)?;
let batch = reader
.next()
.transpose()?
.context("no RecordBatch in DataTable snapshot")?;
let mut dt = DataTable::new(batch);
if let Some(name) = serialized.type_name {
dt = DataTable::with_type_name(dt.into_inner(), name);
}
if let Some(schema_id) = serialized.schema_id {
dt = dt.with_schema_id(schema_id);
}
Ok(dt)
}