harn-vm 0.7.49

Async bytecode virtual machine for the Harn programming language
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
use std::rc::Rc;
use std::sync::{Arc, LazyLock, Mutex};

use crate::agent_events::{self, AgentEvent, AgentEventSink};
use crate::value::{VmError, VmValue};

use super::agent_config::AgentLoopConfig;

mod finalize;
mod helpers;
mod llm_call;
mod post_turn;
mod skill_match;
mod state;
mod tool_dispatch;
mod tool_search_client;
mod turn_preflight;

pub use skill_match::{parse_skill_config, parse_skill_match_config_public};
pub use state::SkillMatchConfig;
#[allow(unused_imports)]
pub use state::{ActiveSkill, SkillMatchStrategy};

thread_local! {
    static CURRENT_HOST_BRIDGE: std::cell::RefCell<Option<Rc<crate::bridge::HostBridge>>> = const { std::cell::RefCell::new(None) };
    static CURRENT_AGENT_SESSION_ID: std::cell::RefCell<Option<String>> = const { std::cell::RefCell::new(None) };
    /// Queue of feedback items pushed via `agent_inject_feedback(session_id, kind, content)`
    /// from inside a pipeline event handler. The turn loop drains this
    /// queue at safe boundaries (before each LLM call) and appends each
    /// entry as a runtime-feedback message.
    static PENDING_FEEDBACK: std::cell::RefCell<Vec<(String, String, String)>> =
        const { std::cell::RefCell::new(Vec::new()) };
    /// Stack of per-loop event sinks installed via
    /// `AgentLoopConfig.event_sink`. The agent loop pushes on entry and
    /// pops on drop (via `LoopSinkGuard`); `emit_agent_event` fans the
    /// event out to the top-of-stack sink in addition to the global
    /// `agent_events` registry. Distinct from the global registry on
    /// purpose: tests that wipe the global registry (`reset_all_sinks`,
    /// `reset_thread_local_state`) cannot race with a per-loop
    /// observation, and the host gets a non-cancellable observation
    /// path that's guaranteed to fire even when no external session
    /// subscriber is registered. Stack-shaped so nested loops (workflow
    /// stages, sub-agents) don't bleed events upward into the parent's
    /// sink.
    static CURRENT_LOOP_SINKS: std::cell::RefCell<Vec<Arc<dyn AgentEventSink>>> =
        const { std::cell::RefCell::new(Vec::new()) };
}

/// Boxed session-end hook: receives a `session_id` string.
type SessionEndHook = Arc<dyn Fn(&str) + Send + Sync>;

/// Global (cross-thread) pending feedback queue. Background threads (e.g.
/// long-running tool monitors) push here; the turn-loop drains both this
/// and the thread-local `PENDING_FEEDBACK` at each preflight boundary.
static GLOBAL_PENDING_FEEDBACK: LazyLock<Mutex<Vec<(String, String, String)>>> =
    LazyLock::new(|| Mutex::new(Vec::new()));

/// Registry of hooks called when an agent-loop session ends (normally or via
/// budget exhaustion). Each hook receives the session_id so it can release
/// resources scoped to that session (e.g. killing orphaned child processes).
static SESSION_END_HOOKS: LazyLock<Mutex<Vec<SessionEndHook>>> =
    LazyLock::new(|| Mutex::new(Vec::new()));

/// RAII guard that pushes a per-loop event sink onto the
/// `CURRENT_LOOP_SINKS` stack and pops it on drop. Constructed from
/// `AgentLoopConfig.event_sink`; if the config holds `None` the guard
/// is a no-op.
pub(crate) struct LoopSinkGuard {
    pushed: bool,
}

impl LoopSinkGuard {
    pub(crate) fn install(sink: Option<Arc<dyn AgentEventSink>>) -> Self {
        if let Some(sink) = sink {
            CURRENT_LOOP_SINKS.with(|stack| stack.borrow_mut().push(sink));
            Self { pushed: true }
        } else {
            Self { pushed: false }
        }
    }
}

impl Drop for LoopSinkGuard {
    fn drop(&mut self) {
        if self.pushed {
            CURRENT_LOOP_SINKS.with(|stack| {
                let _ = stack.borrow_mut().pop();
            });
        }
    }
}

