Skip to main content

harn_vm/
step_runtime.rs

1//! Per-step runtime state for `@step`-annotated persona functions.
2//!
3//! The compiler emits a call to the `__register_step` builtin after each
4//! `@step` declaration so the runtime can dispatch on the step's metadata
5//! when its function is invoked. While a step's frame is on the call
6//! stack, an [`ActiveStep`] entry tracks per-step LLM usage, defaults
7//! `llm_call`'s model when the call site doesn't override it, and bounds
8//! cumulative token and cost spend against the step's budget.
9//!
10//! This module owns three thread-locals (a per-program registry, a stack
11//! of currently-active steps, and a log of completed step summaries) but
12//! exposes only narrow helpers — `current_active_step_*` /
13//! `record_step_llm_usage` / etc. — so the call sites in
14//! `crates/harn-vm/src/llm/`, `crates/harn-vm/src/vm/`, and the compiler
15//! stay focused.
16
17use crate::value::VmDictExt;
18use std::cell::{Cell, RefCell};
19use std::collections::BTreeMap;
20use std::sync::Arc;
21
22use serde::Serialize;
23use serde_json::Value as JsonValue;
24
25use crate::orchestration::{
26    current_execution_policy, pop_execution_policy, push_execution_policy, CapabilityPolicy,
27    HookEvent,
28};
29use crate::personas::StageDecl;
30use crate::stdlib::macros::{harn_builtin, VmBuiltinDef};
31use crate::value::{VmClosure, VmError, VmValue};
32
33fn vm_str(value: &VmValue) -> Option<&str> {
34    match value {
35        VmValue::String(s) => Some(s.as_ref()),
36        _ => None,
37    }
38}
39
40/// Static metadata captured from a `@step(...)` attribute.
41///
42/// Populated by the `__register_step` builtin (see [`register_step_from_dict`])
43/// when the program first runs, then consulted by `llm_call` and the
44/// frame-pop hooks while the step is active.
45#[derive(Debug, Default, Clone)]
46pub struct StepDefinition {
47    pub name: String,
48    pub function: String,
49    pub model: Option<String>,
50    pub max_tokens: Option<u64>,
51    pub max_usd: Option<f64>,
52    /// One of "fail" (default), "continue", "escalate". Drives how a
53    /// `budget_exceeded` error propagating out of the step is handled —
54    /// see `crates/harn-vm/src/vm/execution.rs`.
55    pub error_boundary: Option<String>,
56}
57
58#[derive(Debug, Default, Clone)]
59pub struct PersonaDefinition {
60    pub name: String,
61    /// Per-stage tool/side-effect scoping. Keyed lookups by stage name happen
62    /// every step entry; the list is small (a handful of stages per persona)
63    /// so a `Vec` keeps insertion order and matches the manifest's authored
64    /// ordering.
65    pub stages: Vec<StageDecl>,
66}
67
68impl StepDefinition {
69    pub fn boundary(&self) -> StepErrorBoundary {
70        match self.error_boundary.as_deref() {
71            Some("continue") => StepErrorBoundary::Continue,
72            Some("escalate") => StepErrorBoundary::Escalate,
73            _ => StepErrorBoundary::Fail,
74        }
75    }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum StepErrorBoundary {
80    Fail,
81    Continue,
82    Escalate,
83}
84
85/// Tracks one in-flight step. The `frame_depth` is `Vm::frames.len()`
86/// captured immediately after `push_closure_frame` returns, so an
87/// `ActiveStep` is "alive" while `Vm::frames.len() >= frame_depth`.
88#[derive(Debug, Clone)]
89pub struct ActiveStep {
90    pub frame_depth: usize,
91    pub definition: Arc<StepDefinition>,
92    pub persona: Option<String>,
93    pub args: Vec<VmValue>,
94    pub input_tokens: u64,
95    pub output_tokens: u64,
96    pub cost_usd: f64,
97    pub llm_calls: u32,
98    pub last_model: Option<String>,
99    /// Tracing span id opened when the step's frame was pushed; ended on
100    /// completion. 0 when tracing was disabled at push time, in which
101    /// case `span_end` is a no-op anyway.
102    pub span_id: u64,
103    /// True when this step pushed a per-stage `CapabilityPolicy` onto the
104    /// execution policy stack. The runtime pops it when the step's frame
105    /// unwinds, mirroring the RAII guard pattern in
106    /// `crates/harn-serve/src/adapters/acp/modes.rs`.
107    pub stage_policy_pushed: bool,
108}
109
110impl ActiveStep {
111    fn new(
112        frame_depth: usize,
113        definition: Arc<StepDefinition>,
114        persona: Option<String>,
115        args: Vec<VmValue>,
116        span_id: u64,
117        stage_policy_pushed: bool,
118    ) -> Self {
119        Self {
120            frame_depth,
121            definition,
122            persona,
123            args,
124            input_tokens: 0,
125            output_tokens: 0,
126            cost_usd: 0.0,
127            llm_calls: 0,
128            last_model: None,
129            span_id,
130            stage_policy_pushed,
131        }
132    }
133
134    fn total_tokens(&self) -> u64 {
135        self.input_tokens.saturating_add(self.output_tokens)
136    }
137}
138
139#[derive(Debug, Clone)]
140pub struct ActivePersona {
141    pub frame_depth: usize,
142    pub definition: Arc<PersonaDefinition>,
143}
144
145/// Snapshot persisted into [`COMPLETED_STEPS`] when the step's frame
146/// unwinds. Receipts and `harn persona inspect`-style downstream consumers
147/// read it back via [`drain_completed_steps`].
148#[derive(Debug, Clone, Serialize)]
149pub struct CompletedStep {
150    pub name: String,
151    pub function: String,
152    pub model: Option<String>,
153    pub input_tokens: u64,
154    pub output_tokens: u64,
155    pub cost_usd: f64,
156    pub llm_calls: u32,
157    pub status: String,
158    pub error: Option<String>,
159}
160
161thread_local! {
162    static STEP_REGISTRY: RefCell<BTreeMap<String, Arc<StepDefinition>>> =
163        const { RefCell::new(std::collections::BTreeMap::new()) };
164    static PERSONA_REGISTRY: RefCell<BTreeMap<String, Arc<PersonaDefinition>>> =
165        const { RefCell::new(std::collections::BTreeMap::new()) };
166    static STEP_REGISTRY_LEN: Cell<usize> = const { Cell::new(0) };
167    static PERSONA_REGISTRY_LEN: Cell<usize> = const { Cell::new(0) };
168    static PERSONA_STACK: RefCell<Vec<ActivePersona>> = const { RefCell::new(Vec::new()) };
169    static STEP_STACK: RefCell<Vec<ActiveStep>> = const { RefCell::new(Vec::new()) };
170    static COMPLETED_STEPS: RefCell<Vec<CompletedStep>> = const { RefCell::new(Vec::new()) };
171    static PERSONA_HOOKS: RefCell<Vec<PersonaHookRegistration>> = const { RefCell::new(Vec::new()) };
172}
173
174/// Reset every thread-local owned by this module. Called between test
175/// runs and at the start of each top-level program execution so leftover
176/// registrations don't leak across runs.
177pub fn reset_thread_local_state() {
178    STEP_REGISTRY.with(|r| r.borrow_mut().clear());
179    PERSONA_REGISTRY.with(|r| r.borrow_mut().clear());
180    STEP_REGISTRY_LEN.with(|len| len.set(0));
181    PERSONA_REGISTRY_LEN.with(|len| len.set(0));
182    PERSONA_STACK.with(|s| s.borrow_mut().clear());
183    STEP_STACK.with(|s| s.borrow_mut().clear());
184    COMPLETED_STEPS.with(|c| c.borrow_mut().clear());
185    PERSONA_HOOKS.with(|h| h.borrow_mut().clear());
186}
187
188#[inline]
189fn step_registry_empty() -> bool {
190    STEP_REGISTRY_LEN.with(|len| len.get() == 0)
191}
192
193#[inline]
194fn persona_registry_empty() -> bool {
195    PERSONA_REGISTRY_LEN.with(|len| len.get() == 0)
196}
197
198#[inline]
199fn tracked_registries_empty() -> bool {
200    step_registry_empty() && persona_registry_empty()
201}
202
203/// Bind a `@step` function name to its declared metadata. Idempotent: a
204/// second call replaces the prior definition (matches re-evaluation
205/// semantics of `harn run` and the conformance harness).
206pub fn register_step(function: &str, definition: StepDefinition) {
207    let inserted = STEP_REGISTRY.with(|registry| {
208        registry
209            .borrow_mut()
210            .insert(function.to_string(), Arc::new(definition))
211            .is_none()
212    });
213    if inserted {
214        STEP_REGISTRY_LEN.with(|len| len.set(len.get() + 1));
215    }
216}
217
218pub fn register_persona(function: &str, definition: PersonaDefinition) {
219    let inserted = PERSONA_REGISTRY.with(|registry| {
220        registry
221            .borrow_mut()
222            .insert(function.to_string(), Arc::new(definition))
223            .is_none()
224    });
225    if inserted {
226        PERSONA_REGISTRY_LEN.with(|len| len.set(len.get() + 1));
227    }
228}
229
230pub fn register_persona_from_dict(args: Vec<VmValue>) -> Result<VmValue, VmError> {
231    let function = args
232        .first()
233        .and_then(vm_str)
234        .map(|s| s.to_string())
235        .ok_or_else(|| {
236            VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
237                "__register_persona: expected (function_name, metadata_dict)",
238            )))
239        })?;
240    let meta = args
241        .get(1)
242        .and_then(VmValue::as_dict)
243        .cloned()
244        .ok_or_else(|| {
245            VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
246                "__register_persona: metadata argument must be a dict",
247            )))
248        })?;
249    let definition = PersonaDefinition {
250        name: meta
251            .get("name")
252            .and_then(vm_str)
253            .map(str::to_string)
254            .unwrap_or_else(|| function.clone()),
255        stages: parse_stage_decls(meta.get("stages"))?,
256    };
257    register_persona(&function, definition);
258    Ok(VmValue::Nil)
259}
260
261fn parse_stage_decls(value: Option<&VmValue>) -> Result<Vec<StageDecl>, VmError> {
262    let Some(value) = value else {
263        return Ok(Vec::new());
264    };
265    let entries = match value {
266        VmValue::Nil => return Ok(Vec::new()),
267        VmValue::List(list) => list.as_ref(),
268        _ => {
269            return Err(VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
270                "__register_persona: stages argument must be a list of dicts",
271            ))));
272        }
273    };
274    let mut out = Vec::with_capacity(entries.len());
275    for entry in entries {
276        let dict = entry.as_dict().ok_or_else(|| {
277            VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
278                "__register_persona: each stage entry must be a dict",
279            )))
280        })?;
281        let Some(name) = dict.get("name").and_then(vm_str) else {
282            return Err(VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
283                "__register_persona: stage dict missing required 'name'",
284            ))));
285        };
286        let allowed_tools = match dict.get("allowed_tools") {
287            None | Some(VmValue::Nil) => None,
288            Some(VmValue::List(items)) => Some(
289                items
290                    .iter()
291                    .map(|item| {
292                        vm_str(item).map(str::to_string).ok_or_else(|| {
293                            VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
294                                "__register_persona: stage allowed_tools entries must be strings",
295                            )))
296                        })
297                    })
298                    .collect::<Result<Vec<_>, _>>()?,
299            ),
300            _ => {
301                return Err(VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
302                    "__register_persona: stage allowed_tools must be a list of strings",
303                ))));
304            }
305        };
306        let side_effect_level = dict
307            .get("side_effect_level")
308            .and_then(vm_str)
309            .map(str::to_string)
310            .filter(|s| !s.is_empty());
311        let max_iterations = match dict.get("max_iterations") {
312            Some(VmValue::Int(n)) if *n >= 0 => Some(*n as u32),
313            Some(VmValue::Float(f)) if f.is_finite() && *f >= 0.0 => Some(*f as u32),
314            _ => None,
315        };
316        out.push(StageDecl {
317            name: name.to_string(),
318            allowed_tools,
319            side_effect_level,
320            max_iterations,
321            on_exit: None,
322        });
323    }
324    Ok(out)
325}
326
327/// Builtin entry point invoked by compiler-emitted bytecode after every
328/// `@step` function declaration. Accepts a dict mirroring
329/// `harn_modules::PersonaStepMetadata`.
330pub fn register_step_from_dict(args: Vec<VmValue>) -> Result<VmValue, VmError> {
331    let function = args
332        .first()
333        .and_then(vm_str)
334        .map(|s| s.to_string())
335        .ok_or_else(|| {
336            VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
337                "__register_step: expected (function_name, metadata_dict)",
338            )))
339        })?;
340    let meta = args
341        .get(1)
342        .and_then(VmValue::as_dict)
343        .cloned()
344        .ok_or_else(|| {
345            VmError::Thrown(VmValue::String(arcstr::ArcStr::from(
346                "__register_step: metadata argument must be a dict",
347            )))
348        })?;
349
350    let mut definition = StepDefinition {
351        function: function.clone(),
352        ..StepDefinition::default()
353    };
354    definition.name = meta
355        .get("name")
356        .and_then(vm_str)
357        .map(|s| s.to_string())
358        .unwrap_or_else(|| function.clone());
359    definition.model = meta
360        .get("model")
361        .and_then(vm_str)
362        .map(|s| s.to_string())
363        .filter(|s| !s.is_empty());
364    definition.error_boundary = meta
365        .get("error_boundary")
366        .and_then(vm_str)
367        .map(|s| s.to_string());
368
369    if let Some(VmValue::Dict(budget)) = meta.get("budget") {
370        if let Some(value) = budget.get("max_tokens") {
371            definition.max_tokens = match value {
372                VmValue::Int(n) if *n > 0 => Some(*n as u64),
373                VmValue::Float(f) if f.is_finite() && *f > 0.0 => Some(*f as u64),
374                _ => None,
375            };
376        }
377        if let Some(value) = budget.get("max_usd") {
378            definition.max_usd = match value {
379                VmValue::Float(f) if f.is_finite() && *f >= 0.0 => Some(*f),
380                VmValue::Int(n) if *n >= 0 => Some(*n as f64),
381                _ => None,
382            };
383        }
384    }
385
386    register_step(&function, definition);
387    Ok(VmValue::Nil)
388}
389
390#[derive(Clone)]
391pub struct PersonaHookRegistration {
392    pub persona_pattern: String,
393    pub step_name: Option<String>,
394    pub event: HookEvent,
395    pub threshold_pct: Option<f64>,
396    pub handler: Arc<VmClosure>,
397}
398
399impl std::fmt::Debug for PersonaHookRegistration {
400    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
401        f.debug_struct("PersonaHookRegistration")
402            .field("persona_pattern", &self.persona_pattern)
403            .field("step_name", &self.step_name)
404            .field("event", &self.event)
405            .field("threshold_pct", &self.threshold_pct)
406            .field("handler", &"..")
407            .finish()
408    }
409}
410
411#[derive(Debug, Clone)]
412pub struct PersonaHookInvocation {
413    pub handler: Arc<VmClosure>,
414    pub event: HookEvent,
415}
416
417pub fn register_persona_hook(
418    persona_pattern: impl Into<String>,
419    event: HookEvent,
420    threshold_pct: Option<f64>,
421    handler: Arc<VmClosure>,
422) {
423    PERSONA_HOOKS.with(|hooks| {
424        hooks.borrow_mut().push(PersonaHookRegistration {
425            persona_pattern: persona_pattern.into(),
426            step_name: None,
427            event,
428            threshold_pct,
429            handler,
430        });
431    });
432}
433
434pub fn register_step_hook(
435    persona_pattern: impl Into<String>,
436    step_name: impl Into<String>,
437    event: HookEvent,
438    threshold_pct: Option<f64>,
439    handler: Arc<VmClosure>,
440) {
441    PERSONA_HOOKS.with(|hooks| {
442        hooks.borrow_mut().push(PersonaHookRegistration {
443            persona_pattern: persona_pattern.into(),
444            step_name: Some(step_name.into()),
445            event,
446            threshold_pct,
447            handler,
448        });
449    });
450}
451
452pub fn clear_persona_hooks() {
453    PERSONA_HOOKS.with(|hooks| hooks.borrow_mut().clear());
454}
455
456pub struct ActiveContextSnapshot {
457    steps: Vec<ActiveStep>,
458    personas: Vec<ActivePersona>,
459}
460
461pub fn take_active_context() -> ActiveContextSnapshot {
462    ActiveContextSnapshot {
463        steps: STEP_STACK.with(|stack| std::mem::take(&mut *stack.borrow_mut())),
464        personas: PERSONA_STACK.with(|stack| std::mem::take(&mut *stack.borrow_mut())),
465    }
466}
467
468pub fn restore_active_context(snapshot: ActiveContextSnapshot) {
469    STEP_STACK.with(|stack| *stack.borrow_mut() = snapshot.steps);
470    PERSONA_STACK.with(|stack| *stack.borrow_mut() = snapshot.personas);
471}
472
473pub fn is_tracked_function(function_name: &str) -> bool {
474    if tracked_registries_empty() {
475        return false;
476    }
477    (!step_registry_empty()
478        && STEP_REGISTRY.with(|registry| registry.borrow().contains_key(function_name)))
479        || (!persona_registry_empty()
480            && PERSONA_REGISTRY.with(|registry| registry.borrow().contains_key(function_name)))
481}
482
483pub fn step_definition_for_function(function_name: &str) -> Option<Arc<StepDefinition>> {
484    if step_registry_empty() {
485        return None;
486    }
487    STEP_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned())
488}
489
490pub fn current_persona_name() -> Option<String> {
491    PERSONA_STACK.with(|stack| stack.borrow().last().map(|p| p.definition.name.clone()))
492}
493
494/// Resolve the per-stage policy for `step_name` against the currently
495/// active persona's stage declarations. Returns `None` when no persona is
496/// active or no stage matches the step name. Caller pushes the result onto
497/// `EXECUTION_POLICY_STACK`.
498///
499/// When an ambient policy is already active, the stage policy is
500/// intersected with it so a stage can only ever tighten the tool surface
501/// and side-effect ceiling — never widen them.
502fn stage_policy_for_active_step(step_name: &str) -> Option<CapabilityPolicy> {
503    let stage_policy = PERSONA_STACK.with(|stack| {
504        let stack = stack.borrow();
505        let persona = stack.last()?;
506        let stage = persona
507            .definition
508            .stages
509            .iter()
510            .find(|stage| stage.name == step_name)?;
511        Some(stage_decl_to_policy(stage))
512    })?;
513    let Some(parent) = current_execution_policy() else {
514        return Some(stage_policy);
515    };
516    // `intersect` is conservative: failure means the stage referenced a tool
517    // the ambient policy already denied. Fall back to a narrowed copy that
518    // drops those entries so the stage can never widen the ceiling.
519    Some(parent.intersect(&stage_policy).unwrap_or_else(|_| {
520        let intersected_tools: Vec<String> = stage_policy
521            .tools
522            .iter()
523            .filter(|tool| parent.tools.is_empty() || parent.tools.contains(*tool))
524            .cloned()
525            .collect();
526        CapabilityPolicy {
527            tools: intersected_tools,
528            ..stage_policy
529        }
530    }))
531}
532
533fn stage_decl_to_policy(stage: &StageDecl) -> CapabilityPolicy {
534    CapabilityPolicy {
535        tools: stage.allowed_tools.clone().unwrap_or_default(),
536        side_effect_level: stage.side_effect_level.clone(),
537        ..CapabilityPolicy::default()
538    }
539}
540
541fn persona_matches(pattern: &str, persona: &str) -> bool {
542    crate::orchestration::glob_match(pattern, persona)
543}
544
545pub fn matching_hooks(
546    event: HookEvent,
547    persona: Option<&str>,
548    step_name: Option<&str>,
549    budget_pct: Option<f64>,
550) -> Vec<PersonaHookInvocation> {
551    let persona = persona.unwrap_or("");
552    PERSONA_HOOKS.with(|hooks| {
553        hooks
554            .borrow()
555            .iter()
556            .filter(|hook| hook.event == event)
557            .filter(|hook| persona_matches(&hook.persona_pattern, persona))
558            .filter(|hook| match (&hook.step_name, step_name) {
559                (Some(expected), Some(actual)) => expected == actual,
560                (Some(_), None) => false,
561                (None, _) => true,
562            })
563            .filter(|hook| match (hook.threshold_pct, budget_pct) {
564                (Some(threshold), Some(pct)) => pct >= threshold,
565                (Some(_), None) => false,
566                (None, _) => true,
567            })
568            .map(|hook| PersonaHookInvocation {
569                handler: hook.handler.clone(),
570                event: hook.event,
571            })
572            .collect()
573    })
574}
575
576pub fn maybe_push_active_persona(function_name: &str, frame_depth: usize) -> bool {
577    if persona_registry_empty() {
578        return false;
579    }
580    let definition =
581        PERSONA_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned());
582    let Some(definition) = definition else {
583        return false;
584    };
585    PERSONA_STACK.with(|stack| {
586        stack.borrow_mut().push(ActivePersona {
587            frame_depth,
588            definition,
589        });
590    });
591    true
592}
593
594/// Push an active step onto the stack iff `function_name` has metadata
595/// registered. Returns `true` when a frame was pushed so the call site
596/// can record that fact. Called from `Vm::push_closure_frame` after the
597/// new frame has been added.
598pub fn maybe_push_active_step(function_name: &str, frame_depth: usize, args: &[VmValue]) -> bool {
599    if step_registry_empty() {
600        return false;
601    }
602    let definition = STEP_REGISTRY.with(|registry| registry.borrow().get(function_name).cloned());
603    let Some(definition) = definition else {
604        return false;
605    };
606    let persona = current_persona_name();
607    let span_id =
608        crate::tracing::span_start(crate::tracing::SpanKind::Step, definition.name.clone());
609    if let Some(persona_name) = persona.as_deref() {
610        crate::tracing::span_set_metadata(
611            span_id,
612            "persona",
613            serde_json::Value::String(persona_name.to_string()),
614        );
615    }
616    if let Some(model) = definition.model.as_deref() {
617        crate::tracing::span_set_metadata(
618            span_id,
619            "model",
620            serde_json::Value::String(model.to_string()),
621        );
622    }
623    let step_name = definition.name.clone();
624    STEP_STACK.with(|stack| {
625        stack.borrow_mut().push(ActiveStep::new(
626            frame_depth,
627            definition,
628            persona,
629            args.to_vec(),
630            span_id,
631            false,
632        ));
633    });
634    if let Some(policy) = stage_policy_for_active_step(&step_name) {
635        push_execution_policy(policy);
636        STEP_STACK.with(|stack| {
637            if let Some(top) = stack.borrow_mut().last_mut() {
638                top.stage_policy_pushed = true;
639            }
640        });
641    }
642    true
643}
644
645/// Drop any step entries whose owning frame has already been unwound,
646/// recording a `CompletedStep` summary for each. The `current_frame_depth`
647/// is `Vm::frames.len()` at the call site — entries with
648/// `frame_depth > current_frame_depth` are stale.
649pub fn prune_below_frame(current_frame_depth: usize) {
650    let mut popped: Vec<ActiveStep> = Vec::new();
651    STEP_STACK.with(|stack| {
652        let mut stack = stack.borrow_mut();
653        while let Some(top) = stack.last() {
654            if top.frame_depth > current_frame_depth {
655                popped.push(stack.pop().unwrap());
656            } else {
657                break;
658            }
659        }
660    });
661    for step in popped {
662        finish_step(step, "completed", None);
663    }
664    PERSONA_STACK.with(|stack| {
665        let mut stack = stack.borrow_mut();
666        while stack
667            .last()
668            .is_some_and(|persona| persona.frame_depth > current_frame_depth)
669        {
670            stack.pop();
671        }
672    });
673}
674
675pub fn take_active_step(current_frame_depth: usize) -> Option<ActiveStep> {
676    STEP_STACK.with(|stack| {
677        let mut stack = stack.borrow_mut();
678        if stack
679            .last()
680            .is_some_and(|step| step.frame_depth == current_frame_depth)
681        {
682            stack.pop()
683        } else {
684            None
685        }
686    })
687}
688
689pub fn finish_active_step(step: ActiveStep, status: &str, error: Option<String>) {
690    finish_step(step, status, error);
691}
692
693/// Pop the topmost active step (if its frame is the current one) and
694/// record an explicit completion status. Used when an error boundary
695/// rewrites or absorbs an in-flight error so the receipt log reflects the
696/// outcome the persona actually saw.
697pub fn pop_and_record(current_frame_depth: usize, status: &str, error: Option<String>) -> bool {
698    let popped = STEP_STACK.with(|stack| {
699        let mut stack = stack.borrow_mut();
700        if stack
701            .last()
702            .map(|step| step.frame_depth == current_frame_depth)
703            .unwrap_or(false)
704        {
705            stack.pop()
706        } else {
707            None
708        }
709    });
710    let Some(step) = popped else {
711        return false;
712    };
713    finish_step(step, status, error);
714    true
715}
716
717fn finish_step(step: ActiveStep, status: &str, error: Option<String>) {
718    if step.stage_policy_pushed {
719        pop_execution_policy();
720    }
721    crate::tracing::span_set_metadata(
722        step.span_id,
723        "status",
724        serde_json::Value::String(status.to_string()),
725    );
726    crate::tracing::span_set_metadata(
727        step.span_id,
728        "llm_calls",
729        serde_json::Value::Number(step.llm_calls.into()),
730    );
731    crate::tracing::span_set_metadata(
732        step.span_id,
733        "input_tokens",
734        serde_json::Value::Number(step.input_tokens.into()),
735    );
736    crate::tracing::span_set_metadata(
737        step.span_id,
738        "output_tokens",
739        serde_json::Value::Number(step.output_tokens.into()),
740    );
741    if let Some(cost_n) = serde_json::Number::from_f64(step.cost_usd) {
742        crate::tracing::span_set_metadata(
743            step.span_id,
744            "cost_usd",
745            serde_json::Value::Number(cost_n),
746        );
747    }
748    crate::tracing::span_end(step.span_id);
749    let summary = CompletedStep {
750        name: step.definition.name.clone(),
751        function: step.definition.function.clone(),
752        model: step
753            .last_model
754            .clone()
755            .or_else(|| step.definition.model.clone()),
756        input_tokens: step.input_tokens,
757        output_tokens: step.output_tokens,
758        cost_usd: step.cost_usd,
759        llm_calls: step.llm_calls,
760        status: status.to_string(),
761        error,
762    };
763    COMPLETED_STEPS.with(|completed| completed.borrow_mut().push(summary));
764}
765
766/// Get a snapshot of the topmost active step, if any. Used by the
767/// llm_call path to fill in defaults — never for mutation.
768pub fn with_active_step<R>(f: impl FnOnce(&ActiveStep) -> R) -> Option<R> {
769    STEP_STACK.with(|stack| stack.borrow().last().map(f))
770}
771
772/// Mutate the topmost active step (typically to attribute LLM usage).
773pub fn with_active_step_mut<R>(f: impl FnOnce(&mut ActiveStep) -> R) -> Option<R> {
774    STEP_STACK.with(|stack| stack.borrow_mut().last_mut().map(f))
775}
776
777/// Frame depth of the topmost active step, or `None` when no step is
778/// active. Used by `handle_error` to detect "this throw is exiting a
779/// step's frame".
780pub fn active_step_frame_depth() -> Option<usize> {
781    STEP_STACK.with(|stack| stack.borrow().last().map(|s| s.frame_depth))
782}
783
784/// Default model the topmost active step should impose on `llm_call`
785/// invocations whose options dict didn't pin a model.
786pub fn active_step_model_default() -> Option<String> {
787    STEP_STACK.with(|stack| {
788        stack
789            .borrow()
790            .last()
791            .and_then(|step| step.definition.model.clone())
792    })
793}
794
795/// Record that `llm_call` consumed `input_tokens` / `output_tokens` for
796/// `cost_usd`. Updates the active step's running totals and returns a
797/// budget-exhaustion error if the step's ceiling is now breached.
798///
799/// The check is performed AFTER the call so the test fixture's first
800/// call (which fits under budget) succeeds and subsequent calls trip the
801/// limit. This matches the existing `accumulate_cost_for_provider`
802/// pattern where global budget is also checked post-hoc.
803pub fn record_step_llm_usage(
804    model: &str,
805    input_tokens: i64,
806    output_tokens: i64,
807    cost_usd: f64,
808) -> Result<(), VmError> {
809    let exhausted = STEP_STACK.with(|stack| -> Option<VmError> {
810        let mut stack = stack.borrow_mut();
811        let step = stack.last_mut()?;
812        step.input_tokens = step.input_tokens.saturating_add(input_tokens.max(0) as u64);
813        step.output_tokens = step
814            .output_tokens
815            .saturating_add(output_tokens.max(0) as u64);
816        step.cost_usd += cost_usd;
817        step.llm_calls = step.llm_calls.saturating_add(1);
818        if !model.is_empty() {
819            step.last_model = Some(model.to_string());
820        }
821
822        if let Some(max_tokens) = step.definition.max_tokens {
823            if step.total_tokens() > max_tokens {
824                return Some(budget_exhausted_error(
825                    &step.definition,
826                    "max_tokens",
827                    max_tokens as f64,
828                    step.total_tokens() as f64,
829                    step.cost_usd,
830                ));
831            }
832        }
833        if let Some(max_usd) = step.definition.max_usd {
834            if step.cost_usd > max_usd {
835                return Some(budget_exhausted_error(
836                    &step.definition,
837                    "max_usd",
838                    max_usd,
839                    step.total_tokens() as f64,
840                    step.cost_usd,
841                ));
842            }
843        }
844        None
845    });
846    if let Some(err) = exhausted {
847        return Err(err);
848    }
849    Ok(())
850}
851
852fn budget_exhausted_error(
853    definition: &StepDefinition,
854    limit: &str,
855    limit_value: f64,
856    consumed_tokens: f64,
857    consumed_cost_usd: f64,
858) -> VmError {
859    let mut dict: crate::value::DictMap = crate::value::DictMap::new();
860    dict.put_str("category", "budget_exceeded");
861    dict.put_str("kind", "budget_exhausted");
862    dict.put_str("reason", "step_budget_exhausted");
863    dict.put_str("step", definition.name.clone());
864    dict.put_str("function", definition.function.clone());
865    dict.put_str("limit", limit);
866    dict.insert(
867        crate::value::intern_key("limit_value"),
868        VmValue::Float(limit_value),
869    );
870    dict.insert(
871        crate::value::intern_key("consumed_tokens"),
872        VmValue::Float(consumed_tokens),
873    );
874    dict.insert(
875        crate::value::intern_key("consumed_cost_usd"),
876        VmValue::Float(consumed_cost_usd),
877    );
878    dict.put_str(
879        "error_boundary",
880        definition
881            .error_boundary
882            .clone()
883            .unwrap_or_else(|| "fail".to_string()),
884    );
885    dict.put_str(
886        "message",
887        format!(
888            "step `{}` exceeded {} budget ({} > {})",
889            definition.name, limit, consumed_tokens as i64, limit_value as i64
890        ),
891    );
892    VmError::Thrown(VmValue::dict(dict))
893}
894
895/// Returns true if the thrown value looks like a budget-exhausted
896/// error — either our typed step-budget dict or the existing
897/// `crates/harn-vm/src/llm/cost.rs::budget_exceeded_error` shape.
898/// Either form is treated identically by `error_boundary` because the
899/// per-step budget machinery layers onto the existing envelope; a step
900/// whose budget the preflight projection rejects is still a budget
901/// exhaustion the step authored.
902pub fn is_step_budget_exhausted(err: &VmError) -> bool {
903    let VmError::Thrown(VmValue::Dict(dict)) = err else {
904        return false;
905    };
906    let category = dict.get("category").and_then(vm_str);
907    let kind = dict.get("kind").and_then(vm_str);
908    let reason = dict.get("reason").and_then(vm_str);
909    if matches!(kind, Some("budget_exhausted")) && matches!(reason, Some("step_budget_exhausted")) {
910        return true;
911    }
912    matches!(category, Some("budget_exceeded"))
913}
914
915/// Annotate an existing budget-exhausted error with `escalated: true`
916/// and the step's identity so the persona body / handoff receiver can
917/// route on it. Returns the original error if it isn't a thrown dict.
918/// Ensures `step` and `function` keys reflect the just-finished step
919/// even when the underlying error was raised by the preflight budget
920/// machinery (which doesn't know which step it's running under).
921pub fn mark_escalated(err: VmError, step_name: Option<&str>, function: Option<&str>) -> VmError {
922    let VmError::Thrown(VmValue::Dict(dict)) = err else {
923        return err;
924    };
925    let mut next = (*dict).clone();
926    next.insert(crate::value::intern_key("escalated"), VmValue::Bool(true));
927    next.put_str("category", "handoff_escalation");
928    if let Some(step) = step_name {
929        next.entry(crate::value::intern_key("step"))
930            .or_insert_with(|| VmValue::String(arcstr::ArcStr::from(step.to_string())));
931    }
932    if let Some(function) = function {
933        next.entry(crate::value::intern_key("function"))
934            .or_insert_with(|| VmValue::String(arcstr::ArcStr::from(function.to_string())));
935    }
936    VmError::Thrown(VmValue::dict(next))
937}
938
939/// Drain the completed-step log. Used by receipt builders that want a
940/// per-step model + token + cost breakdown for the just-finished run.
941pub fn drain_completed_steps() -> Vec<CompletedStep> {
942    COMPLETED_STEPS.with(|completed| std::mem::take(&mut *completed.borrow_mut()))
943}
944
945/// Read the completed-step log without clearing it. Use when callers
946/// want a peek without disturbing the global record stream.
947pub fn peek_completed_steps() -> Vec<CompletedStep> {
948    COMPLETED_STEPS.with(|completed| completed.borrow().clone())
949}
950
951/// Lower a [`CompletedStep`] into JSON for embedding in receipts /
952/// inspect output.
953pub fn completed_step_to_json(step: &CompletedStep) -> JsonValue {
954    serde_json::to_value(step).unwrap_or(JsonValue::Null)
955}
956
957/// Register the `__register_step` and `__register_persona` host builtins.
958/// Compiler-emitted bytecode after every `@step` / persona declaration
959/// calls these with `(function_name, metadata_dict)` so the runtime can
960/// later dispatch on the step's metadata when its function is invoked.
961pub fn register_step_builtins(vm: &mut crate::vm::Vm) {
962    for def in MODULE_BUILTINS {
963        vm.register_builtin_def(def);
964    }
965}
966
967pub(crate) const MODULE_BUILTINS: &[&VmBuiltinDef] =
968    &[&__REGISTER_STEP_DEF, &__REGISTER_PERSONA_DEF];
969
970#[harn_builtin(category = "step_runtime", runtime_only = true)]
971fn __register_step(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
972    register_step_from_dict(args.to_vec())
973}
974
975#[harn_builtin(category = "step_runtime", runtime_only = true)]
976fn __register_persona(args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
977    register_persona_from_dict(args.to_vec())
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use crate::value::VmDictExt;
984
985    fn fresh_state() {
986        reset_thread_local_state();
987    }
988
989    #[test]
990    fn registers_and_pops_step_from_dict() {
991        fresh_state();
992        let mut budget: crate::value::DictMap = crate::value::DictMap::new();
993        budget.insert(crate::value::intern_key("max_tokens"), VmValue::Int(100));
994        budget.insert(crate::value::intern_key("max_usd"), VmValue::Float(0.05));
995        let mut meta: crate::value::DictMap = crate::value::DictMap::new();
996        meta.put_str("name", "plan");
997        meta.put_str("model", "claude-haiku-4-5");
998        meta.put_str("error_boundary", "continue");
999        meta.insert(crate::value::intern_key("budget"), VmValue::dict(budget));
1000
1001        register_step_from_dict(vec![
1002            VmValue::String(arcstr::ArcStr::from("plan_step")),
1003            VmValue::dict(meta),
1004        ])
1005        .expect("registration succeeds");
1006
1007        assert!(maybe_push_active_step("plan_step", 3, &[]));
1008        assert_eq!(active_step_frame_depth(), Some(3));
1009        assert_eq!(
1010            active_step_model_default().as_deref(),
1011            Some("claude-haiku-4-5")
1012        );
1013
1014        record_step_llm_usage("claude-haiku-4-5", 10, 20, 0.001).expect("under budget");
1015        with_active_step(|step| {
1016            assert_eq!(step.input_tokens, 10);
1017            assert_eq!(step.output_tokens, 20);
1018            assert!((step.cost_usd - 0.001).abs() < 1e-9);
1019        });
1020
1021        let err =
1022            record_step_llm_usage("claude-haiku-4-5", 50, 50, 0.0).expect_err("should exhaust");
1023        assert!(is_step_budget_exhausted(&err));
1024
1025        prune_below_frame(2);
1026        let completed = drain_completed_steps();
1027        assert_eq!(completed.len(), 1);
1028        assert_eq!(completed[0].llm_calls, 2);
1029    }
1030
1031    #[test]
1032    fn unregistered_function_does_not_push() {
1033        fresh_state();
1034        assert!(!maybe_push_active_step("not_a_step", 1, &[]));
1035        assert!(active_step_frame_depth().is_none());
1036    }
1037
1038    #[test]
1039    fn tracked_registry_empty_fast_path_tracks_registrations_and_reset() {
1040        fresh_state();
1041        assert!(tracked_registries_empty());
1042        assert!(!is_tracked_function("plan_step"));
1043
1044        register_step(
1045            "plan_step",
1046            StepDefinition {
1047                name: "plan".to_string(),
1048                function: "plan_step".to_string(),
1049                ..StepDefinition::default()
1050            },
1051        );
1052        assert!(!tracked_registries_empty());
1053        assert!(is_tracked_function("plan_step"));
1054        assert!(step_definition_for_function("plan_step").is_some());
1055
1056        register_step(
1057            "plan_step",
1058            StepDefinition {
1059                name: "plan_v2".to_string(),
1060                function: "plan_step".to_string(),
1061                ..StepDefinition::default()
1062            },
1063        );
1064        assert!(is_tracked_function("plan_step"));
1065
1066        fresh_state();
1067        assert!(tracked_registries_empty());
1068        assert!(!is_tracked_function("plan_step"));
1069    }
1070
1071    #[test]
1072    fn stage_policy_narrows_but_does_not_widen_parent_policy() {
1073        fresh_state();
1074        let mut meta: crate::value::DictMap = crate::value::DictMap::new();
1075        meta.put_str("name", "research");
1076        register_step_from_dict(vec![
1077            VmValue::String(arcstr::ArcStr::from("research_step")),
1078            VmValue::dict(meta),
1079        ])
1080        .expect("step registration");
1081
1082        let mut stage_dict: crate::value::DictMap = crate::value::DictMap::new();
1083        stage_dict.put_str("name", "research");
1084        // Stage tries to add `edit` on top of a parent that only allowed `read`.
1085        stage_dict.insert(
1086            crate::value::intern_key("allowed_tools"),
1087            VmValue::List(std::sync::Arc::new(vec![
1088                VmValue::String(arcstr::ArcStr::from("read")),
1089                VmValue::String(arcstr::ArcStr::from("edit")),
1090            ])),
1091        );
1092        let mut persona_meta: crate::value::DictMap = crate::value::DictMap::new();
1093        persona_meta.put_str("name", "scoped");
1094        persona_meta.insert(
1095            crate::value::intern_key("stages"),
1096            VmValue::List(std::sync::Arc::new(vec![VmValue::Dict(
1097                std::sync::Arc::new(stage_dict),
1098            )])),
1099        );
1100        register_persona_from_dict(vec![
1101            VmValue::String(arcstr::ArcStr::from("scoped_persona")),
1102            VmValue::dict(persona_meta),
1103        ])
1104        .expect("persona registration");
1105
1106        push_execution_policy(CapabilityPolicy {
1107            tools: vec!["read".to_string()],
1108            ..CapabilityPolicy::default()
1109        });
1110        assert!(maybe_push_active_persona("scoped_persona", 1));
1111        assert!(maybe_push_active_step("research_step", 2, &[]));
1112        let policy = current_execution_policy().expect("stage policy active");
1113        // `edit` is filtered out because the parent already denied it.
1114        assert_eq!(policy.tools, vec!["read".to_string()]);
1115
1116        prune_below_frame(0);
1117        pop_execution_policy();
1118        assert!(current_execution_policy().is_none());
1119    }
1120
1121    #[test]
1122    fn stage_policy_is_pushed_and_popped_around_step() {
1123        fresh_state();
1124        let mut meta: crate::value::DictMap = crate::value::DictMap::new();
1125        meta.put_str("name", "research");
1126        register_step_from_dict(vec![
1127            VmValue::String(arcstr::ArcStr::from("research_step")),
1128            VmValue::dict(meta),
1129        ])
1130        .expect("step registration succeeds");
1131
1132        let mut stage_dict: crate::value::DictMap = crate::value::DictMap::new();
1133        stage_dict.put_str("name", "research");
1134        stage_dict.insert(
1135            crate::value::intern_key("allowed_tools"),
1136            VmValue::List(std::sync::Arc::new(vec![VmValue::String(
1137                arcstr::ArcStr::from("read"),
1138            )])),
1139        );
1140        let mut persona_meta: crate::value::DictMap = crate::value::DictMap::new();
1141        persona_meta.put_str("name", "scoped");
1142        persona_meta.insert(
1143            crate::value::intern_key("stages"),
1144            VmValue::List(std::sync::Arc::new(vec![VmValue::Dict(
1145                std::sync::Arc::new(stage_dict),
1146            )])),
1147        );
1148        register_persona_from_dict(vec![
1149            VmValue::String(arcstr::ArcStr::from("scoped_persona")),
1150            VmValue::dict(persona_meta),
1151        ])
1152        .expect("persona registration succeeds");
1153
1154        assert!(maybe_push_active_persona("scoped_persona", 1));
1155        assert!(crate::orchestration::current_execution_policy().is_none());
1156        assert!(maybe_push_active_step("research_step", 2, &[]));
1157        let policy = crate::orchestration::current_execution_policy()
1158            .expect("stage policy is active inside step");
1159        assert_eq!(policy.tools, vec!["read".to_string()]);
1160
1161        prune_below_frame(0);
1162        assert!(crate::orchestration::current_execution_policy().is_none());
1163    }
1164}