Skip to main content

harn_vm/
step_runtime.rs

1//! Per-step runtime state for `@step`-annotated persona functions.
2//!
3//! The compiler emits a call to the `__register_step` builtin after each
4//! `@step` declaration so the runtime can dispatch on the step's metadata
5//! when its function is invoked. While a step's frame is on the call
6//! stack, an [`ActiveStep`] entry tracks per-step LLM usage, defaults
7//! `llm_call`'s model when the call site doesn't override it, and bounds
8//! cumulative token and cost spend against the step's budget.
9//!
10//! This module owns three thread-locals (a per-program registry, a stack
11//! of currently-active steps, and a log of completed step summaries) but
12//! exposes only narrow helpers — `current_active_step_*` /
13//! `record_step_llm_usage` / etc. — so the call sites in
14//! `crates/harn-vm/src/llm/`, `crates/harn-vm/src/vm/`, and the compiler
15//! stay focused.
16
17use std::cell::{Cell, RefCell};
18use std::collections::BTreeMap;
19use std::rc::Rc;
20
21use serde::Serialize;
22use serde_json::Value as JsonValue;
23
24use crate::orchestration::{
25    current_execution_policy, pop_execution_policy, push_execution_policy, CapabilityPolicy,
26    HookEvent,
27};
28use crate::personas::StageDecl;
29use crate::value::{VmClosure, VmError, VmValue};
30
31fn vm_str(value: &VmValue) -> Option<&str> {
32    match value {
33        VmValue::String(s) => Some(s.as_ref()),
34        _ => None,
35    }
36}
37
38/// Static metadata captured from a `@step(...)` attribute.
39///
40/// Populated by the `__register_step` builtin (see [`register_step_from_dict`])
41/// when the program first runs, then consulted by `llm_call` and the
42/// frame-pop hooks while the step is active.
43#[derive(Debug, Default, Clone)]
44pub struct StepDefinition {
45    pub name: String,
46    pub function: String,
47    pub model: Option<String>,
48    pub max_tokens: Option<u64>,
49    pub max_usd: Option<f64>,
50    /// One of "fail" (default), "continue", "escalate". Drives how a
51    /// `budget_exceeded` error propagating out of the step is handled —
52    /// see `crates/harn-vm/src/vm/execution.rs`.
53    pub error_boundary: Option<String>,
54}
55
56#[derive(Debug, Default, Clone)]
57pub struct PersonaDefinition {
58    pub name: String,
59    /// Per-stage tool/side-effect scoping. Keyed lookups by stage name happen
60    /// every step entry; the list is small (a handful of stages per persona)
61    /// so a `Vec` keeps insertion order and matches the manifest's authored
62    /// ordering.
63    pub stages: Vec<StageDecl>,
64}
65
66impl StepDefinition {
67    pub fn boundary(&self) -> StepErrorBoundary {
68        match self.error_boundary.as_deref() {
69            Some("continue") => StepErrorBoundary::Continue,
70            Some("escalate") => StepErrorBoundary::Escalate,
71            _ => StepErrorBoundary::Fail,
72        }
73    }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum StepErrorBoundary {
78    Fail,
79    Continue,
80    Escalate,
81}
82
83/// Tracks one in-flight step. The `frame_depth` is `Vm::frames.len()`
84/// captured immediately after `push_closure_frame` returns, so an
85/// `ActiveStep` is "alive" while `Vm::frames.len() >= frame_depth`.
86#[derive(Debug, Clone)]
87pub struct ActiveStep {
88    pub frame_depth: usize,
89    pub definition: Rc<StepDefinition>,
90    pub persona: Option<String>,
91    pub args: Vec<VmValue>,
92    pub input_tokens: u64,
93    pub output_tokens: u64,
94    pub cost_usd: f64,
95    pub llm_calls: u32,
96    pub last_model: Option<String>,
97    /// Tracing span id opened when the step's frame was pushed; ended on
98    /// completion. 0 when tracing was disabled at push time, in which
99    /// case `span_end` is a no-op anyway.
100    pub span_id: u64,
101    /// True when this step pushed a per-stage `CapabilityPolicy` onto the
102    /// execution policy stack. The runtime pops it when the step's frame
103    /// unwinds, mirroring the RAII guard pattern in
104    /// `crates/harn-serve/src/adapters/acp/modes.rs`.
105    pub stage_policy_pushed: bool,
106}
107
108impl ActiveStep {
109    fn new(
110        frame_depth: usize,
111        definition: Rc<StepDefinition>,
112        persona: Option<String>,
113        args: Vec<VmValue>,
114        span_id: u64,
115        stage_policy_pushed: bool,
116    ) -> Self {
117        Self {
118            frame_depth,
119            definition,
120            persona,
121            args,
122            input_tokens: 0,
123            output_tokens: 0,
124            cost_usd: 0.0,
125            llm_calls: 0,
126            last_model: None,
127            span_id,
128            stage_policy_pushed,
129        }
130    }
131
132    fn total_tokens(&self) -> u64 {
133        self.input_tokens.saturating_add(self.output_tokens)
134    }
135}
136
137#[derive(Debug, Clone)]
138pub struct ActivePersona {
139    pub frame_depth: usize,
140    pub definition: Rc<PersonaDefinition>,
141}
142
143/// Snapshot persisted into [`COMPLETED_STEPS`] when the step's frame
144/// unwinds. Receipts and `harn persona inspect`-style downstream consumers
145/// read it back via [`drain_completed_steps`].
146#[derive(Debug, Clone, Serialize)]
147pub struct CompletedStep {
148    pub name: String,
149    pub function: String,
150    pub model: Option<String>,
151    pub input_tokens: u64,
152    pub output_tokens: u64,
153    pub cost_usd: f64,
154    pub llm_calls: u32,
155    pub status: String,
156    pub error: Option<String>,
157}
158
159thread_local! {
160    static STEP_REGISTRY: RefCell<BTreeMap<String, Rc<StepDefinition>>> =
161        const { RefCell::new(BTreeMap::new()) };
162    static PERSONA_REGISTRY: RefCell<BTreeMap<String, Rc<PersonaDefinition>>> =
163        const { RefCell::new(BTreeMap::new()) };
164    static STEP_REGISTRY_LEN: Cell<usize> = const { Cell::new(0) };
165    static PERSONA_REGISTRY_LEN: Cell<usize> = const { Cell::new(0) };
166    static PERSONA_STACK: RefCell<Vec<ActivePersona>> = const { RefCell::new(Vec::new()) };
167    static STEP_STACK: RefCell<Vec<ActiveStep>> = const { RefCell::new(Vec::new()) };
168    static COMPLETED_STEPS: RefCell<Vec<CompletedStep>> = const { RefCell::new(Vec::new()) };
169    static PERSONA_HOOKS: RefCell<Vec<PersonaHookRegistration>> = const { RefCell::new(Vec::new()) };
170}
171
172/// Reset every thread-local owned by this module. Called between test
173/// runs and at the start of each top-level program execution so leftover
174/// registrations don't leak across runs.
175pub fn reset_thread_local_state() {
176    STEP_REGISTRY.with(|r| r.borrow_mut().clear());
177    PERSONA_REGISTRY.with(|r| r.borrow_mut().clear());
178    STEP_REGISTRY_LEN.with(|len| len.set(0));
179    PERSONA_REGISTRY_LEN.with(|len| len.set(0));
180    PERSONA_STACK.with(|s| s.borrow_mut().clear());
181    STEP_STACK.with(|s| s.borrow_mut().clear());
182    COMPLETED_STEPS.with(|c| c.borrow_mut().clear());
183    PERSONA_HOOKS.with(|h| h.borrow_mut().clear());
184}
185
186#[inline]
187fn step_registry_empty() -> bool {
188    STEP_REGISTRY_LEN.with(|len| len.get() == 0)
189}
190
191#[inline]
192fn persona_registry_empty() -> bool {
193    PERSONA_REGISTRY_LEN.with(|len| len.get() == 0)
194}
195
196#[inline]
197fn tracked_registries_empty() -> bool {
198    step_registry_empty() && persona_registry_empty()
199}
200
201/// Bind a `@step` function name to its declared metadata. Idempotent: a
202/// second call replaces the prior definition (matches re-evaluation
203/// semantics of `harn run` and the conformance harness).
204pub fn register_step(function: &str, definition: StepDefinition) {
205    let inserted = STEP_REGISTRY.with(|registry| {
206        registry
207            .borrow_mut()
208            .insert(function.to_string(), Rc::new(definition))
209            .is_none()
210    });
211    if inserted {
212        STEP_REGISTRY_LEN.with(|len| len.set(len.get() + 1));
213    }
214}
215
216pub fn register_persona(function: &str, definition: PersonaDefinition) {
217    let inserted = PERSONA_REGISTRY.with(|registry| {
218        registry
219            .borrow_mut()
220            .insert(function.to_string(), Rc::new(definition))
221            .is_none()
222    });
223    if inserted {
224        PERSONA_REGISTRY_LEN.with(|len| len.set(len.get() + 1));
225    }
226}
227
228pub fn register_persona_from_dict(args: Vec<VmValue>) -> Result<VmValue, VmError> {
229    let function = args
230        .first()
231        .and_then(vm_str)
232        .map(|s| s.to_string())
233        .ok_or_else(|| {
234            VmError::Thrown(VmValue::String(Rc::from(
235                "__register_persona: expected (function_name, metadata_dict)",
236            )))
237        })?;
238    let meta = args
239        .get(1)
240        .and_then(VmValue::as_dict)
241        .cloned()
242        .ok_or_else(|| {
243            VmError::Thrown(VmValue::String(Rc::from(
244                "__register_persona: metadata argument must be a dict",
245            )))
246        })?;
247    let definition = PersonaDefinition {
248        name: meta
249            .get("name")
250            .and_then(vm_str)
251            .map(str::to_string)
252            .unwrap_or_else(|| function.clone()),
253        stages: parse_stage_decls(meta.get("stages"))?,
254    };
255    register_persona(&function, definition);
256    Ok(VmValue::Nil)
257}
258
259fn parse_stage_decls(value: Option<&VmValue>) -> Result<Vec<StageDecl>, VmError> {
260    let Some(value) = value else {
261        return Ok(Vec::new());
262    };
263    let entries = match value {
264        VmValue::Nil => return Ok(Vec::new()),
265        VmValue::List(list) => list.as_ref(),
266        _ => {
267            return Err(VmError::Thrown(VmValue::String(Rc::from(
268                "__register_persona: stages argument must be a list of dicts",
269            ))));
270        }
271    };
272    let mut out = Vec::with_capacity(entries.len());
273    for entry in entries {
274        let dict = entry.as_dict().ok_or_else(|| {
275            VmError::Thrown(VmValue::String(Rc::from(
276                "__register_persona: each stage entry must be a dict",
277            )))
278        })?;
279        let Some(name) = dict.get("name").and_then(vm_str) else {
280            return Err(VmError::Thrown(VmValue::String(Rc::from(
281                "__register_persona: stage dict missing required 'name'",
282            ))));
283        };
284        let allowed_tools = match dict.get("allowed_tools") {
285            None | Some(VmValue::Nil) => None,
286            Some(VmValue::List(items)) => Some(
287                items
288                    .iter()
289                    .map(|item| {
290                        vm_str(item).map(str::to_string).ok_or_else(|| {
291                            VmError::Thrown(VmValue::String(Rc::from(
292                                "__register_persona: stage allowed_tools entries must be strings",
293                            )))
294                        })
295                    })
296                    .collect::<Result<Vec<_>, _>>()?,
297            ),
298            _ => {
299                return Err(VmError::Thrown(VmValue::String(Rc::from(
300                    "__register_persona: stage allowed_tools must be a list of strings",
301                ))));
302            }
303        };
304        let side_effect_level = dict
305            .get("side_effect_level")
306            .and_then(vm_str)
307            .map(str::to_string)
308            .filter(|s| !s.is_empty());
309        let max_iterations = match dict.get("max_iterations") {
310            Some(VmValue::Int(n)) if *n >= 0 => Some(*n as u32),
311            Some(VmValue::Float(f)) if f.is_finite() && *f >= 0.0 => Some(*f as u32),
312            _ => None,
313        };
314        out.push(StageDecl {
315            name: name.to_string(),
316            allowed_tools,
317            side_effect_level,
318            max_iterations,
319            on_exit: None,
320        });
321    }
322    Ok(out)
323}
324
325/// Builtin entry point invoked by compiler-emitted bytecode after every
326/// `@step` function declaration. Accepts a dict mirroring
327/// `harn_modules::PersonaStepMetadata`.
328pub fn register_step_from_dict(args: Vec<VmValue>) -> Result<VmValue, VmError> {
329    let function = args
330        .first()
331        .and_then(vm_str)
332        .map(|s| s.to_string())
333        .ok_or_else(|| {
334            VmError::Thrown(VmValue::String(Rc::from(
335                "__register_step: expected (function_name, metadata_dict)",
336            )))
337        })?;
338    let meta = args
339        .get(1)
340        .and_then(VmValue::as_dict)
341        .cloned()
342        .ok_or_else(|| {
343            VmError::Thrown(VmValue::String(Rc::from(
344                "__register_step: metadata argument must be a dict",
345            )))
346        })?;
347
348    let mut definition = StepDefinition {
349        function: function.clone(),
350        ..StepDefinition::default()
351    };
352    definition.name = meta
353        .get("name")
354        .and_then(vm_str)
355        .map(|s| s.to_string())
356        .unwrap_or_else(|| function.clone());
357    definition.model = meta
358        .get("model")
359        .and_then(vm_str)
360        .map(|s| s.to_string())
361        .filter(|s| !s.is_empty());
362    definition.error_boundary = meta
363        .get("error_boundary")
364        .and_then(vm_str)
365        .map(|s| s.to_string());
366
367    if let Some(VmValue::Dict(budget)) = meta.get("budget") {
368        if let Some(value) = budget.get("max_tokens") {
369            definition.max_tokens = match value {
370                VmValue::Int(n) if *n > 0 => Some(*n as u64),
371                VmValue::Float(f) if f.is_finite() && *f > 0.0 => Some(*f as u64),
372                _ => None,
373            };
374        }
375        if let Some(value) = budget.get("max_usd") {
376            definition.max_usd = match value {
377                VmValue::Float(f) if f.is_finite() && *f >= 0.0 => Some(*f),
378                VmValue::Int(n) if *n >= 0 => Some(*n as f64),
379                _ => None,
380            };
381        }
382    }
383
384    register_step(&function, definition);
385    Ok(VmValue::Nil)
386}
387
388#[derive(Clone)]
389pub struct PersonaHookRegistration {
390    pub persona_pattern: String,
391    pub step_name: Option<String>,
392    pub event: HookEvent,
393    pub threshold_pct: Option<f64>,
394    pub handler: Rc<VmClosure>,
395}
396
397impl std::fmt::Debug for PersonaHookRegistration {
398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
399        f.debug_struct("PersonaHookRegistration")
400            .field("persona_pattern", &self.persona_pattern)
401            .field("step_name", &self.step_name)
402            .field("event", &self.event)
403            .field("threshold_pct", &self.threshold_pct)
404            .field("handler", &"..")
405            .finish()
406    }
407}
408
409#[derive(Debug, Clone)]
410pub struct PersonaHookInvocation {
411    pub handler: Rc<VmClosure>,
412    pub event: HookEvent,
413}
414
415pub fn register_persona_hook(
416    persona_pattern: impl Into<String>,
417    event: HookEvent,
418    threshold_pct: Option<f64>,
419    handler: Rc<VmClosure>,
420) {
421    PERSONA_HOOKS.with(|hooks| {
422        hooks.borrow_mut().push(PersonaHookRegistration {
423            persona_pattern: persona_pattern.into(),
424            step_name: None,
425            event,
426            threshold_pct,
427            handler,
428        });
429    });
430}
431
432pub fn register_step_hook(
433    persona_pattern: impl Into<String>,
434    step_name: impl Into<String>,
435    event: HookEvent,
436    threshold_pct: Option<f64>,
437    handler: Rc<VmClosure>,
438) {
439    PERSONA_HOOKS.with(|hooks| {
440        hooks.borrow_mut().push(PersonaHookRegistration {
441            persona_pattern: persona_pattern.into(),
442            step_name: Some(step_name.into()),
443            event,
444            threshold_pct,
445            handler,
446        });
447    });
448}
449
450pub fn clear_persona_hooks() {
451    PERSONA_HOOKS.with(|hooks| hooks.borrow_mut().clear());
452}
453
454pub struct ActiveContextSnapshot {
455    steps: Vec<ActiveStep>,
456    personas: Vec<ActivePersona>,
457}
458
459pub fn take_active_context() -> ActiveContextSnapshot {
460    ActiveContextSnapshot {
461        steps: STEP_STACK.with(|stack| std::mem::take(&mut *stack.borrow_mut())),
462        personas: PERSONA_STACK.with(|stack| std::mem::take(&mut *stack.borrow_mut())),
463    }
464}
465
466pub fn restore_active_context(snapshot: ActiveContextSnapshot) {
467    STEP_STACK.with(|stack| *stack.borrow_mut() = snapshot.steps);
468    PERSONA_STACK.with(|stack| *stack.borrow_mut() = snapshot.personas);
469}
470
471pub fn is_tracked_function(function_name: &str) -> bool {
472    if tracked_registries_empty() {
473        return false;
474    }
475    (!step_registry_empty()
476        && STEP_REGISTRY.with(|registry| registry.borrow().contains_key(function_name)))
477        || (!persona_registry_empty()
478            && PERSONA_REGISTRY.with(|registry| registry.borrow().contains_key(function_name)))
479}
480
481pub fn step_definition_for_function(function_name: &str) -> Option<Rc<StepDefinition>> {
482    if step_registry_empty() {
483        return None;
484    }
485    STEP_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned())
486}
487
488pub fn current_persona_name() -> Option<String> {
489    PERSONA_STACK.with(|stack| stack.borrow().last().map(|p| p.definition.name.clone()))
490}
491
492/// Resolve the per-stage policy for `step_name` against the currently
493/// active persona's stage declarations. Returns `None` when no persona is
494/// active or no stage matches the step name. Caller pushes the result onto
495/// `EXECUTION_POLICY_STACK`.
496///
497/// When an ambient policy is already active, the stage policy is
498/// intersected with it so a stage can only ever tighten the tool surface
499/// and side-effect ceiling — never widen them.
500fn stage_policy_for_active_step(step_name: &str) -> Option<CapabilityPolicy> {
501    let stage_policy = PERSONA_STACK.with(|stack| {
502        let stack = stack.borrow();
503        let persona = stack.last()?;
504        let stage = persona
505            .definition
506            .stages
507            .iter()
508            .find(|stage| stage.name == step_name)?;
509        Some(stage_decl_to_policy(stage))
510    })?;
511    let Some(parent) = current_execution_policy() else {
512        return Some(stage_policy);
513    };
514    // `intersect` is conservative: failure means the stage referenced a tool
515    // the ambient policy already denied. Fall back to a narrowed copy that
516    // drops those entries so the stage can never widen the ceiling.
517    Some(parent.intersect(&stage_policy).unwrap_or_else(|_| {
518        let intersected_tools: Vec<String> = stage_policy
519            .tools
520            .iter()
521            .filter(|tool| parent.tools.is_empty() || parent.tools.contains(*tool))
522            .cloned()
523            .collect();
524        CapabilityPolicy {
525            tools: intersected_tools,
526            ..stage_policy
527        }
528    }))
529}
530
531fn stage_decl_to_policy(stage: &StageDecl) -> CapabilityPolicy {
532    CapabilityPolicy {
533        tools: stage.allowed_tools.clone().unwrap_or_default(),
534        side_effect_level: stage.side_effect_level.clone(),
535        ..CapabilityPolicy::default()
536    }
537}
538
539fn persona_matches(pattern: &str, persona: &str) -> bool {
540    crate::orchestration::glob_match(pattern, persona)
541}
542
543pub fn matching_hooks(
544    event: HookEvent,
545    persona: Option<&str>,
546    step_name: Option<&str>,
547    budget_pct: Option<f64>,
548) -> Vec<PersonaHookInvocation> {
549    let persona = persona.unwrap_or("");
550    PERSONA_HOOKS.with(|hooks| {
551        hooks
552            .borrow()
553            .iter()
554            .filter(|hook| hook.event == event)
555            .filter(|hook| persona_matches(&hook.persona_pattern, persona))
556            .filter(|hook| match (&hook.step_name, step_name) {
557                (Some(expected), Some(actual)) => expected == actual,
558                (Some(_), None) => false,
559                (None, _) => true,
560            })
561            .filter(|hook| match (hook.threshold_pct, budget_pct) {
562                (Some(threshold), Some(pct)) => pct >= threshold,
563                (Some(_), None) => false,
564                (None, _) => true,
565            })
566            .map(|hook| PersonaHookInvocation {
567                handler: hook.handler.clone(),
568                event: hook.event,
569            })
570            .collect()
571    })
572}
573
574pub fn maybe_push_active_persona(function_name: &str, frame_depth: usize) -> bool {
575    if persona_registry_empty() {
576        return false;
577    }
578    let definition =
579        PERSONA_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned());
580    let Some(definition) = definition else {
581        return false;
582    };
583    PERSONA_STACK.with(|stack| {
584        stack.borrow_mut().push(ActivePersona {
585            frame_depth,
586            definition,
587        });
588    });
589    true
590}
591
592/// Push an active step onto the stack iff `function_name` has metadata
593/// registered. Returns `true` when a frame was pushed so the call site
594/// can record that fact. Called from `Vm::push_closure_frame` after the
595/// new frame has been added.
596pub fn maybe_push_active_step(function_name: &str, frame_depth: usize, args: &[VmValue]) -> bool {
597    if step_registry_empty() {
598        return false;
599    }
600    let definition = STEP_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned());
601    let Some(definition) = definition else {
602        return false;
603    };
604    let persona = current_persona_name();
605    let span_id =
606        crate::tracing::span_start(crate::tracing::SpanKind::Step, definition.name.clone());
607    if let Some(persona_name) = persona.as_deref() {
608        crate::tracing::span_set_metadata(
609            span_id,
610            "persona",
611            serde_json::Value::String(persona_name.to_string()),
612        );
613    }
614    if let Some(model) = definition.model.as_deref() {
615        crate::tracing::span_set_metadata(
616            span_id,
617            "model",
618            serde_json::Value::String(model.to_string()),
619        );
620    }
621    let step_name = definition.name.clone();
622    STEP_STACK.with(|stack| {
623        stack.borrow_mut().push(ActiveStep::new(
624            frame_depth,
625            definition,
626            persona,
627            args.to_vec(),
628            span_id,
629            false,
630        ));
631    });
632    if let Some(policy) = stage_policy_for_active_step(&step_name) {
633        push_execution_policy(policy);
634        STEP_STACK.with(|stack| {
635            if let Some(top) = stack.borrow_mut().last_mut() {
636                top.stage_policy_pushed = true;
637            }
638        });
639    }
640    true
641}
642
643/// Drop any step entries whose owning frame has already been unwound,
644/// recording a `CompletedStep` summary for each. The `current_frame_depth`
645/// is `Vm::frames.len()` at the call site — entries with
646/// `frame_depth > current_frame_depth` are stale.
647pub fn prune_below_frame(current_frame_depth: usize) {
648    let mut popped: Vec<ActiveStep> = Vec::new();
649    STEP_STACK.with(|stack| {
650        let mut stack = stack.borrow_mut();
651        while let Some(top) = stack.last() {
652            if top.frame_depth > current_frame_depth {
653                popped.push(stack.pop().unwrap());
654            } else {
655                break;
656            }
657        }
658    });
659    for step in popped {
660        finish_step(step, "completed", None);
661    }
662    PERSONA_STACK.with(|stack| {
663        let mut stack = stack.borrow_mut();
664        while stack
665            .last()
666            .is_some_and(|persona| persona.frame_depth > current_frame_depth)
667        {
668            stack.pop();
669        }
670    });
671}
672
673pub fn take_active_step(current_frame_depth: usize) -> Option<ActiveStep> {
674    STEP_STACK.with(|stack| {
675        let mut stack = stack.borrow_mut();
676        if stack
677            .last()
678            .is_some_and(|step| step.frame_depth == current_frame_depth)
679        {
680            stack.pop()
681        } else {
682            None
683        }
684    })
685}
686
687pub fn finish_active_step(step: ActiveStep, status: &str, error: Option<String>) {
688    finish_step(step, status, error);
689}
690
691/// Pop the topmost active step (if its frame is the current one) and
692/// record an explicit completion status. Used when an error boundary
693/// rewrites or absorbs an in-flight error so the receipt log reflects the
694/// outcome the persona actually saw.
695pub fn pop_and_record(current_frame_depth: usize, status: &str, error: Option<String>) -> bool {
696    let popped = STEP_STACK.with(|stack| {
697        let mut stack = stack.borrow_mut();
698        if stack
699            .last()
700            .map(|step| step.frame_depth == current_frame_depth)
701            .unwrap_or(false)
702        {
703            stack.pop()
704        } else {
705            None
706        }
707    });
708    let Some(step) = popped else {
709        return false;
710    };
711    finish_step(step, status, error);
712    true
713}
714
715fn finish_step(step: ActiveStep, status: &str, error: Option<String>) {
716    if step.stage_policy_pushed {
717        pop_execution_policy();
718    }
719    crate::tracing::span_set_metadata(
720        step.span_id,
721        "status",
722        serde_json::Value::String(status.to_string()),
723    );
724    crate::tracing::span_set_metadata(
725        step.span_id,
726        "llm_calls",
727        serde_json::Value::Number(step.llm_calls.into()),
728    );
729    crate::tracing::span_set_metadata(
730        step.span_id,
731        "input_tokens",
732        serde_json::Value::Number(step.input_tokens.into()),
733    );
734    crate::tracing::span_set_metadata(
735        step.span_id,
736        "output_tokens",
737        serde_json::Value::Number(step.output_tokens.into()),
738    );
739    if let Some(cost_n) = serde_json::Number::from_f64(step.cost_usd) {
740        crate::tracing::span_set_metadata(
741            step.span_id,
742            "cost_usd",
743            serde_json::Value::Number(cost_n),
744        );
745    }
746    crate::tracing::span_end(step.span_id);
747    let summary = CompletedStep {
748        name: step.definition.name.clone(),
749        function: step.definition.function.clone(),
750        model: step
751            .last_model
752            .clone()
753            .or_else(|| step.definition.model.clone()),
754        input_tokens: step.input_tokens,
755        output_tokens: step.output_tokens,
756        cost_usd: step.cost_usd,
757        llm_calls: step.llm_calls,
758        status: status.to_string(),
759        error,
760    };
761    COMPLETED_STEPS.with(|completed| completed.borrow_mut().push(summary));
762}
763
764/// Get a snapshot of the topmost active step, if any. Used by the
765/// llm_call path to fill in defaults — never for mutation.
766pub fn with_active_step<R>(f: impl FnOnce(&ActiveStep) -> R) -> Option<R> {
767    STEP_STACK.with(|stack| stack.borrow().last().map(f))
768}
769
770/// Mutate the topmost active step (typically to attribute LLM usage).
771pub fn with_active_step_mut<R>(f: impl FnOnce(&mut ActiveStep) -> R) -> Option<R> {
772    STEP_STACK.with(|stack| stack.borrow_mut().last_mut().map(f))
773}
774
775/// Frame depth of the topmost active step, or `None` when no step is
776/// active. Used by `handle_error` to detect "this throw is exiting a
777/// step's frame".
778pub fn active_step_frame_depth() -> Option<usize> {
779    STEP_STACK.with(|stack| stack.borrow().last().map(|s| s.frame_depth))
780}
781
782/// Default model the topmost active step should impose on `llm_call`
783/// invocations whose options dict didn't pin a model.
784pub fn active_step_model_default() -> Option<String> {
785    STEP_STACK.with(|stack| {
786        stack
787            .borrow()
788            .last()
789            .and_then(|step| step.definition.model.clone())
790    })
791}
792
793/// Record that `llm_call` consumed `input_tokens` / `output_tokens` for
794/// `cost_usd`. Updates the active step's running totals and returns a
795/// budget-exhaustion error if the step's ceiling is now breached.
796///
797/// The check is performed AFTER the call so the test fixture's first
798/// call (which fits under budget) succeeds and subsequent calls trip the
799/// limit. This matches the existing `accumulate_cost_for_provider`
800/// pattern where global budget is also checked post-hoc.
801pub fn record_step_llm_usage(
802    model: &str,
803    input_tokens: i64,
804    output_tokens: i64,
805    cost_usd: f64,
806) -> Result<(), VmError> {
807    let exhausted = STEP_STACK.with(|stack| -> Option<VmError> {
808        let mut stack = stack.borrow_mut();
809        let step = stack.last_mut()?;
810        step.input_tokens = step.input_tokens.saturating_add(input_tokens.max(0) as u64);
811        step.output_tokens = step
812            .output_tokens
813            .saturating_add(output_tokens.max(0) as u64);
814        step.cost_usd += cost_usd;
815        step.llm_calls = step.llm_calls.saturating_add(1);
816        if !model.is_empty() {
817            step.last_model = Some(model.to_string());
818        }
819
820        if let Some(max_tokens) = step.definition.max_tokens {
821            if step.total_tokens() > max_tokens {
822                return Some(budget_exhausted_error(
823                    &step.definition,
824                    "max_tokens",
825                    max_tokens as f64,
826                    step.total_tokens() as f64,
827                    step.cost_usd,
828                ));
829            }
830        }
831        if let Some(max_usd) = step.definition.max_usd {
832            if step.cost_usd > max_usd {
833                return Some(budget_exhausted_error(
834                    &step.definition,
835                    "max_usd",
836                    max_usd,
837                    step.total_tokens() as f64,
838                    step.cost_usd,
839                ));
840            }
841        }
842        None
843    });
844    if let Some(err) = exhausted {
845        return Err(err);
846    }
847    Ok(())
848}
849
850fn budget_exhausted_error(
851    definition: &StepDefinition,
852    limit: &str,
853    limit_value: f64,
854    consumed_tokens: f64,
855    consumed_cost_usd: f64,
856) -> VmError {
857    let mut dict: BTreeMap<String, VmValue> = BTreeMap::new();
858    dict.insert(
859        "category".to_string(),
860        VmValue::String(Rc::from("budget_exceeded")),
861    );
862    dict.insert(
863        "kind".to_string(),
864        VmValue::String(Rc::from("budget_exhausted")),
865    );
866    dict.insert(
867        "reason".to_string(),
868        VmValue::String(Rc::from("step_budget_exhausted")),
869    );
870    dict.insert(
871        "step".to_string(),
872        VmValue::String(Rc::from(definition.name.clone())),
873    );
874    dict.insert(
875        "function".to_string(),
876        VmValue::String(Rc::from(definition.function.clone())),
877    );
878    dict.insert(
879        "limit".to_string(),
880        VmValue::String(Rc::from(limit.to_string())),
881    );
882    dict.insert("limit_value".to_string(), VmValue::Float(limit_value));
883    dict.insert(
884        "consumed_tokens".to_string(),
885        VmValue::Float(consumed_tokens),
886    );
887    dict.insert(
888        "consumed_cost_usd".to_string(),
889        VmValue::Float(consumed_cost_usd),
890    );
891    dict.insert(
892        "error_boundary".to_string(),
893        VmValue::String(Rc::from(
894            definition
895                .error_boundary
896                .clone()
897                .unwrap_or_else(|| "fail".to_string()),
898        )),
899    );
900    dict.insert(
901        "message".to_string(),
902        VmValue::String(Rc::from(format!(
903            "step `{}` exceeded {} budget ({} > {})",
904            definition.name, limit, consumed_tokens as i64, limit_value as i64
905        ))),
906    );
907    VmError::Thrown(VmValue::Dict(Rc::new(dict)))
908}
909
910/// Returns true if the thrown value looks like a budget-exhausted
911/// error — either our typed step-budget dict or the existing
912/// `crates/harn-vm/src/llm/cost.rs::budget_exceeded_error` shape.
913/// Either form is treated identically by `error_boundary` because the
914/// per-step budget machinery layers onto the existing envelope; a step
915/// whose budget the preflight projection rejects is still a budget
916/// exhaustion the step authored.
917pub fn is_step_budget_exhausted(err: &VmError) -> bool {
918    let VmError::Thrown(VmValue::Dict(dict)) = err else {
919        return false;
920    };
921    let category = dict.get("category").and_then(vm_str);
922    let kind = dict.get("kind").and_then(vm_str);
923    let reason = dict.get("reason").and_then(vm_str);
924    if matches!(kind, Some("budget_exhausted")) && matches!(reason, Some("step_budget_exhausted")) {
925        return true;
926    }
927    matches!(category, Some("budget_exceeded"))
928}
929
930/// Annotate an existing budget-exhausted error with `escalated: true`
931/// and the step's identity so the persona body / handoff receiver can
932/// route on it. Returns the original error if it isn't a thrown dict.
933/// Ensures `step` and `function` keys reflect the just-finished step
934/// even when the underlying error was raised by the preflight budget
935/// machinery (which doesn't know which step it's running under).
936pub fn mark_escalated(err: VmError, step_name: Option<&str>, function: Option<&str>) -> VmError {
937    let VmError::Thrown(VmValue::Dict(dict)) = err else {
938        return err;
939    };
940    let mut next = (*dict).clone();
941    next.insert("escalated".to_string(), VmValue::Bool(true));
942    next.insert(
943        "category".to_string(),
944        VmValue::String(Rc::from("handoff_escalation")),
945    );
946    if let Some(step) = step_name {
947        next.entry("step".to_string())
948            .or_insert_with(|| VmValue::String(Rc::from(step.to_string())));
949    }
950    if let Some(function) = function {
951        next.entry("function".to_string())
952            .or_insert_with(|| VmValue::String(Rc::from(function.to_string())));
953    }
954    VmError::Thrown(VmValue::Dict(Rc::new(next)))
955}
956
957/// Drain the completed-step log. Used by receipt builders that want a
958/// per-step model + token + cost breakdown for the just-finished run.
959pub fn drain_completed_steps() -> Vec<CompletedStep> {
960    COMPLETED_STEPS.with(|completed| std::mem::take(&mut *completed.borrow_mut()))
961}
962
963/// Read the completed-step log without clearing it. Use when callers
964/// want a peek without disturbing the global record stream.
965pub fn peek_completed_steps() -> Vec<CompletedStep> {
966    COMPLETED_STEPS.with(|completed| completed.borrow().clone())
967}
968
969/// Lower a [`CompletedStep`] into JSON for embedding in receipts /
970/// inspect output.
971pub fn completed_step_to_json(step: &CompletedStep) -> JsonValue {
972    serde_json::to_value(step).unwrap_or(JsonValue::Null)
973}
974
975/// Register the `__register_step` host builtin. Compiler-emitted
976/// bytecode after every `@step` declaration calls it with
977/// `(function_name, metadata_dict)` so the runtime can later dispatch on
978/// the step's metadata when its function is invoked.
979pub fn register_step_builtins(vm: &mut crate::vm::Vm) {
980    vm.register_builtin("__register_step", |args, _out| {
981        register_step_from_dict(args.to_vec())
982    });
983    vm.register_builtin("__register_persona", |args, _out| {
984        register_persona_from_dict(args.to_vec())
985    });
986}
987
988#[cfg(test)]
989mod tests {
990    use super::*;
991
992    fn fresh_state() {
993        reset_thread_local_state();
994    }
995
996    #[test]
997    fn registers_and_pops_step_from_dict() {
998        fresh_state();
999        let mut budget: BTreeMap<String, VmValue> = BTreeMap::new();
1000        budget.insert("max_tokens".to_string(), VmValue::Int(100));
1001        budget.insert("max_usd".to_string(), VmValue::Float(0.05));
1002        let mut meta: BTreeMap<String, VmValue> = BTreeMap::new();
1003        meta.insert("name".to_string(), VmValue::String(Rc::from("plan")));
1004        meta.insert(
1005            "model".to_string(),
1006            VmValue::String(Rc::from("claude-haiku-4-5")),
1007        );
1008        meta.insert(
1009            "error_boundary".to_string(),
1010            VmValue::String(Rc::from("continue")),
1011        );
1012        meta.insert("budget".to_string(), VmValue::Dict(Rc::new(budget)));
1013
1014        register_step_from_dict(vec![
1015            VmValue::String(Rc::from("plan_step")),
1016            VmValue::Dict(Rc::new(meta)),
1017        ])
1018        .expect("registration succeeds");
1019
1020        assert!(maybe_push_active_step("plan_step", 3, &[]));
1021        assert_eq!(active_step_frame_depth(), Some(3));
1022        assert_eq!(
1023            active_step_model_default().as_deref(),
1024            Some("claude-haiku-4-5")
1025        );
1026
1027        record_step_llm_usage("claude-haiku-4-5", 10, 20, 0.001).expect("under budget");
1028        with_active_step(|step| {
1029            assert_eq!(step.input_tokens, 10);
1030            assert_eq!(step.output_tokens, 20);
1031            assert!((step.cost_usd - 0.001).abs() < 1e-9);
1032        });
1033
1034        let err =
1035            record_step_llm_usage("claude-haiku-4-5", 50, 50, 0.0).expect_err("should exhaust");
1036        assert!(is_step_budget_exhausted(&err));
1037
1038        prune_below_frame(2);
1039        let completed = drain_completed_steps();
1040        assert_eq!(completed.len(), 1);
1041        assert_eq!(completed[0].llm_calls, 2);
1042    }
1043
1044    #[test]
1045    fn unregistered_function_does_not_push() {
1046        fresh_state();
1047        assert!(!maybe_push_active_step("not_a_step", 1, &[]));
1048        assert!(active_step_frame_depth().is_none());
1049    }
1050
1051    #[test]
1052    fn tracked_registry_empty_fast_path_tracks_registrations_and_reset() {
1053        fresh_state();
1054        assert!(tracked_registries_empty());
1055        assert!(!is_tracked_function("plan_step"));
1056
1057        register_step(
1058            "plan_step",
1059            StepDefinition {
1060                name: "plan".to_string(),
1061                function: "plan_step".to_string(),
1062                ..StepDefinition::default()
1063            },
1064        );
1065        assert!(!tracked_registries_empty());
1066        assert!(is_tracked_function("plan_step"));
1067        assert!(step_definition_for_function("plan_step").is_some());
1068
1069        register_step(
1070            "plan_step",
1071            StepDefinition {
1072                name: "plan_v2".to_string(),
1073                function: "plan_step".to_string(),
1074                ..StepDefinition::default()
1075            },
1076        );
1077        assert!(is_tracked_function("plan_step"));
1078
1079        fresh_state();
1080        assert!(tracked_registries_empty());
1081        assert!(!is_tracked_function("plan_step"));
1082    }
1083
1084    #[test]
1085    fn stage_policy_narrows_but_does_not_widen_parent_policy() {
1086        fresh_state();
1087        let mut meta: BTreeMap<String, VmValue> = BTreeMap::new();
1088        meta.insert("name".to_string(), VmValue::String(Rc::from("research")));
1089        register_step_from_dict(vec![
1090            VmValue::String(Rc::from("research_step")),
1091            VmValue::Dict(Rc::new(meta)),
1092        ])
1093        .expect("step registration");
1094
1095        let mut stage_dict: BTreeMap<String, VmValue> = BTreeMap::new();
1096        stage_dict.insert("name".to_string(), VmValue::String(Rc::from("research")));
1097        // Stage tries to add `edit` on top of a parent that only allowed `read`.
1098        stage_dict.insert(
1099            "allowed_tools".to_string(),
1100            VmValue::List(Rc::new(vec![
1101                VmValue::String(Rc::from("read")),
1102                VmValue::String(Rc::from("edit")),
1103            ])),
1104        );
1105        let mut persona_meta: BTreeMap<String, VmValue> = BTreeMap::new();
1106        persona_meta.insert("name".to_string(), VmValue::String(Rc::from("scoped")));
1107        persona_meta.insert(
1108            "stages".to_string(),
1109            VmValue::List(Rc::new(vec![VmValue::Dict(Rc::new(stage_dict))])),
1110        );
1111        register_persona_from_dict(vec![
1112            VmValue::String(Rc::from("scoped_persona")),
1113            VmValue::Dict(Rc::new(persona_meta)),
1114        ])
1115        .expect("persona registration");
1116
1117        push_execution_policy(CapabilityPolicy {
1118            tools: vec!["read".to_string()],
1119            ..CapabilityPolicy::default()
1120        });
1121        assert!(maybe_push_active_persona("scoped_persona", 1));
1122        assert!(maybe_push_active_step("research_step", 2, &[]));
1123        let policy = current_execution_policy().expect("stage policy active");
1124        // `edit` is filtered out because the parent already denied it.
1125        assert_eq!(policy.tools, vec!["read".to_string()]);
1126
1127        prune_below_frame(0);
1128        pop_execution_policy();
1129        assert!(current_execution_policy().is_none());
1130    }
1131
1132    #[test]
1133    fn stage_policy_is_pushed_and_popped_around_step() {
1134        fresh_state();
1135        let mut meta: BTreeMap<String, VmValue> = BTreeMap::new();
1136        meta.insert("name".to_string(), VmValue::String(Rc::from("research")));
1137        register_step_from_dict(vec![
1138            VmValue::String(Rc::from("research_step")),
1139            VmValue::Dict(Rc::new(meta)),
1140        ])
1141        .expect("step registration succeeds");
1142
1143        let mut stage_dict: BTreeMap<String, VmValue> = BTreeMap::new();
1144        stage_dict.insert("name".to_string(), VmValue::String(Rc::from("research")));
1145        stage_dict.insert(
1146            "allowed_tools".to_string(),
1147            VmValue::List(Rc::new(vec![VmValue::String(Rc::from("read"))])),
1148        );
1149        let mut persona_meta: BTreeMap<String, VmValue> = BTreeMap::new();
1150        persona_meta.insert("name".to_string(), VmValue::String(Rc::from("scoped")));
1151        persona_meta.insert(
1152            "stages".to_string(),
1153            VmValue::List(Rc::new(vec![VmValue::Dict(Rc::new(stage_dict))])),
1154        );
1155        register_persona_from_dict(vec![
1156            VmValue::String(Rc::from("scoped_persona")),
1157            VmValue::Dict(Rc::new(persona_meta)),
1158        ])
1159        .expect("persona registration succeeds");
1160
1161        assert!(maybe_push_active_persona("scoped_persona", 1));
1162        assert!(crate::orchestration::current_execution_policy().is_none());
1163        assert!(maybe_push_active_step("research_step", 2, &[]));
1164        let policy = crate::orchestration::current_execution_policy()
1165            .expect("stage policy is active inside step");
1166        assert_eq!(policy.tools, vec!["read".to_string()]);
1167
1168        prune_below_frame(0);
1169        assert!(crate::orchestration::current_execution_policy().is_none());
1170    }
1171}