Skip to main content

shape_vm/executor/
vm_impl_part1.rs

1use super::*;
2
3impl VirtualMachine {
4    pub fn new(config: VMConfig) -> Self {
5        let debugger = if config.debug_mode {
6            Some(VMDebugger::new())
7        } else {
8            None
9        };
10
11        let gc = GarbageCollector::new(config.gc_config.clone());
12
13        // Initialize builtin schema IDs (overwritten from loaded bytecode registry
14        // in `load_program`).
15        let (registry, builtin_schemas) =
16            shape_runtime::type_schema::TypeSchemaRegistry::with_stdlib_types_and_builtin_ids();
17
18        let mut program = BytecodeProgram::new();
19        program.type_schema_registry = registry;
20
21        let mut vm = Self {
22            config,
23            program,
24            ip: 0,
25            stack: (0..crate::constants::DEFAULT_STACK_CAPACITY)
26                .map(|_| ValueWord::none())
27                .collect(),
28            sp: 0,
29            module_bindings: Vec::new(),
30            call_stack: Vec::with_capacity(crate::constants::DEFAULT_CALL_STACK_CAPACITY),
31            loop_stack: Vec::new(),
32            timeframe_stack: Vec::new(),
33            debugger,
34            gc,
35            instruction_count: 0,
36            exception_handlers: Vec::new(),
37            builtin_schemas,
38            last_error_line: None,
39            last_error_file: None,
40            last_uncaught_exception: None,
41            module_init_done: false,
42            output_buffer: None,
43            module_registry: shape_runtime::module_exports::ModuleExportRegistry::new(),
44            module_fn_table: Vec::new(),
45            function_name_index: HashMap::new(),
46            extension_methods: HashMap::new(),
47            merged_schema_cache: HashMap::new(),
48            interrupt: Arc::new(AtomicU8::new(0)),
49            future_id_counter: 0,
50            async_scope_stack: Vec::new(),
51            task_scheduler: task_scheduler::TaskScheduler::new(),
52            foreign_fn_handles: Vec::new(),
53            function_hashes: Vec::new(),
54            function_hash_raw: Vec::new(),
55            function_id_by_hash: HashMap::new(),
56            function_entry_points: Vec::new(),
57            program_entry_ip: 0,
58            resource_usage: None,
59            time_travel: None,
60            #[cfg(feature = "gc")]
61            gc_heap: None,
62            #[cfg(feature = "jit")]
63            jit_compiled: false,
64            #[cfg(feature = "jit")]
65            jit_dispatch_table: std::collections::HashMap::new(),
66            tier_manager: None,
67            pending_resume: None,
68            pending_frame_resume: None,
69            metrics: None,
70            feedback_vectors: Vec::new(),
71            megamorphic_cache: crate::megamorphic_cache::MegamorphicCache::new(),
72        };
73
74        // VM-native std modules are always available, independent of
75        // user-registered extension modules.
76        vm.register_extension(state_builtins::create_state_module());
77        vm.register_extension(create_transport_module_exports());
78        vm.register_extension(shape_runtime::stdlib::regex::create_regex_module());
79        vm.register_extension(shape_runtime::stdlib::http::create_http_module());
80        vm.register_extension(shape_runtime::stdlib::crypto::create_crypto_module());
81        vm.register_extension(shape_runtime::stdlib::env::create_env_module());
82        vm.register_extension(shape_runtime::stdlib::log::create_log_module());
83        vm.register_extension(shape_runtime::stdlib::json::create_json_module());
84        vm.register_extension(shape_runtime::stdlib::toml_module::create_toml_module());
85        vm.register_extension(shape_runtime::stdlib::yaml::create_yaml_module());
86        vm.register_extension(shape_runtime::stdlib::xml::create_xml_module());
87        vm.register_extension(shape_runtime::stdlib::compress::create_compress_module());
88        vm.register_extension(shape_runtime::stdlib::archive::create_archive_module());
89        vm.register_extension(shape_runtime::stdlib::parallel::create_parallel_module());
90        vm.register_extension(shape_runtime::stdlib::unicode::create_unicode_module());
91
92        // Initialise metrics collector when requested.
93        if vm.config.metrics_enabled {
94            vm.metrics = Some(crate::metrics::VmMetrics::new());
95        }
96
97        // Auto-initialise the tracing GC heap when requested.
98        #[cfg(feature = "gc")]
99        if vm.config.use_tracing_gc {
100            vm.init_gc_heap();
101        }
102
103        vm
104    }
105
106    /// Attach resource limits to this VM. The dispatch loop will enforce them.
107    pub fn with_resource_limits(mut self, limits: crate::resource_limits::ResourceLimits) -> Self {
108        let mut usage = crate::resource_limits::ResourceUsage::new(limits);
109        usage.start();
110        self.resource_usage = Some(usage);
111        self
112    }
113
114    /// Initialize the GC heap for this VM instance (gc feature only).
115    ///
116    /// Sets up the GcHeap and registers it as the thread-local heap so
117    /// ValueWord::heap_box() and ValueSlot::from_heap() can allocate through it.
118    /// Also configures the GC threshold from the VM's GCConfig.
119    #[cfg(feature = "gc")]
120    pub fn init_gc_heap(&mut self) {
121        let heap = shape_gc::GcHeap::new();
122        self.gc_heap = Some(heap);
123        // Set thread-local GC heap pointer AFTER the move into self.gc_heap
124        // so the pointer remains valid for the VM's lifetime.
125        if let Some(ref mut heap) = self.gc_heap {
126            unsafe { shape_gc::set_thread_gc_heap(heap as *mut _) };
127        }
128    }
129
130    /// Set the interrupt flag (shared with Ctrl+C handler).
131    pub fn set_interrupt(&mut self, flag: Arc<AtomicU8>) {
132        self.interrupt = flag;
133    }
134
135    /// Enable time-travel debugging with the given capture mode and history limit.
136    pub fn enable_time_travel(&mut self, mode: time_travel::CaptureMode, max_entries: usize) {
137        self.time_travel = Some(time_travel::TimeTravel::new(mode, max_entries));
138    }
139
140    /// Disable time-travel debugging and discard history.
141    pub fn disable_time_travel(&mut self) {
142        self.time_travel = None;
143    }
144
145    /// Mark this VM as having been JIT-compiled selectively.
146    ///
147    /// Call this after using `shape_jit::JITCompiler::compile_program_selective`
148    /// externally to JIT-compile functions that benefit from native execution.
149    /// The caller is responsible for performing the compilation via `shape-jit`
150    /// (which depends on `shape-vm`, so the dependency flows one way).
151    ///
152    /// # Example (in a crate that depends on both `shape-vm` and `shape-jit`):
153    ///
154    /// ```ignore
155    /// let mut compiler = shape_jit::JITCompiler::new()?;
156    /// let (_jitted_fn, _table) = compiler.compile_program_selective("main", vm.program())?;
157    /// vm.set_jit_compiled();
158    /// ```
159    #[cfg(feature = "jit")]
160    pub fn set_jit_compiled(&mut self) {
161        self.jit_compiled = true;
162    }
163
164    /// Returns whether selective JIT compilation has been applied to this VM.
165    #[cfg(feature = "jit")]
166    pub fn is_jit_compiled(&self) -> bool {
167        self.jit_compiled
168    }
169
170    /// Register a JIT-compiled function in the dispatch table.
171    ///
172    /// After registration, calls to this function_id will attempt JIT dispatch
173    /// before falling back to bytecode interpretation.
174    #[cfg(feature = "jit")]
175    pub fn register_jit_function(&mut self, function_id: u16, ptr: JitFnPtr) {
176        self.jit_dispatch_table.insert(function_id, ptr);
177        self.jit_compiled = true;
178    }
179
180    /// Get the JIT dispatch table for inspection or external use.
181    #[cfg(feature = "jit")]
182    pub fn jit_dispatch_table(&self) -> &std::collections::HashMap<u16, JitFnPtr> {
183        &self.jit_dispatch_table
184    }
185
186    /// Enable tiered compilation for this VM.
187    ///
188    /// Must be called after `load_program()` so the function count is known.
189    /// The caller is responsible for spawning a background compilation thread
190    /// that reads from the request channel and sends results back.
191    ///
192    /// Returns `(request_rx, result_tx)` that the background thread should use.
193    pub fn enable_tiered_compilation(
194        &mut self,
195    ) -> (
196        std::sync::mpsc::Receiver<crate::tier::CompilationRequest>,
197        std::sync::mpsc::Sender<crate::tier::CompilationResult>,
198    ) {
199        let function_count = self.program.functions.len();
200        let mut mgr = crate::tier::TierManager::new(function_count, true);
201
202        let (req_tx, req_rx) = std::sync::mpsc::channel();
203        let (res_tx, res_rx) = std::sync::mpsc::channel();
204        mgr.set_channels(req_tx, res_rx);
205
206        self.tier_manager = Some(mgr);
207        (req_rx, res_tx)
208    }
209
210    /// Get a reference to the tier manager, if tiered compilation is enabled.
211    pub fn tier_manager(&self) -> Option<&crate::tier::TierManager> {
212        self.tier_manager.as_ref()
213    }
214
215    /// Poll the tier manager for completed background JIT compilations.
216    ///
217    /// Completed compilations are applied by `TierManager::poll_completions()`,
218    /// which updates its internal `native_code_table`. The JIT dispatch fast
219    /// path in `op_call` reads from `tier_mgr.get_native_code()`.
220    ///
221    /// Called every 1024 instructions from the dispatch loop (same cadence as
222    /// interrupt and GC safepoint checks).
223    pub(crate) fn poll_tier_completions(&mut self) {
224        if let Some(ref mut tier_mgr) = self.tier_manager {
225            // poll_completions() reads from the compilation_rx channel and
226            // updates native_code_table internally.
227            let completions = tier_mgr.poll_completions();
228
229            // Record tier transition events in metrics if enabled.
230            if let Some(ref mut metrics) = self.metrics {
231                for result in &completions {
232                    if result.native_code.is_some() {
233                        let from_tier = match result.compiled_tier {
234                            crate::tier::Tier::BaselineJit => 0,   // was Interpreted
235                            crate::tier::Tier::OptimizingJit => 1, // was BaselineJit
236                            crate::tier::Tier::Interpreted => continue,
237                        };
238                        let to_tier = match result.compiled_tier {
239                            crate::tier::Tier::BaselineJit => 1,
240                            crate::tier::Tier::OptimizingJit => 2,
241                            crate::tier::Tier::Interpreted => continue,
242                        };
243                        metrics.record_tier_event(crate::metrics::TierEvent {
244                            function_id: result.function_id,
245                            from_tier,
246                            to_tier,
247                            call_count: tier_mgr.get_call_count(result.function_id),
248                            timestamp_us: metrics.elapsed_us(),
249                        });
250                    }
251                }
252            }
253        }
254    }
255
256    /// Get or create a feedback vector for the current function.
257    /// Returns None if tiered compilation is disabled.
258    #[inline]
259    pub(crate) fn current_feedback_vector(
260        &mut self,
261    ) -> Option<&mut crate::feedback::FeedbackVector> {
262        let func_id = self.call_stack.last()?.function_id? as usize;
263        if func_id >= self.feedback_vectors.len() {
264            return None;
265        }
266        if self.feedback_vectors[func_id].is_none() {
267            if self.tier_manager.is_none() {
268                return None;
269            }
270            self.feedback_vectors[func_id] =
271                Some(crate::feedback::FeedbackVector::new(func_id as u16));
272        }
273        self.feedback_vectors[func_id].as_mut()
274    }
275
276    /// Access the feedback vectors (for JIT compilation).
277    pub fn feedback_vectors(&self) -> &[Option<crate::feedback::FeedbackVector>] {
278        &self.feedback_vectors
279    }
280
281    /// Get a reference to the loaded program (for external JIT compilation).
282    pub fn program(&self) -> &BytecodeProgram {
283        &self.program
284    }
285
286    /// Get a reference to the time-travel debugger, if enabled.
287    pub fn time_travel(&self) -> Option<&time_travel::TimeTravel> {
288        self.time_travel.as_ref()
289    }
290
291    /// Get a mutable reference to the time-travel debugger, if enabled.
292    pub fn time_travel_mut(&mut self) -> Option<&mut time_travel::TimeTravel> {
293        self.time_travel.as_mut()
294    }
295
296    /// Get a reference to the extension module registry.
297    pub fn module_registry(&self) -> &shape_runtime::module_exports::ModuleExportRegistry {
298        &self.module_registry
299    }
300
301    /// Register an extension module into the VM's module registry.
302    /// Also merges any method intrinsics for fast Object dispatch.
303    pub fn register_extension(&mut self, module: shape_runtime::module_exports::ModuleExports) {
304        // Merge method intrinsics
305        for (type_name, methods) in &module.method_intrinsics {
306            let entry = self.extension_methods.entry(type_name.clone()).or_default();
307            for (method_name, func) in methods {
308                entry.insert(method_name.clone(), func.clone());
309            }
310        }
311        // Expose module exports as methods on the module object type so
312        // `module.fn(...)` dispatches via CallMethod without UFCS rewrites.
313        let module_type_name = format!("__mod_{}", module.name);
314        let module_entry = self.extension_methods.entry(module_type_name).or_default();
315        for (export_name, func) in &module.exports {
316            module_entry.insert(export_name.clone(), func.clone());
317        }
318        for (export_name, async_fn) in &module.async_exports {
319            let async_fn = async_fn.clone();
320            let wrapped: shape_runtime::module_exports::ModuleFn = Arc::new(
321                move |args: &[ValueWord], _ctx: &shape_runtime::module_exports::ModuleContext| {
322                    let future = async_fn(args);
323                    tokio::task::block_in_place(|| {
324                        tokio::runtime::Handle::current().block_on(future)
325                    })
326                },
327            );
328            module_entry.insert(export_name.clone(), wrapped);
329        }
330        self.module_registry.register(module);
331    }
332
333    /// Generate a unique future ID for spawned async tasks
334    pub(crate) fn next_future_id(&mut self) -> u64 {
335        self.future_id_counter += 1;
336        self.future_id_counter
337    }
338
339    /// Register a ModuleFn in the table and return its ID (for ValueWord::ModuleFunction).
340    pub fn register_module_fn(&mut self, f: shape_runtime::module_exports::ModuleFn) -> usize {
341        let id = self.module_fn_table.len();
342        self.module_fn_table.push(f);
343        id
344    }
345
346    /// Invoke a registered module function with a scoped `ModuleContext`.
347    ///
348    /// The context provides access to the type schema registry, a callable
349    /// invoker closure, and a raw invoker that extensions can capture in
350    /// long-lived structs (e.g., CFFI callback userdata).
351    pub(crate) fn invoke_module_fn(
352        &mut self,
353        module_fn: &shape_runtime::module_exports::ModuleFn,
354        args: &[ValueWord],
355    ) -> Result<ValueWord, VMError> {
356        // SAFETY: The module function is called synchronously and the VM pointer
357        // remains valid for the duration of the call.  We use a raw pointer so
358        // that: (a) the callable invoker can re-enter the VM, and (b) we can
359        // simultaneously borrow the schema registry.
360        unsafe {
361            let vm_ptr = self as *mut VirtualMachine;
362
363            let invoker =
364                |callable: &ValueWord, call_args: &[ValueWord]| -> Result<ValueWord, String> {
365                    (*vm_ptr)
366                        .call_value_immediate_nb(callable, call_args, None)
367                        .map_err(|e| e.to_string())
368                };
369
370            unsafe fn vm_callable_invoker(
371                ctx: *mut std::ffi::c_void,
372                callable: &ValueWord,
373                args: &[ValueWord],
374            ) -> Result<ValueWord, String> {
375                let vm = unsafe { &mut *(ctx as *mut VirtualMachine) };
376                vm.call_value_immediate_nb(callable, args, None)
377                    .map_err(|err| err.to_string())
378            }
379
380            // Capture a read-only snapshot of VM state before dispatching.
381            // The snapshot lives on the stack and is referenced by ModuleContext
382            // for the duration of this synchronous call.
383            let vm_snapshot = (*vm_ptr).capture_vm_state();
384
385            let ctx = shape_runtime::module_exports::ModuleContext {
386                schemas: &(*vm_ptr).program.type_schema_registry,
387                invoke_callable: Some(&invoker),
388                raw_invoker: Some(shape_runtime::module_exports::RawCallableInvoker {
389                    ctx: vm_ptr as *mut std::ffi::c_void,
390                    invoke: vm_callable_invoker,
391                }),
392                function_hashes: if (*vm_ptr).function_hash_raw.is_empty() {
393                    None
394                } else {
395                    Some(&(*vm_ptr).function_hash_raw)
396                },
397                vm_state: Some(&vm_snapshot),
398                granted_permissions: None,
399                scope_constraints: None,
400                set_pending_resume: Some(&|snapshot| {
401                    // vm_ptr is valid for the duration of the module function call
402                    // (outer unsafe block covers this).
403                    (*vm_ptr).pending_resume = Some(snapshot);
404                }),
405                set_pending_frame_resume: Some(&|ip_offset, locals| {
406                    // vm_ptr is valid for the duration of the module function call
407                    // (outer unsafe block covers this).
408                    (*vm_ptr).pending_frame_resume = Some(FrameResumeData { ip_offset, locals });
409                }),
410            };
411
412            let result = module_fn(args, &ctx).map_err(VMError::RuntimeError);
413
414            // Check if the module function requested a VM state resume.
415            // If so, return a special error that the dispatch loop intercepts.
416            if (*vm_ptr).pending_resume.is_some() {
417                return Err(VMError::ResumeRequested);
418            }
419
420            result
421        }
422    }
423
424    /// Populate extension module objects as module_bindings (json, duckdb, etc.).
425    /// These are used by extension Shape code (e.g., `duckdb.query(...)`).
426    /// Call this after load_program().
427    pub fn populate_module_objects(&mut self) {
428        // Collect module data first to avoid borrow conflicts
429        let module_data: Vec<(
430            String,
431            Vec<(String, shape_runtime::module_exports::ModuleFn)>,
432            Vec<(String, shape_runtime::module_exports::AsyncModuleFn)>,
433            Vec<String>,
434        )> = self
435            .module_registry
436            .module_names()
437            .iter()
438            .filter_map(|name| {
439                let module = self.module_registry.get(name)?;
440                let sync_exports: Vec<_> = module
441                    .exports
442                    .iter()
443                    .map(|(k, v)| (k.clone(), v.clone()))
444                    .collect();
445                let async_exports: Vec<_> = module
446                    .async_exports
447                    .iter()
448                    .map(|(k, v)| (k.clone(), v.clone()))
449                    .collect();
450                let mut source_exports = Vec::new();
451                for artifact in &module.module_artifacts {
452                    if artifact.module_path != *name {
453                        continue;
454                    }
455                    let Some(source) = artifact.source.as_deref() else {
456                        continue;
457                    };
458                    if let Ok(exports) =
459                        shape_runtime::module_loader::collect_exported_function_names_from_source(
460                            &artifact.module_path,
461                            source,
462                        )
463                    {
464                        source_exports.extend(exports);
465                    }
466                }
467                source_exports.sort();
468                source_exports.dedup();
469                Some((
470                    name.to_string(),
471                    sync_exports,
472                    async_exports,
473                    source_exports,
474                ))
475            })
476            .collect();
477
478        for (module_name, sync_exports, async_exports, source_exports) in module_data {
479            // Find the module_binding index for this module name
480            let binding_idx = self
481                .program
482                .module_binding_names
483                .iter()
484                .position(|n| n == &module_name);
485
486            if let Some(idx) = binding_idx {
487                let mut obj = HashMap::new();
488
489                // Register sync exports directly
490                for (export_name, module_fn) in sync_exports {
491                    let fn_id = self.register_module_fn(module_fn);
492                    obj.insert(export_name, ValueWord::from_module_function(fn_id as u32));
493                }
494
495                // Wrap async exports: block_in_place + block_on at call time
496                for (export_name, async_fn) in async_exports {
497                    let wrapped: shape_runtime::module_exports::ModuleFn =
498                        Arc::new(move |args: &[ValueWord], _ctx: &shape_runtime::module_exports::ModuleContext| {
499                            let future = async_fn(args);
500                            tokio::task::block_in_place(|| {
501                                tokio::runtime::Handle::current().block_on(future)
502                            })
503                        });
504                    let fn_id = self.register_module_fn(wrapped);
505                    obj.insert(export_name, ValueWord::from_module_function(fn_id as u32));
506                }
507
508                // Add Shape-source exported functions (compiled into bytecode).
509                // These are regular VM functions, not host module functions.
510                for export_name in source_exports {
511                    if obj.contains_key(&export_name) {
512                        continue;
513                    }
514                    if let Some(&func_id) = self.function_name_index.get(&export_name) {
515                        obj.insert(export_name, ValueWord::from_function(func_id));
516                    }
517                }
518
519                // Module object schemas must be predeclared at compile time.
520                let cache_name = format!("__mod_{}", module_name);
521                let schema_id = if let Some(schema) = self.lookup_schema_by_name(&cache_name) {
522                    schema.id
523                } else {
524                    // Keep execution predictable: no runtime schema synthesis.
525                    // Missing module schema means compiler/loader setup is incomplete.
526                    continue;
527                };
528
529                // Look up schema to get field ordering
530                let Some(schema) = self.lookup_schema(schema_id) else {
531                    continue;
532                };
533                let field_order: Vec<String> =
534                    schema.fields.iter().map(|f| f.name.clone()).collect();
535
536                let mut slots = Vec::with_capacity(field_order.len());
537                let mut heap_mask: u64 = 0;
538                for (i, field_name) in field_order.iter().enumerate() {
539                    let nb_val = obj.get(field_name).cloned().unwrap_or_else(ValueWord::none);
540                    let (slot, is_heap) =
541                        crate::executor::objects::object_creation::nb_to_slot_with_field_type(
542                            &nb_val, None,
543                        );
544                    slots.push(slot);
545                    if is_heap {
546                        heap_mask |= 1u64 << i;
547                    }
548                }
549
550                let typed_nb = ValueWord::from_heap_value(HeapValue::TypedObject {
551                    schema_id: schema_id as u64,
552                    slots: slots.into_boxed_slice(),
553                    heap_mask,
554                });
555                if idx >= self.module_bindings.len() {
556                    self.module_bindings.resize_with(idx + 1, ValueWord::none);
557                }
558                // BARRIER: heap write site — overwrites module binding during typed object initialization
559                self.module_bindings[idx] = typed_nb;
560            }
561        }
562    }
563    // ========================================================================
564    // Conversion and Helper Methods
565
566    /// Create a TypedObject from field name-value pairs.
567    ///
568    /// Create a TypedObject from predeclared compile-time schemas.
569    pub(crate) fn create_typed_object_from_pairs(
570        &mut self,
571        fields: &[(&str, ValueWord)],
572    ) -> Result<ValueWord, VMError> {
573        // Build field names for schema lookup
574        let field_names: Vec<&str> = fields.iter().map(|(k, _)| *k).collect();
575        let key: String = field_names.join(",");
576        let schema_name = format!("__native_{}", key);
577
578        // Runtime schema synthesis is retired: these object layouts must be
579        // predeclared in compile-time registries.
580        let schema_id = self
581            .lookup_schema_by_name(&schema_name)
582            .map(|s| s.id)
583            .ok_or_else(|| {
584                VMError::RuntimeError(format!(
585                    "Missing predeclared schema '{}'. Runtime schema generation is disabled.",
586                    schema_name
587                ))
588            })?;
589
590        let field_types = self.lookup_schema(schema_id).map(|schema| {
591            schema
592                .fields
593                .iter()
594                .map(|f| f.field_type.clone())
595                .collect::<Vec<_>>()
596        });
597
598        // Build slots and heap_mask.
599        let mut slots = Vec::with_capacity(fields.len());
600        let mut heap_mask: u64 = 0;
601        for (i, (_name, nb)) in fields.iter().enumerate() {
602            let field_type = field_types.as_ref().and_then(|types| types.get(i));
603            let (slot, is_heap) =
604                crate::executor::objects::object_creation::nb_to_slot_with_field_type(
605                    nb, field_type,
606                );
607            slots.push(slot);
608            if is_heap {
609                heap_mask |= 1u64 << i;
610            }
611        }
612
613        Ok(ValueWord::from_heap_value(HeapValue::TypedObject {
614            schema_id: schema_id as u64,
615            slots: slots.into_boxed_slice(),
616            heap_mask,
617        }))
618    }
619
620    /// Look up a schema by ID in the compiled program registry.
621    pub(crate) fn lookup_schema(
622        &self,
623        schema_id: u32,
624    ) -> Option<&shape_runtime::type_schema::TypeSchema> {
625        self.program.type_schema_registry.get_by_id(schema_id)
626    }
627
628    pub(crate) fn lookup_schema_by_name(
629        &self,
630        name: &str,
631    ) -> Option<&shape_runtime::type_schema::TypeSchema> {
632        self.program.type_schema_registry.get(name)
633    }
634
635    /// Derive a merged schema from two existing schemas.
636    /// Right fields overwrite left fields with the same name.
637    /// Caches result by (left_schema_id, right_schema_id).
638    pub(crate) fn derive_merged_schema(
639        &mut self,
640        left_id: u32,
641        right_id: u32,
642    ) -> Result<u32, VMError> {
643        if let Some(&cached) = self.merged_schema_cache.get(&(left_id, right_id)) {
644            return Ok(cached);
645        }
646
647        // Runtime schema synthesis is disabled; merged schemas must exist.
648        let merged_name = format!("__merged_{}_{}", left_id, right_id);
649        let intersection_name = format!("__intersection_{}_{}", left_id, right_id);
650        let merged_id = self
651            .lookup_schema_by_name(&merged_name)
652            .or_else(|| self.lookup_schema_by_name(&intersection_name))
653            .map(|s| s.id)
654            .ok_or_else(|| {
655                VMError::RuntimeError(format!(
656                    "Missing predeclared merged schema for {} + {} (expected '{}' or '{}').",
657                    left_id, right_id, merged_name, intersection_name
658                ))
659            })?;
660        self.merged_schema_cache
661            .insert((left_id, right_id), merged_id);
662
663        Ok(merged_id)
664    }
665
666    /// Derive a subset schema: base schema minus excluded fields.
667    /// Uses registry name-based lookup for caching.
668    pub(crate) fn derive_subset_schema(
669        &mut self,
670        base_id: u32,
671        exclude: &std::collections::HashSet<String>,
672    ) -> Result<u32, VMError> {
673        // Build deterministic cache name
674        let mut excluded_sorted: Vec<&String> = exclude.iter().collect();
675        excluded_sorted.sort();
676        let cache_name = format!(
677            "__sub_{}_exc_{}",
678            base_id,
679            excluded_sorted
680                .iter()
681                .map(|s| s.as_str())
682                .collect::<Vec<_>>()
683                .join(",")
684        );
685
686        // Runtime schema synthesis is disabled; subset schemas must be predeclared.
687        if let Some(schema) = self.lookup_schema_by_name(&cache_name) {
688            return Ok(schema.id);
689        }
690        Err(VMError::RuntimeError(format!(
691            "Missing predeclared subset schema '{}' (runtime schema derivation is disabled).",
692            cache_name
693        )))
694    }
695}