Skip to main content

harn_vm/vm/
dispatch.rs

1use std::future::Future;
2use std::rc::Rc;
3
4use crate::value::{ErrorCategory, VmBuiltinFn, VmClosure, VmError, VmValue};
5use crate::BuiltinId;
6
7use super::async_builtin::CURRENT_ASYNC_BUILTIN_CHILD_VM;
8use super::{ScopeSpan, Vm, VmBuiltinDispatch, VmBuiltinEntry, VmBuiltinKind, VmBuiltinMetadata};
9
10impl Vm {
11    fn builtin_span_kind(name: &str) -> Option<crate::tracing::SpanKind> {
12        match name {
13            "llm_call" | "llm_stream" | "llm_stream_call" | "agent_loop" | "agent_turn" => {
14                Some(crate::tracing::SpanKind::LlmCall)
15            }
16            "mcp_call" => Some(crate::tracing::SpanKind::ToolCall),
17            _ => None,
18        }
19    }
20
21    fn is_runtime_context_builtin(name: &str) -> bool {
22        matches!(
23            name,
24            "runtime_context"
25                | "task_current"
26                | "runtime_context_values"
27                | "runtime_context_get"
28                | "runtime_context_set"
29                | "runtime_context_clear"
30        )
31    }
32
33    fn resolve_sync_builtin_id_or_name(
34        &self,
35        direct_id: Option<BuiltinId>,
36        name: &str,
37    ) -> Option<Result<VmBuiltinFn, VmError>> {
38        if crate::autonomy::needs_async_side_effect_enforcement(name)
39            || Self::is_runtime_context_builtin(name)
40        {
41            return None;
42        }
43
44        let dispatch = if let Some(id) = direct_id {
45            self.builtins_by_id
46                .get(&id)
47                .filter(|entry| entry.name.as_ref() == name)
48                .map(|entry| entry.dispatch.clone())
49        } else {
50            None
51        }
52        .or_else(|| {
53            self.builtins
54                .get(name)
55                .cloned()
56                .map(VmBuiltinDispatch::Sync)
57        });
58
59        let Some(dispatch) = dispatch else {
60            if self.async_builtins.contains_key(name) || self.bridge.is_some() {
61                return None;
62            }
63            let all_builtins = self
64                .builtins
65                .keys()
66                .chain(self.async_builtins.keys())
67                .map(|s| s.as_str());
68            return Some(
69                if let Some(suggestion) = crate::value::closest_match(name, all_builtins) {
70                    Err(VmError::Runtime(format!(
71                        "Undefined builtin: {name} (did you mean `{suggestion}`?)"
72                    )))
73                } else {
74                    Err(VmError::UndefinedBuiltin(name.to_string()))
75                },
76            );
77        };
78
79        match dispatch {
80            VmBuiltinDispatch::Sync(builtin) => Some(Ok(builtin)),
81            VmBuiltinDispatch::Async(_) => None,
82        }
83    }
84
85    fn validate_sync_builtin_args(&self, name: &str, args: &[VmValue]) -> Result<(), VmError> {
86        if self.denied_builtins.contains(name) {
87            return Err(VmError::CategorizedError {
88                message: format!("Tool '{}' is not permitted.", name),
89                category: ErrorCategory::ToolRejected,
90            });
91        }
92        crate::orchestration::enforce_current_policy_for_builtin(name, args)?;
93        crate::typecheck::validate_builtin_call(name, args, None)
94    }
95
96    fn index_builtin_id(&mut self, name: &str, dispatch: VmBuiltinDispatch) {
97        let id = BuiltinId::from_name(name);
98        if self.builtin_id_collisions.contains(&id) {
99            return;
100        }
101        if let Some(existing) = self.builtins_by_id.get(&id) {
102            if existing.name.as_ref() != name {
103                Rc::make_mut(&mut self.builtins_by_id).remove(&id);
104                Rc::make_mut(&mut self.builtin_id_collisions).insert(id);
105                return;
106            }
107        }
108        Rc::make_mut(&mut self.builtins_by_id).insert(
109            id,
110            VmBuiltinEntry {
111                name: Rc::from(name),
112                dispatch,
113            },
114        );
115    }
116
117    fn refresh_builtin_id(&mut self, name: &str) {
118        if let Some(builtin) = self.builtins.get(name).cloned() {
119            self.index_builtin_id(name, VmBuiltinDispatch::Sync(builtin));
120        } else if let Some(async_builtin) = self.async_builtins.get(name).cloned() {
121            self.index_builtin_id(name, VmBuiltinDispatch::Async(async_builtin));
122        } else {
123            let id = BuiltinId::from_name(name);
124            if self
125                .builtins_by_id
126                .get(&id)
127                .is_some_and(|entry| entry.name.as_ref() == name)
128            {
129                Rc::make_mut(&mut self.builtins_by_id).remove(&id);
130            }
131        }
132    }
133
134    /// Register a sync builtin function.
135    pub fn register_builtin<F>(&mut self, name: &str, f: F)
136    where
137        F: Fn(&[VmValue], &mut String) -> Result<VmValue, VmError> + 'static,
138    {
139        Rc::make_mut(&mut self.builtins).insert(name.to_string(), Rc::new(f));
140        Rc::make_mut(&mut self.builtin_metadata)
141            .insert(name.to_string(), VmBuiltinMetadata::sync(name.to_string()));
142        Rc::make_mut(&mut self.deferred_builtin_registrars).remove(name);
143        self.refresh_builtin_id(name);
144    }
145
146    /// Register a sync builtin function with discoverable metadata.
147    pub fn register_builtin_with_metadata<F>(&mut self, metadata: VmBuiltinMetadata, f: F)
148    where
149        F: Fn(&[VmValue], &mut String) -> Result<VmValue, VmError> + 'static,
150    {
151        let name = metadata.name().to_string();
152        Rc::make_mut(&mut self.builtins).insert(name.clone(), Rc::new(f));
153        Rc::make_mut(&mut self.builtin_metadata)
154            .insert(name.clone(), metadata.with_kind(VmBuiltinKind::Sync));
155        Rc::make_mut(&mut self.deferred_builtin_registrars).remove(&name);
156        self.refresh_builtin_id(&name);
157    }
158
159    /// Remove a sync builtin (so an async version can take precedence).
160    pub fn unregister_builtin(&mut self, name: &str) {
161        Rc::make_mut(&mut self.builtins).remove(name);
162        if self.async_builtins.contains_key(name) {
163            Rc::make_mut(&mut self.builtin_metadata).insert(
164                name.to_string(),
165                VmBuiltinMetadata::async_builtin(name.to_string()),
166            );
167        } else {
168            Rc::make_mut(&mut self.builtin_metadata).remove(name);
169        }
170        self.refresh_builtin_id(name);
171    }
172
173    /// Register an async builtin function.
174    pub fn register_async_builtin<F, Fut>(&mut self, name: &str, f: F)
175    where
176        F: Fn(Vec<VmValue>) -> Fut + 'static,
177        Fut: Future<Output = Result<VmValue, VmError>> + 'static,
178    {
179        Rc::make_mut(&mut self.async_builtins)
180            .insert(name.to_string(), Rc::new(move |args| Box::pin(f(args))));
181        Rc::make_mut(&mut self.builtin_metadata).insert(
182            name.to_string(),
183            VmBuiltinMetadata::async_builtin(name.to_string()),
184        );
185        Rc::make_mut(&mut self.deferred_builtin_registrars).remove(name);
186        self.refresh_builtin_id(name);
187    }
188
189    /// Register an async builtin function with discoverable metadata.
190    pub fn register_async_builtin_with_metadata<F, Fut>(
191        &mut self,
192        metadata: VmBuiltinMetadata,
193        f: F,
194    ) where
195        F: Fn(Vec<VmValue>) -> Fut + 'static,
196        Fut: Future<Output = Result<VmValue, VmError>> + 'static,
197    {
198        let name = metadata.name().to_string();
199        Rc::make_mut(&mut self.async_builtins)
200            .insert(name.clone(), Rc::new(move |args| Box::pin(f(args))));
201        Rc::make_mut(&mut self.builtin_metadata)
202            .insert(name.clone(), metadata.with_kind(VmBuiltinKind::Async));
203        Rc::make_mut(&mut self.deferred_builtin_registrars).remove(&name);
204        self.refresh_builtin_id(&name);
205    }
206
207    /// Register a builtin name whose implementation should be installed only
208    /// if a script actually resolves that name.
209    pub(crate) fn register_deferred_builtin(&mut self, name: &str, registrar: fn(&mut Vm)) {
210        if self.builtins.contains_key(name) || self.async_builtins.contains_key(name) {
211            return;
212        }
213        Rc::make_mut(&mut self.deferred_builtin_registrars).insert(name.to_string(), registrar);
214    }
215
216    pub(crate) fn ensure_deferred_builtin(&mut self, name: &str) -> bool {
217        let Some(registrar) = self.deferred_builtin_registrars.get(name).copied() else {
218            return false;
219        };
220        registrar(self);
221        Rc::make_mut(&mut self.deferred_builtin_registrars).remove(name);
222        self.builtins.contains_key(name) || self.async_builtins.contains_key(name)
223    }
224
225    pub(crate) fn registered_builtin_id(&self, name: &str) -> Option<BuiltinId> {
226        let id = BuiltinId::from_name(name);
227        if self
228            .builtins_by_id
229            .get(&id)
230            .is_some_and(|entry| entry.name.as_ref() == name)
231        {
232            Some(id)
233        } else {
234            None
235        }
236    }
237
238    /// Invoke a closure inline against the existing VM frame stack.
239    ///
240    /// Dispatch path for every callback-taking method on lists/dicts/sets
241    /// (`.map`, `.filter`, `.reduce`, `.each`, `.sort_by`, …) via
242    /// [`call_callable_value`]. The closure's frame is pushed onto
243    /// `self.frames` using the same machinery as `Op::Call`, and the
244    /// shared dispatch loop ([`Vm::drive_until_frame_depth`]) drains the
245    /// sub-execution back to the caller's depth.
246    ///
247    /// This avoids the per-invocation `Pin<Box<dyn Future>>` heap
248    /// allocation a recursive `async fn` would require — the recursion
249    /// cycle (closure → `.map` → callback → closure) is broken instead at
250    /// [`Vm::call_method`], which keeps a single boxed future per
251    /// method-call site rather than per callback element.
252    ///
253    /// Exception handlers are saved and cleared before the sub-execution
254    /// so an unhandled throw inside the body propagates as a Rust
255    /// `Result::Err` to the caller's dispatch loop. Iterators, deadlines,
256    /// and frames are scoped by `CallFrame::saved_iterator_depth` and the
257    /// per-frame deadline tags.
258    pub(crate) async fn call_closure(
259        &mut self,
260        closure: &VmClosure,
261        args: &[VmValue],
262    ) -> Result<VmValue, VmError> {
263        let saved_handlers = std::mem::take(&mut self.exception_handlers);
264        let active_context = (!crate::step_runtime::is_tracked_function(&closure.func.name))
265            .then(crate::step_runtime::take_active_context);
266
267        let target_frame_depth = self.frames.len();
268        let result = match self.push_closure_frame(closure, args) {
269            Ok(()) => self.drive_until_frame_depth(target_frame_depth).await,
270            Err(e) => Err(e),
271        };
272
273        self.exception_handlers = saved_handlers;
274        if let Some(ctx) = active_context {
275            crate::step_runtime::restore_active_context(ctx);
276        }
277
278        result
279    }
280
281    /// Invoke a value as a callable. Supports `VmValue::Closure` and
282    /// `VmValue::BuiltinRef`, so builtin names passed by reference (e.g.
283    /// `dict.rekey(snake_to_camel)`) dispatch through the same code path as
284    /// user-defined closures.
285    pub(crate) async fn call_callable_value(
286        &mut self,
287        callable: &VmValue,
288        args: &[VmValue],
289    ) -> Result<VmValue, VmError> {
290        match callable {
291            VmValue::Closure(closure) => self.call_closure(closure, args).await,
292            VmValue::BuiltinRef(name) => {
293                if !crate::autonomy::needs_async_side_effect_enforcement(name) {
294                    if let Some(result) = self.call_sync_builtin_by_ref(name, args) {
295                        return result;
296                    }
297                }
298                self.call_named_builtin(name, args.to_vec()).await
299            }
300            VmValue::BuiltinRefId { id, name } => {
301                self.call_builtin_id_or_name(*id, name, args.to_vec()).await
302            }
303            other => Err(VmError::TypeError(format!(
304                "expected callable, got {}",
305                other.type_name()
306            ))),
307        }
308    }
309
310    fn call_sync_builtin_by_ref(
311        &mut self,
312        name: &str,
313        args: &[VmValue],
314    ) -> Option<Result<VmValue, VmError>> {
315        self.try_call_sync_builtin_id_or_name(None, name, args)
316    }
317
318    /// Returns true if `v` is callable via `call_callable_value`.
319    pub(crate) fn is_callable_value(v: &VmValue) -> bool {
320        matches!(
321            v,
322            VmValue::Closure(_) | VmValue::BuiltinRef(_) | VmValue::BuiltinRefId { .. }
323        )
324    }
325
326    /// Public wrapper for `call_closure`, used by the MCP server to invoke
327    /// tool handler closures from outside the VM execution loop.
328    pub async fn call_closure_pub(
329        &mut self,
330        closure: &VmClosure,
331        args: &[VmValue],
332    ) -> Result<VmValue, VmError> {
333        self.cancel_grace_instructions_remaining = None;
334        self.call_closure(closure, args).await
335    }
336
337    /// Resolve a named builtin: sync builtins → async builtins → bridge → error.
338    /// Used by Call, TailCall, and Pipe handlers to avoid duplicating this lookup.
339    pub(crate) async fn call_named_builtin(
340        &mut self,
341        name: &str,
342        args: Vec<VmValue>,
343    ) -> Result<VmValue, VmError> {
344        self.call_builtin_impl(name, args, None).await
345    }
346
347    pub(crate) async fn call_builtin_id_or_name(
348        &mut self,
349        id: BuiltinId,
350        name: &str,
351        args: Vec<VmValue>,
352    ) -> Result<VmValue, VmError> {
353        self.call_builtin_impl(name, args, Some(id)).await
354    }
355
356    pub(crate) fn try_call_sync_builtin_id_or_name(
357        &mut self,
358        direct_id: Option<BuiltinId>,
359        name: &str,
360        args: &[VmValue],
361    ) -> Option<Result<VmValue, VmError>> {
362        if self.denied_builtins.contains(name) {
363            return Some(Err(VmError::CategorizedError {
364                message: format!("Tool '{}' is not permitted.", name),
365                category: ErrorCategory::ToolRejected,
366            }));
367        }
368        self.ensure_deferred_builtin(name);
369        let builtin = match self.resolve_sync_builtin_id_or_name(direct_id, name)? {
370            Ok(builtin) => builtin,
371            Err(error) => return Some(Err(error)),
372        };
373        let _span =
374            Self::builtin_span_kind(name).map(|kind| ScopeSpan::new(kind, name.to_string()));
375        if let Err(error) = self.validate_sync_builtin_args(name, args) {
376            return Some(Err(error));
377        }
378
379        Some(builtin(args, &mut self.output))
380    }
381
382    pub(crate) fn try_call_sync_builtin_id_or_name_from_stack_args(
383        &mut self,
384        direct_id: Option<BuiltinId>,
385        name: &str,
386        args_start: usize,
387    ) -> Option<Result<VmValue, VmError>> {
388        if self.denied_builtins.contains(name) {
389            return Some(Err(VmError::CategorizedError {
390                message: format!("Tool '{}' is not permitted.", name),
391                category: ErrorCategory::ToolRejected,
392            }));
393        }
394        self.ensure_deferred_builtin(name);
395        let builtin = match self.resolve_sync_builtin_id_or_name(direct_id, name)? {
396            Ok(builtin) => builtin,
397            Err(error) => return Some(Err(error)),
398        };
399        if args_start > self.stack.len() {
400            return Some(Err(VmError::Runtime(
401                "call argument stack underflow".to_string(),
402            )));
403        }
404
405        let _span =
406            Self::builtin_span_kind(name).map(|kind| ScopeSpan::new(kind, name.to_string()));
407        let args = &self.stack[args_start..];
408        if let Err(error) = self.validate_sync_builtin_args(name, args) {
409            return Some(Err(error));
410        }
411
412        Some(builtin(args, &mut self.output))
413    }
414
415    async fn call_builtin_impl(
416        &mut self,
417        name: &str,
418        args: Vec<VmValue>,
419        direct_id: Option<BuiltinId>,
420    ) -> Result<VmValue, VmError> {
421        // Auto-trace LLM calls and tool calls.
422        let _span =
423            Self::builtin_span_kind(name).map(|kind| ScopeSpan::new(kind, name.to_string()));
424
425        // Sandbox check: deny builtins blocked by --deny/--allow flags.
426        if self.denied_builtins.contains(name) {
427            return Err(VmError::CategorizedError {
428                message: format!("Tool '{}' is not permitted.", name),
429                category: ErrorCategory::ToolRejected,
430            });
431        }
432        let autonomy = if crate::autonomy::needs_async_side_effect_enforcement(name) {
433            crate::autonomy::enforce_builtin_side_effect_boxed(name, &args).await?
434        } else {
435            None
436        };
437        if let Some(crate::autonomy::AutonomyDecision::Skip(value)) = autonomy {
438            return Ok(value);
439        }
440        if !matches!(
441            autonomy,
442            Some(crate::autonomy::AutonomyDecision::AllowApproved)
443        ) {
444            crate::orchestration::enforce_current_policy_for_builtin(name, &args)?;
445        }
446        crate::typecheck::validate_builtin_call(name, &args, None)?;
447
448        if let Some(result) =
449            crate::runtime_context::dispatch_runtime_context_builtin(self, name, &args)
450        {
451            return result;
452        }
453
454        self.ensure_deferred_builtin(name);
455
456        if let Some(id) = direct_id {
457            if let Some(entry) = self.builtins_by_id.get(&id).cloned() {
458                if entry.name.as_ref() == name {
459                    return self.call_builtin_entry(entry.dispatch, args).await;
460                }
461            }
462        }
463
464        if let Some(builtin) = self.builtins.get(name).cloned() {
465            self.call_builtin_entry(VmBuiltinDispatch::Sync(builtin), args)
466                .await
467        } else if let Some(async_builtin) = self.async_builtins.get(name).cloned() {
468            self.call_builtin_entry(VmBuiltinDispatch::Async(async_builtin), args)
469                .await
470        } else if let Some(bridge) = &self.bridge {
471            crate::orchestration::enforce_current_policy_for_bridge_builtin(name)?;
472            let args_json: Vec<serde_json::Value> =
473                args.iter().map(crate::llm::vm_value_to_json).collect();
474            let result = bridge
475                .call(
476                    "builtin_call",
477                    serde_json::json!({"name": name, "args": args_json}),
478                )
479                .await?;
480            Ok(crate::bridge::json_result_to_vm_value(&result))
481        } else {
482            let all_builtins = self
483                .builtins
484                .keys()
485                .chain(self.async_builtins.keys())
486                .chain(self.deferred_builtin_registrars.keys())
487                .map(|s| s.as_str());
488            if let Some(suggestion) = crate::value::closest_match(name, all_builtins) {
489                return Err(VmError::Runtime(format!(
490                    "Undefined builtin: {name} (did you mean `{suggestion}`?)"
491                )));
492            }
493            Err(VmError::UndefinedBuiltin(name.to_string()))
494        }
495    }
496
497    async fn call_builtin_entry(
498        &mut self,
499        dispatch: VmBuiltinDispatch,
500        args: Vec<VmValue>,
501    ) -> Result<VmValue, VmError> {
502        match dispatch {
503            VmBuiltinDispatch::Sync(builtin) => builtin(&args, &mut self.output),
504            VmBuiltinDispatch::Async(async_builtin) => {
505                CURRENT_ASYNC_BUILTIN_CHILD_VM.with(|slot| {
506                    slot.borrow_mut().push(self.child_vm());
507                });
508                let result = async_builtin(args).await;
509                let captured = CURRENT_ASYNC_BUILTIN_CHILD_VM.with(|slot| {
510                    let mut stack = slot.borrow_mut();
511                    let mut top = stack.pop();
512                    top.as_mut().map(|vm| vm.take_output()).unwrap_or_default()
513                });
514                if !captured.is_empty() {
515                    self.output.push_str(&captured);
516                }
517                result
518            }
519        }
520    }
521}