/// Synchronously emit an event to external sinks (the global registry)
/// and to the top-of-stack per-loop sink installed by `LoopSinkGuard`.
/// Skips closure subscribers because they are async + VM-bound and
/// cannot be safely awaited from sites that may run outside the agent
/// loop's `LocalSet` task — currently the SSE transport (#693) which
/// fires `ToolCall(Pending)` / `ToolCallUpdate(Pending, raw_input)` per
/// streamed delta.
///
/// Closure subscribers still see the canonical lifecycle (`Pending →
/// InProgress → Completed/Failed`) emitted later by `tool_dispatch.rs`
/// via `emit_agent_event` — this sync path is for the streaming-args
/// observation surface only.
pub(crate) fn emit_agent_event_sync(event: &AgentEvent) {
    agent_events::emit_event(event);
    let loop_sink = CURRENT_LOOP_SINKS.with(|stack| stack.borrow().last().cloned());
    if let Some(sink) = loop_sink {
        sink.handle_event(event);
    }
}

/// Emit an event through both external sinks (sync) and closure
/// subscribers (async, via the agent-loop's VM context). Called by the
/// turn loop at every phase.
///
/// **Thread-local invariant.** Pipeline closure subscribers live on the
/// session's `SessionState.subscribers` in `crate::agent_sessions`,
/// which is a `thread_local!` because `VmValue` wraps `Rc` and can't
/// cross threads. The agent loop runs on a tokio `LocalSet`-pinned
/// task, and `agent_subscribe` (the host builtin that appends to the
/// session) runs on that same task, so the invariant holds. If a
/// future VM embedder runs the loop from a multi-thread runtime
/// without a `LocalSet`, closure subscribers will silently decouple
/// from their emit site.
pub(crate) async fn emit_agent_event(event: &AgentEvent) {
    agent_events::emit_event(event);

    // Per-loop sink (installed by `LoopSinkGuard` from
    // `AgentLoopConfig.event_sink`) gets the event after the global
    // registry. Snapshot the top-of-stack outside the borrow so the
    // sink can re-enter `emit_agent_event` without panicking.
    let loop_sink = CURRENT_LOOP_SINKS.with(|stack| stack.borrow().last().cloned());
    if let Some(sink) = loop_sink {
        sink.handle_event(event);
    }

    let subscribers = crate::agent_sessions::subscribers_for(event.session_id());
    if subscribers.is_empty() {
        return;
    }
    let payload = serde_json::to_value(event).unwrap_or(serde_json::Value::Null);
    for closure in subscribers {
        let VmValue::Closure(closure) = closure else {
            continue;
        };
        let Some(mut vm) = crate::vm::clone_async_builtin_child_vm() else {
            continue;
        };
        let arg = crate::stdlib::json_to_vm_value(&payload);
        // Log but don't propagate: one broken subscriber must not tear
        // down the agent loop.
        if let Err(err) = vm.call_closure_pub(&closure, &[arg]).await {
            crate::events::log_warn(
                "agent.subscriber",
                &format!(
                    "session={} event={:?} subscriber error: {}",
                    event.session_id(),
                    std::mem::discriminant(event),
                    err
                ),
            );
        }
    }
}

/// Push a pending-feedback item. Called by the `agent_inject_feedback`
/// host builtin; drained by the turn loop.
pub(crate) fn push_pending_feedback(session_id: &str, kind: &str, content: &str) {
    PENDING_FEEDBACK.with(|q| {
        q.borrow_mut().push((
            session_id.to_string(),
            kind.to_string(),
            content.to_string(),
        ))
    });
}

/// Push a pending-feedback item from any thread (not just the agent-loop
/// thread). Background tasks (e.g. long-running tool monitors) use this
/// to deliver results; the turn loop drains it at each preflight boundary.
pub fn push_pending_feedback_global(session_id: &str, kind: &str, content: &str) {
    if let Ok(mut q) = GLOBAL_PENDING_FEEDBACK.lock() {
        q.push((
            session_id.to_string(),
            kind.to_string(),
            content.to_string(),
        ));
    }
}

/// Register a hook that fires when any agent-loop session ends. The hook
/// receives the session_id and must be `Send + Sync` so it can be stored
/// across threads. Idempotent registration is the caller's responsibility.
pub fn register_session_end_hook(hook: SessionEndHook) {
    if let Ok(mut hooks) = SESSION_END_HOOKS.lock() {
        hooks.push(hook);
    }
}

fn fire_session_end_hooks(session_id: &str) {
    if let Ok(hooks) = SESSION_END_HOOKS.lock() {
        for hook in hooks.iter() {
            hook(session_id);
        }
    }
}

