Skip to main content

shape_vm/
remote.rs

1//! Per-function remote execution support
2//!
3//! This module provides the types and executor for transferring function
4//! execution to another machine. The design sends the full compiled
5//! `BytecodeProgram` + a "call this function with these args" message,
6//! running it on a full Shape VM on the remote side.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Layer 4: @remote / @distributed annotations    (Shape stdlib — user-defined policy)
12//! Layer 3: RemoteCallRequest/Response            (this module)
13//! Layer 2: shape-wire codec (MessagePack)        (encode_message / decode_message)
14//! Layer 1: Transport (TCP/QUIC/Unix socket)      (user-provided, pluggable)
15//! ```
16//!
17//! Layer 0 (the foundation): Full Shape VM on both sides, same `BytecodeProgram`,
18//! same `Executor`.
19//!
20//! # Closure semantics
21//!
22//! `Arc<RwLock<ValueWord>>` upvalues become **value copies** on serialization.
23//! If the remote side mutates a captured variable, the sender doesn't see it.
24//! This is the correct semantic for distributed computing — a **send-copy** model.
25
26use serde::{Deserialize, Serialize};
27use shape_runtime::snapshot::{
28    SerializableVMValue, SnapshotStore, nanboxed_to_serializable, serializable_to_nanboxed,
29};
30use shape_runtime::type_schema::TypeSchemaRegistry;
31use shape_value::ValueWord;
32
33use shape_wire::WireValue;
34
35use crate::bytecode::{BytecodeProgram, FunctionBlob, FunctionHash, Program};
36use crate::executor::{VMConfig, VirtualMachine};
37
38/// Request to execute a function on a remote VM.
39///
40/// Contains everything needed to call a function: the full compiled program
41/// (cacheable by `program_hash`), function identity, serialized arguments,
42/// and optional closure captures.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct RemoteCallRequest {
45    /// The full compiled program. After the first transfer, the remote
46    /// side caches by `program_hash` and subsequent calls only need args.
47    pub program: BytecodeProgram,
48
49    /// Function to call by name (for named functions).
50    pub function_name: String,
51
52    /// Function to call by ID (for closures that have no user-facing name).
53    /// Takes precedence over `function_name` when `Some`.
54    pub function_id: Option<u16>,
55
56    /// Function to call by content hash (canonical identity).
57    ///
58    /// Preferred over name-based lookup when present. This avoids ambiguity
59    /// when multiple modules define functions with the same name.
60    #[serde(default)]
61    pub function_hash: Option<FunctionHash>,
62
63    /// Serialized arguments to the function.
64    pub arguments: Vec<SerializableVMValue>,
65
66    /// Closure upvalues, if calling a closure. These are value-copied from
67    /// the sender's `Arc<RwLock<ValueWord>>` upvalue slots.
68    pub upvalues: Option<Vec<SerializableVMValue>>,
69
70    /// Type schema registry — sent separately because `BytecodeProgram`
71    /// has `#[serde(skip)]` on its registry (it's populated at compile time).
72    pub type_schemas: TypeSchemaRegistry,
73
74    /// Content hash of the program for caching. If the remote side has
75    /// already seen this hash, it can skip deserializing the program.
76    pub program_hash: [u8; 32],
77
78    /// Minimal content-addressed blobs for the called function and its
79    /// transitive dependencies. When present, the callee can reconstruct
80    /// a `Program` from these blobs instead of deserializing the full
81    /// `BytecodeProgram`, dramatically reducing payload size.
82    #[serde(default)]
83    pub function_blobs: Option<Vec<(FunctionHash, FunctionBlob)>>,
84}
85
86/// Response from a remote function execution.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct RemoteCallResponse {
89    /// The function's return value, or an error message.
90    pub result: Result<SerializableVMValue, RemoteCallError>,
91}
92
93/// Error from remote execution.
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct RemoteCallError {
96    /// Human-readable error message.
97    pub message: String,
98    /// Optional error kind for programmatic handling.
99    pub kind: RemoteErrorKind,
100}
101
102/// Classification of remote execution errors.
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub enum RemoteErrorKind {
105    /// Function not found in the program.
106    FunctionNotFound,
107    /// Argument deserialization failed.
108    ArgumentError,
109    /// Runtime error during execution.
110    RuntimeError,
111    /// Module function required on the remote side is missing.
112    MissingModuleFunction,
113}
114
115impl std::fmt::Display for RemoteCallError {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        write!(f, "{:?}: {}", self.kind, self.message)
118    }
119}
120
121impl std::error::Error for RemoteCallError {}
122
123// ---------------------------------------------------------------------------
124// Wire message envelope (Phase 2: blob negotiation)
125// ---------------------------------------------------------------------------
126
127/// Envelope for all wire protocol messages.
128///
129/// Wraps the existing `RemoteCallRequest`/`RemoteCallResponse` with negotiation
130/// and sidecar message types for bandwidth optimization on persistent connections.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub enum WireMessage {
133    /// Offer function blob hashes to check what the remote already has.
134    BlobNegotiation(BlobNegotiationRequest),
135    /// Reply with the subset of offered hashes that are already cached.
136    BlobNegotiationReply(BlobNegotiationResponse),
137    /// A remote function call (may have blobs stripped if negotiation occurred).
138    Call(RemoteCallRequest),
139    /// Response to a remote function call.
140    CallResponse(RemoteCallResponse),
141    /// A large blob sent as a separate message before the call (Phase 3).
142    Sidecar(BlobSidecar),
143
144    // --- Execution server messages (V2) ---
145    /// Execute Shape source code on the server.
146    Execute(ExecuteRequest),
147    /// Response to an Execute request.
148    ExecuteResponse(ExecuteResponse),
149    /// Validate Shape source code (parse + type-check) without executing.
150    Validate(ValidateRequest),
151    /// Response to a Validate request.
152    ValidateResponse(ValidateResponse),
153    /// Authenticate with the server (required for non-localhost).
154    Auth(AuthRequest),
155    /// Response to an Auth request.
156    AuthResponse(AuthResponse),
157    /// Execute a Shape file on the server.
158    ExecuteFile(ExecuteFileRequest),
159    /// Execute a Shape project (shape.toml) on the server.
160    ExecuteProject(ExecuteProjectRequest),
161    /// Validate a Shape file or project (parse + type-check) without executing.
162    ValidatePath(ValidatePathRequest),
163    /// Ping the server for liveness / capability discovery.
164    Ping(PingRequest),
165    /// Pong reply with server info.
166    Pong(ServerInfo),
167}
168
169/// Ping request (empty payload for wire format consistency).
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct PingRequest {}
172
173/// Request to check which function blobs the remote side already has cached.
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct BlobNegotiationRequest {
176    /// Content hashes of function blobs the caller wants to send.
177    pub offered_hashes: Vec<FunctionHash>,
178}
179
180/// Response indicating which offered blobs are already cached on the remote side.
181#[derive(Debug, Clone, Serialize, Deserialize)]
182pub struct BlobNegotiationResponse {
183    /// Subset of offered hashes that the remote already has in its blob cache.
184    pub known_hashes: Vec<FunctionHash>,
185}
186
187/// A large binary payload sent as a separate message before the call request.
188///
189/// Used for splitting large BlobRef-backed values (DataTables, TypedArrays, etc.)
190/// out of the main serialized payload.
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct BlobSidecar {
193    pub sidecar_id: u32,
194    pub data: Vec<u8>,
195}
196
197// ---------------------------------------------------------------------------
198// Execution server message types (V2)
199// ---------------------------------------------------------------------------
200
201/// Request to execute Shape source code on the server.
202#[derive(Debug, Clone, Serialize, Deserialize)]
203pub struct ExecuteRequest {
204    /// Shape source code to execute.
205    pub code: String,
206    /// Client-assigned request ID for correlation.
207    pub request_id: u64,
208}
209
210/// Response from executing Shape source code.
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct ExecuteResponse {
213    /// The request ID this response corresponds to.
214    pub request_id: u64,
215    /// Whether execution completed successfully.
216    pub success: bool,
217    /// Structured return value from execution.
218    pub value: WireValue,
219    /// Print/log output captured during execution (NOT the return value).
220    pub stdout: Option<String>,
221    /// Error message (if execution failed).
222    pub error: Option<String>,
223    /// Pre-rendered Content terminal representation (if value is Content).
224    #[serde(skip_serializing_if = "Option::is_none", default)]
225    pub content_terminal: Option<String>,
226    /// Pre-rendered Content HTML representation (if value is Content).
227    #[serde(skip_serializing_if = "Option::is_none", default)]
228    pub content_html: Option<String>,
229    /// Diagnostics (parse errors, type errors, warnings).
230    pub diagnostics: Vec<WireDiagnostic>,
231    /// Execution metrics (if available).
232    pub metrics: Option<ExecutionMetrics>,
233    /// Structured print output with rendered strings (MsgPack-serialized).
234    #[serde(skip_serializing_if = "Option::is_none", default)]
235    pub print_output: Option<Vec<shape_wire::print_result::WirePrintResult>>,
236}
237
238/// Request to validate Shape source code without executing it.
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct ValidateRequest {
241    /// Shape source code to validate.
242    pub code: String,
243    /// Client-assigned request ID for correlation.
244    pub request_id: u64,
245}
246
247/// Response from validating Shape source code.
248#[derive(Debug, Clone, Serialize, Deserialize)]
249pub struct ValidateResponse {
250    /// The request ID this response corresponds to.
251    pub request_id: u64,
252    /// Whether the code is valid (no errors).
253    pub success: bool,
254    /// Diagnostics (parse errors, type errors, warnings).
255    pub diagnostics: Vec<WireDiagnostic>,
256}
257
258/// Request to execute a Shape file on the server.
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct ExecuteFileRequest {
261    /// Absolute path to the .shape file.
262    pub path: String,
263    /// Optional working directory (defaults to file's parent).
264    pub cwd: Option<String>,
265    /// Client-assigned request ID for correlation.
266    pub request_id: u64,
267}
268
269/// Request to execute a Shape project (shape.toml) on the server.
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct ExecuteProjectRequest {
272    /// Absolute path to the project directory (must contain shape.toml).
273    pub project_dir: String,
274    /// Client-assigned request ID for correlation.
275    pub request_id: u64,
276}
277
278/// Request to validate a Shape file or project without executing.
279#[derive(Debug, Clone, Serialize, Deserialize)]
280pub struct ValidatePathRequest {
281    /// Path to a .shape file or a project directory (containing shape.toml).
282    pub path: String,
283    /// Client-assigned request ID for correlation.
284    pub request_id: u64,
285}
286
287/// Authentication request for non-localhost connections.
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct AuthRequest {
290    /// Bearer token for authentication.
291    pub token: String,
292}
293
294/// Authentication response.
295#[derive(Debug, Clone, Serialize, Deserialize)]
296pub struct AuthResponse {
297    /// Whether authentication succeeded.
298    pub authenticated: bool,
299    /// Error message if authentication failed.
300    pub error: Option<String>,
301}
302
303/// Server information returned in Pong responses.
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct ServerInfo {
306    /// Shape language version.
307    pub shape_version: String,
308    /// Wire protocol version.
309    pub wire_protocol: u32,
310    /// Server capabilities (e.g., "execute", "validate", "call", "blob-negotiation").
311    pub capabilities: Vec<String>,
312}
313
314/// A diagnostic message (error, warning, info).
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct WireDiagnostic {
317    /// Severity: "error", "warning", "info".
318    pub severity: String,
319    /// Human-readable diagnostic message.
320    pub message: String,
321    /// Source line number (1-indexed), if available.
322    pub line: Option<u32>,
323    /// Source column number (1-indexed), if available.
324    pub column: Option<u32>,
325}
326
327/// Execution performance metrics.
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct ExecutionMetrics {
330    /// Number of VM instructions executed.
331    pub instructions_executed: u64,
332    /// Wall-clock time in milliseconds.
333    pub wall_time_ms: u64,
334    /// Peak memory usage in bytes.
335    pub memory_bytes_peak: u64,
336}
337
338// ---------------------------------------------------------------------------
339// Per-connection blob cache (Phase 2)
340// ---------------------------------------------------------------------------
341
342/// Per-connection cache of function blobs received from a remote peer.
343///
344/// Content hashes make stale entries harmless (same hash = same content),
345/// so no invalidation protocol is needed. LRU eviction bounds memory usage.
346pub struct RemoteBlobCache {
347    blobs: std::collections::HashMap<FunctionHash, FunctionBlob>,
348    /// Access order for LRU eviction (most recently used at the end).
349    order: Vec<FunctionHash>,
350    /// Maximum number of entries before LRU eviction kicks in.
351    max_entries: usize,
352}
353
354impl RemoteBlobCache {
355    /// Create a new blob cache with the given capacity.
356    pub fn new(max_entries: usize) -> Self {
357        Self {
358            blobs: std::collections::HashMap::new(),
359            order: Vec::new(),
360            max_entries,
361        }
362    }
363
364    /// Default cache with 4096 entry capacity.
365    pub fn default_cache() -> Self {
366        Self::new(4096)
367    }
368
369    /// Insert a blob, evicting the least recently used entry if at capacity.
370    pub fn insert(&mut self, hash: FunctionHash, blob: FunctionBlob) {
371        if self.blobs.contains_key(&hash) {
372            // Move to end (most recently used)
373            self.order.retain(|h| h != &hash);
374            self.order.push(hash);
375            return;
376        }
377
378        // Evict LRU if at capacity
379        while self.blobs.len() >= self.max_entries && !self.order.is_empty() {
380            let evicted = self.order.remove(0);
381            self.blobs.remove(&evicted);
382        }
383
384        self.blobs.insert(hash, blob);
385        self.order.push(hash);
386    }
387
388    /// Look up a cached blob by hash, updating access order.
389    pub fn get(&mut self, hash: &FunctionHash) -> Option<&FunctionBlob> {
390        if self.blobs.contains_key(hash) {
391            self.order.retain(|h| h != hash);
392            self.order.push(*hash);
393            self.blobs.get(hash)
394        } else {
395            None
396        }
397    }
398
399    /// Check if a hash is cached without updating access order.
400    pub fn contains(&self, hash: &FunctionHash) -> bool {
401        self.blobs.contains_key(hash)
402    }
403
404    /// Return all cached hashes.
405    pub fn known_hashes(&self) -> Vec<FunctionHash> {
406        self.blobs.keys().copied().collect()
407    }
408
409    /// Return the subset of `offered` hashes that are in the cache.
410    pub fn filter_known(&self, offered: &[FunctionHash]) -> Vec<FunctionHash> {
411        offered
412            .iter()
413            .filter(|h| self.blobs.contains_key(h))
414            .copied()
415            .collect()
416    }
417
418    /// Number of cached entries.
419    pub fn len(&self) -> usize {
420        self.blobs.len()
421    }
422
423    /// Whether the cache is empty.
424    pub fn is_empty(&self) -> bool {
425        self.blobs.is_empty()
426    }
427
428    /// Insert all blobs from a set, typically received from a remote call.
429    pub fn insert_blobs(&mut self, blobs: &[(FunctionHash, FunctionBlob)]) {
430        for (hash, blob) in blobs {
431            self.insert(*hash, blob.clone());
432        }
433    }
434}
435
436/// Build a minimal set of function blobs for a function hash and its
437/// transitive dependencies from a content-addressed `Program`.
438///
439/// Returns `None` if the program has no content-addressed representation
440/// or the entry hash is not present in the function store.
441pub fn build_minimal_blobs_by_hash(
442    program: &BytecodeProgram,
443    entry_hash: FunctionHash,
444) -> Option<Vec<(FunctionHash, FunctionBlob)>> {
445    let ca = program.content_addressed.as_ref()?;
446    if !ca.function_store.contains_key(&entry_hash) {
447        return None;
448    }
449
450    // Compute transitive closure of dependencies
451    let mut needed: std::collections::HashSet<FunctionHash> = std::collections::HashSet::new();
452    let mut queue = vec![entry_hash];
453    while let Some(hash) = queue.pop() {
454        if needed.insert(hash) {
455            if let Some(blob) = ca.function_store.get(&hash) {
456                for dep in &blob.dependencies {
457                    if !needed.contains(dep) {
458                        queue.push(*dep);
459                    }
460                }
461            }
462        }
463    }
464
465    // Collect the minimal blob set
466    let blobs: Vec<(FunctionHash, FunctionBlob)> = needed
467        .into_iter()
468        .filter_map(|hash| {
469            ca.function_store
470                .get(&hash)
471                .map(|blob| (hash, blob.clone()))
472        })
473        .collect();
474
475    Some(blobs)
476}
477
478/// Backwards-compatible name-based wrapper around `build_minimal_blobs_by_hash`.
479///
480/// If multiple blobs share the same name, this returns `None` to avoid
481/// ambiguous, potentially incorrect dependency selection.
482pub fn build_minimal_blobs(
483    program: &BytecodeProgram,
484    fn_name: &str,
485) -> Option<Vec<(FunctionHash, FunctionBlob)>> {
486    let ca = program.content_addressed.as_ref()?;
487    let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
488        if blob.name == fn_name {
489            Some(*hash)
490        } else {
491            None
492        }
493    });
494    let first = matches.next()?;
495    if matches.next().is_some() {
496        return None;
497    }
498    build_minimal_blobs_by_hash(program, first)
499}
500
501/// Build a minimal `Program` from function blobs and an explicit entry hash.
502///
503/// Used on the callee side to reconstruct a `Program` from blobs received in
504/// a `RemoteCallRequest`.
505pub fn program_from_blobs_by_hash(
506    blobs: Vec<(FunctionHash, FunctionBlob)>,
507    entry_hash: FunctionHash,
508    source: &BytecodeProgram,
509) -> Option<Program> {
510    let function_store: std::collections::HashMap<FunctionHash, FunctionBlob> =
511        blobs.into_iter().collect();
512    if !function_store.contains_key(&entry_hash) {
513        return None;
514    }
515
516    Some(Program {
517        entry: entry_hash,
518        function_store,
519        top_level_locals_count: source.top_level_locals_count,
520        top_level_local_storage_hints: source.top_level_local_storage_hints.clone(),
521        module_binding_names: source.module_binding_names.clone(),
522        module_binding_storage_hints: source.module_binding_storage_hints.clone(),
523        function_local_storage_hints: source.function_local_storage_hints.clone(),
524        top_level_frame: source.top_level_frame.clone(),
525        data_schema: source.data_schema.clone(),
526        type_schema_registry: source.type_schema_registry.clone(),
527        trait_method_symbols: source.trait_method_symbols.clone(),
528        foreign_functions: source.foreign_functions.clone(),
529        native_struct_layouts: source.native_struct_layouts.clone(),
530        debug_info: source.debug_info.clone(),
531    })
532}
533
534/// Backwards-compatible name-based wrapper around `program_from_blobs_by_hash`.
535pub fn program_from_blobs(
536    blobs: Vec<(FunctionHash, FunctionBlob)>,
537    fn_name: &str,
538    source: &BytecodeProgram,
539) -> Option<Program> {
540    let mut matches = blobs.iter().filter_map(|(hash, blob)| {
541        if blob.name == fn_name {
542            Some(*hash)
543        } else {
544            None
545        }
546    });
547    let entry = matches.next()?;
548    if matches.next().is_some() {
549        return None;
550    }
551    program_from_blobs_by_hash(blobs, entry, source)
552}
553
554/// Execute a remote call request on this machine.
555///
556/// This is the entry point for the receiving side. It:
557/// 1. Reconstructs the `BytecodeProgram` and populates its `TypeSchemaRegistry`
558/// 2. Creates a full `VirtualMachine` with the program
559/// 3. Converts serialized arguments back to `ValueWord`
560/// 4. Calls the function by name or ID
561/// 5. Converts the result back to `SerializableVMValue`
562///
563/// The `store` is used for `SerializableVMValue` ↔ `ValueWord` conversion
564/// (needed for `BlobRef`-backed values like DataTable).
565pub fn execute_remote_call(
566    request: RemoteCallRequest,
567    store: &SnapshotStore,
568) -> RemoteCallResponse {
569    match execute_inner(request, store) {
570        Ok(value) => RemoteCallResponse { result: Ok(value) },
571        Err(err) => RemoteCallResponse { result: Err(err) },
572    }
573}
574
575/// Execute a remote call with pre-loaded language runtime extensions.
576///
577/// `language_runtimes` maps language IDs (e.g. "python") to pre-loaded
578/// runtime handles. The server loads these once at startup from installed
579/// extensions. The bytecode carries foreign function source text; the
580/// runtime on the server compiles and executes it.
581pub fn execute_remote_call_with_runtimes(
582    request: RemoteCallRequest,
583    store: &SnapshotStore,
584    language_runtimes: &std::collections::HashMap<
585        String,
586        std::sync::Arc<shape_runtime::plugins::language_runtime::PluginLanguageRuntime>,
587    >,
588) -> RemoteCallResponse {
589    match execute_inner_with_runtimes(request, store, language_runtimes) {
590        Ok(value) => RemoteCallResponse { result: Ok(value) },
591        Err(err) => RemoteCallResponse { result: Err(err) },
592    }
593}
594
595fn execute_inner(
596    request: RemoteCallRequest,
597    store: &SnapshotStore,
598) -> Result<SerializableVMValue, RemoteCallError> {
599    // 1. Reconstruct program with type schemas.
600    // Prefer content-addressed blobs when present — they carry only the
601    // transitive closure of the called function, so deserialization is cheaper.
602    let mut program = if let Some(blobs) = request.function_blobs {
603        let entry_hash = request
604            .function_hash
605            .or_else(|| {
606                if let Some(fid) = request.function_id {
607                    request
608                        .program
609                        .function_blob_hashes
610                        .get(fid as usize)
611                        .copied()
612                        .flatten()
613                } else {
614                    None
615                }
616            })
617            .or_else(|| {
618                // Legacy fallback by unique function name.
619                request.program.content_addressed.as_ref().and_then(|ca| {
620                    let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
621                        if blob.name == request.function_name {
622                            Some(*hash)
623                        } else {
624                            None
625                        }
626                    });
627                    let first = matches.next()?;
628                    if matches.next().is_some() {
629                        None
630                    } else {
631                        Some(first)
632                    }
633                })
634            })
635            .ok_or_else(|| RemoteCallError {
636                message: format!(
637                    "Could not resolve entry hash for remote function '{}'",
638                    request.function_name
639                ),
640                kind: RemoteErrorKind::FunctionNotFound,
641            })?;
642
643        // Reconstruct a content-addressed Program from the minimal blobs,
644        // then link it into a BytecodeProgram the VM can execute.
645        let ca_program = program_from_blobs_by_hash(blobs, entry_hash, &request.program)
646            .ok_or_else(|| RemoteCallError {
647                message: format!(
648                    "Could not reconstruct program from blobs for '{}'",
649                    request.function_name
650                ),
651                kind: RemoteErrorKind::FunctionNotFound,
652            })?;
653        // Link the content-addressed program into a flat BytecodeProgram
654        let linked = crate::linker::link(&ca_program).map_err(|e| RemoteCallError {
655            message: format!("Linker error: {}", e),
656            kind: RemoteErrorKind::RuntimeError,
657        })?;
658        // Convert LinkedProgram to BytecodeProgram for the existing VM path
659        crate::linker::linked_to_bytecode_program(&linked)
660    } else {
661        request.program
662    };
663    program.type_schema_registry = request.type_schemas;
664
665    // 2. Convert arguments from serializable form
666    let args: Vec<ValueWord> = request
667        .arguments
668        .iter()
669        .map(|sv| {
670            serializable_to_nanboxed(sv, store).map_err(|e| RemoteCallError {
671                message: format!("Failed to deserialize argument: {}", e),
672                kind: RemoteErrorKind::ArgumentError,
673            })
674        })
675        .collect::<Result<Vec<_>, _>>()?;
676
677    // 3. Create VM and load program
678    let mut vm = VirtualMachine::new(VMConfig::default());
679    vm.load_program(program);
680    vm.populate_module_objects();
681
682    // 4. Execute — closure or named function
683    //
684    // Dispatch priority:
685    //   1. Closure call (upvalues present) — needs function_id for upvalue binding
686    //   2. Hash-first — preferred for non-closure calls because function_id from the
687    //      client may be stale after blob-based relinking on the server
688    //   3. Function ID — fallback when no hash is available (legacy path)
689    //   4. Function name — last resort
690    let result = if let Some(ref upvalue_data) = request.upvalues {
691        // Closure call: reconstruct upvalues as Upvalue structs
692        let upvalues: Vec<shape_value::Upvalue> = upvalue_data
693            .iter()
694            .map(|sv| {
695                let nb = serializable_to_nanboxed(sv, store).map_err(|e| RemoteCallError {
696                    message: format!("Failed to deserialize upvalue: {}", e),
697                    kind: RemoteErrorKind::ArgumentError,
698                })?;
699                Ok(shape_value::Upvalue::new(nb))
700            })
701            .collect::<Result<Vec<_>, RemoteCallError>>()?;
702
703        let function_id = request.function_id.ok_or_else(|| RemoteCallError {
704            message: "Closure call requires function_id".to_string(),
705            kind: RemoteErrorKind::FunctionNotFound,
706        })?;
707
708        vm.execute_closure(function_id, upvalues, args, None)
709            .map_err(|e| RemoteCallError {
710                message: e.to_string(),
711                kind: RemoteErrorKind::RuntimeError,
712            })?
713    } else if let Some(hash) = request.function_hash {
714        // Hash-first call path — resolve the function by its content hash
715        // in the server's linked program. This is correct even when the program
716        // was reconstructed from blobs (where client-side function_ids are stale).
717        let func_id = vm
718            .program()
719            .function_blob_hashes
720            .iter()
721            .enumerate()
722            .find_map(|(idx, maybe_hash)| {
723                if maybe_hash == &Some(hash) {
724                    Some(idx as u16)
725                } else {
726                    None
727                }
728            })
729            .ok_or_else(|| RemoteCallError {
730                message: format!("Function hash not found in program: {}", hash),
731                kind: RemoteErrorKind::FunctionNotFound,
732            })?;
733        vm.execute_function_by_id(func_id, args, None)
734            .map_err(|e| RemoteCallError {
735                message: e.to_string(),
736                kind: RemoteErrorKind::RuntimeError,
737            })?
738    } else if let Some(func_id) = request.function_id {
739        // Call by function ID (legacy — only valid when program was not reconstructed)
740        vm.execute_function_by_id(func_id, args, None)
741            .map_err(|e| RemoteCallError {
742                message: e.to_string(),
743                kind: RemoteErrorKind::RuntimeError,
744            })?
745    } else {
746        // Call by name
747        vm.execute_function_by_name(&request.function_name, args, None)
748            .map_err(|e| RemoteCallError {
749                message: e.to_string(),
750                kind: RemoteErrorKind::RuntimeError,
751            })?
752    };
753
754    // 5. Serialize result
755    nanboxed_to_serializable(&result, store).map_err(|e| RemoteCallError {
756        message: format!("Failed to serialize result: {}", e),
757        kind: RemoteErrorKind::RuntimeError,
758    })
759}
760
761fn execute_inner_with_runtimes(
762    request: RemoteCallRequest,
763    store: &SnapshotStore,
764    language_runtimes: &std::collections::HashMap<
765        String,
766        std::sync::Arc<shape_runtime::plugins::language_runtime::PluginLanguageRuntime>,
767    >,
768) -> Result<SerializableVMValue, RemoteCallError> {
769    // 1. Reconstruct program with type schemas (same logic as execute_inner)
770    let mut program = if let Some(blobs) = request.function_blobs {
771        let entry_hash = request
772            .function_hash
773            .or_else(|| {
774                if let Some(fid) = request.function_id {
775                    request
776                        .program
777                        .function_blob_hashes
778                        .get(fid as usize)
779                        .copied()
780                        .flatten()
781                } else {
782                    None
783                }
784            })
785            .or_else(|| {
786                request.program.content_addressed.as_ref().and_then(|ca| {
787                    let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
788                        if blob.name == request.function_name {
789                            Some(*hash)
790                        } else {
791                            None
792                        }
793                    });
794                    let first = matches.next()?;
795                    if matches.next().is_some() {
796                        None
797                    } else {
798                        Some(first)
799                    }
800                })
801            })
802            .ok_or_else(|| RemoteCallError {
803                message: format!(
804                    "Could not resolve entry hash for remote function '{}'",
805                    request.function_name
806                ),
807                kind: RemoteErrorKind::FunctionNotFound,
808            })?;
809
810        let ca_program = program_from_blobs_by_hash(blobs, entry_hash, &request.program)
811            .ok_or_else(|| RemoteCallError {
812                message: format!(
813                    "Could not reconstruct program from blobs for '{}'",
814                    request.function_name
815                ),
816                kind: RemoteErrorKind::FunctionNotFound,
817            })?;
818        let linked = crate::linker::link(&ca_program).map_err(|e| RemoteCallError {
819            message: format!("Linker error: {}", e),
820            kind: RemoteErrorKind::RuntimeError,
821        })?;
822        crate::linker::linked_to_bytecode_program(&linked)
823    } else {
824        request.program
825    };
826    program.type_schema_registry = request.type_schemas;
827
828    // 2. Convert arguments from serializable form
829    let args: Vec<ValueWord> = request
830        .arguments
831        .iter()
832        .map(|sv| {
833            serializable_to_nanboxed(sv, store).map_err(|e| RemoteCallError {
834                message: format!("Failed to deserialize argument: {}", e),
835                kind: RemoteErrorKind::ArgumentError,
836            })
837        })
838        .collect::<Result<Vec<_>, _>>()?;
839
840    // 3. Create VM and load program
841    let mut vm = VirtualMachine::new(VMConfig::default());
842    vm.load_program(program);
843    vm.populate_module_objects();
844
845    // 4. Link foreign functions from pre-loaded language runtimes
846    if !vm.program.foreign_functions.is_empty() && !language_runtimes.is_empty() {
847        let entries = vm.program.foreign_functions.clone();
848        let mut handles = Vec::with_capacity(entries.len());
849
850        for (idx, entry) in entries.iter().enumerate() {
851            // Skip native ABI entries (not supported in remote context)
852            if entry.native_abi.is_some() {
853                handles.push(None);
854                continue;
855            }
856
857            if let Some(lang_runtime) = language_runtimes.get(&entry.language) {
858                vm.program.foreign_functions[idx].dynamic_errors =
859                    lang_runtime.has_dynamic_errors();
860
861                let compiled = lang_runtime
862                    .compile(
863                        &entry.name,
864                        &entry.body_text,
865                        &entry.param_names,
866                        &entry.param_types,
867                        entry.return_type.as_deref(),
868                        entry.is_async,
869                    )
870                    .map_err(|e| RemoteCallError {
871                        message: format!(
872                            "Failed to compile foreign function '{}': {}",
873                            entry.name, e
874                        ),
875                        kind: RemoteErrorKind::RuntimeError,
876                    })?;
877                handles.push(Some(crate::executor::ForeignFunctionHandle::Runtime {
878                    runtime: std::sync::Arc::clone(lang_runtime),
879                    compiled,
880                }));
881            } else {
882                return Err(RemoteCallError {
883                    message: format!(
884                        "No language runtime for '{}' on this server. \
885                         Install the {} extension.",
886                        entry.language, entry.language
887                    ),
888                    kind: RemoteErrorKind::RuntimeError,
889                });
890            }
891        }
892        vm.foreign_fn_handles = handles;
893    }
894
895    // 5. Execute — closure or named function (same dispatch priority as execute_inner)
896    let result = if let Some(ref upvalue_data) = request.upvalues {
897        let upvalues: Vec<shape_value::Upvalue> = upvalue_data
898            .iter()
899            .map(|sv| {
900                let nb = serializable_to_nanboxed(sv, store).map_err(|e| RemoteCallError {
901                    message: format!("Failed to deserialize upvalue: {}", e),
902                    kind: RemoteErrorKind::ArgumentError,
903                })?;
904                Ok(shape_value::Upvalue::new(nb))
905            })
906            .collect::<Result<Vec<_>, RemoteCallError>>()?;
907
908        let function_id = request.function_id.ok_or_else(|| RemoteCallError {
909            message: "Closure call requires function_id".to_string(),
910            kind: RemoteErrorKind::FunctionNotFound,
911        })?;
912
913        vm.execute_closure(function_id, upvalues, args, None)
914            .map_err(|e| RemoteCallError {
915                message: e.to_string(),
916                kind: RemoteErrorKind::RuntimeError,
917            })?
918    } else if let Some(hash) = request.function_hash {
919        // Hash-first: resolve by content hash in the server's linked program.
920        // Client-side function_ids are stale after blob-based relinking.
921        let func_id = vm
922            .program()
923            .function_blob_hashes
924            .iter()
925            .enumerate()
926            .find_map(|(idx, maybe_hash)| {
927                if maybe_hash == &Some(hash) {
928                    Some(idx as u16)
929                } else {
930                    None
931                }
932            })
933            .ok_or_else(|| RemoteCallError {
934                message: format!("Function hash not found in program: {}", hash),
935                kind: RemoteErrorKind::FunctionNotFound,
936            })?;
937        vm.execute_function_by_id(func_id, args, None)
938            .map_err(|e| RemoteCallError {
939                message: e.to_string(),
940                kind: RemoteErrorKind::RuntimeError,
941            })?
942    } else if let Some(func_id) = request.function_id {
943        vm.execute_function_by_id(func_id, args, None)
944            .map_err(|e| RemoteCallError {
945                message: e.to_string(),
946                kind: RemoteErrorKind::RuntimeError,
947            })?
948    } else {
949        vm.execute_function_by_name(&request.function_name, args, None)
950            .map_err(|e| RemoteCallError {
951                message: e.to_string(),
952                kind: RemoteErrorKind::RuntimeError,
953            })?
954    };
955
956    // 6. Serialize result
957    nanboxed_to_serializable(&result, store).map_err(|e| RemoteCallError {
958        message: format!("Failed to serialize result: {}", e),
959        kind: RemoteErrorKind::RuntimeError,
960    })
961}
962
963/// Compute a SHA-256 hash of a `BytecodeProgram` for caching.
964///
965/// Remote VMs can cache programs by this hash, avoiding re-transfer
966/// of the same program on repeated calls.
967pub fn program_hash(program: &BytecodeProgram) -> [u8; 32] {
968    use sha2::{Digest, Sha256};
969    let bytes =
970        rmp_serde::to_vec_named(program).expect("BytecodeProgram serialization should not fail");
971    let hash = Sha256::digest(&bytes);
972    let mut out = [0u8; 32];
973    out.copy_from_slice(&hash);
974    out
975}
976
977/// Create a minimal stub program containing only metadata (no instructions/constants/functions).
978///
979/// Used by `build_call_request` and `build_closure_call_request` when content-addressed
980/// blobs are available, to reduce payload size.
981fn create_stub_program(program: &BytecodeProgram) -> BytecodeProgram {
982    let mut stub = BytecodeProgram::default();
983    stub.type_schema_registry = program.type_schema_registry.clone();
984    // Carry enough content-addressed metadata for program_from_blobs()
985    if let Some(ref ca) = program.content_addressed {
986        stub.content_addressed = Some(Program {
987            entry: ca.entry,
988            function_store: std::collections::HashMap::new(),
989            top_level_locals_count: ca.top_level_locals_count,
990            top_level_local_storage_hints: ca.top_level_local_storage_hints.clone(),
991            module_binding_names: ca.module_binding_names.clone(),
992            module_binding_storage_hints: ca.module_binding_storage_hints.clone(),
993            function_local_storage_hints: ca.function_local_storage_hints.clone(),
994            top_level_frame: ca.top_level_frame.clone(),
995            data_schema: ca.data_schema.clone(),
996            type_schema_registry: ca.type_schema_registry.clone(),
997            trait_method_symbols: ca.trait_method_symbols.clone(),
998            foreign_functions: ca.foreign_functions.clone(),
999            native_struct_layouts: ca.native_struct_layouts.clone(),
1000            debug_info: ca.debug_info.clone(),
1001        });
1002    }
1003    // Copy top-level metadata needed by program_from_blobs
1004    stub.top_level_locals_count = program.top_level_locals_count;
1005    stub.top_level_local_storage_hints = program.top_level_local_storage_hints.clone();
1006    stub.module_binding_names = program.module_binding_names.clone();
1007    stub.module_binding_storage_hints = program.module_binding_storage_hints.clone();
1008    stub.function_local_storage_hints = program.function_local_storage_hints.clone();
1009    stub.data_schema = program.data_schema.clone();
1010    stub.trait_method_symbols = program.trait_method_symbols.clone();
1011    stub.foreign_functions = program.foreign_functions.clone();
1012    stub.native_struct_layouts = program.native_struct_layouts.clone();
1013    stub.debug_info = program.debug_info.clone();
1014    stub.function_blob_hashes = program.function_blob_hashes.clone();
1015    stub
1016}
1017
1018/// Perform blob negotiation before sending a call request.
1019///
1020/// Creates a `BlobNegotiationRequest` with the hashes from the blob set,
1021/// checks which blobs the remote already has (via the provided cache as a
1022/// local stand-in), and returns the set of known hashes that can be stripped
1023/// from the outgoing request.
1024///
1025/// In a real transport scenario the `BlobNegotiationRequest` would be sent
1026/// over the wire and the `BlobNegotiationResponse` received from the remote.
1027/// Currently this performs the negotiation locally against the provided cache.
1028///
1029/// # Example flow
1030/// ```text
1031/// 1. Caller builds blob set for function
1032/// 2. negotiate_blobs() → BlobNegotiationRequest with offered hashes
1033/// 3. Remote replies with BlobNegotiationResponse (known_hashes)
1034/// 4. Caller strips known blobs from the request
1035/// ```
1036pub fn negotiate_blobs(
1037    blobs: &[(FunctionHash, FunctionBlob)],
1038    remote_cache: &RemoteBlobCache,
1039) -> BlobNegotiationResponse {
1040    let request = BlobNegotiationRequest {
1041        offered_hashes: blobs.iter().map(|(h, _)| *h).collect(),
1042    };
1043    // TODO: Wire this to actual transport — currently performs negotiation
1044    // locally against the provided cache. In production, `request` would be
1045    // serialized, sent over the wire, and the response deserialized.
1046    handle_negotiation(&request, remote_cache)
1047}
1048
1049/// Build a `RemoteCallRequest` for a named function, with blob negotiation.
1050///
1051/// Performs a negotiation step against the provided `remote_cache` to discover
1052/// which blobs the remote already has, then strips those from the request.
1053/// If `remote_cache` is `None`, sends all blobs (no negotiation).
1054pub fn build_call_request_with_negotiation(
1055    program: &BytecodeProgram,
1056    function_name: &str,
1057    arguments: Vec<SerializableVMValue>,
1058    remote_cache: Option<&RemoteBlobCache>,
1059) -> RemoteCallRequest {
1060    let mut request = build_call_request(program, function_name, arguments);
1061
1062    if let (Some(cache), Some(blobs)) = (remote_cache, &mut request.function_blobs) {
1063        let response = negotiate_blobs(blobs, cache);
1064        let known_set: std::collections::HashSet<FunctionHash> =
1065            response.known_hashes.into_iter().collect();
1066        blobs.retain(|(hash, _)| !known_set.contains(hash));
1067    }
1068
1069    request
1070}
1071
1072/// Build a `RemoteCallRequest` for a named function.
1073///
1074/// Convenience function that handles program hashing and type schema extraction.
1075/// When the program has content-addressed blobs, automatically computes the
1076/// minimal transitive closure and attaches it to the request.
1077pub fn build_call_request(
1078    program: &BytecodeProgram,
1079    function_name: &str,
1080    arguments: Vec<SerializableVMValue>,
1081) -> RemoteCallRequest {
1082    let hash = program_hash(program);
1083    let function_id = program
1084        .functions
1085        .iter()
1086        .position(|f| f.name == function_name)
1087        .map(|id| id as u16);
1088    let function_hash = function_id
1089        .and_then(|fid| {
1090            program
1091                .function_blob_hashes
1092                .get(fid as usize)
1093                .copied()
1094                .flatten()
1095        })
1096        .or_else(|| {
1097            program.content_addressed.as_ref().and_then(|ca| {
1098                let mut matches = ca.function_store.iter().filter_map(|(hash, blob)| {
1099                    if blob.name == function_name {
1100                        Some(*hash)
1101                    } else {
1102                        None
1103                    }
1104                });
1105                let first = matches.next()?;
1106                if matches.next().is_some() {
1107                    None
1108                } else {
1109                    Some(first)
1110                }
1111            })
1112        });
1113    let blobs = function_hash.and_then(|h| build_minimal_blobs_by_hash(program, h));
1114
1115    // When content-addressed blobs are available, send a minimal stub program
1116    // instead of the full BytecodeProgram to reduce payload size.
1117    let request_program = if blobs.is_some() {
1118        create_stub_program(program)
1119    } else {
1120        program.clone()
1121    };
1122
1123    RemoteCallRequest {
1124        program: request_program,
1125        function_name: function_name.to_string(),
1126        function_id,
1127        function_hash,
1128        arguments,
1129        upvalues: None,
1130        type_schemas: program.type_schema_registry.clone(),
1131        program_hash: hash,
1132        function_blobs: blobs,
1133    }
1134}
1135
1136/// Build a `RemoteCallRequest` for a closure.
1137///
1138/// Serializes the closure's captured upvalues alongside the function call.
1139/// When the closure's function has a matching content-addressed blob, sends
1140/// the minimal blob set instead of the full program.
1141pub fn build_closure_call_request(
1142    program: &BytecodeProgram,
1143    function_id: u16,
1144    arguments: Vec<SerializableVMValue>,
1145    upvalues: Vec<SerializableVMValue>,
1146) -> RemoteCallRequest {
1147    let hash = program_hash(program);
1148
1149    let function_hash = program
1150        .function_blob_hashes
1151        .get(function_id as usize)
1152        .copied()
1153        .flatten();
1154    let blobs = function_hash.and_then(|h| build_minimal_blobs_by_hash(program, h));
1155
1156    RemoteCallRequest {
1157        program: if blobs.is_some() {
1158            create_stub_program(program)
1159        } else {
1160            program.clone()
1161        },
1162        function_name: String::new(),
1163        function_id: Some(function_id),
1164        function_hash,
1165        arguments,
1166        upvalues: Some(upvalues),
1167        type_schemas: program.type_schema_registry.clone(),
1168        program_hash: hash,
1169        function_blobs: blobs,
1170    }
1171}
1172
1173/// Build a `RemoteCallRequest` that strips function blobs the remote already has.
1174///
1175/// Like `build_call_request`, but takes a set of hashes the remote is known to
1176/// have cached (from a prior `BlobNegotiationResponse`). Blobs with matching
1177/// hashes are omitted from `function_blobs`, reducing payload size.
1178pub fn build_call_request_negotiated(
1179    program: &BytecodeProgram,
1180    function_name: &str,
1181    arguments: Vec<SerializableVMValue>,
1182    known_hashes: &[FunctionHash],
1183) -> RemoteCallRequest {
1184    let mut request = build_call_request(program, function_name, arguments);
1185
1186    // Strip blobs the remote already has
1187    if let Some(ref mut blobs) = request.function_blobs {
1188        let known_set: std::collections::HashSet<FunctionHash> =
1189            known_hashes.iter().copied().collect();
1190        blobs.retain(|(hash, _)| !known_set.contains(hash));
1191    }
1192
1193    request
1194}
1195
1196/// Handle a blob negotiation request on the server side.
1197///
1198/// Returns the subset of offered hashes that are present in the cache.
1199pub fn handle_negotiation(
1200    request: &BlobNegotiationRequest,
1201    cache: &RemoteBlobCache,
1202) -> BlobNegotiationResponse {
1203    BlobNegotiationResponse {
1204        known_hashes: cache.filter_known(&request.offered_hashes),
1205    }
1206}
1207
1208// ---------------------------------------------------------------------------
1209// Wire message dispatch (V1 + V2 handlers)
1210// ---------------------------------------------------------------------------
1211
1212/// Handle a `WireMessage` by dispatching to the appropriate handler.
1213///
1214/// V1 messages (BlobNegotiation, Call, CallResponse, Sidecar) are fully handled.
1215/// V2 messages (Execute, Validate, Auth, Ping, file/project operations) return
1216/// stub error responses until the execution server is implemented.
1217pub fn handle_wire_message(
1218    msg: WireMessage,
1219    store: &SnapshotStore,
1220    cache: &mut RemoteBlobCache,
1221) -> WireMessage {
1222    match msg {
1223        WireMessage::BlobNegotiation(req) => {
1224            let response = handle_negotiation(&req, cache);
1225            WireMessage::BlobNegotiationReply(response)
1226        }
1227        WireMessage::BlobNegotiationReply(_) => {
1228            // Client-side message — server should not receive this.
1229            // Return an error wrapped in an ExecuteResponse as a generic error channel.
1230            WireMessage::ExecuteResponse(ExecuteResponse {
1231                request_id: 0,
1232                success: false,
1233                value: WireValue::Null,
1234                stdout: None,
1235                error: Some("Unexpected BlobNegotiationReply on server side".to_string()),
1236                content_terminal: None,
1237                content_html: None,
1238                diagnostics: vec![],
1239                metrics: None,
1240                print_output: None,
1241            })
1242        }
1243        WireMessage::Call(req) => {
1244            // Cache any incoming blobs for future negotiation
1245            if let Some(ref blobs) = req.function_blobs {
1246                cache.insert_blobs(blobs);
1247            }
1248            let response = execute_remote_call(req, store);
1249            WireMessage::CallResponse(response)
1250        }
1251        WireMessage::CallResponse(_) => {
1252            // Client-side message — server should not receive this.
1253            WireMessage::ExecuteResponse(ExecuteResponse {
1254                request_id: 0,
1255                success: false,
1256                value: WireValue::Null,
1257                stdout: None,
1258                error: Some("Unexpected CallResponse on server side".to_string()),
1259                content_terminal: None,
1260                content_html: None,
1261                diagnostics: vec![],
1262                metrics: None,
1263                print_output: None,
1264            })
1265        }
1266        WireMessage::Sidecar(_sidecar) => {
1267            // Sidecars are buffered by the transport layer and reassembled
1268            // before the Call message is dispatched. If we receive one here,
1269            // it means the transport did not buffer it.
1270            WireMessage::ExecuteResponse(ExecuteResponse {
1271                request_id: 0,
1272                success: false,
1273                value: WireValue::Null,
1274                stdout: None,
1275                error: Some("Unexpected standalone Sidecar message".to_string()),
1276                content_terminal: None,
1277                content_html: None,
1278                diagnostics: vec![],
1279                metrics: None,
1280                print_output: None,
1281            })
1282        }
1283
1284        // --- V2 message stubs ---
1285
1286        WireMessage::Execute(req) => WireMessage::ExecuteResponse(ExecuteResponse {
1287            request_id: req.request_id,
1288            success: false,
1289            value: WireValue::Null,
1290            stdout: None,
1291            error: Some("V2 Execute not yet implemented".to_string()),
1292            content_terminal: None,
1293            content_html: None,
1294            diagnostics: vec![WireDiagnostic {
1295                severity: "error".to_string(),
1296                message: "V2 Execute handler not yet implemented".to_string(),
1297                line: None,
1298                column: None,
1299            }],
1300            metrics: None,
1301            print_output: None,
1302        }),
1303        WireMessage::ExecuteResponse(_) => {
1304            // Client-side message — should not arrive at server.
1305            WireMessage::ExecuteResponse(ExecuteResponse {
1306                request_id: 0,
1307                success: false,
1308                value: WireValue::Null,
1309                stdout: None,
1310                error: Some("Unexpected ExecuteResponse on server side".to_string()),
1311                content_terminal: None,
1312                content_html: None,
1313                diagnostics: vec![],
1314                metrics: None,
1315                print_output: None,
1316            })
1317        }
1318        WireMessage::Validate(req) => WireMessage::ValidateResponse(ValidateResponse {
1319            request_id: req.request_id,
1320            success: false,
1321            diagnostics: vec![WireDiagnostic {
1322                severity: "error".to_string(),
1323                message: "V2 Validate handler not yet implemented".to_string(),
1324                line: None,
1325                column: None,
1326            }],
1327        }),
1328        WireMessage::ValidateResponse(_) => {
1329            WireMessage::ExecuteResponse(ExecuteResponse {
1330                request_id: 0,
1331                success: false,
1332                value: WireValue::Null,
1333                stdout: None,
1334                error: Some("Unexpected ValidateResponse on server side".to_string()),
1335                content_terminal: None,
1336                content_html: None,
1337                diagnostics: vec![],
1338                metrics: None,
1339                print_output: None,
1340            })
1341        }
1342        WireMessage::Auth(_req) => WireMessage::AuthResponse(AuthResponse {
1343            authenticated: false,
1344            error: Some("V2 Auth handler not yet implemented".to_string()),
1345        }),
1346        WireMessage::AuthResponse(_) => {
1347            WireMessage::ExecuteResponse(ExecuteResponse {
1348                request_id: 0,
1349                success: false,
1350                value: WireValue::Null,
1351                stdout: None,
1352                error: Some("Unexpected AuthResponse on server side".to_string()),
1353                content_terminal: None,
1354                content_html: None,
1355                diagnostics: vec![],
1356                metrics: None,
1357                print_output: None,
1358            })
1359        }
1360        WireMessage::ExecuteFile(req) => WireMessage::ExecuteResponse(ExecuteResponse {
1361            request_id: req.request_id,
1362            success: false,
1363            value: WireValue::Null,
1364            stdout: None,
1365            error: Some("V2 ExecuteFile handler not yet implemented".to_string()),
1366            content_terminal: None,
1367            content_html: None,
1368            diagnostics: vec![WireDiagnostic {
1369                severity: "error".to_string(),
1370                message: "V2 ExecuteFile handler not yet implemented".to_string(),
1371                line: None,
1372                column: None,
1373            }],
1374            metrics: None,
1375            print_output: None,
1376        }),
1377        WireMessage::ExecuteProject(req) => WireMessage::ExecuteResponse(ExecuteResponse {
1378            request_id: req.request_id,
1379            success: false,
1380            value: WireValue::Null,
1381            stdout: None,
1382            error: Some("V2 ExecuteProject handler not yet implemented".to_string()),
1383            content_terminal: None,
1384            content_html: None,
1385            diagnostics: vec![WireDiagnostic {
1386                severity: "error".to_string(),
1387                message: "V2 ExecuteProject handler not yet implemented".to_string(),
1388                line: None,
1389                column: None,
1390            }],
1391            metrics: None,
1392            print_output: None,
1393        }),
1394        WireMessage::ValidatePath(req) => WireMessage::ValidateResponse(ValidateResponse {
1395            request_id: req.request_id,
1396            success: false,
1397            diagnostics: vec![WireDiagnostic {
1398                severity: "error".to_string(),
1399                message: "V2 ValidatePath handler not yet implemented".to_string(),
1400                line: None,
1401                column: None,
1402            }],
1403        }),
1404        WireMessage::Ping(_) => WireMessage::Pong(ServerInfo {
1405            shape_version: env!("CARGO_PKG_VERSION").to_string(),
1406            wire_protocol: shape_wire::WIRE_PROTOCOL_V2,
1407            capabilities: vec![
1408                "call".to_string(),
1409                "blob-negotiation".to_string(),
1410                "sidecar".to_string(),
1411            ],
1412        }),
1413        WireMessage::Pong(_) => {
1414            // Client-side message — should not arrive at server.
1415            WireMessage::ExecuteResponse(ExecuteResponse {
1416                request_id: 0,
1417                success: false,
1418                value: WireValue::Null,
1419                stdout: None,
1420                error: Some("Unexpected Pong on server side".to_string()),
1421                content_terminal: None,
1422                content_html: None,
1423                diagnostics: vec![],
1424                metrics: None,
1425                print_output: None,
1426            })
1427        }
1428    }
1429}
1430
1431// ---------------------------------------------------------------------------
1432// Phase 3B: Sidecar extraction and reassembly
1433// ---------------------------------------------------------------------------
1434
1435/// Minimum blob size (in bytes) to extract as a sidecar.
1436/// Blobs smaller than this are left inline in the serialized payload.
1437pub const SIDECAR_THRESHOLD: usize = 1024 * 1024; // 1 MB
1438
1439/// Extract large blobs from serialized arguments into sidecars.
1440///
1441/// Walks the `SerializableVMValue` tree recursively. Any `BlobRef` whose
1442/// backing `ChunkedBlob` exceeds `SIDECAR_THRESHOLD` bytes is replaced
1443/// with a `SidecarRef` and the raw data is collected into a `BlobSidecar`.
1444///
1445/// Returns the extracted sidecars. The `args` are modified in place.
1446pub fn extract_sidecars(
1447    args: &mut Vec<SerializableVMValue>,
1448    store: &SnapshotStore,
1449) -> Vec<BlobSidecar> {
1450    let mut sidecars = Vec::new();
1451    let mut next_id: u32 = 0;
1452    for arg in args.iter_mut() {
1453        extract_sidecars_recursive(arg, store, &mut sidecars, &mut next_id);
1454    }
1455    sidecars
1456}
1457
1458/// Extract the BlobRef from a SerializableVMValue if it carries one (non-mutating read).
1459fn get_blob_ref(value: &SerializableVMValue) -> Option<&shape_runtime::snapshot::BlobRef> {
1460    use shape_runtime::snapshot::SerializableVMValue as SV;
1461    match value {
1462        SV::DataTable(blob)
1463        | SV::TypedTable { table: blob, .. }
1464        | SV::RowView { table: blob, .. }
1465        | SV::ColumnRef { table: blob, .. }
1466        | SV::IndexedTable { table: blob, .. } => Some(blob),
1467        SV::TypedArray { blob, .. } | SV::Matrix { blob, .. } => Some(blob),
1468        _ => None,
1469    }
1470}
1471
1472fn extract_sidecars_recursive(
1473    value: &mut SerializableVMValue,
1474    store: &SnapshotStore,
1475    sidecars: &mut Vec<BlobSidecar>,
1476    next_id: &mut u32,
1477) {
1478    use shape_runtime::snapshot::SerializableVMValue as SV;
1479
1480    // First: check if this value carries a blob large enough to extract.
1481    // Capture metadata (TypedArray len, Matrix rows/cols) before replacing.
1482    let meta = match &*value {
1483        SV::TypedArray { len, .. } => (*len as u32, 0u32),
1484        SV::Matrix { rows, cols, .. } => (*rows, *cols),
1485        _ => (0, 0),
1486    };
1487    // Clone the blob info to avoid borrow conflicts with the later mutation.
1488    if let Some(blob) = get_blob_ref(value) {
1489        let blob_kind = blob.kind.clone();
1490        let blob_hash = blob.hash.clone();
1491        if let Some(sidecar) = try_extract_blob(blob, store, next_id) {
1492            let sidecar_id = sidecar.sidecar_id;
1493            sidecars.push(sidecar);
1494            *value = SV::SidecarRef {
1495                sidecar_id,
1496                blob_kind,
1497                original_hash: blob_hash,
1498                meta_a: meta.0,
1499                meta_b: meta.1,
1500            };
1501            return;
1502        }
1503    }
1504
1505    // Recursive descent into containers
1506    match value {
1507        SV::Array(items) => {
1508            for item in items.iter_mut() {
1509                extract_sidecars_recursive(item, store, sidecars, next_id);
1510            }
1511        }
1512        SV::HashMap { keys, values } => {
1513            for k in keys.iter_mut() {
1514                extract_sidecars_recursive(k, store, sidecars, next_id);
1515            }
1516            for v in values.iter_mut() {
1517                extract_sidecars_recursive(v, store, sidecars, next_id);
1518            }
1519        }
1520        SV::TypedObject { slot_data, .. } => {
1521            for slot in slot_data.iter_mut() {
1522                extract_sidecars_recursive(slot, store, sidecars, next_id);
1523            }
1524        }
1525        SV::Some(inner) | SV::Ok(inner) | SV::Err(inner) => {
1526            extract_sidecars_recursive(inner, store, sidecars, next_id);
1527        }
1528        SV::TypeAnnotatedValue { value: inner, .. } => {
1529            extract_sidecars_recursive(inner, store, sidecars, next_id);
1530        }
1531        SV::Closure { upvalues, .. } => {
1532            for uv in upvalues.iter_mut() {
1533                extract_sidecars_recursive(uv, store, sidecars, next_id);
1534            }
1535        }
1536        SV::Enum(ev) => match &mut ev.payload {
1537            shape_runtime::snapshot::EnumPayloadSnapshot::Unit => {}
1538            shape_runtime::snapshot::EnumPayloadSnapshot::Tuple(items) => {
1539                for item in items.iter_mut() {
1540                    extract_sidecars_recursive(item, store, sidecars, next_id);
1541                }
1542            }
1543            shape_runtime::snapshot::EnumPayloadSnapshot::Struct(fields) => {
1544                for (_, v) in fields.iter_mut() {
1545                    extract_sidecars_recursive(v, store, sidecars, next_id);
1546                }
1547            }
1548        },
1549        SV::PrintResult(pr) => {
1550            for span in pr.spans.iter_mut() {
1551                if let shape_runtime::snapshot::PrintSpanSnapshot::Value {
1552                    raw_value,
1553                    format_params,
1554                    ..
1555                } = span
1556                {
1557                    extract_sidecars_recursive(raw_value, store, sidecars, next_id);
1558                    for (_, v) in format_params.iter_mut() {
1559                        extract_sidecars_recursive(v, store, sidecars, next_id);
1560                    }
1561                }
1562            }
1563        }
1564        SV::SimulationCall { params, .. } => {
1565            for (_, v) in params.iter_mut() {
1566                extract_sidecars_recursive(v, store, sidecars, next_id);
1567            }
1568        }
1569        SV::FunctionRef { closure, .. } => {
1570            if let Some(c) = closure {
1571                extract_sidecars_recursive(c, store, sidecars, next_id);
1572            }
1573        }
1574        SV::Range { start, end, .. } => {
1575            if let Some(s) = start {
1576                extract_sidecars_recursive(s, store, sidecars, next_id);
1577            }
1578            if let Some(e) = end {
1579                extract_sidecars_recursive(e, store, sidecars, next_id);
1580            }
1581        }
1582
1583        // Leaf types and blob carriers (handled above) — nothing more to do
1584        _ => {}
1585    }
1586}
1587
1588/// Try to extract a BlobRef's data as a sidecar if it exceeds the threshold.
1589fn try_extract_blob(
1590    blob: &shape_runtime::snapshot::BlobRef,
1591    store: &SnapshotStore,
1592    next_id: &mut u32,
1593) -> Option<BlobSidecar> {
1594    // Load the ChunkedBlob metadata to check total size
1595    let chunked: shape_runtime::snapshot::ChunkedBlob = store.get_struct(&blob.hash).ok()?;
1596    if chunked.total_len < SIDECAR_THRESHOLD {
1597        return None;
1598    }
1599
1600    // Load the raw data
1601    let data = shape_runtime::snapshot::load_chunked_bytes(&chunked, store).ok()?;
1602    let sidecar_id = *next_id;
1603    *next_id += 1;
1604
1605    Some(BlobSidecar { sidecar_id, data })
1606}
1607
1608/// Reassemble sidecars back into the serialized payload.
1609///
1610/// Walks the `SerializableVMValue` tree and replaces `SidecarRef` variants
1611/// with the original `BlobRef`, storing the sidecar data back into the
1612/// snapshot store.
1613pub fn reassemble_sidecars(
1614    args: &mut Vec<SerializableVMValue>,
1615    sidecars: &std::collections::HashMap<u32, BlobSidecar>,
1616    store: &SnapshotStore,
1617) -> anyhow::Result<()> {
1618    for arg in args.iter_mut() {
1619        reassemble_recursive(arg, sidecars, store)?;
1620    }
1621    Ok(())
1622}
1623
1624fn reassemble_recursive(
1625    value: &mut SerializableVMValue,
1626    sidecars: &std::collections::HashMap<u32, BlobSidecar>,
1627    store: &SnapshotStore,
1628) -> anyhow::Result<()> {
1629    use shape_runtime::snapshot::{BlobRef, SerializableVMValue as SV};
1630
1631    match value {
1632        SV::SidecarRef {
1633            sidecar_id,
1634            blob_kind,
1635            original_hash: _,
1636            meta_a,
1637            meta_b,
1638        } => {
1639            let sidecar = sidecars
1640                .get(sidecar_id)
1641                .ok_or_else(|| anyhow::anyhow!("missing sidecar with id {}", sidecar_id))?;
1642            let meta_a = *meta_a;
1643            let meta_b = *meta_b;
1644
1645            // Store the sidecar data back into the snapshot store as chunked bytes,
1646            // then wrap in a ChunkedBlob struct and store that.
1647            let chunked = shape_runtime::snapshot::store_chunked_bytes(&sidecar.data, store)?;
1648            let hash = store.put_struct(&chunked)?;
1649
1650            let blob = BlobRef {
1651                hash,
1652                kind: blob_kind.clone(),
1653            };
1654            *value = match blob_kind {
1655                shape_runtime::snapshot::BlobKind::DataTable => SV::DataTable(blob),
1656                shape_runtime::snapshot::BlobKind::TypedArray(ek) => SV::TypedArray {
1657                    element_kind: *ek,
1658                    blob,
1659                    len: meta_a as usize,
1660                },
1661                shape_runtime::snapshot::BlobKind::Matrix => SV::Matrix {
1662                    blob,
1663                    rows: meta_a,
1664                    cols: meta_b,
1665                },
1666            };
1667        }
1668
1669        // Recursive descent (same structure as extract)
1670        SV::Array(items) => {
1671            for item in items.iter_mut() {
1672                reassemble_recursive(item, sidecars, store)?;
1673            }
1674        }
1675        SV::HashMap { keys, values } => {
1676            for k in keys.iter_mut() {
1677                reassemble_recursive(k, sidecars, store)?;
1678            }
1679            for v in values.iter_mut() {
1680                reassemble_recursive(v, sidecars, store)?;
1681            }
1682        }
1683        SV::TypedObject { slot_data, .. } => {
1684            for slot in slot_data.iter_mut() {
1685                reassemble_recursive(slot, sidecars, store)?;
1686            }
1687        }
1688        SV::Some(inner) | SV::Ok(inner) | SV::Err(inner) => {
1689            reassemble_recursive(inner, sidecars, store)?;
1690        }
1691        SV::TypeAnnotatedValue { value: inner, .. } => {
1692            reassemble_recursive(inner, sidecars, store)?;
1693        }
1694        SV::Closure { upvalues, .. } => {
1695            for uv in upvalues.iter_mut() {
1696                reassemble_recursive(uv, sidecars, store)?;
1697            }
1698        }
1699        SV::Enum(ev) => match &mut ev.payload {
1700            shape_runtime::snapshot::EnumPayloadSnapshot::Unit => {}
1701            shape_runtime::snapshot::EnumPayloadSnapshot::Tuple(items) => {
1702                for item in items.iter_mut() {
1703                    reassemble_recursive(item, sidecars, store)?;
1704                }
1705            }
1706            shape_runtime::snapshot::EnumPayloadSnapshot::Struct(fields) => {
1707                for (_, v) in fields.iter_mut() {
1708                    reassemble_recursive(v, sidecars, store)?;
1709                }
1710            }
1711        },
1712        SV::PrintResult(pr) => {
1713            for span in pr.spans.iter_mut() {
1714                if let shape_runtime::snapshot::PrintSpanSnapshot::Value {
1715                    raw_value,
1716                    format_params,
1717                    ..
1718                } = span
1719                {
1720                    reassemble_recursive(raw_value, sidecars, store)?;
1721                    for (_, v) in format_params.iter_mut() {
1722                        reassemble_recursive(v, sidecars, store)?;
1723                    }
1724                }
1725            }
1726        }
1727        SV::SimulationCall { params, .. } => {
1728            for (_, v) in params.iter_mut() {
1729                reassemble_recursive(v, sidecars, store)?;
1730            }
1731        }
1732        SV::FunctionRef { closure, .. } => {
1733            if let Some(c) = closure {
1734                reassemble_recursive(c, sidecars, store)?;
1735            }
1736        }
1737        SV::Range { start, end, .. } => {
1738            if let Some(s) = start {
1739                reassemble_recursive(s, sidecars, store)?;
1740            }
1741            if let Some(e) = end {
1742                reassemble_recursive(e, sidecars, store)?;
1743            }
1744        }
1745
1746        // Leaf types and blob-carrying variants (non-sidecar) — nothing to reassemble
1747        _ => {}
1748    }
1749    Ok(())
1750}
1751
1752#[cfg(test)]
1753mod tests {
1754    use super::*;
1755    use crate::bytecode::{FunctionBlob, FunctionHash, Instruction, OpCode, Program};
1756    use crate::compiler::BytecodeCompiler;
1757    use shape_abi_v1::PermissionSet;
1758    use std::collections::HashMap;
1759
1760    /// Helper: compile Shape source to BytecodeProgram
1761    fn compile(source: &str) -> BytecodeProgram {
1762        let program = shape_ast::parser::parse_program(source).expect("parse failed");
1763        let compiler = BytecodeCompiler::new();
1764        compiler.compile(&program).expect("compile failed")
1765    }
1766
1767    /// Helper: create a temp SnapshotStore
1768    fn temp_store() -> SnapshotStore {
1769        let dir = std::env::temp_dir().join(format!("shape_remote_test_{}", std::process::id()));
1770        SnapshotStore::new(dir).expect("create snapshot store")
1771    }
1772
1773    fn mk_hash(tag: u8) -> FunctionHash {
1774        let mut bytes = [0u8; 32];
1775        bytes[0] = tag;
1776        FunctionHash(bytes)
1777    }
1778
1779    fn mk_blob(name: &str, hash: FunctionHash, dependencies: Vec<FunctionHash>) -> FunctionBlob {
1780        FunctionBlob {
1781            content_hash: hash,
1782            name: name.to_string(),
1783            arity: 0,
1784            param_names: Vec::new(),
1785            locals_count: 0,
1786            is_closure: false,
1787            captures_count: 0,
1788            is_async: false,
1789            ref_params: Vec::new(),
1790            ref_mutates: Vec::new(),
1791            mutable_captures: Vec::new(),
1792            frame_descriptor: None,
1793            instructions: vec![
1794                Instruction::simple(OpCode::PushNull),
1795                Instruction::simple(OpCode::ReturnValue),
1796            ],
1797            constants: Vec::new(),
1798            strings: Vec::new(),
1799            required_permissions: PermissionSet::pure(),
1800            dependencies,
1801            callee_names: Vec::new(),
1802            type_schemas: Vec::new(),
1803            foreign_dependencies: Vec::new(),
1804            source_map: Vec::new(),
1805        }
1806    }
1807
1808    #[test]
1809    fn test_remote_call_simple_function() {
1810        let bytecode = compile(
1811            r#"
1812            function add(a, b) { a + b }
1813        "#,
1814        );
1815        let store = temp_store();
1816
1817        let request = build_call_request(
1818            &bytecode,
1819            "add",
1820            vec![
1821                SerializableVMValue::Number(10.0),
1822                SerializableVMValue::Number(32.0),
1823            ],
1824        );
1825
1826        let response = execute_remote_call(request, &store);
1827        match response.result {
1828            Ok(SerializableVMValue::Number(n)) => assert_eq!(n, 42.0),
1829            other => panic!("Expected Number(42.0), got {:?}", other),
1830        }
1831    }
1832
1833    #[test]
1834    fn test_remote_call_function_not_found() {
1835        let bytecode = compile("function foo() { 1 }");
1836        let store = temp_store();
1837
1838        let request = build_call_request(&bytecode, "nonexistent", vec![]);
1839
1840        let response = execute_remote_call(request, &store);
1841        assert!(response.result.is_err());
1842        let err = response.result.unwrap_err();
1843        assert!(matches!(err.kind, RemoteErrorKind::RuntimeError));
1844    }
1845
1846    #[test]
1847    fn test_program_hash_deterministic() {
1848        let bytecode = compile("function f(x) { x * 2 }");
1849        let hash1 = program_hash(&bytecode);
1850        let hash2 = program_hash(&bytecode);
1851        assert_eq!(hash1, hash2, "Same program should produce same hash");
1852    }
1853
1854    #[test]
1855    fn test_request_response_serialization_roundtrip() {
1856        let bytecode = compile("function double(x) { x * 2 }");
1857        let request =
1858            build_call_request(&bytecode, "double", vec![SerializableVMValue::Number(21.0)]);
1859
1860        // Encode → decode roundtrip via MessagePack
1861        let bytes = shape_wire::encode_message(&request).expect("encode request");
1862        let decoded: RemoteCallRequest =
1863            shape_wire::decode_message(&bytes).expect("decode request");
1864
1865        assert_eq!(decoded.function_name, "double");
1866        assert_eq!(decoded.arguments.len(), 1);
1867        assert_eq!(decoded.program_hash, request.program_hash);
1868    }
1869
1870    #[test]
1871    fn test_response_serialization_roundtrip() {
1872        let response = RemoteCallResponse {
1873            result: Ok(SerializableVMValue::String("hello".to_string())),
1874        };
1875
1876        let bytes = shape_wire::encode_message(&response).expect("encode response");
1877        let decoded: RemoteCallResponse =
1878            shape_wire::decode_message(&bytes).expect("decode response");
1879
1880        match decoded.result {
1881            Ok(SerializableVMValue::String(s)) => assert_eq!(s, "hello"),
1882            other => panic!("Expected Ok(String), got {:?}", other),
1883        }
1884    }
1885
1886    #[test]
1887    fn test_type_schema_registry_roundtrip() {
1888        use shape_runtime::type_schema::{FieldType, TypeSchemaRegistry};
1889
1890        let mut registry = TypeSchemaRegistry::new();
1891        registry.register_type(
1892            "Point",
1893            vec![
1894                ("x".to_string(), FieldType::F64),
1895                ("y".to_string(), FieldType::F64),
1896            ],
1897        );
1898
1899        let bytes = shape_wire::encode_message(&registry).expect("encode registry");
1900        let decoded: TypeSchemaRegistry =
1901            shape_wire::decode_message(&bytes).expect("decode registry");
1902
1903        assert!(decoded.has_type("Point"));
1904        let schema = decoded.get("Point").unwrap();
1905        assert_eq!(schema.field_count(), 2);
1906        assert_eq!(schema.field_offset("x"), Some(0));
1907        assert_eq!(schema.field_offset("y"), Some(8));
1908    }
1909
1910    #[test]
1911    fn test_build_minimal_blobs_rejects_ambiguous_function_name() {
1912        let h1 = mk_hash(1);
1913        let h2 = mk_hash(2);
1914        let blob1 = mk_blob("dup", h1, vec![]);
1915        let blob2 = mk_blob("dup", h2, vec![]);
1916
1917        let mut function_store = HashMap::new();
1918        function_store.insert(h1, blob1.clone());
1919        function_store.insert(h2, blob2.clone());
1920
1921        let mut program = BytecodeProgram::default();
1922        program.content_addressed = Some(Program {
1923            entry: h1,
1924            function_store,
1925            top_level_locals_count: 0,
1926            top_level_local_storage_hints: Vec::new(),
1927            module_binding_names: Vec::new(),
1928            module_binding_storage_hints: Vec::new(),
1929            function_local_storage_hints: Vec::new(),
1930            top_level_frame: None,
1931            data_schema: None,
1932            type_schema_registry: shape_runtime::type_schema::TypeSchemaRegistry::new(),
1933            trait_method_symbols: HashMap::new(),
1934            foreign_functions: Vec::new(),
1935            native_struct_layouts: Vec::new(),
1936            debug_info: crate::bytecode::DebugInfo::new("<test>".to_string()),
1937        });
1938
1939        assert!(
1940            build_minimal_blobs(&program, "dup").is_none(),
1941            "name-based selection must reject ambiguous function names"
1942        );
1943
1944        let by_hash = build_minimal_blobs_by_hash(&program, h2)
1945            .expect("hash-based selection should work with duplicate names");
1946        assert_eq!(by_hash.len(), 1);
1947        assert_eq!(by_hash[0].0, h2);
1948        assert_eq!(by_hash[0].1.name, "dup");
1949    }
1950
1951    #[test]
1952    fn test_program_from_blobs_by_hash_requires_entry_blob() {
1953        let h1 = mk_hash(1);
1954        let h_missing = mk_hash(9);
1955        let blob = mk_blob("f", h1, vec![]);
1956        let source = BytecodeProgram::default();
1957
1958        let reconstructed = program_from_blobs_by_hash(vec![(h1, blob)], h_missing, &source);
1959        assert!(
1960            reconstructed.is_none(),
1961            "reconstruction must fail when the requested entry hash is absent"
1962        );
1963    }
1964
1965    // ---- Phase 2: Blob negotiation tests ----
1966
1967    #[test]
1968    fn test_blob_cache_insert_and_get() {
1969        let mut cache = RemoteBlobCache::new(10);
1970        let h1 = mk_hash(1);
1971        let blob1 = mk_blob("f1", h1, vec![]);
1972
1973        cache.insert(h1, blob1.clone());
1974        assert_eq!(cache.len(), 1);
1975        assert!(cache.contains(&h1));
1976        assert_eq!(cache.get(&h1).unwrap().name, "f1");
1977    }
1978
1979    #[test]
1980    fn test_blob_cache_lru_eviction() {
1981        let mut cache = RemoteBlobCache::new(2);
1982        let h1 = mk_hash(1);
1983        let h2 = mk_hash(2);
1984        let h3 = mk_hash(3);
1985
1986        cache.insert(h1, mk_blob("f1", h1, vec![]));
1987        cache.insert(h2, mk_blob("f2", h2, vec![]));
1988        assert_eq!(cache.len(), 2);
1989
1990        // Insert h3 should evict h1 (least recently used)
1991        cache.insert(h3, mk_blob("f3", h3, vec![]));
1992        assert_eq!(cache.len(), 2);
1993        assert!(!cache.contains(&h1), "h1 should be evicted");
1994        assert!(cache.contains(&h2));
1995        assert!(cache.contains(&h3));
1996    }
1997
1998    #[test]
1999    fn test_blob_cache_access_updates_order() {
2000        let mut cache = RemoteBlobCache::new(2);
2001        let h1 = mk_hash(1);
2002        let h2 = mk_hash(2);
2003        let h3 = mk_hash(3);
2004
2005        cache.insert(h1, mk_blob("f1", h1, vec![]));
2006        cache.insert(h2, mk_blob("f2", h2, vec![]));
2007
2008        // Access h1 to make it recently used
2009        cache.get(&h1);
2010
2011        // Insert h3 should evict h2 (now least recently used)
2012        cache.insert(h3, mk_blob("f3", h3, vec![]));
2013        assert!(
2014            cache.contains(&h1),
2015            "h1 was accessed, should not be evicted"
2016        );
2017        assert!(!cache.contains(&h2), "h2 should be evicted");
2018        assert!(cache.contains(&h3));
2019    }
2020
2021    #[test]
2022    fn test_blob_cache_filter_known() {
2023        let mut cache = RemoteBlobCache::new(10);
2024        let h1 = mk_hash(1);
2025        let h2 = mk_hash(2);
2026        let h3 = mk_hash(3);
2027
2028        cache.insert(h1, mk_blob("f1", h1, vec![]));
2029        cache.insert(h3, mk_blob("f3", h3, vec![]));
2030
2031        let known = cache.filter_known(&[h1, h2, h3]);
2032        assert_eq!(known.len(), 2);
2033        assert!(known.contains(&h1));
2034        assert!(known.contains(&h3));
2035        assert!(!known.contains(&h2));
2036    }
2037
2038    #[test]
2039    fn test_handle_negotiation() {
2040        let mut cache = RemoteBlobCache::new(10);
2041        let h1 = mk_hash(1);
2042        let h2 = mk_hash(2);
2043        cache.insert(h1, mk_blob("f1", h1, vec![]));
2044
2045        let request = BlobNegotiationRequest {
2046            offered_hashes: vec![h1, h2],
2047        };
2048        let response = handle_negotiation(&request, &cache);
2049        assert_eq!(response.known_hashes.len(), 1);
2050        assert!(response.known_hashes.contains(&h1));
2051    }
2052
2053    #[test]
2054    fn test_build_call_request_negotiated_strips_known_blobs() {
2055        // Create a program with content-addressed blobs
2056        let h1 = mk_hash(1);
2057        let h2 = mk_hash(2);
2058        let blob1 = mk_blob("entry", h1, vec![h2]);
2059        let blob2 = mk_blob("helper", h2, vec![]);
2060
2061        let mut function_store = HashMap::new();
2062        function_store.insert(h1, blob1.clone());
2063        function_store.insert(h2, blob2.clone());
2064
2065        let mut program = BytecodeProgram::default();
2066        program.content_addressed = Some(Program {
2067            entry: h1,
2068            function_store,
2069            top_level_locals_count: 0,
2070            top_level_local_storage_hints: Vec::new(),
2071            module_binding_names: Vec::new(),
2072            module_binding_storage_hints: Vec::new(),
2073            function_local_storage_hints: Vec::new(),
2074            top_level_frame: None,
2075            data_schema: None,
2076            type_schema_registry: shape_runtime::type_schema::TypeSchemaRegistry::new(),
2077            trait_method_symbols: HashMap::new(),
2078            foreign_functions: Vec::new(),
2079            native_struct_layouts: Vec::new(),
2080            debug_info: crate::bytecode::DebugInfo::new("<test>".to_string()),
2081        });
2082        program.functions = vec![crate::bytecode::Function {
2083            name: "entry".to_string(),
2084            arity: 0,
2085            param_names: vec![],
2086            locals_count: 0,
2087            entry_point: 0,
2088            body_length: 0,
2089            is_closure: false,
2090            captures_count: 0,
2091            is_async: false,
2092            ref_params: vec![],
2093            ref_mutates: vec![],
2094            mutable_captures: vec![],
2095            frame_descriptor: None,
2096            osr_entry_points: vec![],
2097        }];
2098        program.function_blob_hashes = vec![Some(h1)];
2099
2100        // First call: no known hashes -> all blobs sent
2101        let req1 = build_call_request_negotiated(&program, "entry", vec![], &[]);
2102        let blobs1 = req1.function_blobs.as_ref().unwrap();
2103        assert_eq!(blobs1.len(), 2, "first call should send all blobs");
2104
2105        // Second call: h2 is known -> only h1 sent
2106        let req2 = build_call_request_negotiated(&program, "entry", vec![], &[h2]);
2107        let blobs2 = req2.function_blobs.as_ref().unwrap();
2108        assert_eq!(blobs2.len(), 1, "second call should skip known blobs");
2109        assert_eq!(blobs2[0].0, h1);
2110    }
2111
2112    #[test]
2113    fn test_wire_message_serialization_roundtrip() {
2114        let msg = WireMessage::BlobNegotiation(BlobNegotiationRequest {
2115            offered_hashes: vec![mk_hash(1), mk_hash(2)],
2116        });
2117        let bytes = shape_wire::encode_message(&msg).expect("encode WireMessage");
2118        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode WireMessage");
2119        match decoded {
2120            WireMessage::BlobNegotiation(req) => {
2121                assert_eq!(req.offered_hashes.len(), 2);
2122            }
2123            _ => panic!("Expected BlobNegotiation"),
2124        }
2125    }
2126
2127    // ---- V2 execution server message tests ----
2128
2129    #[test]
2130    fn test_execute_request_roundtrip() {
2131        let msg = WireMessage::Execute(ExecuteRequest {
2132            code: "fn main() { 42 }".to_string(),
2133            request_id: 7,
2134        });
2135        let bytes = shape_wire::encode_message(&msg).expect("encode Execute");
2136        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode Execute");
2137        match decoded {
2138            WireMessage::Execute(req) => {
2139                assert_eq!(req.code, "fn main() { 42 }");
2140                assert_eq!(req.request_id, 7);
2141            }
2142            _ => panic!("Expected Execute"),
2143        }
2144    }
2145
2146    #[test]
2147    fn test_execute_response_roundtrip() {
2148        let msg = WireMessage::ExecuteResponse(ExecuteResponse {
2149            request_id: 7,
2150            success: true,
2151            value: WireValue::Number(42.0),
2152            stdout: Some("hello\n".to_string()),
2153            error: None,
2154            content_terminal: None,
2155            content_html: None,
2156            diagnostics: vec![WireDiagnostic {
2157                severity: "warning".to_string(),
2158                message: "unused variable".to_string(),
2159                line: Some(1),
2160                column: Some(5),
2161            }],
2162            metrics: Some(ExecutionMetrics {
2163                instructions_executed: 100,
2164                wall_time_ms: 3,
2165                memory_bytes_peak: 4096,
2166            }),
2167            print_output: None,
2168        });
2169        let bytes = shape_wire::encode_message(&msg).expect("encode ExecuteResponse");
2170        let decoded: WireMessage =
2171            shape_wire::decode_message(&bytes).expect("decode ExecuteResponse");
2172        match decoded {
2173            WireMessage::ExecuteResponse(resp) => {
2174                assert_eq!(resp.request_id, 7);
2175                assert!(resp.success);
2176                assert!(matches!(resp.value, WireValue::Number(n) if n == 42.0));
2177                assert_eq!(resp.stdout.as_deref(), Some("hello\n"));
2178                assert!(resp.error.is_none());
2179                assert_eq!(resp.diagnostics.len(), 1);
2180                assert_eq!(resp.diagnostics[0].severity, "warning");
2181                assert_eq!(resp.diagnostics[0].line, Some(1));
2182                let m = resp.metrics.unwrap();
2183                assert_eq!(m.instructions_executed, 100);
2184                assert_eq!(m.wall_time_ms, 3);
2185            }
2186            _ => panic!("Expected ExecuteResponse"),
2187        }
2188    }
2189
2190    #[test]
2191    fn test_ping_pong_roundtrip() {
2192        let ping = WireMessage::Ping(PingRequest {});
2193        let bytes = shape_wire::encode_message(&ping).expect("encode Ping");
2194        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode Ping");
2195        assert!(matches!(decoded, WireMessage::Ping(_)));
2196
2197        let pong = WireMessage::Pong(ServerInfo {
2198            shape_version: "0.1.3".to_string(),
2199            wire_protocol: 2,
2200            capabilities: vec!["execute".to_string(), "validate".to_string()],
2201        });
2202        let bytes = shape_wire::encode_message(&pong).expect("encode Pong");
2203        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode Pong");
2204        match decoded {
2205            WireMessage::Pong(info) => {
2206                assert_eq!(info.shape_version, "0.1.3");
2207                assert_eq!(info.wire_protocol, 2);
2208                assert_eq!(info.capabilities.len(), 2);
2209            }
2210            _ => panic!("Expected Pong"),
2211        }
2212    }
2213
2214    #[test]
2215    fn test_auth_roundtrip() {
2216        let msg = WireMessage::Auth(AuthRequest {
2217            token: "secret-token".to_string(),
2218        });
2219        let bytes = shape_wire::encode_message(&msg).expect("encode Auth");
2220        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode Auth");
2221        match decoded {
2222            WireMessage::Auth(req) => assert_eq!(req.token, "secret-token"),
2223            _ => panic!("Expected Auth"),
2224        }
2225
2226        let resp = WireMessage::AuthResponse(AuthResponse {
2227            authenticated: true,
2228            error: None,
2229        });
2230        let bytes = shape_wire::encode_message(&resp).expect("encode AuthResponse");
2231        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode AuthResponse");
2232        match decoded {
2233            WireMessage::AuthResponse(r) => {
2234                assert!(r.authenticated);
2235                assert!(r.error.is_none());
2236            }
2237            _ => panic!("Expected AuthResponse"),
2238        }
2239    }
2240
2241    #[test]
2242    fn test_validate_roundtrip() {
2243        let msg = WireMessage::Validate(ValidateRequest {
2244            code: "let x = 1".to_string(),
2245            request_id: 99,
2246        });
2247        let bytes = shape_wire::encode_message(&msg).expect("encode Validate");
2248        let decoded: WireMessage = shape_wire::decode_message(&bytes).expect("decode Validate");
2249        match decoded {
2250            WireMessage::Validate(req) => {
2251                assert_eq!(req.code, "let x = 1");
2252                assert_eq!(req.request_id, 99);
2253            }
2254            _ => panic!("Expected Validate"),
2255        }
2256
2257        let resp = WireMessage::ValidateResponse(ValidateResponse {
2258            request_id: 99,
2259            success: false,
2260            diagnostics: vec![WireDiagnostic {
2261                severity: "error".to_string(),
2262                message: "parse error".to_string(),
2263                line: None,
2264                column: None,
2265            }],
2266        });
2267        let bytes = shape_wire::encode_message(&resp).expect("encode ValidateResponse");
2268        let decoded: WireMessage =
2269            shape_wire::decode_message(&bytes).expect("decode ValidateResponse");
2270        match decoded {
2271            WireMessage::ValidateResponse(r) => {
2272                assert_eq!(r.request_id, 99);
2273                assert!(!r.success);
2274                assert_eq!(r.diagnostics.len(), 1);
2275            }
2276            _ => panic!("Expected ValidateResponse"),
2277        }
2278    }
2279
2280    #[test]
2281    fn test_ping_framing_roundtrip() {
2282        use shape_wire::transport::framing::{decode_framed, encode_framed};
2283
2284        let ping = WireMessage::Ping(PingRequest {});
2285        let mp = shape_wire::encode_message(&ping).expect("encode Ping");
2286        eprintln!("Ping msgpack bytes ({} bytes): {:02x?}", mp.len(), &mp);
2287
2288        let framed = encode_framed(&mp);
2289        eprintln!("Framed bytes ({} bytes): {:02x?}", framed.len(), &framed);
2290
2291        let decompressed = decode_framed(&framed).expect("decode_framed");
2292        assert_eq!(mp, decompressed, "framing roundtrip should preserve bytes");
2293
2294        let decoded: WireMessage =
2295            shape_wire::decode_message(&decompressed).expect("decode Ping after framing");
2296        assert!(matches!(decoded, WireMessage::Ping(_)));
2297    }
2298
2299    #[test]
2300    fn test_execute_framing_roundtrip() {
2301        use shape_wire::transport::framing::{decode_framed, encode_framed};
2302
2303        let exec = WireMessage::Execute(ExecuteRequest {
2304            code: "42".to_string(),
2305            request_id: 1,
2306        });
2307        let mp = shape_wire::encode_message(&exec).expect("encode Execute");
2308        eprintln!("Execute msgpack bytes ({} bytes): {:02x?}", mp.len(), &mp);
2309
2310        let framed = encode_framed(&mp);
2311        let decompressed = decode_framed(&framed).expect("decode_framed");
2312        let decoded: WireMessage =
2313            shape_wire::decode_message(&decompressed).expect("decode Execute after framing");
2314        match decoded {
2315            WireMessage::Execute(req) => {
2316                assert_eq!(req.code, "42");
2317                assert_eq!(req.request_id, 1);
2318            }
2319            _ => panic!("Expected Execute"),
2320        }
2321    }
2322
2323    // ---- Phase 3B: Sidecar extraction tests ----
2324
2325    #[test]
2326    fn test_extract_sidecars_no_large_blobs() {
2327        let store = temp_store();
2328        let mut args = vec![
2329            SerializableVMValue::Int(42),
2330            SerializableVMValue::String("hello".to_string()),
2331            SerializableVMValue::Array(vec![
2332                SerializableVMValue::Number(1.0),
2333                SerializableVMValue::Number(2.0),
2334            ]),
2335        ];
2336        let sidecars = extract_sidecars(&mut args, &store);
2337        assert!(sidecars.is_empty(), "no large blobs → no sidecars");
2338        // Args should be unchanged
2339        assert!(matches!(args[0], SerializableVMValue::Int(42)));
2340    }
2341
2342    #[test]
2343    fn test_extract_sidecars_large_typed_array() {
2344        let store = temp_store();
2345
2346        // Create a large float array (2MB of f64 data)
2347        let data = vec![0f64; 256 * 1024]; // 256K * 8 bytes = 2 MB
2348        let aligned = shape_value::AlignedVec::from_vec(data);
2349        let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
2350        let nb = shape_value::ValueWord::from_float_array(std::sync::Arc::new(buf));
2351        let serialized = shape_runtime::snapshot::nanboxed_to_serializable(&nb, &store).unwrap();
2352
2353        let mut args = vec![serialized];
2354        let sidecars = extract_sidecars(&mut args, &store);
2355        assert_eq!(
2356            sidecars.len(),
2357            1,
2358            "should extract one sidecar for 2MB array"
2359        );
2360        assert!(
2361            matches!(args[0], SerializableVMValue::SidecarRef { .. }),
2362            "original should be replaced with SidecarRef"
2363        );
2364        assert!(
2365            sidecars[0].data.len() >= 1024 * 1024,
2366            "sidecar data should be >= 1MB"
2367        );
2368    }
2369
2370    #[test]
2371    fn test_reassemble_sidecars_roundtrip() {
2372        let store = temp_store();
2373
2374        // Create a large float array
2375        let data: Vec<f64> = (0..256 * 1024).map(|i| i as f64).collect();
2376        let aligned = shape_value::AlignedVec::from_vec(data.clone());
2377        let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
2378        let nb = shape_value::ValueWord::from_float_array(std::sync::Arc::new(buf));
2379        let original = shape_runtime::snapshot::nanboxed_to_serializable(&nb, &store).unwrap();
2380
2381        let mut args = vec![original.clone()];
2382        let sidecars = extract_sidecars(&mut args, &store);
2383        assert_eq!(sidecars.len(), 1);
2384
2385        // Build sidecar map for reassembly
2386        let sidecar_map: HashMap<u32, BlobSidecar> =
2387            sidecars.into_iter().map(|s| (s.sidecar_id, s)).collect();
2388
2389        // Reassemble
2390        reassemble_sidecars(&mut args, &sidecar_map, &store).unwrap();
2391
2392        // The reassembled value should deserialize to the same data
2393        let restored = shape_runtime::snapshot::serializable_to_nanboxed(&args[0], &store).unwrap();
2394        let hv = restored.as_heap_ref().unwrap();
2395        match hv {
2396            shape_value::heap_value::HeapValue::FloatArray(a) => {
2397                assert_eq!(a.len(), 256 * 1024);
2398                assert!((a.as_slice()[0] - 0.0).abs() < f64::EPSILON);
2399                assert!((a.as_slice()[1000] - 1000.0).abs() < f64::EPSILON);
2400            }
2401            // reassemble produces DataTable wrapper, which is also valid
2402            _ => {
2403                // The reassembled blob may come back as DataTable BlobRef
2404                // since reassemble uses a generic DataTable wrapper.
2405                // This is acceptable — the raw data is preserved.
2406            }
2407        }
2408    }
2409
2410    #[test]
2411    fn test_extract_sidecars_nested_in_array() {
2412        let store = temp_store();
2413
2414        // Create a large float array nested in an Array
2415        let data = vec![0f64; 256 * 1024]; // 2 MB
2416        let aligned = shape_value::AlignedVec::from_vec(data);
2417        let buf = shape_value::AlignedTypedBuffer::from_aligned(aligned);
2418        let nb = shape_value::ValueWord::from_float_array(std::sync::Arc::new(buf));
2419        let serialized = shape_runtime::snapshot::nanboxed_to_serializable(&nb, &store).unwrap();
2420
2421        let mut args = vec![SerializableVMValue::Array(vec![
2422            SerializableVMValue::Int(1),
2423            serialized,
2424            SerializableVMValue::String("end".to_string()),
2425        ])];
2426
2427        let sidecars = extract_sidecars(&mut args, &store);
2428        assert_eq!(sidecars.len(), 1, "should find nested large blob");
2429
2430        // Verify the array structure is preserved with SidecarRef inside
2431        match &args[0] {
2432            SerializableVMValue::Array(items) => {
2433                assert_eq!(items.len(), 3);
2434                assert!(matches!(items[0], SerializableVMValue::Int(1)));
2435                assert!(matches!(items[1], SerializableVMValue::SidecarRef { .. }));
2436                assert!(matches!(items[2], SerializableVMValue::String(_)));
2437            }
2438            _ => panic!("Expected Array wrapper to be preserved"),
2439        }
2440    }
2441
2442    #[test]
2443    fn test_sidecar_ref_serialization_roundtrip() {
2444        use shape_runtime::hashing::HashDigest;
2445        use shape_runtime::snapshot::{BlobKind, TypedArrayElementKind};
2446
2447        let value = SerializableVMValue::SidecarRef {
2448            sidecar_id: 7,
2449            blob_kind: BlobKind::TypedArray(TypedArrayElementKind::F64),
2450            original_hash: HashDigest::from_hex("abc123"),
2451            meta_a: 1000,
2452            meta_b: 0,
2453        };
2454
2455        let bytes = shape_wire::encode_message(&value).expect("encode SidecarRef");
2456        let decoded: SerializableVMValue =
2457            shape_wire::decode_message(&bytes).expect("decode SidecarRef");
2458        match decoded {
2459            SerializableVMValue::SidecarRef { sidecar_id, .. } => {
2460                assert_eq!(sidecar_id, 7);
2461            }
2462            _ => panic!("Expected SidecarRef"),
2463        }
2464    }
2465
2466    // ---- Blob negotiation integration tests ----
2467
2468    #[test]
2469    fn test_negotiate_blobs_returns_known_hashes() {
2470        let h1 = mk_hash(1);
2471        let h2 = mk_hash(2);
2472        let h3 = mk_hash(3);
2473
2474        let mut cache = RemoteBlobCache::new(10);
2475        cache.insert(h1, mk_blob("f1", h1, vec![]));
2476        cache.insert(h3, mk_blob("f3", h3, vec![]));
2477
2478        let blobs = vec![
2479            (h1, mk_blob("f1", h1, vec![])),
2480            (h2, mk_blob("f2", h2, vec![])),
2481            (h3, mk_blob("f3", h3, vec![])),
2482        ];
2483        let response = negotiate_blobs(&blobs, &cache);
2484        assert_eq!(response.known_hashes.len(), 2);
2485        assert!(response.known_hashes.contains(&h1));
2486        assert!(response.known_hashes.contains(&h3));
2487        assert!(!response.known_hashes.contains(&h2));
2488    }
2489
2490    #[test]
2491    fn test_build_call_request_with_negotiation_strips_known() {
2492        let h1 = mk_hash(1);
2493        let h2 = mk_hash(2);
2494        let blob1 = mk_blob("entry", h1, vec![h2]);
2495        let blob2 = mk_blob("helper", h2, vec![]);
2496
2497        let mut function_store = HashMap::new();
2498        function_store.insert(h1, blob1.clone());
2499        function_store.insert(h2, blob2.clone());
2500
2501        let mut program = BytecodeProgram::default();
2502        program.content_addressed = Some(Program {
2503            entry: h1,
2504            function_store,
2505            top_level_locals_count: 0,
2506            top_level_local_storage_hints: Vec::new(),
2507            module_binding_names: Vec::new(),
2508            module_binding_storage_hints: Vec::new(),
2509            function_local_storage_hints: Vec::new(),
2510            top_level_frame: None,
2511            data_schema: None,
2512            type_schema_registry: shape_runtime::type_schema::TypeSchemaRegistry::new(),
2513            trait_method_symbols: HashMap::new(),
2514            foreign_functions: Vec::new(),
2515            native_struct_layouts: Vec::new(),
2516            debug_info: crate::bytecode::DebugInfo::new("<test>".to_string()),
2517        });
2518        program.functions = vec![crate::bytecode::Function {
2519            name: "entry".to_string(),
2520            arity: 0,
2521            param_names: vec![],
2522            locals_count: 0,
2523            entry_point: 0,
2524            body_length: 0,
2525            is_closure: false,
2526            captures_count: 0,
2527            is_async: false,
2528            ref_params: vec![],
2529            ref_mutates: vec![],
2530            mutable_captures: vec![],
2531            frame_descriptor: None,
2532            osr_entry_points: vec![],
2533        }];
2534        program.function_blob_hashes = vec![Some(h1)];
2535
2536        // Cache has h2 -> negotiation should strip it
2537        let mut cache = RemoteBlobCache::new(10);
2538        cache.insert(h2, blob2.clone());
2539
2540        let req = build_call_request_with_negotiation(&program, "entry", vec![], Some(&cache));
2541        let blobs = req.function_blobs.as_ref().unwrap();
2542        assert_eq!(blobs.len(), 1, "should strip known blob h2");
2543        assert_eq!(blobs[0].0, h1, "only h1 should remain");
2544    }
2545
2546    #[test]
2547    fn test_build_call_request_with_negotiation_no_cache() {
2548        let h1 = mk_hash(1);
2549        let blob1 = mk_blob("entry", h1, vec![]);
2550
2551        let mut function_store = HashMap::new();
2552        function_store.insert(h1, blob1.clone());
2553
2554        let mut program = BytecodeProgram::default();
2555        program.content_addressed = Some(Program {
2556            entry: h1,
2557            function_store,
2558            top_level_locals_count: 0,
2559            top_level_local_storage_hints: Vec::new(),
2560            module_binding_names: Vec::new(),
2561            module_binding_storage_hints: Vec::new(),
2562            function_local_storage_hints: Vec::new(),
2563            top_level_frame: None,
2564            data_schema: None,
2565            type_schema_registry: shape_runtime::type_schema::TypeSchemaRegistry::new(),
2566            trait_method_symbols: HashMap::new(),
2567            foreign_functions: Vec::new(),
2568            native_struct_layouts: Vec::new(),
2569            debug_info: crate::bytecode::DebugInfo::new("<test>".to_string()),
2570        });
2571        program.functions = vec![crate::bytecode::Function {
2572            name: "entry".to_string(),
2573            arity: 0,
2574            param_names: vec![],
2575            locals_count: 0,
2576            entry_point: 0,
2577            body_length: 0,
2578            is_closure: false,
2579            captures_count: 0,
2580            is_async: false,
2581            ref_params: vec![],
2582            ref_mutates: vec![],
2583            mutable_captures: vec![],
2584            frame_descriptor: None,
2585            osr_entry_points: vec![],
2586        }];
2587        program.function_blob_hashes = vec![Some(h1)];
2588
2589        // No cache -> all blobs sent
2590        let req = build_call_request_with_negotiation(&program, "entry", vec![], None);
2591        let blobs = req.function_blobs.as_ref().unwrap();
2592        assert_eq!(blobs.len(), 1, "all blobs should be sent when no cache");
2593    }
2594
2595    // ---- V2 handler stub tests ----
2596
2597    #[test]
2598    fn test_handle_wire_message_ping_returns_pong() {
2599        let store = temp_store();
2600        let mut cache = RemoteBlobCache::default_cache();
2601        let msg = WireMessage::Ping(PingRequest {});
2602        let response = handle_wire_message(msg, &store, &mut cache);
2603        match response {
2604            WireMessage::Pong(info) => {
2605                assert_eq!(info.wire_protocol, shape_wire::WIRE_PROTOCOL_V2);
2606                assert!(info.capabilities.contains(&"call".to_string()));
2607                assert!(info.capabilities.contains(&"blob-negotiation".to_string()));
2608            }
2609            _ => panic!("Expected Pong response"),
2610        }
2611    }
2612
2613    #[test]
2614    fn test_handle_wire_message_execute_returns_v2_stub() {
2615        let store = temp_store();
2616        let mut cache = RemoteBlobCache::default_cache();
2617        let msg = WireMessage::Execute(ExecuteRequest {
2618            code: "42".to_string(),
2619            request_id: 5,
2620        });
2621        let response = handle_wire_message(msg, &store, &mut cache);
2622        match response {
2623            WireMessage::ExecuteResponse(resp) => {
2624                assert_eq!(resp.request_id, 5);
2625                assert!(!resp.success);
2626                assert!(resp.error.as_ref().unwrap().contains("not yet implemented"));
2627            }
2628            _ => panic!("Expected ExecuteResponse"),
2629        }
2630    }
2631
2632    #[test]
2633    fn test_handle_wire_message_validate_returns_v2_stub() {
2634        let store = temp_store();
2635        let mut cache = RemoteBlobCache::default_cache();
2636        let msg = WireMessage::Validate(ValidateRequest {
2637            code: "let x = 1".to_string(),
2638            request_id: 10,
2639        });
2640        let response = handle_wire_message(msg, &store, &mut cache);
2641        match response {
2642            WireMessage::ValidateResponse(resp) => {
2643                assert_eq!(resp.request_id, 10);
2644                assert!(!resp.success);
2645                assert!(resp.diagnostics[0].message.contains("not yet implemented"));
2646            }
2647            _ => panic!("Expected ValidateResponse"),
2648        }
2649    }
2650
2651    #[test]
2652    fn test_handle_wire_message_auth_returns_v2_stub() {
2653        let store = temp_store();
2654        let mut cache = RemoteBlobCache::default_cache();
2655        let msg = WireMessage::Auth(AuthRequest {
2656            token: "test".to_string(),
2657        });
2658        let response = handle_wire_message(msg, &store, &mut cache);
2659        match response {
2660            WireMessage::AuthResponse(resp) => {
2661                assert!(!resp.authenticated);
2662                assert!(resp.error.as_ref().unwrap().contains("not yet implemented"));
2663            }
2664            _ => panic!("Expected AuthResponse"),
2665        }
2666    }
2667
2668    #[test]
2669    fn test_handle_wire_message_blob_negotiation() {
2670        let store = temp_store();
2671        let mut cache = RemoteBlobCache::new(10);
2672        let h1 = mk_hash(1);
2673        let h2 = mk_hash(2);
2674        cache.insert(h1, mk_blob("f1", h1, vec![]));
2675
2676        let msg = WireMessage::BlobNegotiation(BlobNegotiationRequest {
2677            offered_hashes: vec![h1, h2],
2678        });
2679        let response = handle_wire_message(msg, &store, &mut cache);
2680        match response {
2681            WireMessage::BlobNegotiationReply(resp) => {
2682                assert_eq!(resp.known_hashes.len(), 1);
2683                assert!(resp.known_hashes.contains(&h1));
2684            }
2685            _ => panic!("Expected BlobNegotiationReply"),
2686        }
2687    }
2688}