Skip to main content

shape_vm/
remote.rs

1//! Per-function remote execution support
2//!
3//! This module provides the types and executor for transferring function
4//! execution to another machine. The design sends the full compiled
5//! `BytecodeProgram` + a "call this function with these args" message,
6//! running it on a full Shape VM on the remote side.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Layer 4: @remote / @distributed annotations    (Shape stdlib — user-defined policy)
12//! Layer 3: RemoteCallRequest/Response            (this module)
13//! Layer 2: shape-wire codec (MessagePack)        (encode_message / decode_message)
14//! Layer 1: Transport (TCP/QUIC/Unix socket)      (user-provided, pluggable)
15//! ```
16//!
17//! Layer 0 (the foundation): Full Shape VM on both sides, same `BytecodeProgram`,
18//! same `Executor`.
19//!
20//! # Closure semantics
21//!
22//! `Arc<RwLock<ValueWord>>` upvalues become **value copies** on serialization.
23//! If the remote side mutates a captured variable, the sender doesn't see it.
24//! This is the correct semantic for distributed computing — a **send-copy** model.
25
26use serde::{Deserialize, Serialize};
27use shape_runtime::snapshot::{
28    SerializableVMValue, SnapshotStore, nanboxed_to_serializable, serializable_to_nanboxed,
29};
30use shape_runtime::type_schema::TypeSchemaRegistry;
31use shape_value::ValueWord;
32
33use crate::bytecode::{BytecodeProgram, FunctionBlob, FunctionHash, Program};
34use crate::executor::{VMConfig, VirtualMachine};
35
36/// Request to execute a function on a remote VM.
37///
38/// Contains everything needed to call a function: the full compiled program
39/// (cacheable by `program_hash`), function identity, serialized arguments,
40/// and optional closure captures.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct RemoteCallRequest {
43    /// The full compiled program. After the first transfer, the remote
44    /// side caches by `program_hash` and subsequent calls only need args.
45    pub program: BytecodeProgram,
46
47    /// Function to call by name (for named functions).
48    pub function_name: String,
49
50    /// Function to call by ID (for closures that have no user-facing name).
51    /// Takes precedence over `function_name` when `Some`.
52    pub function_id: Option<u16>,
53
54    /// Function to call by content hash (canonical identity).
55    ///
56    /// Preferred over name-based lookup when present. This avoids ambiguity
57    /// when multiple modules define functions with the same name.
58    #[serde(default)]
59    pub function_hash: Option<FunctionHash>,
60
61    /// Serialized arguments to the function.
62    pub arguments: Vec<SerializableVMValue>,
63
64    /// Closure upvalues, if calling a closure. These are value-copied from
65    /// the sender's `Arc<RwLock<ValueWord>>` upvalue slots.
66    pub upvalues: Option<Vec<SerializableVMValue>>,
67
68    /// Type schema registry — sent separately because `BytecodeProgram`
69    /// has `#[serde(skip)]` on its registry (it's populated at compile time).
70    pub type_schemas: TypeSchemaRegistry,
71
72    /// Content hash of the program for caching. If the remote side has
73    /// already seen this hash, it can skip deserializing the program.
74    pub program_hash: [u8; 32],
75
76    /// Minimal content-addressed blobs for the called function and its
77    /// transitive dependencies. When present, the callee can reconstruct
78    /// a `Program` from these blobs instead of deserializing the full
79    /// `BytecodeProgram`, dramatically reducing payload size.
80    #[serde(default)]
81    pub function_blobs: Option<Vec<(FunctionHash, FunctionBlob)>>,
82}
83
84/// Response from a remote function execution.
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct RemoteCallResponse {
87    /// The function's return value, or an error message.
88    pub result: Result<SerializableVMValue, RemoteCallError>,
89}
90
91/// Error from remote execution.
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct RemoteCallError {
94    /// Human-readable error message.
95    pub message: String,
96    /// Optional error kind for programmatic handling.
97    pub kind: RemoteErrorKind,
98}
99
100/// Classification of remote execution errors.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum RemoteErrorKind {
103    /// Function not found in the program.
104    FunctionNotFound,
105    /// Argument deserialization failed.
106    ArgumentError,
107    /// Runtime error during execution.
108    RuntimeError,
109    /// Module function required on the remote side is missing.
110    MissingModuleFunction,
111}
112
113impl std::fmt::Display for RemoteCallError {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        write!(f, "{:?}: {}", self.kind, self.message)
116    }
117}
118
119impl std::error::Error for RemoteCallError {}
120
121// ---------------------------------------------------------------------------
122// Wire message envelope (Phase 2: blob negotiation)
123// ---------------------------------------------------------------------------
124
125/// Envelope for all wire protocol messages.
126///
127/// Wraps the existing `RemoteCallRequest`/`RemoteCallResponse` with negotiation
128/// and sidecar message types for bandwidth optimization on persistent connections.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub enum WireMessage {
131    /// Offer function blob hashes to check what the remote already has.
132    BlobNegotiation(BlobNegotiationRequest),
133    /// Reply with the subset of offered hashes that are already cached.
134    BlobNegotiationReply(BlobNegotiationResponse),
135    /// A remote function call (may have blobs stripped if negotiation occurred).
136    Call(RemoteCallRequest),
137    /// Response to a remote function call.
138    CallResponse(RemoteCallResponse),
139    /// A large blob sent as a separate message before the call (Phase 3).
140    Sidecar(BlobSidecar),
141}
142
143/// Request to check which function blobs the remote side already has cached.
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct BlobNegotiationRequest {
146    /// Content hashes of function blobs the caller wants to send.
147    pub offered_hashes: Vec<FunctionHash>,
148}
149
150/// Response indicating which offered blobs are already cached on the remote side.
151#[derive(Debug, Clone, Serialize, Deserialize)]
152pub struct BlobNegotiationResponse {
153    /// Subset of offered hashes that the remote already has in its blob cache.
154    pub known_hashes: Vec<FunctionHash>,
155}
156
157/// A large binary payload sent as a separate message before the call request.
158///
159/// Used for splitting large BlobRef-backed values (DataTables, TypedArrays, etc.)
160/// out of the main serialized payload.
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct BlobSidecar {
163    pub sidecar_id: u32,
164    pub data: Vec<u8>,
165}
166
167// ---------------------------------------------------------------------------
168// Per-connection blob cache (Phase 2)
169// ---------------------------------------------------------------------------
170
171/// Per-connection cache of function blobs received from a remote peer.
172///
173/// Content hashes make stale entries harmless (same hash = same content),
174/// so no invalidation protocol is needed. LRU eviction bounds memory usage.
175pub struct RemoteBlobCache {
176    blobs: std::collections::HashMap<FunctionHash, FunctionBlob>,
177    /// Access order for LRU eviction (most recently used at the end).
178    order: Vec<FunctionHash>,
179    /// Maximum number of entries before LRU eviction kicks in.
180    max_entries: usize,
181}
182
183impl RemoteBlobCache {
184    /// Create a new blob cache with the given capacity.
185    pub fn new(max_entries: usize) -> Self {
186        Self {
187            blobs: std::collections::HashMap::new(),
188            order: Vec::new(),
189            max_entries,
190        }
191    }
192
193    /// Default cache with 4096 entry capacity.
194    pub fn default_cache() -> Self {
195        Self::new(4096)
196    }
197
198    /// Insert a blob, evicting the least recently used entry if at capacity.
199    pub fn insert(&mut self, hash: FunctionHash, blob: FunctionBlob) {
200        if self.blobs.contains_key(&hash) {
201            // Move to end (most recently used)
202            self.order.retain(|h| h != &hash);
203            self.order.push(hash);
204            return;
205        }
206
207        // Evict LRU if at capacity
208        while self.blobs.len() >= self.max_entries && !self.order.is_empty() {
209            let evicted = self.order.remove(0);
210            self.blobs.remove(&evicted);
211        }
212
213        self.blobs.insert(hash, blob);
214        self.order.push(hash);
215    }
216
217    /// Look up a cached blob by hash, updating access order.
218    pub fn get(&mut self, hash: &FunctionHash) -> Option<&FunctionBlob> {
219        if self.blobs.contains_key(hash) {
220            self.order.retain(|h| h != hash);
221            self.order.push(*hash);
222            self.blobs.get(hash)
223        } else {
224            None
225        }
226    }
227
228    /// Check if a hash is cached without updating access order.
229    pub fn contains(&self, hash: &FunctionHash) -> bool {
230        self.blobs.contains_key(hash)
231    }
232
233    /// Return all cached hashes.
234    pub fn known_hashes(&self) -> Vec<FunctionHash> {
235        self.blobs.keys().copied().collect()
236    }
237
238    /// Return the subset of `offered` hashes that are in the cache.
239    pub fn filter_known(&self, offered: &[FunctionHash]) -> Vec<FunctionHash> {
240        offered
241            .iter()
242            .filter(|h| self.blobs.contains_key(h))
243            .copied()
244            .collect()
245    }
246
247    /// Number of cached entries.
248    pub fn len(&self) -> usize {
249        self.blobs.len()
250    }
251
252    /// Whether the cache is empty.
253    pub fn is_empty(&self) -> bool {
254        self.blobs.is_empty()
255    }
256
257    /// Insert all blobs from a set, typically received from a remote call.
258    pub fn insert_blobs(&mut self, blobs: &[(FunctionHash, FunctionBlob)]) {
259        for (hash, blob) in blobs {
260            self.insert(*hash, blob.clone());
261        }
262    }
263}
264
265/// Build a minimal set of function blobs for a function hash and its
266/// transitive dependencies from a content-addressed `Program`.
267///
268/// Returns `None` if the program has no content-addressed representation
269/// or the entry hash is not present in the function store.
270pub fn build_minimal_blobs_by_hash(
271    program: &BytecodeProgram,
272    entry_hash: FunctionHash,
273) -> Option<Vec<(FunctionHash, FunctionBlob)>> {
274    let ca = program.content_addressed.as_ref()?;
275    if !ca.function_store.contains_key(&entry_hash) {
276        return None;
277    }
278
279    // Compute transitive closure of dependencies
280    let mut needed: std::collections::HashSet<FunctionHash> = std::collections::HashSet::new();
281    let mut queue = vec![entry_hash];
282    while let Some(hash) = queue.pop() {
283        if needed.insert(hash) {
284            if let Some(blob) = ca.function_store.get(&hash) {
285                for dep in &blob.dependencies {
286                    if !needed.contains(dep) {
287                        queue.push(*dep);
288                    }
289                }
290            }
291        }
292    }
293
294    // Collect the minimal blob set
295    let blobs: Vec<(FunctionHash, FunctionBlob)> = needed
296        .into_iter()
297        .filter_map(|hash| {
298            ca.function_store
299                .get(&hash)
300                .map(|blob| (hash, blob.clone()))
301        })
302        .collect();
303
304    Some(blobs)
305}
306
307/// Backwards-compatible name-based wrapper around `build_minimal_blobs_by_hash`.
308///
309/// If multiple blobs share the same name, this returns `None` to avoid
310/// ambiguous, potentially incorrect dependency selection.
311pub fn build_minimal_blobs(
312    program: &BytecodeProgram,
313    fn_name: &str,
314) -> Option<Vec<(FunctionHash, FunctionBlob)>> {
315    let ca = program.content_addressed.as_ref()?;
316    let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
317        if blob.name == fn_name {
318            Some(*hash)
319        } else {
320            None
321        }
322    });
323    let first = matches.next()?;
324    if matches.next().is_some() {
325        return None;
326    }
327    build_minimal_blobs_by_hash(program, first)
328}
329
330/// Build a minimal `Program` from function blobs and an explicit entry hash.
331///
332/// Used on the callee side to reconstruct a `Program` from blobs received in
333/// a `RemoteCallRequest`.
334pub fn program_from_blobs_by_hash(
335    blobs: Vec<(FunctionHash, FunctionBlob)>,
336    entry_hash: FunctionHash,
337    source: &BytecodeProgram,
338) -> Option<Program> {
339    let function_store: std::collections::HashMap<FunctionHash, FunctionBlob> =
340        blobs.into_iter().collect();
341    if !function_store.contains_key(&entry_hash) {
342        return None;
343    }
344
345    Some(Program {
346        entry: entry_hash,
347        function_store,
348        top_level_locals_count: source.top_level_locals_count,
349        top_level_local_storage_hints: source.top_level_local_storage_hints.clone(),
350        module_binding_names: source.module_binding_names.clone(),
351        module_binding_storage_hints: source.module_binding_storage_hints.clone(),
352        function_local_storage_hints: source.function_local_storage_hints.clone(),
353        top_level_frame: source.top_level_frame.clone(),
354        data_schema: source.data_schema.clone(),
355        type_schema_registry: source.type_schema_registry.clone(),
356        trait_method_symbols: source.trait_method_symbols.clone(),
357        foreign_functions: source.foreign_functions.clone(),
358        native_struct_layouts: source.native_struct_layouts.clone(),
359        debug_info: source.debug_info.clone(),
360    })
361}
362
363/// Backwards-compatible name-based wrapper around `program_from_blobs_by_hash`.
364pub fn program_from_blobs(
365    blobs: Vec<(FunctionHash, FunctionBlob)>,
366    fn_name: &str,
367    source: &BytecodeProgram,
368) -> Option<Program> {
369    let mut matches = blobs.iter().filter_map(|(hash, blob)| {
370        if blob.name == fn_name {
371            Some(*hash)
372        } else {
373            None
374        }
375    });
376    let entry = matches.next()?;
377    if matches.next().is_some() {
378        return None;
379    }
380    program_from_blobs_by_hash(blobs, entry, source)
381}
382
383/// Execute a remote call request on this machine.
384///
385/// This is the entry point for the receiving side. It:
386/// 1. Reconstructs the `BytecodeProgram` and populates its `TypeSchemaRegistry`
387/// 2. Creates a full `VirtualMachine` with the program
388/// 3. Converts serialized arguments back to `ValueWord`
389/// 4. Calls the function by name or ID
390/// 5. Converts the result back to `SerializableVMValue`
391///
392/// The `store` is used for `SerializableVMValue` ↔ `ValueWord` conversion
393/// (needed for `BlobRef`-backed values like DataTable).
394pub fn execute_remote_call(
395    request: RemoteCallRequest,
396    store: &SnapshotStore,
397) -> RemoteCallResponse {
398    match execute_inner(request, store) {
399        Ok(value) => RemoteCallResponse { result: Ok(value) },
400        Err(err) => RemoteCallResponse { result: Err(err) },
401    }
402}
403
404fn execute_inner(
405    request: RemoteCallRequest,
406    store: &SnapshotStore,
407) -> Result<SerializableVMValue, RemoteCallError> {
408    // 1. Reconstruct program with type schemas.
409    // Prefer content-addressed blobs when present — they carry only the
410    // transitive closure of the called function, so deserialization is cheaper.
411    let mut program = if let Some(blobs) = request.function_blobs {
412        let entry_hash = request
413            .function_hash
414            .or_else(|| {
415                if let Some(fid) = request.function_id {
416                    request
417                        .program
418                        .function_blob_hashes
419                        .get(fid as usize)
420                        .copied()
421                        .flatten()
422                } else {
423                    None
424                }
425            })
426            .or_else(|| {
427                // Legacy fallback by unique function name.
428                request.program.content_addressed.as_ref().and_then(|ca| {
429                    let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
430                        if blob.name == request.function_name {
431                            Some(*hash)
432                        } else {
433                            None
434                        }
435                    });
436                    let first = matches.next()?;
437                    if matches.next().is_some() {
438                        None
439                    } else {
440                        Some(first)
441                    }
442                })
443            })
444            .ok_or_else(|| RemoteCallError {
445                message: format!(
446                    "Could not resolve entry hash for remote function '{}'",
447                    request.function_name
448                ),
449                kind: RemoteErrorKind::FunctionNotFound,
450            })?;
451
452        // Reconstruct a content-addressed Program from the minimal blobs,
453        // then link it into a BytecodeProgram the VM can execute.
454        let ca_program = program_from_blobs_by_hash(blobs, entry_hash, &request.program)
455            .ok_or_else(|| RemoteCallError {
456                message: format!(
457                    "Could not reconstruct program from blobs for '{}'",
458                    request.function_name
459                ),
460                kind: RemoteErrorKind::FunctionNotFound,
461            })?;
462        // Link the content-addressed program into a flat BytecodeProgram
463        let linked = crate::linker::link(&ca_program).map_err(|e| RemoteCallError {
464            message: format!("Linker error: {}", e),
465            kind: RemoteErrorKind::RuntimeError,
466        })?;
467        // Convert LinkedProgram to BytecodeProgram for the existing VM path
468        crate::linker::linked_to_bytecode_program(&linked)
469    } else {
470        request.program
471    };
472    program.type_schema_registry = request.type_schemas;
473
474    // 2. Convert arguments from serializable form
475    let args: Vec<ValueWord> = request
476        .arguments
477        .iter()
478        .map(|sv| {
479            serializable_to_nanboxed(sv, store).map_err(|e| RemoteCallError {
480                message: format!("Failed to deserialize argument: {}", e),
481                kind: RemoteErrorKind::ArgumentError,
482            })
483        })
484        .collect::<Result<Vec<_>, _>>()?;
485
486    // 3. Create VM and load program
487    let mut vm = VirtualMachine::new(VMConfig::default());
488    vm.load_program(program);
489    vm.populate_module_objects();
490
491    // 4. Execute — closure or named function
492    let result = if let Some(ref upvalue_data) = request.upvalues {
493        // Closure call: reconstruct upvalues as Upvalue structs
494        let upvalues: Vec<shape_value::Upvalue> = upvalue_data
495            .iter()
496            .map(|sv| {
497                let nb = serializable_to_nanboxed(sv, store).map_err(|e| RemoteCallError {
498                    message: format!("Failed to deserialize upvalue: {}", e),
499                    kind: RemoteErrorKind::ArgumentError,
500                })?;
501                Ok(shape_value::Upvalue::new(nb))
502            })
503            .collect::<Result<Vec<_>, RemoteCallError>>()?;
504
505        let function_id = request.function_id.ok_or_else(|| RemoteCallError {
506            message: "Closure call requires function_id".to_string(),
507            kind: RemoteErrorKind::FunctionNotFound,
508        })?;
509
510        vm.execute_closure(function_id, upvalues, args, None)
511            .map_err(|e| RemoteCallError {
512                message: e.to_string(),
513                kind: RemoteErrorKind::RuntimeError,
514            })?
515    } else if let Some(func_id) = request.function_id {
516        // Call by function ID
517        vm.execute_function_by_id(func_id, args, None)
518            .map_err(|e| RemoteCallError {
519                message: e.to_string(),
520                kind: RemoteErrorKind::RuntimeError,
521            })?
522    } else if let Some(hash) = request.function_hash {
523        // Hash-first call path.
524        let func_id = vm
525            .program()
526            .function_blob_hashes
527            .iter()
528            .enumerate()
529            .find_map(|(idx, maybe_hash)| {
530                if maybe_hash == &Some(hash) {
531                    Some(idx as u16)
532                } else {
533                    None
534                }
535            })
536            .ok_or_else(|| RemoteCallError {
537                message: format!("Function hash not found in program: {}", hash),
538                kind: RemoteErrorKind::FunctionNotFound,
539            })?;
540        vm.execute_function_by_id(func_id, args, None)
541            .map_err(|e| RemoteCallError {
542                message: e.to_string(),
543                kind: RemoteErrorKind::RuntimeError,
544            })?
545    } else {
546        // Call by name
547        vm.execute_function_by_name(&request.function_name, args, None)
548            .map_err(|e| RemoteCallError {
549                message: e.to_string(),
550                kind: RemoteErrorKind::RuntimeError,
551            })?
552    };
553
554    // 5. Serialize result
555    nanboxed_to_serializable(&result, store).map_err(|e| RemoteCallError {
556        message: format!("Failed to serialize result: {}", e),
557        kind: RemoteErrorKind::RuntimeError,
558    })
559}
560
561/// Compute a SHA-256 hash of a `BytecodeProgram` for caching.
562///
563/// Remote VMs can cache programs by this hash, avoiding re-transfer
564/// of the same program on repeated calls.
565pub fn program_hash(program: &BytecodeProgram) -> [u8; 32] {
566    use sha2::{Digest, Sha256};
567    let bytes =
568        rmp_serde::to_vec_named(program).expect("BytecodeProgram serialization should not fail");
569    let hash = Sha256::digest(&bytes);
570    let mut out = [0u8; 32];
571    out.copy_from_slice(&hash);
572    out
573}
574
575/// Create a minimal stub program containing only metadata (no instructions/constants/functions).
576///
577/// Used by `build_call_request` and `build_closure_call_request` when content-addressed
578/// blobs are available, to reduce payload size.
579fn create_stub_program(program: &BytecodeProgram) -> BytecodeProgram {
580    let mut stub = BytecodeProgram::default();
581    stub.type_schema_registry = program.type_schema_registry.clone();
582    // Carry enough content-addressed metadata for program_from_blobs()
583    if let Some(ref ca) = program.content_addressed {
584        stub.content_addressed = Some(Program {
585            entry: ca.entry,
586            function_store: std::collections::HashMap::new(),
587            top_level_locals_count: ca.top_level_locals_count,
588            top_level_local_storage_hints: ca.top_level_local_storage_hints.clone(),
589            module_binding_names: ca.module_binding_names.clone(),
590            module_binding_storage_hints: ca.module_binding_storage_hints.clone(),
591            function_local_storage_hints: ca.function_local_storage_hints.clone(),
592            top_level_frame: ca.top_level_frame.clone(),
593            data_schema: ca.data_schema.clone(),
594            type_schema_registry: ca.type_schema_registry.clone(),
595            trait_method_symbols: ca.trait_method_symbols.clone(),
596            foreign_functions: ca.foreign_functions.clone(),
597            native_struct_layouts: ca.native_struct_layouts.clone(),
598            debug_info: ca.debug_info.clone(),
599        });
600    }
601    // Copy top-level metadata needed by program_from_blobs
602    stub.top_level_locals_count = program.top_level_locals_count;
603    stub.top_level_local_storage_hints = program.top_level_local_storage_hints.clone();
604    stub.module_binding_names = program.module_binding_names.clone();
605    stub.module_binding_storage_hints = program.module_binding_storage_hints.clone();
606    stub.function_local_storage_hints = program.function_local_storage_hints.clone();
607    stub.data_schema = program.data_schema.clone();
608    stub.trait_method_symbols = program.trait_method_symbols.clone();
609    stub.foreign_functions = program.foreign_functions.clone();
610    stub.native_struct_layouts = program.native_struct_layouts.clone();
611    stub.debug_info = program.debug_info.clone();
612    stub.function_blob_hashes = program.function_blob_hashes.clone();
613    stub
614}
615
616/// Build a `RemoteCallRequest` for a named function.
617///
618/// Convenience function that handles program hashing and type schema extraction.
619/// When the program has content-addressed blobs, automatically computes the
620/// minimal transitive closure and attaches it to the request.
621pub fn build_call_request(
622    program: &BytecodeProgram,
623    function_name: &str,
624    arguments: Vec<SerializableVMValue>,
625) -> RemoteCallRequest {
626    let hash = program_hash(program);
627    let function_id = program
628        .functions
629        .iter()
630        .position(|f| f.name == function_name)
631        .map(|id| id as u16);
632    let function_hash = function_id
633        .and_then(|fid| {
634            program
635                .function_blob_hashes
636                .get(fid as usize)
637                .copied()
638                .flatten()
639        })
640        .or_else(|| {
641            program.content_addressed.as_ref().and_then(|ca| {
642                let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
643                    if blob.name == function_name {
644                        Some(*hash)
645                    } else {
646                        None
647                    }
648                });
649                let first = matches.next()?;
650                if matches.next().is_some() {
651                    None
652                } else {
653                    Some(first)
654                }
655            })
656        });
657    let blobs = function_hash.and_then(|h| build_minimal_blobs_by_hash(program, h));
658
659    // When content-addressed blobs are available, send a minimal stub program
660    // instead of the full BytecodeProgram to reduce payload size.
661    let request_program = if blobs.is_some() {
662        create_stub_program(program)
663    } else {
664        program.clone()
665    };
666
667    RemoteCallRequest {
668        program: request_program,
669        function_name: function_name.to_string(),
670        function_id,
671        function_hash,
672        arguments,
673        upvalues: None,
674        type_schemas: program.type_schema_registry.clone(),
675        program_hash: hash,
676        function_blobs: blobs,
677    }
678}
679
680/// Build a `RemoteCallRequest` for a closure.
681///
682/// Serializes the closure's captured upvalues alongside the function call.
683/// When the closure's function has a matching content-addressed blob, sends
684/// the minimal blob set instead of the full program.
685pub fn build_closure_call_request(
686    program: &BytecodeProgram,
687    function_id: u16,
688    arguments: Vec<SerializableVMValue>,
689    upvalues: Vec<SerializableVMValue>,
690) -> RemoteCallRequest {
691    let hash = program_hash(program);
692
693    let function_hash = program
694        .function_blob_hashes
695        .get(function_id as usize)
696        .copied()
697        .flatten();
698    let blobs = function_hash.and_then(|h| build_minimal_blobs_by_hash(program, h));
699
700    RemoteCallRequest {
701        program: if blobs.is_some() {
702            create_stub_program(program)
703        } else {
704            program.clone()
705        },
706        function_name: String::new(),
707        function_id: Some(function_id),
708        function_hash,
709        arguments,
710        upvalues: Some(upvalues),
711        type_schemas: program.type_schema_registry.clone(),
712        program_hash: hash,
713        function_blobs: blobs,
714    }
715}
716
717/// Build a `RemoteCallRequest` that strips function blobs the remote already has.
718///
719/// Like `build_call_request`, but takes a set of hashes the remote is known to
720/// have cached (from a prior `BlobNegotiationResponse`). Blobs with matching
721/// hashes are omitted from `function_blobs`, reducing payload size.
722pub fn build_call_request_negotiated(
723    program: &BytecodeProgram,
724    function_name: &str,
725    arguments: Vec<SerializableVMValue>,
726    known_hashes: &[FunctionHash],
727) -> RemoteCallRequest {
728    let mut request = build_call_request(program, function_name, arguments);
729
730    // Strip blobs the remote already has
731    if let Some(ref mut blobs) = request.function_blobs {
732        let known_set: std::collections::HashSet<FunctionHash> =
733            known_hashes.iter().copied().collect();
734        blobs.retain(|(hash, _)| !known_set.contains(hash));
735    }
736
737    request
738}
739
740/// Handle a blob negotiation request on the server side.
741///
742/// Returns the subset of offered hashes that are present in the cache.
743pub fn handle_negotiation(
744    request: &BlobNegotiationRequest,
745    cache: &RemoteBlobCache,
746) -> BlobNegotiationResponse {
747    BlobNegotiationResponse {
748        known_hashes: cache.filter_known(&request.offered_hashes),
749    }
750}
751
752// ---------------------------------------------------------------------------
753// Phase 3B: Sidecar extraction and reassembly
754// ---------------------------------------------------------------------------
755
756/// Minimum blob size (in bytes) to extract as a sidecar.
757/// Blobs smaller than this are left inline in the serialized payload.
758pub const SIDECAR_THRESHOLD: usize = 1024 * 1024; // 1 MB
759
760/// Extract large blobs from serialized arguments into sidecars.
761///
762/// Walks the `SerializableVMValue` tree recursively. Any `BlobRef` whose
763/// backing `ChunkedBlob` exceeds `SIDECAR_THRESHOLD` bytes is replaced
764/// with a `SidecarRef` and the raw data is collected into a `BlobSidecar`.
765///
766/// Returns the extracted sidecars. The `args` are modified in place.
767pub fn extract_sidecars(
768    args: &mut Vec<SerializableVMValue>,
769    store: &SnapshotStore,
770) -> Vec<BlobSidecar> {
771    let mut sidecars = Vec::new();
772    let mut next_id: u32 = 0;
773    for arg in args.iter_mut() {
774        extract_sidecars_recursive(arg, store, &mut sidecars, &mut next_id);
775    }
776    sidecars
777}
778
779/// Extract the BlobRef from a SerializableVMValue if it carries one (non-mutating read).
780fn get_blob_ref(value: &SerializableVMValue) -> Option<&shape_runtime::snapshot::BlobRef> {
781    use shape_runtime::snapshot::SerializableVMValue as SV;
782    match value {
783        SV::DataTable(blob)
784        | SV::TypedTable { table: blob, .. }
785        | SV::RowView { table: blob, .. }
786        | SV::ColumnRef { table: blob, .. }
787        | SV::IndexedTable { table: blob, .. } => Some(blob),
788        SV::TypedArray { blob, .. } | SV::Matrix { blob, .. } => Some(blob),
789        _ => None,
790    }
791}
792
793fn extract_sidecars_recursive(
794    value: &mut SerializableVMValue,
795    store: &SnapshotStore,
796    sidecars: &mut Vec<BlobSidecar>,
797    next_id: &mut u32,
798) {
799    use shape_runtime::snapshot::SerializableVMValue as SV;
800
801    // First: check if this value carries a blob large enough to extract.
802    // Capture metadata (TypedArray len, Matrix rows/cols) before replacing.
803    let meta = match &*value {
804        SV::TypedArray { len, .. } => (*len as u32, 0u32),
805        SV::Matrix { rows, cols, .. } => (*rows, *cols),
806        _ => (0, 0),
807    };
808    // Clone the blob info to avoid borrow conflicts with the later mutation.
809    if let Some(blob) = get_blob_ref(value) {
810        let blob_kind = blob.kind.clone();
811        let blob_hash = blob.hash.clone();
812        if let Some(sidecar) = try_extract_blob(blob, store, next_id) {
813            let sidecar_id = sidecar.sidecar_id;
814            sidecars.push(sidecar);
815            *value = SV::SidecarRef {
816                sidecar_id,
817                blob_kind,
818                original_hash: blob_hash,
819                meta_a: meta.0,
820                meta_b: meta.1,
821            };
822            return;
823        }
824    }
825
826    // Recursive descent into containers
827    match value {
828        SV::Array(items) => {
829            for item in items.iter_mut() {
830                extract_sidecars_recursive(item, store, sidecars, next_id);
831            }
832        }
833        SV::HashMap { keys, values } => {
834            for k in keys.iter_mut() {
835                extract_sidecars_recursive(k, store, sidecars, next_id);
836            }
837            for v in values.iter_mut() {
838                extract_sidecars_recursive(v, store, sidecars, next_id);
839            }
840        }
841        SV::TypedObject { slot_data, .. } => {
842            for slot in slot_data.iter_mut() {
843                extract_sidecars_recursive(slot, store, sidecars, next_id);
844            }
845        }
846        SV::Some(inner) | SV::Ok(inner) | SV::Err(inner) => {
847            extract_sidecars_recursive(inner, store, sidecars, next_id);
848        }
849        SV::TypeAnnotatedValue { value: inner, .. } => {
850            extract_sidecars_recursive(inner, store, sidecars, next_id);
851        }
852        SV::Closure { upvalues, .. } => {
853            for uv in upvalues.iter_mut() {
854                extract_sidecars_recursive(uv, store, sidecars, next_id);
855            }
856        }
857        SV::Enum(ev) => match &mut ev.payload {
858            shape_runtime::snapshot::EnumPayloadSnapshot::Unit => {}
859            shape_runtime::snapshot::EnumPayloadSnapshot::Tuple(items) => {
860                for item in items.iter_mut() {
861                    extract_sidecars_recursive(item, store, sidecars, next_id);
862                }
863            }
864            shape_runtime::snapshot::EnumPayloadSnapshot::Struct(fields) => {
865                for (_, v) in fields.iter_mut() {
866                    extract_sidecars_recursive(v, store, sidecars, next_id);
867                }
868            }
869        },
870        SV::PrintResult(pr) => {
871            for span in pr.spans.iter_mut() {
872                if let shape_runtime::snapshot::PrintSpanSnapshot::Value {
873                    raw_value,
874                    format_params,
875                    ..
876                } = span
877                {
878                    extract_sidecars_recursive(raw_value, store, sidecars, next_id);
879                    for (_, v) in format_params.iter_mut() {
880                        extract_sidecars_recursive(v, store, sidecars, next_id);
881                    }
882                }
883            }
884        }
885        SV::SimulationCall { params, .. } => {
886            for (_, v) in params.iter_mut() {
887                extract_sidecars_recursive(v, store, sidecars, next_id);
888            }
889        }
890        SV::FunctionRef { closure, .. } => {
891            if let Some(c) = closure {
892                extract_sidecars_recursive(c, store, sidecars, next_id);
893            }
894        }
895        SV::Range { start, end, .. } => {
896            if let Some(s) = start {
897                extract_sidecars_recursive(s, store, sidecars, next_id);
898            }
899            if let Some(e) = end {
900                extract_sidecars_recursive(e, store, sidecars, next_id);
901            }
902        }
903
904        // Leaf types and blob carriers (handled above) — nothing more to do
905        _ => {}
906    }
907}
908
909/// Try to extract a BlobRef's data as a sidecar if it exceeds the threshold.
910fn try_extract_blob(
911    blob: &shape_runtime::snapshot::BlobRef,
912    store: &SnapshotStore,
913    next_id: &mut u32,
914) -> Option<BlobSidecar> {
915    // Load the ChunkedBlob metadata to check total size
916    let chunked: shape_runtime::snapshot::ChunkedBlob = store.get_struct(&blob.hash).ok()?;
917    if chunked.total_len < SIDECAR_THRESHOLD {
918        return None;
919    }
920
921    // Load the raw data
922    let data = shape_runtime::snapshot::load_chunked_bytes(&chunked, store).ok()?;
923    let sidecar_id = *next_id;
924    *next_id += 1;
925
926    Some(BlobSidecar { sidecar_id, data })
927}
928
929/// Return the byte size of a single element for a typed array element kind.
930fn typed_array_element_size(kind: shape_runtime::snapshot::TypedArrayElementKind) -> usize {
931    use shape_runtime::snapshot::TypedArrayElementKind as EK;
932    match kind {
933        EK::I8 | EK::U8 | EK::Bool => 1,
934        EK::I16 | EK::U16 => 2,
935        EK::I32 | EK::U32 | EK::F32 => 4,
936        EK::I64 | EK::U64 | EK::F64 => 8,
937    }
938}
939
940/// Reassemble sidecars back into the serialized payload.
941///
942/// Walks the `SerializableVMValue` tree and replaces `SidecarRef` variants
943/// with the original `BlobRef`, storing the sidecar data back into the
944/// snapshot store.
945pub fn reassemble_sidecars(
946    args: &mut Vec<SerializableVMValue>,
947    sidecars: &std::collections::HashMap<u32, BlobSidecar>,
948    store: &SnapshotStore,
949) -> anyhow::Result<()> {
950    for arg in args.iter_mut() {
951        reassemble_recursive(arg, sidecars, store)?;
952    }
953    Ok(())
954}
955
956fn reassemble_recursive(
957    value: &mut SerializableVMValue,
958    sidecars: &std::collections::HashMap<u32, BlobSidecar>,
959    store: &SnapshotStore,
960) -> anyhow::Result<()> {
961    use shape_runtime::snapshot::{BlobRef, SerializableVMValue as SV};
962
963    match value {
964        SV::SidecarRef {
965            sidecar_id,
966            blob_kind,
967            original_hash: _,
968            meta_a,
969            meta_b,
970        } => {
971            let sidecar = sidecars
972                .get(sidecar_id)
973                .ok_or_else(|| anyhow::anyhow!("missing sidecar with id {}", sidecar_id))?;
974            let meta_a = *meta_a;
975            let meta_b = *meta_b;
976
977            // Store the sidecar data back into the snapshot store as chunked bytes,
978            // then wrap in a ChunkedBlob struct and store that.
979            let chunked = shape_runtime::snapshot::store_chunked_bytes(&sidecar.data, store)?;
980            let hash = store.put_struct(&chunked)?;
981
982            let blob = BlobRef {
983                hash,
984                kind: blob_kind.clone(),
985            };
986            *value = match blob_kind {
987                shape_runtime::snapshot::BlobKind::DataTable => SV::DataTable(blob),
988                shape_runtime::snapshot::BlobKind::TypedArray(ek) => SV::TypedArray {
989                    element_kind: *ek,
990                    blob,
991                    len: meta_a as usize,
992                },
993                shape_runtime::snapshot::BlobKind::Matrix => SV::Matrix {
994                    blob,
995                    rows: meta_a,
996                    cols: meta_b,
997                },
998            };
999        }
1000
1001        // Recursive descent (same structure as extract)
1002        SV::Array(items) => {
1003            for item in items.iter_mut() {
1004                reassemble_recursive(item, sidecars, store)?;
1005            }
1006        }
1007        SV::HashMap { keys, values } => {
1008            for k in keys.iter_mut() {
1009                reassemble_recursive(k, sidecars, store)?;
1010            }
1011            for v in values.iter_mut() {
1012                reassemble_recursive(v, sidecars, store)?;
1013            }
1014        }
1015        SV::TypedObject { slot_data, .. } => {
1016            for slot in slot_data.iter_mut() {
1017                reassemble_recursive(slot, sidecars, store)?;
1018            }
1019        }
1020        SV::Some(inner) | SV::Ok(inner) | SV::Err(inner) => {
1021            reassemble_recursive(inner, sidecars, store)?;
1022        }
1023        SV::TypeAnnotatedValue { value: inner, .. } => {
1024            reassemble_recursive(inner, sidecars, store)?;
1025        }
1026        SV::Closure { upvalues, .. } => {
1027            for uv in upvalues.iter_mut() {
1028                reassemble_recursive(uv, sidecars, store)?;
1029            }
1030        }
1031        SV::Enum(ev) => match &mut ev.payload {
1032            shape_runtime::snapshot::EnumPayloadSnapshot::Unit => {}
1033            shape_runtime::snapshot::EnumPayloadSnapshot::Tuple(items) => {
1034                for item in items.iter_mut() {
1035                    reassemble_recursive(item, sidecars, store)?;
1036                }
1037            }
1038            shape_runtime::snapshot::EnumPayloadSnapshot::Struct(fields) => {
1039                for (_, v) in fields.iter_mut() {
1040                    reassemble_recursive(v, sidecars, store)?;
1041                }
1042            }
1043        },
1044        SV::PrintResult(pr) => {
1045            for span in pr.spans.iter_mut() {
1046                if let shape_runtime::snapshot::PrintSpanSnapshot::Value {
1047                    raw_value,
1048                    format_params,
1049                    ..
1050                } = span
1051                {
1052                    reassemble_recursive(raw_value, sidecars, store)?;
1053                    for (_, v) in format_params.iter_mut() {
1054                        reassemble_recursive(v, sidecars, store)?;
1055                    }
1056                }
1057            }
1058        }
1059        SV::SimulationCall { params, .. } => {
1060            for (_, v) in params.iter_mut() {
1061                reassemble_recursive(v, sidecars, store)?;
1062            }
1063        }
1064        SV::FunctionRef { closure, .. } => {
1065            if let Some(c) = closure {
1066                reassemble_recursive(c, sidecars, store)?;
1067            }
1068        }
1069        SV::Range { start, end, .. } => {
1070            if let Some(s) = start {
1071                reassemble_recursive(s, sidecars, store)?;
1072            }
1073            if let Some(e) = end {
1074                reassemble_recursive(e, sidecars, store)?;
1075            }
1076        }
1077
1078        // Leaf types and blob-carrying variants (non-sidecar) — nothing to reassemble
1079        _ => {}
1080    }
1081    Ok(())
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use super::*;
1087    use crate::bytecode::{FunctionBlob, FunctionHash, Instruction, OpCode, Program};
1088    use crate::compiler::BytecodeCompiler;
1089    use shape_abi_v1::PermissionSet;
1090    use std::collections::HashMap;
1091
1092    /// Helper: compile Shape source to BytecodeProgram
1093    fn compile(source: &str) -> BytecodeProgram {
1094        let program = shape_ast::parser::parse_program(source).expect("parse failed");
1095        let compiler = BytecodeCompiler::new();
1096        compiler.compile(&program).expect("compile failed")
1097    }
1098
1099    /// Helper: create a temp SnapshotStore
1100    fn temp_store() -> SnapshotStore {
1101        let dir = std::env::temp_dir().join(format!("shape_remote_test_{}", std::process::id()));
1102        SnapshotStore::new(dir).expect("create snapshot store")
1103    }
1104
1105    fn mk_hash(tag: u8) -> FunctionHash {
1106        let mut bytes = [0u8; 32];
1107        bytes[0] = tag;
1108        FunctionHash(bytes)
1109    }
1110
1111    fn mk_blob(name: &str, hash: FunctionHash, dependencies: Vec<FunctionHash>) -> FunctionBlob {
1112        FunctionBlob {
1113            content_hash: hash,
1114            name: name.to_string(),
1115            arity: 0,
1116            param_names: Vec::new(),
1117            locals_count: 0,
1118            is_closure: false,
1119            captures_count: 0,
1120            is_async: false,
1121            ref_params: Vec::new(),
1122            ref_mutates: Vec::new(),
1123            mutable_captures: Vec::new(),
1124            instructions: vec![
1125                Instruction::simple(OpCode::PushNull),
1126                Instruction::simple(OpCode::ReturnValue),
1127            ],
1128            constants: Vec::new(),
1129            strings: Vec::new(),
1130            required_permissions: PermissionSet::pure(),
1131            dependencies,
1132            callee_names: Vec::new(),
1133            type_schemas: Vec::new(),
1134            foreign_dependencies: Vec::new(),
1135            source_map: Vec::new(),
1136        }
1137    }
1138
1139    #[test]
1140    fn test_remote_call_simple_function() {
1141        let bytecode = compile(
1142            r#"
1143            function add(a, b) { a + b }
1144        "#,
1145        );
1146        let store = temp_store();
1147
1148        let request = build_call_request(
1149            &bytecode,
1150            "add",
1151            vec![
1152                SerializableVMValue::Number(10.0),
1153                SerializableVMValue::Number(32.0),
1154            ],
1155        );
1156
1157        let response = execute_remote_call(request, &store);
1158        match response.result {
1159            Ok(SerializableVMValue::Number(n)) => assert_eq!(n, 42.0),
1160            other => panic!("Expected Number(42.0), got {:?}", other),
1161        }
1162    }
1163
1164    #[test]
1165    fn test_remote_call_function_not_found() {
1166        let bytecode = compile("function foo() { 1 }");
1167        let store = temp_store();
1168
1169        let request = build_call_request(&bytecode, "nonexistent", vec![]);
1170
1171        let response = execute_remote_call(request, &store);
1172        assert!(response.result.is_err());
1173        let err = response.result.unwrap_err();
1174        assert!(matches!(err.kind, RemoteErrorKind::RuntimeError));
1175    }
1176
1177    #[test]
1178    fn test_program_hash_deterministic() {
1179        let bytecode = compile("function f(x) { x * 2 }");
1180        let hash1 = program_hash(&bytecode);
1181        let hash2 = program_hash(&bytecode);
1182        assert_eq!(hash1, hash2, "Same program should produce same hash");
1183    }
1184
1185    #[test]
1186    fn test_request_response_serialization_roundtrip() {
1187        let bytecode = compile("function double(x) { x * 2 }");
1188        let request =
1189            build_call_request(&bytecode, "double", vec![SerializableVMValue::Number(21.0)]);
1190
1191        // Encode → decode roundtrip via MessagePack
1192        let bytes = shape_wire::encode_message(&request).expect("encode request");
1193        let decoded: RemoteCallRequest =
1194            shape_wire::decode_message(&bytes).expect("decode request");
1195
1196        assert_eq!(decoded.function_name, "double");
1197        assert_eq!(decoded.arguments.len(), 1);
1198        assert_eq!(decoded.program_hash, request.program_hash);
1199    }
1200
1201    #[test]
1202    fn test_response_serialization_roundtrip() {
1203        let response = RemoteCallResponse {
1204            result: Ok(SerializableVMValue::String("hello".to_string())),
1205        };
1206
1207        let bytes = shape_wire::encode_message(&response).expect("encode response");
1208        let decoded: RemoteCallResponse =
1209            shape_wire::decode_message(&bytes).expect("decode response");
1210
1211        match decoded.result {
1212            Ok(SerializableVMValue::String(s)) => assert_eq!(s, "hello"),
1213            other => panic!("Expected Ok(String), got {:?}", other),
1214        }
1215    }
1216
1217    #[test]
1218    fn test_type_schema_registry_roundtrip() {
1219        use shape_runtime::type_schema::{FieldType, TypeSchemaRegistry};
1220
1221        let mut registry = TypeSchemaRegistry::new();
1222        registry.register_type(
1223            "Point",
1224            vec![
1225                ("x".to_string(), FieldType::F64),
1226                ("y".to_string(), FieldType::F64),
1227            ],
1228        );
1229
1230        let bytes = shape_wire::encode_message(&registry).expect("encode registry");
1231        let decoded: TypeSchemaRegistry =
1232            shape_wire::decode_message(&bytes).expect("decode registry");
1233
1234        assert!(decoded.has_type("Point"));
1235        let schema = decoded.get("Point").unwrap();
1236        assert_eq!(schema.field_count(), 2);
1237        assert_eq!(schema.field_offset("x"), Some(0));
1238        assert_eq!(schema.field_offset("y"), Some(8));
1239    }
1240
1241    #[test]
1242    fn test_build_minimal_blobs_rejects_ambiguous_function_name() {
1243        let h1 = mk_hash(1);
1244        let h2 = mk_hash(2);
1245        let blob1 = mk_blob("dup", h1, vec![]);
1246        let blob2 = mk_blob("dup", h2, vec![]);
1247
1248        let mut function_store = HashMap::new();
1249        function_store.insert(h1, blob1.clone());
1250        function_store.insert(h2, blob2.clone());
1251
1252        let mut program = BytecodeProgram::default();
1253        program.content_addressed = Some(Program {
1254            entry: h1,
1255            function_store,
1256            top_level_locals_count: 0,
1257            top_level_local_storage_hints: Vec::new(),
1258            module_binding_names: Vec::new(),
1259            module_binding_storage_hints: Vec::new(),
1260            function_local_storage_hints: Vec::new(),
1261            top_level_frame: None,
1262            data_schema: None,
1263            type_schema_registry: shape_runtime::type_schema::TypeSchemaRegistry::new(),
1264            trait_method_symbols: HashMap::new(),
1265            foreign_functions: Vec::new(),
1266            native_struct_layouts: Vec::new(),
1267            debug_info: crate::bytecode::DebugInfo::new("<test>".to_string()),
1268        });
1269
1270        assert!(
1271            build_minimal_blobs(&program, "dup").is_none(),
1272            "name-based selection must reject ambiguous function names"
1273        );
1274
1275        let by_hash = build_minimal_blobs_by_hash(&program, h2)
1276            .expect("hash-based selection should work with duplicate names");
1277        assert_eq!(by_hash.len(), 1);
1278        assert_eq!(by_hash[0].0, h2);
1279        assert_eq!(by_hash[0].1.name, "dup");
1280    }
1281
1282    #[test]
1283    fn test_program_from_blobs_by_hash_requires_entry_blob() {
1284        let h1 = mk_hash(1);
1285        let h_missing = mk_hash(9);
1286        let blob = mk_blob("f", h1, vec![]);
1287        let source = BytecodeProgram::default();
1288
1289        let reconstructed = program_from_blobs_by_hash(vec![(h1, blob)], h_missing, &source);
1290        assert!(
1291            reconstructed.is_none(),
1292            "reconstruction must fail when the requested entry hash is absent"
1293        );
1294    }
1295
1296    // ---- Phase 2: Blob negotiation tests ----
1297
1298    #[test]
1299    fn test_blob_cache_insert_and_get() {
1300        let mut cache = RemoteBlobCache::new(10);
1301        let h1 = mk_hash(1);
1302        let blob1 = mk_blob("f1", h1, vec![]);
1303
1304        cache.insert(h1, blob1.clone());
1305        assert_eq!(cache.len(), 1);
1306        assert!(cache.contains(&h1));
1307        assert_eq!(cache.get(&h1).unwrap().name, "f1");
1308    }
1309
1310    #[test]
1311    fn test_blob_cache_lru_eviction() {
1312        let mut cache = RemoteBlobCache::new(2);
1313        let h1 = mk_hash(1);
1314        let h2 = mk_hash(2);
1315        let h3 = mk_hash(3);
1316
1317        cache.insert(h1, mk_blob("f1", h1, vec![]));
1318        cache.insert(h2, mk_blob("f2", h2, vec![]));
1319        assert_eq!(cache.len(), 2);
1320
1321        // Insert h3 should evict h1 (least recently used)
1322        cache.insert(h3, mk_blob("f3", h3, vec![]));
1323        assert_eq!(cache.len(), 2);
1324        assert!(!cache.contains(&h1), "h1 should be evicted");
1325        assert!(cache.contains(&h2));
1326        assert!(cache.contains(&h3));
1327    }
1328
1329    #[test]
1330    fn test_blob_cache_access_updates_order() {
1331        let mut cache = RemoteBlobCache::new(2);
1332        let h1 = mk_hash(1);
1333        let h2 = mk_hash(2);
1334        let h3 = mk_hash(3);
1335
1336        cache.insert(h1, mk_blob("f1", h1, vec![]));
1337        cache.insert(h2, mk_blob("f2", h2, vec![]));
1338
1339        // Access h1 to make it recently used
1340        cache.get(&h1);
1341
1342        // Insert h3 should evict h2 (now least recently used)
1343        cache.insert(h3, mk_blob("f3", h3, vec![]));
1344        assert!(
1345            cache.contains(&h1),
1346            "h1 was accessed, should not be evicted"
1347        );
1348        assert!(!cache.contains(&h2), "h2 should be evicted");
1349        assert!(cache.contains(&h3));
1350    }
1351
1352    #[test]
1353    fn test_blob_cache_filter_known() {
1354        let mut cache = RemoteBlobCache::new(10);
1355        let h1 = mk_hash(1);
1356        let h2 = mk_hash(2);
1357        let h3 = mk_hash(3);
1358
1359        cache.insert(h1, mk_blob("f1", h1, vec![]));
1360        cache.insert(h3, mk_blob("f3", h3, vec![]));
1361
1362        let known = cache.filter_known(&[h1, h2, h3]);
1363        assert_eq!(known.len(), 2);
1364        assert!(known.contains(&h1));
1365        assert!(known.contains(&h3));
1366        assert!(!known.contains(&h2));
1367    }
1368
1369    #[test]
1370    fn test_handle_negotiation() {
1371        let mut cache = RemoteBlobCache::new(10);
1372        let h1 = mk_hash(1);
1373        let h2 = mk_hash(2);
1374        cache.insert(h1, mk_blob("f1", h1, vec![]));
1375
1376        let request = BlobNegotiationRequest {
1377            offered_hashes: vec![h1, h2],
1378        };
1379        let response = handle_negotiation(&request, &cache);
1380        assert_eq!(response.known_hashes.len(), 1);
1381        assert!(response.known_hashes.contains(&h1));
1382    }
1383
1384    #[test]
1385    fn test_build_call_request_negotiated_strips_known_blobs() {
1386        // Create a program with content-addressed blobs
1387        let h1 = mk_hash(1);
1388        let h2 = mk_hash(2);
1389        let blob1 = mk_blob("entry", h1, vec![h2]);
1390        let blob2 = mk_blob("helper", h2, vec![]);
1391
1392        let mut function_store = HashMap::new();
1393        function_store.insert(h1, blob1.clone());
1394        function_store.insert(h2, blob2.clone());
1395
1396        let mut program = BytecodeProgram::default();
1397        program.content_addressed = Some(Program {
1398            entry: h1,
1399            function_store,
1400            top_level_locals_count: 0,
1401            top_level_local_storage_hints: Vec::new(),
1402            module_binding_names: Vec::new(),
1403            module_binding_storage_hints: Vec::new(),
1404            function_local_storage_hints: Vec::new(),
1405            top_level_frame: None,
1406            data_schema: None,
1407            type_schema_registry: shape_runtime::type_schema::TypeSchemaRegistry::new(),
1408            trait_method_symbols: HashMap::new(),
1409            foreign_functions: Vec::new(),
1410            native_struct_layouts: Vec::new(),
1411            debug_info: crate::bytecode::DebugInfo::new("<test>".to_string()),
1412        });
1413        program.functions = vec![crate::bytecode::Function {
1414            name: "entry".to_string(),
1415            arity: 0,
1416            param_names: vec![],
1417            locals_count: 0,
1418            entry_point: 0,
1419            body_length: 0,
1420            is_closure: false,
1421            captures_count: 0,
1422            is_async: false,
1423            ref_params: vec![],
1424            ref_mutates: vec![],
1425            mutable_captures: vec![],
1426            frame_descriptor: None,
1427            osr_entry_points: vec![],
1428        }];
1429        program.function_blob_hashes = vec![Some(h1)];
1430
1431        // First call: no known hashes -> all blobs sent
1432        let req1 = build_call_request_negotiated(&program, "entry", vec![], &[]);
1433        let blobs1 = req1.function_blobs.as_ref().unwrap();
1434        assert_eq!(blobs1.len(), 2, "first call should send all blobs");
1435
1436        // Second call: h2 is known -> only h1 sent
1437        let req2 = build_call_request_negotiated(&program, "entry", vec![], &[h2]);
1438        let blobs2 = req2.function_blobs.as_ref().unwrap();
1439        assert_eq!(blobs2.len(), 1, "second call should skip known blobs");
1440        assert_eq!(blobs2[0].0, h1);
1441    }
1442
1443    #[test]
1444    fn test_wire_message_serialization_roundtrip() {
1445        let msg = WireMessage::BlobNegotiation(BlobNegotiationRequest {
1446            offered_hashes: vec![mk_hash(1), mk_hash(2)],
1447        });
1448        let bytes = shape_wire::encode_message(&msg).expect("encode WireMessage");
1449        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode WireMessage");
1450        match decoded {
1451            WireMessage::BlobNegotiation(req) => {
1452                assert_eq!(req.offered_hashes.len(), 2);
1453            }
1454            _ => panic!("Expected BlobNegotiation"),
1455        }
1456    }
1457
1458    // ---- Phase 3B: Sidecar extraction tests ----
1459
1460    #[test]
1461    fn test_extract_sidecars_no_large_blobs() {
1462        let store = temp_store();
1463        let mut args = vec![
1464            SerializableVMValue::Int(42),
1465            SerializableVMValue::String("hello".to_string()),
1466            SerializableVMValue::Array(vec![
1467                SerializableVMValue::Number(1.0),
1468                SerializableVMValue::Number(2.0),
1469            ]),
1470        ];
1471        let sidecars = extract_sidecars(&mut args, &store);
1472        assert!(sidecars.is_empty(), "no large blobs → no sidecars");
1473        // Args should be unchanged
1474        assert!(matches!(args[0], SerializableVMValue::Int(42)));
1475    }
1476
1477    #[test]
1478    fn test_extract_sidecars_large_typed_array() {
1479        let store = temp_store();
1480
1481        // Create a large float array (2MB of f64 data)
1482        let data = vec![0f64; 256 * 1024]; // 256K * 8 bytes = 2 MB
1483        let aligned = shape_value::AlignedVec::from_vec(data);
1484        let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
1485        let nb = shape_value::ValueWord::from_float_array(std::sync::Arc::new(buf));
1486        let serialized = shape_runtime::snapshot::nanboxed_to_serializable(&nb, &store).unwrap();
1487
1488        let mut args = vec![serialized];
1489        let sidecars = extract_sidecars(&mut args, &store);
1490        assert_eq!(
1491            sidecars.len(),
1492            1,
1493            "should extract one sidecar for 2MB array"
1494        );
1495        assert!(
1496            matches!(args[0], SerializableVMValue::SidecarRef { .. }),
1497            "original should be replaced with SidecarRef"
1498        );
1499        assert!(
1500            sidecars[0].data.len() >= 1024 * 1024,
1501            "sidecar data should be >= 1MB"
1502        );
1503    }
1504
1505    #[test]
1506    fn test_reassemble_sidecars_roundtrip() {
1507        let store = temp_store();
1508
1509        // Create a large float array
1510        let data: Vec<f64> = (0..256 * 1024).map(|i| i as f64).collect();
1511        let aligned = shape_value::AlignedVec::from_vec(data.clone());
1512        let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
1513        let nb = shape_value::ValueWord::from_float_array(std::sync::Arc::new(buf));
1514        let original = shape_runtime::snapshot::nanboxed_to_serializable(&nb, &store).unwrap();
1515
1516        let mut args = vec![original.clone()];
1517        let sidecars = extract_sidecars(&mut args, &store);
1518        assert_eq!(sidecars.len(), 1);
1519
1520        // Build sidecar map for reassembly
1521        let sidecar_map: HashMap<u32, BlobSidecar> =
1522            sidecars.into_iter().map(|s| (s.sidecar_id, s)).collect();
1523
1524        // Reassemble
1525        reassemble_sidecars(&mut args, &sidecar_map, &store).unwrap();
1526
1527        // The reassembled value should deserialize to the same data
1528        let restored = shape_runtime::snapshot::serializable_to_nanboxed(&args[0], &store).unwrap();
1529        let hv = restored.as_heap_ref().unwrap();
1530        match hv {
1531            shape_value::heap_value::HeapValue::FloatArray(a) => {
1532                assert_eq!(a.len(), 256 * 1024);
1533                assert!((a.as_slice()[0] - 0.0).abs() < f64::EPSILON);
1534                assert!((a.as_slice()[1000] - 1000.0).abs() < f64::EPSILON);
1535            }
1536            // reassemble produces DataTable wrapper, which is also valid
1537            _ => {
1538                // The reassembled blob may come back as DataTable BlobRef
1539                // since reassemble uses a generic DataTable wrapper.
1540                // This is acceptable — the raw data is preserved.
1541            }
1542        }
1543    }
1544
1545    #[test]
1546    fn test_extract_sidecars_nested_in_array() {
1547        let store = temp_store();
1548
1549        // Create a large float array nested in an Array
1550        let data = vec![0f64; 256 * 1024]; // 2 MB
1551        let aligned = shape_value::AlignedVec::from_vec(data);
1552        let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
1553        let nb = shape_value::ValueWord::from_float_array(std::sync::Arc::new(buf));
1554        let serialized = shape_runtime::snapshot::nanboxed_to_serializable(&nb, &store).unwrap();
1555
1556        let mut args = vec![SerializableVMValue::Array(vec![
1557            SerializableVMValue::Int(1),
1558            serialized,
1559            SerializableVMValue::String("end".to_string()),
1560        ])];
1561
1562        let sidecars = extract_sidecars(&mut args, &store);
1563        assert_eq!(sidecars.len(), 1, "should find nested large blob");
1564
1565        // Verify the array structure is preserved with SidecarRef inside
1566        match &args[0] {
1567            SerializableVMValue::Array(items) => {
1568                assert_eq!(items.len(), 3);
1569                assert!(matches!(items[0], SerializableVMValue::Int(1)));
1570                assert!(matches!(items[1], SerializableVMValue::SidecarRef { .. }));
1571                assert!(matches!(items[2], SerializableVMValue::String(_)));
1572            }
1573            _ => panic!("Expected Array wrapper to be preserved"),
1574        }
1575    }
1576
1577    #[test]
1578    fn test_sidecar_ref_serialization_roundtrip() {
1579        use shape_runtime::hashing::HashDigest;
1580        use shape_runtime::snapshot::{BlobKind, TypedArrayElementKind};
1581
1582        let value = SerializableVMValue::SidecarRef {
1583            sidecar_id: 7,
1584            blob_kind: BlobKind::TypedArray(TypedArrayElementKind::F64),
1585            original_hash: HashDigest::from_hex("abc123"),
1586            meta_a: 1000,
1587            meta_b: 0,
1588        };
1589
1590        let bytes = shape_wire::encode_message(&value).expect("encode SidecarRef");
1591        let decoded: SerializableVMValue =
1592            shape_wire::decode_message(&bytes).expect("decode SidecarRef");
1593        match decoded {
1594            SerializableVMValue::SidecarRef { sidecar_id, .. } => {
1595                assert_eq!(sidecar_id, 7);
1596            }
1597            _ => panic!("Expected SidecarRef"),
1598        }
1599    }
1600}