/// Drain every item for `session_id` from the global (cross-thread) queue only.
/// Intended for integration tests that want to inspect feedback pushed by
/// background threads without running a full agent loop.
pub fn drain_global_pending_feedback(session_id: &str) -> Vec<(String, String)> {
    let mut drained = Vec::new();
    if let Ok(mut q) = GLOBAL_PENDING_FEEDBACK.lock() {
        let mut kept = Vec::new();
        for (sid, kind, content) in q.drain(..) {
            if sid == session_id {
                drained.push((kind, content));
            } else {
                kept.push((sid, kind, content));
            }
        }
        *q = kept;
    }
    drained
}

/// Drain every pending-feedback item for a session. Called by the turn
/// loop at injection boundaries. Drains both the thread-local queue (items
/// pushed from the agent-loop thread) and the global queue (items pushed
/// from background threads).
pub(super) fn drain_pending_feedback(session_id: &str) -> Vec<(String, String)> {
    let mut drained: Vec<(String, String)> = Vec::new();

    // Drain thread-local queue.
    PENDING_FEEDBACK.with(|q| {
        let mut queue = q.borrow_mut();
        let mut kept: Vec<(String, String, String)> = Vec::new();
        for (sid, kind, content) in queue.drain(..) {
            if sid == session_id {
                drained.push((kind, content));
            } else {
                kept.push((sid, kind, content));
            }
        }
        *queue = kept;
    });

    // Drain global (cross-thread) queue.
    if let Ok(mut q) = GLOBAL_PENDING_FEEDBACK.lock() {
        let mut kept: Vec<(String, String, String)> = Vec::new();
        for (sid, kind, content) in q.drain(..) {
            if sid == session_id {
                drained.push((kind, content));
            } else {
                kept.push((sid, kind, content));
            }
        }
        *q = kept;
    }

    drained
}

/// RAII guard that binds the agent loop's tool registry as the thread's
/// current registry (for `tool_ref` / `tool_def` lookups) and restores
/// the previous binding on drop.
struct ToolRegistryGuard {
    previous: Option<VmValue>,
}

impl ToolRegistryGuard {
    fn install(registry: Option<VmValue>) -> Self {
        let previous = crate::stdlib::tools::install_current_tool_registry(registry);
        Self { previous }
    }
}

impl Drop for ToolRegistryGuard {
    fn drop(&mut self) {
        crate::stdlib::tools::install_current_tool_registry(self.previous.take());
    }
}

pub(crate) fn install_current_host_bridge(bridge: Rc<crate::bridge::HostBridge>) {
    CURRENT_HOST_BRIDGE.with(|slot| {
        *slot.borrow_mut() = Some(bridge);
    });
}

pub(crate) fn clear_current_host_bridge() {
    CURRENT_HOST_BRIDGE.with(|slot| {
        *slot.borrow_mut() = None;
    });
}

pub(crate) fn current_host_bridge() -> Option<Rc<crate::bridge::HostBridge>> {
    CURRENT_HOST_BRIDGE.with(|slot| slot.borrow().clone())
}

pub fn current_agent_session_id() -> Option<String> {
    CURRENT_AGENT_SESSION_ID.with(|slot| slot.borrow().clone())
}

struct AgentSessionGuard {
    previous: Option<String>,
}

impl AgentSessionGuard {
    fn install(session_id: String) -> Self {
        let previous = CURRENT_AGENT_SESSION_ID.with(|slot| slot.replace(Some(session_id)));
        Self { previous }
    }
}

impl Drop for AgentSessionGuard {
    fn drop(&mut self) {
        CURRENT_AGENT_SESSION_ID.with(|slot| {
            *slot.borrow_mut() = self.previous.take();
        });
    }
}

