Skip to main content

harn_vm/
step_runtime.rs

1//! Per-step runtime state for `@step`-annotated persona functions.
2//!
3//! The compiler emits a call to the `__register_step` builtin after each
4//! `@step` declaration so the runtime can dispatch on the step's metadata
5//! when its function is invoked. While a step's frame is on the call
6//! stack, an [`ActiveStep`] entry tracks per-step LLM usage, defaults
7//! `llm_call`'s model when the call site doesn't override it, and bounds
8//! cumulative token and cost spend against the step's budget.
9//!
10//! This module owns three thread-locals (a per-program registry, a stack
11//! of currently-active steps, and a log of completed step summaries) but
12//! exposes only narrow helpers — `current_active_step_*` /
13//! `record_step_llm_usage` / etc. — so the call sites in
14//! `crates/harn-vm/src/llm/`, `crates/harn-vm/src/vm/`, and the compiler
15//! stay focused.
16
17use std::cell::{Cell, RefCell};
18use std::collections::BTreeMap;
19use std::rc::Rc;
20
21use serde::Serialize;
22use serde_json::Value as JsonValue;
23
24use crate::orchestration::{
25    current_execution_policy, pop_execution_policy, push_execution_policy, CapabilityPolicy,
26    HookEvent,
27};
28use crate::personas::StageDecl;
29use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
30use crate::value::{VmClosure, VmError, VmValue};
31
32fn vm_str(value: &VmValue) -> Option<&str> {
33    match value {
34        VmValue::String(s) => Some(s.as_ref()),
35        _ => None,
36    }
37}
38
39/// Static metadata captured from a `@step(...)` attribute.
40///
41/// Populated by the `__register_step` builtin (see [`register_step_from_dict`])
42/// when the program first runs, then consulted by `llm_call` and the
43/// frame-pop hooks while the step is active.
44#[derive(Debug, Default, Clone)]
45pub struct StepDefinition {
46    pub name: String,
47    pub function: String,
48    pub model: Option<String>,
49    pub max_tokens: Option<u64>,
50    pub max_usd: Option<f64>,
51    /// One of "fail" (default), "continue", "escalate". Drives how a
52    /// `budget_exceeded` error propagating out of the step is handled —
53    /// see `crates/harn-vm/src/vm/execution.rs`.
54    pub error_boundary: Option<String>,
55}
56
57#[derive(Debug, Default, Clone)]
58pub struct PersonaDefinition {
59    pub name: String,
60    /// Per-stage tool/side-effect scoping. Keyed lookups by stage name happen
61    /// every step entry; the list is small (a handful of stages per persona)
62    /// so a `Vec` keeps insertion order and matches the manifest's authored
63    /// ordering.
64    pub stages: Vec<StageDecl>,
65}
66
67impl StepDefinition {
68    pub fn boundary(&self) -> StepErrorBoundary {
69        match self.error_boundary.as_deref() {
70            Some("continue") => StepErrorBoundary::Continue,
71            Some("escalate") => StepErrorBoundary::Escalate,
72            _ => StepErrorBoundary::Fail,
73        }
74    }
75}
76
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum StepErrorBoundary {
79    Fail,
80    Continue,
81    Escalate,
82}
83
84/// Tracks one in-flight step. The `frame_depth` is `Vm::frames.len()`
85/// captured immediately after `push_closure_frame` returns, so an
86/// `ActiveStep` is "alive" while `Vm::frames.len() >= frame_depth`.
87#[derive(Debug, Clone)]
88pub struct ActiveStep {
89    pub frame_depth: usize,
90    pub definition: Rc<StepDefinition>,
91    pub persona: Option<String>,
92    pub args: Vec<VmValue>,
93    pub input_tokens: u64,
94    pub output_tokens: u64,
95    pub cost_usd: f64,
96    pub llm_calls: u32,
97    pub last_model: Option<String>,
98    /// Tracing span id opened when the step's frame was pushed; ended on
99    /// completion. 0 when tracing was disabled at push time, in which
100    /// case `span_end` is a no-op anyway.
101    pub span_id: u64,
102    /// True when this step pushed a per-stage `CapabilityPolicy` onto the
103    /// execution policy stack. The runtime pops it when the step's frame
104    /// unwinds, mirroring the RAII guard pattern in
105    /// `crates/harn-serve/src/adapters/acp/modes.rs`.
106    pub stage_policy_pushed: bool,
107}
108
109impl ActiveStep {
110    fn new(
111        frame_depth: usize,
112        definition: Rc<StepDefinition>,
113        persona: Option<String>,
114        args: Vec<VmValue>,
115        span_id: u64,
116        stage_policy_pushed: bool,
117    ) -> Self {
118        Self {
119            frame_depth,
120            definition,
121            persona,
122            args,
123            input_tokens: 0,
124            output_tokens: 0,
125            cost_usd: 0.0,
126            llm_calls: 0,
127            last_model: None,
128            span_id,
129            stage_policy_pushed,
130        }
131    }
132
133    fn total_tokens(&self) -> u64 {
134        self.input_tokens.saturating_add(self.output_tokens)
135    }
136}
137
138#[derive(Debug, Clone)]
139pub struct ActivePersona {
140    pub frame_depth: usize,
141    pub definition: Rc<PersonaDefinition>,
142}
143
144/// Snapshot persisted into [`COMPLETED_STEPS`] when the step's frame
145/// unwinds. Receipts and `harn persona inspect`-style downstream consumers
146/// read it back via [`drain_completed_steps`].
147#[derive(Debug, Clone, Serialize)]
148pub struct CompletedStep {
149    pub name: String,
150    pub function: String,
151    pub model: Option<String>,
152    pub input_tokens: u64,
153    pub output_tokens: u64,
154    pub cost_usd: f64,
155    pub llm_calls: u32,
156    pub status: String,
157    pub error: Option<String>,
158}
159
160thread_local! {
161    static STEP_REGISTRY: RefCell<BTreeMap<String, Rc<StepDefinition>>> =
162        const { RefCell::new(BTreeMap::new()) };
163    static PERSONA_REGISTRY: RefCell<BTreeMap<String, Rc<PersonaDefinition>>> =
164        const { RefCell::new(BTreeMap::new()) };
165    static STEP_REGISTRY_LEN: Cell<usize> = const { Cell::new(0) };
166    static PERSONA_REGISTRY_LEN: Cell<usize> = const { Cell::new(0) };
167    static PERSONA_STACK: RefCell<Vec<ActivePersona>> = const { RefCell::new(Vec::new()) };
168    static STEP_STACK: RefCell<Vec<ActiveStep>> = const { RefCell::new(Vec::new()) };
169    static COMPLETED_STEPS: RefCell<Vec<CompletedStep>> = const { RefCell::new(Vec::new()) };
170    static PERSONA_HOOKS: RefCell<Vec<PersonaHookRegistration>> = const { RefCell::new(Vec::new()) };
171}
172
173/// Reset every thread-local owned by this module. Called between test
174/// runs and at the start of each top-level program execution so leftover
175/// registrations don't leak across runs.
176pub fn reset_thread_local_state() {
177    STEP_REGISTRY.with(|r| r.borrow_mut().clear());
178    PERSONA_REGISTRY.with(|r| r.borrow_mut().clear());
179    STEP_REGISTRY_LEN.with(|len| len.set(0));
180    PERSONA_REGISTRY_LEN.with(|len| len.set(0));
181    PERSONA_STACK.with(|s| s.borrow_mut().clear());
182    STEP_STACK.with(|s| s.borrow_mut().clear());
183    COMPLETED_STEPS.with(|c| c.borrow_mut().clear());
184    PERSONA_HOOKS.with(|h| h.borrow_mut().clear());
185}
186
187#[inline]
188fn step_registry_empty() -> bool {
189    STEP_REGISTRY_LEN.with(|len| len.get() == 0)
190}
191
192#[inline]
193fn persona_registry_empty() -> bool {
194    PERSONA_REGISTRY_LEN.with(|len| len.get() == 0)
195}
196
197#[inline]
198fn tracked_registries_empty() -> bool {
199    step_registry_empty() && persona_registry_empty()
200}
201
202/// Bind a `@step` function name to its declared metadata. Idempotent: a
203/// second call replaces the prior definition (matches re-evaluation
204/// semantics of `harn run` and the conformance harness).
205pub fn register_step(function: &str, definition: StepDefinition) {
206    let inserted = STEP_REGISTRY.with(|registry| {
207        registry
208            .borrow_mut()
209            .insert(function.to_string(), Rc::new(definition))
210            .is_none()
211    });
212    if inserted {
213        STEP_REGISTRY_LEN.with(|len| len.set(len.get() + 1));
214    }
215}
216
217pub fn register_persona(function: &str, definition: PersonaDefinition) {
218    let inserted = PERSONA_REGISTRY.with(|registry| {
219        registry
220            .borrow_mut()
221            .insert(function.to_string(), Rc::new(definition))
222            .is_none()
223    });
224    if inserted {
225        PERSONA_REGISTRY_LEN.with(|len| len.set(len.get() + 1));
226    }
227}
228
229pub fn register_persona_from_dict(args: Vec<VmValue>) -> Result<VmValue, VmError> {
230    let function = args
231        .first()
232        .and_then(vm_str)
233        .map(|s| s.to_string())
234        .ok_or_else(|| {
235            VmError::Thrown(VmValue::String(Rc::from(
236                "__register_persona: expected (function_name, metadata_dict)",
237            )))
238        })?;
239    let meta = args
240        .get(1)
241        .and_then(VmValue::as_dict)
242        .cloned()
243        .ok_or_else(|| {
244            VmError::Thrown(VmValue::String(Rc::from(
245                "__register_persona: metadata argument must be a dict",
246            )))
247        })?;
248    let definition = PersonaDefinition {
249        name: meta
250            .get("name")
251            .and_then(vm_str)
252            .map(str::to_string)
253            .unwrap_or_else(|| function.clone()),
254        stages: parse_stage_decls(meta.get("stages"))?,
255    };
256    register_persona(&function, definition);
257    Ok(VmValue::Nil)
258}
259
260fn parse_stage_decls(value: Option<&VmValue>) -> Result<Vec<StageDecl>, VmError> {
261    let Some(value) = value else {
262        return Ok(Vec::new());
263    };
264    let entries = match value {
265        VmValue::Nil => return Ok(Vec::new()),
266        VmValue::List(list) => list.as_ref(),
267        _ => {
268            return Err(VmError::Thrown(VmValue::String(Rc::from(
269                "__register_persona: stages argument must be a list of dicts",
270            ))));
271        }
272    };
273    let mut out = Vec::with_capacity(entries.len());
274    for entry in entries {
275        let dict = entry.as_dict().ok_or_else(|| {
276            VmError::Thrown(VmValue::String(Rc::from(
277                "__register_persona: each stage entry must be a dict",
278            )))
279        })?;
280        let Some(name) = dict.get("name").and_then(vm_str) else {
281            return Err(VmError::Thrown(VmValue::String(Rc::from(
282                "__register_persona: stage dict missing required 'name'",
283            ))));
284        };
285        let allowed_tools = match dict.get("allowed_tools") {
286            None | Some(VmValue::Nil) => None,
287            Some(VmValue::List(items)) => Some(
288                items
289                    .iter()
290                    .map(|item| {
291                        vm_str(item).map(str::to_string).ok_or_else(|| {
292                            VmError::Thrown(VmValue::String(Rc::from(
293                                "__register_persona: stage allowed_tools entries must be strings",
294                            )))
295                        })
296                    })
297                    .collect::<Result<Vec<_>, _>>()?,
298            ),
299            _ => {
300                return Err(VmError::Thrown(VmValue::String(Rc::from(
301                    "__register_persona: stage allowed_tools must be a list of strings",
302                ))));
303            }
304        };
305        let side_effect_level = dict
306            .get("side_effect_level")
307            .and_then(vm_str)
308            .map(str::to_string)
309            .filter(|s| !s.is_empty());
310        let max_iterations = match dict.get("max_iterations") {
311            Some(VmValue::Int(n)) if *n >= 0 => Some(*n as u32),
312            Some(VmValue::Float(f)) if f.is_finite() && *f >= 0.0 => Some(*f as u32),
313            _ => None,
314        };
315        out.push(StageDecl {
316            name: name.to_string(),
317            allowed_tools,
318            side_effect_level,
319            max_iterations,
320            on_exit: None,
321        });
322    }
323    Ok(out)
324}
325
326/// Builtin entry point invoked by compiler-emitted bytecode after every
327/// `@step` function declaration. Accepts a dict mirroring
328/// `harn_modules::PersonaStepMetadata`.
329pub fn register_step_from_dict(args: Vec<VmValue>) -> Result<VmValue, VmError> {
330    let function = args
331        .first()
332        .and_then(vm_str)
333        .map(|s| s.to_string())
334        .ok_or_else(|| {
335            VmError::Thrown(VmValue::String(Rc::from(
336                "__register_step: expected (function_name, metadata_dict)",
337            )))
338        })?;
339    let meta = args
340        .get(1)
341        .and_then(VmValue::as_dict)
342        .cloned()
343        .ok_or_else(|| {
344            VmError::Thrown(VmValue::String(Rc::from(
345                "__register_step: metadata argument must be a dict",
346            )))
347        })?;
348
349    let mut definition = StepDefinition {
350        function: function.clone(),
351        ..StepDefinition::default()
352    };
353    definition.name = meta
354        .get("name")
355        .and_then(vm_str)
356        .map(|s| s.to_string())
357        .unwrap_or_else(|| function.clone());
358    definition.model = meta
359        .get("model")
360        .and_then(vm_str)
361        .map(|s| s.to_string())
362        .filter(|s| !s.is_empty());
363    definition.error_boundary = meta
364        .get("error_boundary")
365        .and_then(vm_str)
366        .map(|s| s.to_string());
367
368    if let Some(VmValue::Dict(budget)) = meta.get("budget") {
369        if let Some(value) = budget.get("max_tokens") {
370            definition.max_tokens = match value {
371                VmValue::Int(n) if *n > 0 => Some(*n as u64),
372                VmValue::Float(f) if f.is_finite() && *f > 0.0 => Some(*f as u64),
373                _ => None,
374            };
375        }
376        if let Some(value) = budget.get("max_usd") {
377            definition.max_usd = match value {
378                VmValue::Float(f) if f.is_finite() && *f >= 0.0 => Some(*f),
379                VmValue::Int(n) if *n >= 0 => Some(*n as f64),
380                _ => None,
381            };
382        }
383    }
384
385    register_step(&function, definition);
386    Ok(VmValue::Nil)
387}
388
389#[derive(Clone)]
390pub struct PersonaHookRegistration {
391    pub persona_pattern: String,
392    pub step_name: Option<String>,
393    pub event: HookEvent,
394    pub threshold_pct: Option<f64>,
395    pub handler: Rc<VmClosure>,
396}
397
398impl std::fmt::Debug for PersonaHookRegistration {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        f.debug_struct("PersonaHookRegistration")
401            .field("persona_pattern", &self.persona_pattern)
402            .field("step_name", &self.step_name)
403            .field("event", &self.event)
404            .field("threshold_pct", &self.threshold_pct)
405            .field("handler", &"..")
406            .finish()
407    }
408}
409
410#[derive(Debug, Clone)]
411pub struct PersonaHookInvocation {
412    pub handler: Rc<VmClosure>,
413    pub event: HookEvent,
414}
415
416pub fn register_persona_hook(
417    persona_pattern: impl Into<String>,
418    event: HookEvent,
419    threshold_pct: Option<f64>,
420    handler: Rc<VmClosure>,
421) {
422    PERSONA_HOOKS.with(|hooks| {
423        hooks.borrow_mut().push(PersonaHookRegistration {
424            persona_pattern: persona_pattern.into(),
425            step_name: None,
426            event,
427            threshold_pct,
428            handler,
429        });
430    });
431}
432
433pub fn register_step_hook(
434    persona_pattern: impl Into<String>,
435    step_name: impl Into<String>,
436    event: HookEvent,
437    threshold_pct: Option<f64>,
438    handler: Rc<VmClosure>,
439) {
440    PERSONA_HOOKS.with(|hooks| {
441        hooks.borrow_mut().push(PersonaHookRegistration {
442            persona_pattern: persona_pattern.into(),
443            step_name: Some(step_name.into()),
444            event,
445            threshold_pct,
446            handler,
447        });
448    });
449}
450
451pub fn clear_persona_hooks() {
452    PERSONA_HOOKS.with(|hooks| hooks.borrow_mut().clear());
453}
454
455pub struct ActiveContextSnapshot {
456    steps: Vec<ActiveStep>,
457    personas: Vec<ActivePersona>,
458}
459
460pub fn take_active_context() -> ActiveContextSnapshot {
461    ActiveContextSnapshot {
462        steps: STEP_STACK.with(|stack| std::mem::take(&mut *stack.borrow_mut())),
463        personas: PERSONA_STACK.with(|stack| std::mem::take(&mut *stack.borrow_mut())),
464    }
465}
466
467pub fn restore_active_context(snapshot: ActiveContextSnapshot) {
468    STEP_STACK.with(|stack| *stack.borrow_mut() = snapshot.steps);
469    PERSONA_STACK.with(|stack| *stack.borrow_mut() = snapshot.personas);
470}
471
472pub fn is_tracked_function(function_name: &str) -> bool {
473    if tracked_registries_empty() {
474        return false;
475    }
476    (!step_registry_empty()
477        && STEP_REGISTRY.with(|registry| registry.borrow().contains_key(function_name)))
478        || (!persona_registry_empty()
479            && PERSONA_REGISTRY.with(|registry| registry.borrow().contains_key(function_name)))
480}
481
482pub fn step_definition_for_function(function_name: &str) -> Option<Rc<StepDefinition>> {
483    if step_registry_empty() {
484        return None;
485    }
486    STEP_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned())
487}
488
489pub fn current_persona_name() -> Option<String> {
490    PERSONA_STACK.with(|stack| stack.borrow().last().map(|p| p.definition.name.clone()))
491}
492
493/// Resolve the per-stage policy for `step_name` against the currently
494/// active persona's stage declarations. Returns `None` when no persona is
495/// active or no stage matches the step name. Caller pushes the result onto
496/// `EXECUTION_POLICY_STACK`.
497///
498/// When an ambient policy is already active, the stage policy is
499/// intersected with it so a stage can only ever tighten the tool surface
500/// and side-effect ceiling — never widen them.
501fn stage_policy_for_active_step(step_name: &str) -> Option<CapabilityPolicy> {
502    let stage_policy = PERSONA_STACK.with(|stack| {
503        let stack = stack.borrow();
504        let persona = stack.last()?;
505        let stage = persona
506            .definition
507            .stages
508            .iter()
509            .find(|stage| stage.name == step_name)?;
510        Some(stage_decl_to_policy(stage))
511    })?;
512    let Some(parent) = current_execution_policy() else {
513        return Some(stage_policy);
514    };
515    // `intersect` is conservative: failure means the stage referenced a tool
516    // the ambient policy already denied. Fall back to a narrowed copy that
517    // drops those entries so the stage can never widen the ceiling.
518    Some(parent.intersect(&stage_policy).unwrap_or_else(|_| {
519        let intersected_tools: Vec<String> = stage_policy
520            .tools
521            .iter()
522            .filter(|tool| parent.tools.is_empty() || parent.tools.contains(*tool))
523            .cloned()
524            .collect();
525        CapabilityPolicy {
526            tools: intersected_tools,
527            ..stage_policy
528        }
529    }))
530}
531
532fn stage_decl_to_policy(stage: &StageDecl) -> CapabilityPolicy {
533    CapabilityPolicy {
534        tools: stage.allowed_tools.clone().unwrap_or_default(),
535        side_effect_level: stage.side_effect_level.clone(),
536        ..CapabilityPolicy::default()
537    }
538}
539
540fn persona_matches(pattern: &str, persona: &str) -> bool {
541    crate::orchestration::glob_match(pattern, persona)
542}
543
544pub fn matching_hooks(
545    event: HookEvent,
546    persona: Option<&str>,
547    step_name: Option<&str>,
548    budget_pct: Option<f64>,
549) -> Vec<PersonaHookInvocation> {
550    let persona = persona.unwrap_or("");
551    PERSONA_HOOKS.with(|hooks| {
552        hooks
553            .borrow()
554            .iter()
555            .filter(|hook| hook.event == event)
556            .filter(|hook| persona_matches(&hook.persona_pattern, persona))
557            .filter(|hook| match (&hook.step_name, step_name) {
558                (Some(expected), Some(actual)) => expected == actual,
559                (Some(_), None) => false,
560                (None, _) => true,
561            })
562            .filter(|hook| match (hook.threshold_pct, budget_pct) {
563                (Some(threshold), Some(pct)) => pct >= threshold,
564                (Some(_), None) => false,
565                (None, _) => true,
566            })
567            .map(|hook| PersonaHookInvocation {
568                handler: hook.handler.clone(),
569                event: hook.event,
570            })
571            .collect()
572    })
573}
574
575pub fn maybe_push_active_persona(function_name: &str, frame_depth: usize) -> bool {
576    if persona_registry_empty() {
577        return false;
578    }
579    let definition =
580        PERSONA_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned());
581    let Some(definition) = definition else {
582        return false;
583    };
584    PERSONA_STACK.with(|stack| {
585        stack.borrow_mut().push(ActivePersona {
586            frame_depth,
587            definition,
588        });
589    });
590    true
591}
592
593/// Push an active step onto the stack iff `function_name` has metadata
594/// registered. Returns `true` when a frame was pushed so the call site
595/// can record that fact. Called from `Vm::push_closure_frame` after the
596/// new frame has been added.
597pub fn maybe_push_active_step(function_name: &str, frame_depth: usize, args: &[VmValue]) -> bool {
598    if step_registry_empty() {
599        return false;
600    }
601    let definition = STEP_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned());
602    let Some(definition) = definition else {
603        return false;
604    };
605    let persona = current_persona_name();
606    let span_id =
607        crate::tracing::span_start(crate::tracing::SpanKind::Step, definition.name.clone());
608    if let Some(persona_name) = persona.as_deref() {
609        crate::tracing::span_set_metadata(
610            span_id,
611            "persona",
612            serde_json::Value::String(persona_name.to_string()),
613        );
614    }
615    if let Some(model) = definition.model.as_deref() {
616        crate::tracing::span_set_metadata(
617            span_id,
618            "model",
619            serde_json::Value::String(model.to_string()),
620        );
621    }
622    let step_name = definition.name.clone();
623    STEP_STACK.with(|stack| {
624        stack.borrow_mut().push(ActiveStep::new(
625            frame_depth,
626            definition,
627            persona,
628            args.to_vec(),
629            span_id,
630            false,
631        ));
632    });
633    if let Some(policy) = stage_policy_for_active_step(&step_name) {
634        push_execution_policy(policy);
635        STEP_STACK.with(|stack| {
636            if let Some(top) = stack.borrow_mut().last_mut() {
637                top.stage_policy_pushed = true;
638            }
639        });
640    }
641    true
642}
643
644/// Drop any step entries whose owning frame has already been unwound,
645/// recording a `CompletedStep` summary for each. The `current_frame_depth`
646/// is `Vm::frames.len()` at the call site — entries with
647/// `frame_depth > current_frame_depth` are stale.
648pub fn prune_below_frame(current_frame_depth: usize) {
649    let mut popped: Vec<ActiveStep> = Vec::new();
650    STEP_STACK.with(|stack| {
651        let mut stack = stack.borrow_mut();
652        while let Some(top) = stack.last() {
653            if top.frame_depth > current_frame_depth {
654                popped.push(stack.pop().unwrap());
655            } else {
656                break;
657            }
658        }
659    });
660    for step in popped {
661        finish_step(step, "completed", None);
662    }
663    PERSONA_STACK.with(|stack| {
664        let mut stack = stack.borrow_mut();
665        while stack
666            .last()
667            .is_some_and(|persona| persona.frame_depth > current_frame_depth)
668        {
669            stack.pop();
670        }
671    });
672}
673
674pub fn take_active_step(current_frame_depth: usize) -> Option<ActiveStep> {
675    STEP_STACK.with(|stack| {
676        let mut stack = stack.borrow_mut();
677        if stack
678            .last()
679            .is_some_and(|step| step.frame_depth == current_frame_depth)
680        {
681            stack.pop()
682        } else {
683            None
684        }
685    })
686}
687
688pub fn finish_active_step(step: ActiveStep, status: &str, error: Option<String>) {
689    finish_step(step, status, error);
690}
691
692/// Pop the topmost active step (if its frame is the current one) and
693/// record an explicit completion status. Used when an error boundary
694/// rewrites or absorbs an in-flight error so the receipt log reflects the
695/// outcome the persona actually saw.
696pub fn pop_and_record(current_frame_depth: usize, status: &str, error: Option<String>) -> bool {
697    let popped = STEP_STACK.with(|stack| {
698        let mut stack = stack.borrow_mut();
699        if stack
700            .last()
701            .map(|step| step.frame_depth == current_frame_depth)
702            .unwrap_or(false)
703        {
704            stack.pop()
705        } else {
706            None
707        }
708    });
709    let Some(step) = popped else {
710        return false;
711    };
712    finish_step(step, status, error);
713    true
714}
715
716fn finish_step(step: ActiveStep, status: &str, error: Option<String>) {
717    if step.stage_policy_pushed {
718        pop_execution_policy();
719    }
720    crate::tracing::span_set_metadata(
721        step.span_id,
722        "status",
723        serde_json::Value::String(status.to_string()),
724    );
725    crate::tracing::span_set_metadata(
726        step.span_id,
727        "llm_calls",
728        serde_json::Value::Number(step.llm_calls.into()),
729    );
730    crate::tracing::span_set_metadata(
731        step.span_id,
732        "input_tokens",
733        serde_json::Value::Number(step.input_tokens.into()),
734    );
735    crate::tracing::span_set_metadata(
736        step.span_id,
737        "output_tokens",
738        serde_json::Value::Number(step.output_tokens.into()),
739    );
740    if let Some(cost_n) = serde_json::Number::from_f64(step.cost_usd) {
741        crate::tracing::span_set_metadata(
742            step.span_id,
743            "cost_usd",
744            serde_json::Value::Number(cost_n),
745        );
746    }
747    crate::tracing::span_end(step.span_id);
748    let summary = CompletedStep {
749        name: step.definition.name.clone(),
750        function: step.definition.function.clone(),
751        model: step
752            .last_model
753            .clone()
754            .or_else(|| step.definition.model.clone()),
755        input_tokens: step.input_tokens,
756        output_tokens: step.output_tokens,
757        cost_usd: step.cost_usd,
758        llm_calls: step.llm_calls,
759        status: status.to_string(),
760        error,
761    };
762    COMPLETED_STEPS.with(|completed| completed.borrow_mut().push(summary));
763}
764
765/// Get a snapshot of the topmost active step, if any. Used by the
766/// llm_call path to fill in defaults — never for mutation.
767pub fn with_active_step<R>(f: impl FnOnce(&ActiveStep) -> R) -> Option<R> {
768    STEP_STACK.with(|stack| stack.borrow().last().map(f))
769}
770
771/// Mutate the topmost active step (typically to attribute LLM usage).
772pub fn with_active_step_mut<R>(f: impl FnOnce(&mut ActiveStep) -> R) -> Option<R> {
773    STEP_STACK.with(|stack| stack.borrow_mut().last_mut().map(f))
774}
775
776/// Frame depth of the topmost active step, or `None` when no step is
777/// active. Used by `handle_error` to detect "this throw is exiting a
778/// step's frame".
779pub fn active_step_frame_depth() -> Option<usize> {
780    STEP_STACK.with(|stack| stack.borrow().last().map(|s| s.frame_depth))
781}
782
783/// Default model the topmost active step should impose on `llm_call`
784/// invocations whose options dict didn't pin a model.
785pub fn active_step_model_default() -> Option<String> {
786    STEP_STACK.with(|stack| {
787        stack
788            .borrow()
789            .last()
790            .and_then(|step| step.definition.model.clone())
791    })
792}
793
794/// Record that `llm_call` consumed `input_tokens` / `output_tokens` for
795/// `cost_usd`. Updates the active step's running totals and returns a
796/// budget-exhaustion error if the step's ceiling is now breached.
797///
798/// The check is performed AFTER the call so the test fixture's first
799/// call (which fits under budget) succeeds and subsequent calls trip the
800/// limit. This matches the existing `accumulate_cost_for_provider`
801/// pattern where global budget is also checked post-hoc.
802pub fn record_step_llm_usage(
803    model: &str,
804    input_tokens: i64,
805    output_tokens: i64,
806    cost_usd: f64,
807) -> Result<(), VmError> {
808    let exhausted = STEP_STACK.with(|stack| -> Option<VmError> {
809        let mut stack = stack.borrow_mut();
810        let step = stack.last_mut()?;
811        step.input_tokens = step.input_tokens.saturating_add(input_tokens.max(0) as u64);
812        step.output_tokens = step
813            .output_tokens
814            .saturating_add(output_tokens.max(0) as u64);
815        step.cost_usd += cost_usd;
816        step.llm_calls = step.llm_calls.saturating_add(1);
817        if !model.is_empty() {
818            step.last_model = Some(model.to_string());
819        }
820
821        if let Some(max_tokens) = step.definition.max_tokens {
822            if step.total_tokens() > max_tokens {
823                return Some(budget_exhausted_error(
824                    &step.definition,
825                    "max_tokens",
826                    max_tokens as f64,
827                    step.total_tokens() as f64,
828                    step.cost_usd,
829                ));
830            }
831        }
832        if let Some(max_usd) = step.definition.max_usd {
833            if step.cost_usd > max_usd {
834                return Some(budget_exhausted_error(
835                    &step.definition,
836                    "max_usd",
837                    max_usd,
838                    step.total_tokens() as f64,
839                    step.cost_usd,
840                ));
841            }
842        }
843        None
844    });
845    if let Some(err) = exhausted {
846        return Err(err);
847    }
848    Ok(())
849}
850
851fn budget_exhausted_error(
852    definition: &StepDefinition,
853    limit: &str,
854    limit_value: f64,
855    consumed_tokens: f64,
856    consumed_cost_usd: f64,
857) -> VmError {
858    let mut dict: BTreeMap<String, VmValue> = BTreeMap::new();
859    dict.insert(
860        "category".to_string(),
861        VmValue::String(Rc::from("budget_exceeded")),
862    );
863    dict.insert(
864        "kind".to_string(),
865        VmValue::String(Rc::from("budget_exhausted")),
866    );
867    dict.insert(
868        "reason".to_string(),
869        VmValue::String(Rc::from("step_budget_exhausted")),
870    );
871    dict.insert(
872        "step".to_string(),
873        VmValue::String(Rc::from(definition.name.clone())),
874    );
875    dict.insert(
876        "function".to_string(),
877        VmValue::String(Rc::from(definition.function.clone())),
878    );
879    dict.insert(
880        "limit".to_string(),
881        VmValue::String(Rc::from(limit.to_string())),
882    );
883    dict.insert("limit_value".to_string(), VmValue::Float(limit_value));
884    dict.insert(
885        "consumed_tokens".to_string(),
886        VmValue::Float(consumed_tokens),
887    );
888    dict.insert(
889        "consumed_cost_usd".to_string(),
890        VmValue::Float(consumed_cost_usd),
891    );
892    dict.insert(
893        "error_boundary".to_string(),
894        VmValue::String(Rc::from(
895            definition
896                .error_boundary
897                .clone()
898                .unwrap_or_else(|| "fail".to_string()),
899        )),
900    );
901    dict.insert(
902        "message".to_string(),
903        VmValue::String(Rc::from(format!(
904            "step `{}` exceeded {} budget ({} > {})",
905            definition.name, limit, consumed_tokens as i64, limit_value as i64
906        ))),
907    );
908    VmError::Thrown(VmValue::Dict(Rc::new(dict)))
909}
910
911/// Returns true if the thrown value looks like a budget-exhausted
912/// error — either our typed step-budget dict or the existing
913/// `crates/harn-vm/src/llm/cost.rs::budget_exceeded_error` shape.
914/// Either form is treated identically by `error_boundary` because the
915/// per-step budget machinery layers onto the existing envelope; a step
916/// whose budget the preflight projection rejects is still a budget
917/// exhaustion the step authored.
918pub fn is_step_budget_exhausted(err: &VmError) -> bool {
919    let VmError::Thrown(VmValue::Dict(dict)) = err else {
920        return false;
921    };
922    let category = dict.get("category").and_then(vm_str);
923    let kind = dict.get("kind").and_then(vm_str);
924    let reason = dict.get("reason").and_then(vm_str);
925    if matches!(kind, Some("budget_exhausted")) && matches!(reason, Some("step_budget_exhausted")) {
926        return true;
927    }
928    matches!(category, Some("budget_exceeded"))
929}
930
931/// Annotate an existing budget-exhausted error with `escalated: true`
932/// and the step's identity so the persona body / handoff receiver can
933/// route on it. Returns the original error if it isn't a thrown dict.
934/// Ensures `step` and `function` keys reflect the just-finished step
935/// even when the underlying error was raised by the preflight budget
936/// machinery (which doesn't know which step it's running under).
937pub fn mark_escalated(err: VmError, step_name: Option<&str>, function: Option<&str>) -> VmError {
938    let VmError::Thrown(VmValue::Dict(dict)) = err else {
939        return err;
940    };
941    let mut next = (*dict).clone();
942    next.insert("escalated".to_string(), VmValue::Bool(true));
943    next.insert(
944        "category".to_string(),
945        VmValue::String(Rc::from("handoff_escalation")),
946    );
947    if let Some(step) = step_name {
948        next.entry("step".to_string())
949            .or_insert_with(|| VmValue::String(Rc::from(step.to_string())));
950    }
951    if let Some(function) = function {
952        next.entry("function".to_string())
953            .or_insert_with(|| VmValue::String(Rc::from(function.to_string())));
954    }
955    VmError::Thrown(VmValue::Dict(Rc::new(next)))
956}
957
958/// Drain the completed-step log. Used by receipt builders that want a
959/// per-step model + token + cost breakdown for the just-finished run.
960pub fn drain_completed_steps() -> Vec<CompletedStep> {
961    COMPLETED_STEPS.with(|completed| std::mem::take(&mut *completed.borrow_mut()))
962}
963
964/// Read the completed-step log without clearing it. Use when callers
965/// want a peek without disturbing the global record stream.
966pub fn peek_completed_steps() -> Vec<CompletedStep> {
967    COMPLETED_STEPS.with(|completed| completed.borrow().clone())
968}
969
970/// Lower a [`CompletedStep`] into JSON for embedding in receipts /
971/// inspect output.
972pub fn completed_step_to_json(step: &CompletedStep) -> JsonValue {
973    serde_json::to_value(step).unwrap_or(JsonValue::Null)
974}
975
976/// Register the `__register_step` and `__register_persona` host builtins.
977/// Compiler-emitted bytecode after every `@step` / persona declaration
978/// calls these with `(function_name, metadata_dict)` so the runtime can
979/// later dispatch on the step's metadata when its function is invoked.
980pub fn register_step_builtins(vm: &mut crate::vm::Vm) {
981    for def in MODULE_BUILTINS {
982        vm.register_builtin_def(def);
983    }
984}
985
986pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] =
987    &[&__REGISTER_STEP_DEF, &__REGISTER_PERSONA_DEF];
988
989#[harn_builtin(category = "step_runtime", runtime_only = true)]
990fn __register_step(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
991    register_step_from_dict(args.to_vec())
992}
993
994#[harn_builtin(category = "step_runtime", runtime_only = true)]
995fn __register_persona(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
996    register_persona_from_dict(args.to_vec())
997}
998
999#[cfg(test)]
1000mod tests {
1001    use super::*;
1002
1003    fn fresh_state() {
1004        reset_thread_local_state();
1005    }
1006
1007    #[test]
1008    fn registers_and_pops_step_from_dict() {
1009        fresh_state();
1010        let mut budget: BTreeMap<String, VmValue> = BTreeMap::new();
1011        budget.insert("max_tokens".to_string(), VmValue::Int(100));
1012        budget.insert("max_usd".to_string(), VmValue::Float(0.05));
1013        let mut meta: BTreeMap<String, VmValue> = BTreeMap::new();
1014        meta.insert("name".to_string(), VmValue::String(Rc::from("plan")));
1015        meta.insert(
1016            "model".to_string(),
1017            VmValue::String(Rc::from("claude-haiku-4-5")),
1018        );
1019        meta.insert(
1020            "error_boundary".to_string(),
1021            VmValue::String(Rc::from("continue")),
1022        );
1023        meta.insert("budget".to_string(), VmValue::Dict(Rc::new(budget)));
1024
1025        register_step_from_dict(vec![
1026            VmValue::String(Rc::from("plan_step")),
1027            VmValue::Dict(Rc::new(meta)),
1028        ])
1029        .expect("registration succeeds");
1030
1031        assert!(maybe_push_active_step("plan_step", 3, &[]));
1032        assert_eq!(active_step_frame_depth(), Some(3));
1033        assert_eq!(
1034            active_step_model_default().as_deref(),
1035            Some("claude-haiku-4-5")
1036        );
1037
1038        record_step_llm_usage("claude-haiku-4-5", 10, 20, 0.001).expect("under budget");
1039        with_active_step(|step| {
1040            assert_eq!(step.input_tokens, 10);
1041            assert_eq!(step.output_tokens, 20);
1042            assert!((step.cost_usd - 0.001).abs() < 1e-9);
1043        });
1044
1045        let err =
1046            record_step_llm_usage("claude-haiku-4-5", 50, 50, 0.0).expect_err("should exhaust");
1047        assert!(is_step_budget_exhausted(&err));
1048
1049        prune_below_frame(2);
1050        let completed = drain_completed_steps();
1051        assert_eq!(completed.len(), 1);
1052        assert_eq!(completed[0].llm_calls, 2);
1053    }
1054
1055    #[test]
1056    fn unregistered_function_does_not_push() {
1057        fresh_state();
1058        assert!(!maybe_push_active_step("not_a_step", 1, &[]));
1059        assert!(active_step_frame_depth().is_none());
1060    }
1061
1062    #[test]
1063    fn tracked_registry_empty_fast_path_tracks_registrations_and_reset() {
1064        fresh_state();
1065        assert!(tracked_registries_empty());
1066        assert!(!is_tracked_function("plan_step"));
1067
1068        register_step(
1069            "plan_step",
1070            StepDefinition {
1071                name: "plan".to_string(),
1072                function: "plan_step".to_string(),
1073                ..StepDefinition::default()
1074            },
1075        );
1076        assert!(!tracked_registries_empty());
1077        assert!(is_tracked_function("plan_step"));
1078        assert!(step_definition_for_function("plan_step").is_some());
1079
1080        register_step(
1081            "plan_step",
1082            StepDefinition {
1083                name: "plan_v2".to_string(),
1084                function: "plan_step".to_string(),
1085                ..StepDefinition::default()
1086            },
1087        );
1088        assert!(is_tracked_function("plan_step"));
1089
1090        fresh_state();
1091        assert!(tracked_registries_empty());
1092        assert!(!is_tracked_function("plan_step"));
1093    }
1094
1095    #[test]
1096    fn stage_policy_narrows_but_does_not_widen_parent_policy() {
1097        fresh_state();
1098        let mut meta: BTreeMap<String, VmValue> = BTreeMap::new();
1099        meta.insert("name".to_string(), VmValue::String(Rc::from("research")));
1100        register_step_from_dict(vec![
1101            VmValue::String(Rc::from("research_step")),
1102            VmValue::Dict(Rc::new(meta)),
1103        ])
1104        .expect("step registration");
1105
1106        let mut stage_dict: BTreeMap<String, VmValue> = BTreeMap::new();
1107        stage_dict.insert("name".to_string(), VmValue::String(Rc::from("research")));
1108        // Stage tries to add `edit` on top of a parent that only allowed `read`.
1109        stage_dict.insert(
1110            "allowed_tools".to_string(),
1111            VmValue::List(Rc::new(vec![
1112                VmValue::String(Rc::from("read")),
1113                VmValue::String(Rc::from("edit")),
1114            ])),
1115        );
1116        let mut persona_meta: BTreeMap<String, VmValue> = BTreeMap::new();
1117        persona_meta.insert("name".to_string(), VmValue::String(Rc::from("scoped")));
1118        persona_meta.insert(
1119            "stages".to_string(),
1120            VmValue::List(Rc::new(vec![VmValue::Dict(Rc::new(stage_dict))])),
1121        );
1122        register_persona_from_dict(vec![
1123            VmValue::String(Rc::from("scoped_persona")),
1124            VmValue::Dict(Rc::new(persona_meta)),
1125        ])
1126        .expect("persona registration");
1127
1128        push_execution_policy(CapabilityPolicy {
1129            tools: vec!["read".to_string()],
1130            ..CapabilityPolicy::default()
1131        });
1132        assert!(maybe_push_active_persona("scoped_persona", 1));
1133        assert!(maybe_push_active_step("research_step", 2, &[]));
1134        let policy = current_execution_policy().expect("stage policy active");
1135        // `edit` is filtered out because the parent already denied it.
1136        assert_eq!(policy.tools, vec!["read".to_string()]);
1137
1138        prune_below_frame(0);
1139        pop_execution_policy();
1140        assert!(current_execution_policy().is_none());
1141    }
1142
1143    #[test]
1144    fn stage_policy_is_pushed_and_popped_around_step() {
1145        fresh_state();
1146        let mut meta: BTreeMap<String, VmValue> = BTreeMap::new();
1147        meta.insert("name".to_string(), VmValue::String(Rc::from("research")));
1148        register_step_from_dict(vec![
1149            VmValue::String(Rc::from("research_step")),
1150            VmValue::Dict(Rc::new(meta)),
1151        ])
1152        .expect("step registration succeeds");
1153
1154        let mut stage_dict: BTreeMap<String, VmValue> = BTreeMap::new();
1155        stage_dict.insert("name".to_string(), VmValue::String(Rc::from("research")));
1156        stage_dict.insert(
1157            "allowed_tools".to_string(),
1158            VmValue::List(Rc::new(vec![VmValue::String(Rc::from("read"))])),
1159        );
1160        let mut persona_meta: BTreeMap<String, VmValue> = BTreeMap::new();
1161        persona_meta.insert("name".to_string(), VmValue::String(Rc::from("scoped")));
1162        persona_meta.insert(
1163            "stages".to_string(),
1164            VmValue::List(Rc::new(vec![VmValue::Dict(Rc::new(stage_dict))])),
1165        );
1166        register_persona_from_dict(vec![
1167            VmValue::String(Rc::from("scoped_persona")),
1168            VmValue::Dict(Rc::new(persona_meta)),
1169        ])
1170        .expect("persona registration succeeds");
1171
1172        assert!(maybe_push_active_persona("scoped_persona", 1));
1173        assert!(crate::orchestration::current_execution_policy().is_none());
1174        assert!(maybe_push_active_step("research_step", 2, &[]));
1175        let policy = crate::orchestration::current_execution_policy()
1176            .expect("stage policy is active inside step");
1177        assert_eq!(policy.tools, vec!["read".to_string()]);
1178
1179        prune_below_frame(0);
1180        assert!(crate::orchestration::current_execution_policy().is_none());
1181    }
1182}