pub async fn run_agent_loop_internal(
    opts: &mut super::api::LlmCallOptions,
    config: AgentLoopConfig,
) -> Result<serde_json::Value, VmError> {
    let mut state = state::AgentLoopState::new(opts, config)?;
    let _session_guard = AgentSessionGuard::install(state.session_id.clone());

    let tools_owned = opts.tools.clone();
    let tools_val = tools_owned.as_ref();

    // harn#743: refuse to start the loop if any registered tool has
    // no executable backend. This converts the historical
    // `[builtin_call] unhandled: <name>` runtime failure into a clear,
    // pre-flight error that names the culprit.
    super::agent_tools::validate_tool_registry_executors(tools_val)?;

    let surface_diagnostics = crate::tool_surface::validate_tool_surface_diagnostics(
        &crate::tool_surface::ToolSurfaceInput {
            tools: tools_owned.clone(),
            native_tools: opts.native_tools.clone(),
            policy: crate::orchestration::current_execution_policy(),
            approval_policy: crate::orchestration::current_approval_policy(),
            prompt_texts: state
                .base_system
                .clone()
                .into_iter()
                .chain(state.config.tool_examples.clone())
                .collect(),
            tool_search_active: opts.tool_search.is_some(),
        },
    );
    for diagnostic in &surface_diagnostics {
        match diagnostic.severity {
            crate::tool_surface::ToolSurfaceSeverity::Warning => crate::events::log_warn(
                "tool_surface.validate",
                &format!("{}: {}", diagnostic.code, diagnostic.message),
            ),
            crate::tool_surface::ToolSurfaceSeverity::Error => {
                return Err(VmError::Runtime(format!(
                    "agent_loop tool surface validation failed: {}: {}",
                    diagnostic.code, diagnostic.message
                )));
            }
        }
    }

    let _tool_registry_guard = ToolRegistryGuard::install(tools_owned.clone());

    // Snapshot config/state fields as locals so phase contexts can hold
    // them without fighting the `&mut state` borrow in the loop body.
    let llm_retries = state.config.llm_retries;
    let llm_backoff_ms = state.config.llm_backoff_ms;
    let token_budget = state.config.token_budget;
    let turn_policy = state.config.turn_policy.clone();
    let stop_after_successful_tools = state.config.stop_after_successful_tools.clone();
    let post_turn_callback = state.config.post_turn_callback.clone();
    let bridge = state.bridge.clone();
    let max_iterations = state.max_iterations;
    let max_nudges = state.max_nudges;
    let tool_retries = state.tool_retries;
    let tool_backoff_ms = state.tool_backoff_ms;
    let exit_when_verified = state.exit_when_verified;
    let persistent = state.persistent;
    let daemon = state.daemon;
    let has_tools = state.has_tools;
    let loop_detect_enabled = state.loop_detect_enabled;
    let resumed_iterations = state.resumed_iterations;
    let tool_format = state.tool_format.clone();
    let native_tool_fallback = state.config.native_tool_fallback;
    let done_sentinel = state.done_sentinel.clone();
    let break_unless_phase = state.break_unless_phase.clone();
    let loop_start = state.loop_start;
    let tool_contract_prompt = state.tool_contract_prompt.clone();
    let base_system = state.base_system.clone();
    let persistent_system_prompt = state.persistent_system_prompt.clone();
    let auto_compact = state.auto_compact.clone();
    let daemon_config = state.daemon_config.clone();
    let custom_nudge = state.custom_nudge.clone();
    let session_id = state.session_id.clone();

    // Warn on unknown `stop_after_successful_tools` names: they're
    // tolerated (forward-compat with late-declared tools) but silently
    // never stopping is the failure mode to guard against.
    if let Some(stop_tools) = stop_after_successful_tools.as_ref() {
        let declared = super::tools::collect_tool_schemas(tools_val, opts.native_tools.as_deref());
        let declared_names: std::collections::BTreeSet<&str> =
            declared.iter().map(|schema| schema.name.as_str()).collect();
        let unknown: Vec<&str> = stop_tools
            .iter()
            .filter(|name| !declared_names.contains(name.as_str()))
            .map(String::as_str)
            .collect();
        if !unknown.is_empty() {
            crate::events::log_warn(
                "agent.stop_after_successful_tools",
                &format!(
                    "name(s) not in declared tool schema: {} — will never trigger a stop unless declared later",
                    unknown.join(", ")
                ),
            );
        }
    }

    let mut iteration_exited_via_break = false;
    let mut loop_tokens_used = 0i64;
    for iteration in 0..max_iterations {
        // Skill matching runs at the head of iteration 0 (always) and,
        // when sticky=false, again before each subsequent iteration.
        // Reassess-in-place keeps the active skill when nothing
        // changed, so sticky=true + a still-applicable skill stays hot
        // for the rest of the loop.
        //
        // Exception: if this loop resumed a persisted session whose
        // previous run left skills active, iteration 0 preserves that
        // set instead of re-matching from a cold prompt. sticky=false
        // still lets the post-turn reassess run after turn 1.
        let skip_initial_match =
            iteration == 0 && state.rehydrated_from_session && state.skill_match.sticky;
        let should_match = (iteration == 0 || !state.skill_match.sticky) && !skip_initial_match;
        if should_match {
            skill_match::run_skill_match(
                &mut state,
                opts,
                &bridge,
                &session_id,
                iteration,
                iteration > 0,
            )
            .await?;
        }

        // If any active skill narrows the tool surface via
        // `allowed_tools`, compute the scoped registry for this turn.
        // Downstream phases see the narrowed view; the original
        // `tools_owned` stays intact so deactivation restores the full
        // surface on a later iteration.
        let scoped_tools = state.skill_scoped_tools_val(tools_val);
        let effective_tools_val: Option<&crate::value::VmValue> =
            scoped_tools.as_ref().or(tools_val);

        turn_preflight::run_turn_preflight(
            &mut state,
            opts,
            turn_preflight::PreflightContext {
                bridge: &bridge,
                session_id: &session_id,
                resumed_iterations,
                iteration,
                base_system: base_system.as_deref(),
                tool_contract_prompt: tool_contract_prompt.as_deref(),
                persistent_system_prompt: persistent_system_prompt.as_deref(),
                scoped_tools_val: scoped_tools.as_ref(),
            },
        )
        .await?;
        state.sync_session_store();

        let mut call_result = llm_call::run_llm_call(
            &mut state,
            opts,
            &llm_call::LlmCallContext {
                bridge: &bridge,
                tool_format: &tool_format,
                native_tool_fallback,
                done_sentinel: &done_sentinel,
                break_unless_phase: break_unless_phase.as_deref(),
                exit_when_verified,
                persistent,
                has_tools,
                turn_policy: turn_policy.as_ref(),
                llm_retries,
                llm_backoff_ms,
                tools_val: effective_tools_val,
            },
            iteration,
        )
        .await?;

        let dispatch = if !call_result.tool_calls.is_empty() {
            Some(
                tool_dispatch::run_tool_dispatch(
                    &mut state,
                    opts,
                    &tool_dispatch::ToolDispatchContext {
                        bridge: &bridge,
                        tool_format: &tool_format,
                        tools_val: effective_tools_val,
                        tool_retries,
                        tool_backoff_ms,
                        loop_detect_enabled,
                        session_id: &session_id,
                        iteration,
                        exit_when_verified,
                        auto_compact: &auto_compact,
                    },
                    &call_result,
                )
                .await?,
            )
        } else {
            None
        };

        let iteration_outcome = post_turn::run_post_turn(
            &mut state,
            opts,
            &post_turn::PostTurnContext {
                bridge: &bridge,
                session_id: &session_id,
                tool_format: &tool_format,
                has_tools,
                max_nudges,
                persistent,
                daemon,
                turn_policy: turn_policy.as_ref(),
                stop_after_successful_tools: &stop_after_successful_tools,
                post_turn_callback: &post_turn_callback,
                auto_compact: &auto_compact,
                daemon_config: &daemon_config,
                custom_nudge: &custom_nudge,
                iteration,
            },
            &mut call_result,
            dispatch,
        )
        .await?;
        state.sync_session_store();

        if let Some(token_budget) = token_budget {
            loop_tokens_used = loop_tokens_used
                .saturating_add(call_result.input_tokens)
                .saturating_add(call_result.output_tokens);
            if loop_tokens_used >= token_budget {
                iteration_exited_via_break = true;
                state.final_status = "budget_exhausted";
                break;
            }
        }

        match iteration_outcome {
            post_turn::IterationOutcome::Continue => continue,
            post_turn::IterationOutcome::Break => {
                iteration_exited_via_break = true;
                break;
            }
        }
    }

    // Hit the iteration budget rather than breaking — signal distinctly
    // so hosts can tell "done" from "ran out of rope".
    if !iteration_exited_via_break && max_iterations > 0 {
        state.final_status = "budget_exhausted";
        emit_agent_event(&AgentEvent::BudgetExhausted {
            session_id: session_id.clone(),
            max_iterations,
        })
        .await;
    }

    let result = finalize::run_finalize(
        &mut state,
        opts,
        bridge,
        daemon,
        &daemon_config,
        &tool_format,
        loop_start,
    )
    .await;

    // Notify external resource managers (e.g. long-running tool handles)
    // that this session has ended so they can clean up orphaned processes.
    fire_session_end_hooks(&session_id);

    result
}

#[cfg(test)]
mod tests;