Skip to main content

harness_loop/
lib.rs

1//! ReAct agent loop with self-correction.
2//!
3//! Minimal v0.0.1 implementation:
4//! - Applies guides once at the start.
5//! - Sends `Context` (with `tools`) to the model.
6//! - Dispatches each returned tool call via [`ToolRegistry`].
7//! - Runs `Sensor::SelfCorrect` sensors after each action; auto-fix patches are
8//!   applied directly to the world, blocking signals are fed back to the model.
9//! - Stops when the model returns no tool calls, or when `policy.max_iters` is hit.
10
11pub mod learning;
12pub mod memory_layer;
13pub mod profile_guide;
14pub mod recall_layer;
15pub mod registry;
16pub mod replay;
17pub mod subagent;
18
19pub use learning::*;
20pub use memory_layer::*;
21pub use profile_guide::*;
22pub use recall_layer::*;
23pub use registry::*;
24pub use replay::*;
25pub use subagent::*;
26
27use harness_compactor::DefaultCompactor;
28use harness_core::{
29    Action, Block, Compactor, Context, Event, Guide, HarnessError, HookOutcome, Model, ModelDelta,
30    ModelOutput, ResponseFormat, Sensor, SessionSource, SignalSet, Stage, StopReason, Task,
31    ToolCall, ToolResult, Turn, TurnRole, Usage, World,
32};
33use harness_hooks::HookBus;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37/// Where a run finished. Marked `#[non_exhaustive]` so future fields don't break
38/// downstream matches — always include `..` when destructuring.
39#[derive(Debug, Clone)]
40pub enum Outcome {
41    /// Model returned text with no tool calls (natural end).
42    #[non_exhaustive]
43    Done {
44        text: Option<String>,
45        iters: u32,
46        tools_called: u32,
47        usage: harness_core::Usage,
48    },
49    /// Policy budget exhausted before the model stopped requesting tools.
50    /// Carries everything we know so the caller can recover partial work
51    /// (saved notes, files written by tools, the last assistant text, etc.)
52    /// instead of seeing a single bare "budget out" string.
53    #[non_exhaustive]
54    BudgetExhausted {
55        iters: u32,
56        last_text: Option<String>,
57        tools_called: u32,
58        usage: harness_core::Usage,
59    },
60}
61
62/// The agent loop.
63pub struct AgentLoop<M: Model> {
64    pub model: M,
65    pub tools: ToolRegistry,
66    pub guides: Vec<Arc<dyn Guide>>,
67    pub sensors: Vec<Arc<dyn Sensor>>,
68    pub hooks: HookBus,
69    pub compactor: Arc<dyn Compactor>,
70    /// Default response format applied to every run unless overridden by
71    /// `run_typed`. See [`ResponseFormat`].
72    pub response_format: ResponseFormat,
73    /// When `true`, the loop drives each model turn via `Model::stream()`
74    /// instead of `complete()`, firing `Event::ModelTokenDelta` for each
75    /// text fragment. Tool-call deltas are still assembled inside the loop;
76    /// only the terminal `ModelOutput` shape is observable downstream.
77    pub streaming: bool,
78    /// Optional cross-session recall store. When set, the loop captures every
79    /// turn and the `session_search` tool is registered. See `with_recall`.
80    pub recall: Option<Arc<dyn harness_core::RecallStore>>,
81    /// When true (and `recall` is set), a `RecallGuide` auto-injects top-k
82    /// past context at session start.
83    pub recall_auto_inject: bool,
84    pub learning: Option<LearningConfig>,
85}
86
87impl<M: Model> AgentLoop<M> {
88    pub fn new(model: M) -> Self {
89        Self {
90            model,
91            tools: ToolRegistry::new(),
92            guides: Vec::new(),
93            sensors: Vec::new(),
94            hooks: HookBus::new(),
95            compactor: Arc::new(DefaultCompactor::new()),
96            response_format: ResponseFormat::Free,
97            streaming: false,
98            recall: None,
99            recall_auto_inject: false,
100            learning: None,
101        }
102    }
103
104    /// Opt in to streaming the model's terminal turn token-by-token via
105    /// `Model::stream()`. Hooks subscribed to `Event::ModelTokenDelta` see
106    /// each fragment as it arrives; the rest of the loop is unchanged.
107    pub fn with_streaming(mut self, enable: bool) -> Self {
108        self.streaming = enable;
109        self
110    }
111
112    pub fn with_compactor(mut self, c: Arc<dyn Compactor>) -> Self {
113        self.compactor = c;
114        self
115    }
116
117    pub fn with_tool(mut self, t: Arc<dyn harness_core::Tool>) -> Self {
118        self.tools.insert(t);
119        self
120    }
121
122    pub fn with_guide(mut self, g: Arc<dyn Guide>) -> Self {
123        self.guides.push(g);
124        self
125    }
126
127    pub fn with_sensor(mut self, s: Arc<dyn Sensor>) -> Self {
128        self.sensors.push(s);
129        self
130    }
131
132    pub fn with_hook(mut self, h: Arc<dyn harness_core::Hook>) -> Self {
133        self.hooks.register(h);
134        self
135    }
136
137    /// Pull in every `#[hook]`-registered hook.
138    pub fn with_macro_hooks(mut self) -> Self {
139        self.hooks = self.hooks.with_macro_hooks_take();
140        self
141    }
142
143    /// Enable cross-session recall: capture every turn into `store` and
144    /// register the `session_search` tool. Owner + session id are read from
145    /// `world.profile.extra["recall_owner"|"recall_session"]` at run time.
146    pub fn with_recall(mut self, store: Arc<dyn harness_core::RecallStore>) -> Self {
147        self.tools
148            .insert(Arc::new(crate::SessionSearchTool::new(store.clone())));
149        self.recall = Some(store);
150        self
151    }
152
153    /// After `with_recall`, also auto-inject top-k relevant past context at
154    /// session start (off by default — tool-only is prompt-cache friendly).
155    pub fn auto_inject(mut self) -> Self {
156        self.recall_auto_inject = true;
157        self
158    }
159
160    /// Enable the self-evolving learning loop: after a session that made
161    /// `>= cfg.nudge_interval` tool calls, fork a review subagent (white-listed to
162    /// `cfg.tools`) to update skills + memory from the transcript. Best-effort.
163    pub fn with_learning_loop(mut self, cfg: LearningConfig) -> Self {
164        self.learning = Some(cfg);
165        self
166    }
167
168    /// Set the default response format for all runs through this loop. See
169    /// [`ResponseFormat`]. For typed deserialisation, prefer `run_typed::<T>()`.
170    pub fn with_response_format(mut self, fmt: ResponseFormat) -> Self {
171        self.response_format = fmt;
172        self
173    }
174
175    /// Shortcut for `with_response_format(ResponseFormat::JsonSchema { name, schema })`.
176    /// Accepts a raw `serde_json::Value` so callers can hand-roll the schema or
177    /// pull it from `schemars::schema_for!(T)`.
178    pub fn with_response_schema(self, name: impl Into<String>, schema: serde_json::Value) -> Self {
179        self.with_response_format(ResponseFormat::JsonSchema {
180            name: name.into(),
181            schema,
182        })
183    }
184
185    pub async fn run(&self, task: Task, world: &mut World) -> Result<Outcome, HarnessError> {
186        let max = harness_core::Policy::default().max_iters;
187        self.run_with_max_iters(task, world, max).await
188    }
189
190    pub async fn run_with_max_iters(
191        &self,
192        task: Task,
193        world: &mut World,
194        max_iters: u32,
195    ) -> Result<Outcome, HarnessError> {
196        self.run_with_seed_history(task, Vec::new(), world, max_iters)
197            .await
198    }
199
200    /// Run the agent and deserialise the terminal reply into `T`.
201    ///
202    /// The schema for `T` is derived via `schemars::schema_for!(T)` and
203    /// installed as `ResponseFormat::JsonSchema` for this run only — any
204    /// pre-existing `self.response_format` is ignored. On success the
205    /// returned `T` is parsed from `Outcome::Done.text` (or, on budget
206    /// exhaustion, from `Outcome::BudgetExhausted.last_text`).
207    ///
208    /// Errors:
209    /// - `HarnessError::Other` if the model returns no text at all
210    /// - `HarnessError::Other` if `serde_json::from_str::<T>(text)` fails —
211    ///   the original text is included in the message for debugging.
212    pub async fn run_typed<T>(&self, task: Task, world: &mut World) -> Result<T, HarnessError>
213    where
214        T: serde::de::DeserializeOwned + schemars::JsonSchema + 'static,
215    {
216        let max = harness_core::Policy::default().max_iters;
217        self.run_typed_with_max_iters::<T>(task, world, max).await
218    }
219
220    /// Like `run_typed` but with explicit `max_iters`.
221    pub async fn run_typed_with_max_iters<T>(
222        &self,
223        task: Task,
224        world: &mut World,
225        max_iters: u32,
226    ) -> Result<T, HarnessError>
227    where
228        T: serde::de::DeserializeOwned + schemars::JsonSchema + 'static,
229    {
230        let schema_root = schemars::schema_for!(T);
231        let schema = serde_json::to_value(&schema_root)
232            .map_err(|e| HarnessError::Other(format!("response schema: {e}")))?;
233        let name = std::any::type_name::<T>()
234            .rsplit("::")
235            .next()
236            .unwrap_or("response")
237            .to_string();
238        let fmt = ResponseFormat::JsonSchema { name, schema };
239        let outcome = self
240            .run_with_response_format(task, world, max_iters, fmt)
241            .await?;
242        let text = match outcome {
243            Outcome::Done { text: Some(t), .. }
244            | Outcome::BudgetExhausted {
245                last_text: Some(t), ..
246            } => t,
247            Outcome::Done { text: None, .. } => {
248                return Err(HarnessError::Other(
249                    "run_typed: model returned no text".into(),
250                ));
251            }
252            Outcome::BudgetExhausted {
253                last_text: None, ..
254            } => {
255                return Err(HarnessError::Other(
256                    "run_typed: budget exhausted with no text".into(),
257                ));
258            }
259        };
260        serde_json::from_str::<T>(&text).map_err(|e| {
261            HarnessError::Other(format!(
262                "run_typed: decode {} failed: {e} — raw text was: {text}",
263                std::any::type_name::<T>()
264            ))
265        })
266    }
267
268    /// Run with a one-off `ResponseFormat` override (doesn't touch `self`).
269    pub async fn run_with_response_format(
270        &self,
271        task: Task,
272        world: &mut World,
273        max_iters: u32,
274        fmt: ResponseFormat,
275    ) -> Result<Outcome, HarnessError> {
276        // Borrow checker won't let us swap `self.response_format` because
277        // `self` is `&`. Easiest workaround: hand-roll the same setup that
278        // `run_with_seed_history` does, but with our `fmt`. We do this by
279        // calling through a private helper.
280        self.run_with_seed_history_and_format(task, Vec::new(), world, max_iters, Some(fmt))
281            .await
282    }
283
284    async fn run_with_seed_history_and_format(
285        &self,
286        task: Task,
287        seed: Vec<Turn>,
288        world: &mut World,
289        max_iters: u32,
290        fmt_override: Option<ResponseFormat>,
291    ) -> Result<Outcome, HarnessError> {
292        let mut ctx = Context::new(task);
293        ctx.policy.max_iters = max_iters;
294        ctx.tools = self.tools.schemas();
295        ctx.history = seed;
296        ctx.response_format = fmt_override.unwrap_or_else(|| self.response_format.clone());
297        self.run_built_context(ctx, world).await
298    }
299
300    /// Like `run_with_max_iters` but seeds `ctx.history` with `seed` **before**
301    /// the current user task is appended. Use this for multi-turn REPLs so
302    /// prior conversation lives in `ctx.history` (where the Compactor can see
303    /// it) instead of being concatenated into `task.description` (where it
304    /// previously bypassed compaction entirely — see audit #2).
305    pub async fn run_with_seed_history(
306        &self,
307        task: Task,
308        seed: Vec<Turn>,
309        world: &mut World,
310        max_iters: u32,
311    ) -> Result<Outcome, HarnessError> {
312        let mut ctx = Context::new(task);
313        ctx.policy.max_iters = max_iters;
314        ctx.tools = self.tools.schemas();
315        ctx.history = seed;
316        ctx.response_format = self.response_format.clone();
317        self.run_built_context(ctx, world).await
318    }
319
320    /// Inner ReAct loop on an already-prepared `Context`. Use the public
321    /// `run*` methods unless you need to inject a non-standard `Context`
322    /// (e.g. `run_with_response_format` does to apply a one-off
323    /// `ResponseFormat` without mutating `self`).
324    async fn run_built_context(
325        &self,
326        mut ctx: Context,
327        world: &mut World,
328    ) -> Result<Outcome, HarnessError> {
329        self.hooks.fire(
330            &Event::SessionStart {
331                source: SessionSource::Startup,
332            },
333            world,
334        );
335
336        // ── recall: resolve owner/session, ensure the session row ──
337        let (recall_owner, recall_session) = if self.recall.is_some() {
338            use std::sync::atomic::Ordering;
339            let owner = crate::recall_owner(world);
340            let session = world
341                .profile
342                .extra
343                .get("recall_session")
344                .and_then(|v| v.as_str())
345                .map(|s| s.to_string())
346                .unwrap_or_else(|| {
347                    format!(
348                        "sess-{}-{}",
349                        world.clock.now_ms(),
350                        RECALL_SEQ.fetch_add(1, Ordering::SeqCst)
351                    )
352                });
353            if let Some(store) = &self.recall {
354                let meta = harness_core::SessionMeta::new(&session, world.clock.now_ms());
355                if let Err(e) = store.ensure_session(&owner, &session, &meta).await {
356                    tracing::warn!(error = %e, "recall ensure_session failed");
357                }
358            }
359            (owner, session)
360        } else {
361            (String::new(), String::new())
362        };
363
364        let recall_guide: Option<Arc<dyn Guide>> = if self.recall_auto_inject {
365            if self.recall.is_none() {
366                tracing::warn!(
367                    "auto_inject() set but no recall store — call with_recall(store) first; skipping recall guide"
368                );
369                None
370            } else {
371                self.recall
372                    .clone()
373                    .map(|s| Arc::new(crate::RecallGuide::new(s)) as Arc<dyn Guide>)
374            }
375        } else {
376            None
377        };
378        let all_guides: Vec<&Arc<dyn Guide>> =
379            self.guides.iter().chain(recall_guide.iter()).collect();
380        for g in &all_guides {
381            if g.scope().matches(&ctx.task) {
382                self.hooks.fire(&Event::PreGuide { guide: g.id() }, world);
383                g.apply(&mut ctx, world).await?;
384                self.hooks.fire(&Event::PostGuide { guide: g.id() }, world);
385            }
386        }
387
388        ctx.history.push(Turn {
389            role: TurnRole::User,
390            blocks: vec![Block::Text(ctx.task.description.clone())],
391        });
392
393        if self.recall.is_some() {
394            self.recall_append(
395                &recall_owner,
396                &recall_session,
397                harness_core::RecallMessage::new(
398                    "user",
399                    ctx.task.description.clone(),
400                    world.clock.now_ms(),
401                ),
402            )
403            .await;
404        }
405
406        // Running totals — surface to caller even on BudgetExhausted.
407        let mut tools_called: u32 = 0;
408        let mut total_usage = harness_core::Usage::default();
409        let mut last_text: Option<String> = None;
410
411        for iter in 0..ctx.policy.max_iters {
412            self.hooks.fire(&Event::Heartbeat { iter }, world);
413
414            // Compaction: run every stage required by current budget.
415            let stages = self.compactor.budget(&ctx).required_stages();
416            for stage in stages {
417                self.hooks.fire(&Event::PreCompact { stage }, world);
418                self.compactor.compact(stage, &mut ctx).await?;
419                self.hooks.fire(&Event::PostCompact { stage }, world);
420            }
421
422            // Per-iteration guides — recall-style adapters that want to
423            // refresh their injected context every turn (e.g. MemoryGuide
424            // re-recalling against the latest user message). Default
425            // `apply_before_iter` is a no-op, so this loop is cheap for
426            // guides that don't override it.
427            for g in &all_guides {
428                if g.scope().matches(&ctx.task)
429                    && let Err(e) = g.apply_before_iter(&mut ctx, world).await
430                {
431                    tracing::warn!(guide = %g.id(), error = %e, "apply_before_iter failed; continuing");
432                }
433            }
434
435            self.hooks.fire(&Event::PreModel { ctx: &ctx }, world);
436            let out = if self.streaming {
437                self.complete_via_stream(&ctx, world).await?
438            } else {
439                self.model.complete(&ctx).await?
440            };
441            self.hooks.fire(&Event::PostModel { out: &out }, world);
442            // Accumulate usage even if the run later exhausts budget.
443            total_usage.input_tokens += out.usage.input_tokens;
444            total_usage.output_tokens += out.usage.output_tokens;
445            total_usage.cached_input_tokens += out.usage.cached_input_tokens;
446            if let Some(t) = &out.text {
447                last_text = Some(t.clone());
448            }
449            ctx.push_model_output(&out);
450
451            if self.recall.is_some() {
452                let calls = if out.tool_calls.is_empty() {
453                    None
454                } else {
455                    serde_json::to_string(&out.tool_calls).ok()
456                };
457                let mut m = harness_core::RecallMessage::new(
458                    "assistant",
459                    out.text.clone().unwrap_or_default(),
460                    world.clock.now_ms(),
461                );
462                m.tool_calls = calls;
463                self.recall_append(&recall_owner, &recall_session, m).await;
464            }
465
466            if out.tool_calls.is_empty() {
467                self.hooks.fire(&Event::TaskCompleted, world);
468                self.hooks.fire(&Event::SessionEnd, world);
469                self.run_learning_review(&ctx, world, tools_called).await;
470                return Ok(Outcome::Done {
471                    text: out.text,
472                    iters: iter + 1,
473                    tools_called,
474                    usage: total_usage,
475                });
476            }
477
478            for call in &out.tool_calls {
479                let action = Action {
480                    tool: call.name.clone(),
481                    call_id: call.id.clone(),
482                    args: call.args.clone(),
483                };
484
485                // PreToolUse hook can deny destructive actions
486                if let HookOutcome::Deny { reason } = self
487                    .hooks
488                    .fire(&Event::PreToolUse { action: &action }, world)
489                {
490                    ctx.history.push(Turn {
491                        role: TurnRole::Tool,
492                        blocks: vec![Block::ToolResult {
493                            call_id: action.call_id.clone(),
494                            content: serde_json::json!({
495                                "ok": false,
496                                "denied_by_hook": reason,
497                            }),
498                        }],
499                    });
500                    if self.recall.is_some() {
501                        self.recall_append(
502                            &recall_owner,
503                            &recall_session,
504                            harness_core::RecallMessage::new(
505                                "tool",
506                                format!("[denied by hook] {reason}"),
507                                world.clock.now_ms(),
508                            )
509                            .with_tool_name(action.tool.clone()),
510                        )
511                        .await;
512                    }
513                    continue;
514                }
515
516                let result = match self.tools.dispatch(&action, world).await {
517                    Ok(r) => r,
518                    Err(e) => ToolResult {
519                        ok: false,
520                        content: serde_json::json!({"error": e.to_string()}),
521                        trace: None,
522                    },
523                };
524                tools_called += 1;
525                self.hooks.fire(
526                    &Event::PostToolUse {
527                        action: &action,
528                        result: &result,
529                    },
530                    world,
531                );
532
533                ctx.history.push(Turn {
534                    role: TurnRole::Tool,
535                    blocks: vec![Block::ToolResult {
536                        call_id: action.call_id.clone(),
537                        content: result.content.clone(),
538                    }],
539                });
540
541                if self.recall.is_some() {
542                    let body = serde_json::to_string(&result.content).unwrap_or_default();
543                    self.recall_append(
544                        &recall_owner,
545                        &recall_session,
546                        harness_core::RecallMessage::new("tool", body, world.clock.now_ms())
547                            .with_tool_name(action.tool.clone()),
548                    )
549                    .await;
550                }
551
552                // run self-correct sensors
553                let mut all_signals = Vec::new();
554                for s in &self.sensors {
555                    if s.stage() != Stage::SelfCorrect {
556                        continue;
557                    }
558                    self.hooks.fire(&Event::PreSensor { sensor: s.id() }, world);
559                    let sigs = s.observe(&action, world).await.unwrap_or_else(|e| {
560                        tracing::warn!(?e, "sensor failed");
561                        Vec::new()
562                    });
563                    self.hooks.fire(
564                        &Event::PostSensor {
565                            sensor: s.id(),
566                            signals: &sigs,
567                        },
568                        world,
569                    );
570                    all_signals.extend(sigs);
571                }
572                if !all_signals.is_empty() {
573                    let bundle = SignalSet::new(all_signals);
574                    let (patches, remaining) = bundle.partition_auto_fix();
575
576                    // audit #7: each patch goes through PreAutoFix.
577                    // Hooks can Deny (skip silently). Default safelist on
578                    // RunCommand catches the obvious misuses with no hook.
579                    let approved: Vec<harness_core::FixPatch> = patches.into_iter().filter(|p| {
580                        if !is_default_safe_fix(p) {
581                            tracing::warn!(?p, "auto-fix rejected by default safelist (use PreAutoFix hook to override)");
582                            self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
583                            return false;
584                        }
585                        match self.hooks.fire(&Event::PreAutoFix { patch: p }, world) {
586                            HookOutcome::Deny { reason } => {
587                                tracing::warn!(?p, %reason, "auto-fix denied by hook");
588                                self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
589                                false
590                            }
591                            _ => true,
592                        }
593                    }).collect();
594
595                    let applied = apply_patches(&approved, world).await;
596                    // Emit PostAutoFix for each approved patch with the application result.
597                    for (i, p) in approved.iter().enumerate() {
598                        self.hooks.fire(
599                            &Event::PostAutoFix {
600                                patch: p,
601                                applied: i < applied.len(),
602                            },
603                            world,
604                        );
605                    }
606                    if !applied.is_empty() {
607                        ctx.push_feedback(vec![harness_core::Signal {
608                            severity: harness_core::Severity::Hint,
609                            origin: "auto-fix".into(),
610                            message: format!(
611                                "applied {} auto-fix patch(es): {applied:?}",
612                                applied.len()
613                            ),
614                            agent_hint: Some(
615                                "re-check the affected files before continuing".into(),
616                            ),
617                            auto_fix: None,
618                            location: None,
619                        }]);
620                    }
621                    if remaining.has_blocking() {
622                        ctx.push_feedback(remaining.signals);
623                    }
624                }
625            }
626        }
627        // ── Budget exhausted ─────────────────────────────────────────
628        // Force a final synthesis pass with tools DISABLED. Otherwise the
629        // model often spins on tool calls right up to the budget cap and
630        // never emits a text conclusion, leaving the caller with nothing
631        // but `last_text` from some earlier intermediate turn (or None).
632        //
633        // The synthesis call is "free" — it costs one extra model call
634        // beyond max_iters but doesn't count toward `iters`. The result
635        // lands in `last_text` so callers display it as the answer.
636        let synthesised = self
637            .force_final_synthesis(&mut ctx, world, &mut total_usage)
638            .await;
639        if let Some(t) = synthesised {
640            last_text = Some(t);
641        }
642
643        self.hooks.fire(&Event::SessionEnd, world);
644        self.run_learning_review(&ctx, world, tools_called).await;
645        Ok(Outcome::BudgetExhausted {
646            iters: ctx.policy.max_iters,
647            last_text,
648            tools_called,
649            usage: total_usage,
650        })
651    }
652
653    /// Drive `Model::stream()` and assemble the result into a `ModelOutput`,
654    /// firing `Event::ModelTokenDelta` for each text fragment along the way.
655    ///
656    /// Adapters that don't implement real streaming (e.g. `GeminiNative` /
657    /// `AnthropicNative` today) fall back to the default trait impl, which
658    /// runs `complete()` and emits the whole reply as a single delta. That
659    /// works — the loop sees one big `ModelDelta::Text(...)` followed by
660    /// `Stop`, fires one big `ModelTokenDelta`, and proceeds. So enabling
661    /// `streaming` is safe regardless of which provider the user picked.
662    async fn complete_via_stream(
663        &self,
664        ctx: &Context,
665        world: &mut World,
666    ) -> Result<ModelOutput, HarnessError> {
667        use futures::StreamExt;
668        let mut stream = self
669            .model
670            .stream(ctx)
671            .await
672            .map_err(harness_core::HarnessError::Model)?;
673        let mut text = String::new();
674        let mut reasoning_lines: Vec<String> = Vec::new();
675        let mut usage = Usage::default();
676        let mut stop_reason = StopReason::EndTurn;
677        // Insertion-ordered map: index → (id, name, args). We can't use the
678        // tool-call id as the primary key because the stream may emit args
679        // chunks before the first chunk that carries the id; the OpenAI-compat
680        // SSE parser already does its own buffering and surfaces `id` in
681        // ToolCallStart, but be lenient with adapters that may interleave.
682        let mut tool_starts: HashMap<String, (String, String)> = HashMap::new();
683        let mut tool_order: Vec<String> = Vec::new();
684        while let Some(item) = stream.next().await {
685            let delta = item.map_err(harness_core::HarnessError::Model)?;
686            match delta {
687                ModelDelta::Text(t) => {
688                    if !t.is_empty() {
689                        self.hooks.fire(&Event::ModelTokenDelta { text: &t }, world);
690                        text.push_str(&t);
691                    }
692                }
693                ModelDelta::ToolCallStart { id, name } => {
694                    if !tool_starts.contains_key(&id) {
695                        tool_order.push(id.clone());
696                    }
697                    tool_starts
698                        .entry(id)
699                        .or_insert_with(|| (name, String::new()));
700                }
701                ModelDelta::ToolCallArgs { id, partial_json } => {
702                    let entry = tool_starts
703                        .entry(id.clone())
704                        .or_insert_with(|| (String::new(), String::new()));
705                    if !tool_order.iter().any(|k| k == &id) {
706                        tool_order.push(id);
707                    }
708                    entry.1.push_str(&partial_json);
709                }
710                ModelDelta::ToolCallEnd { .. } => {}
711                ModelDelta::Usage(u) => usage = u,
712                ModelDelta::Stop(r) => stop_reason = r,
713                ModelDelta::Reasoning(s) => {
714                    if !s.is_empty() {
715                        reasoning_lines.push(s);
716                    }
717                }
718                // ModelDelta is `#[non_exhaustive]`; ignore future variants
719                // we don't yet understand.
720                _ => {}
721            }
722        }
723        let tool_calls: Vec<ToolCall> = tool_order
724            .into_iter()
725            .filter_map(|id| {
726                tool_starts.remove(&id).map(|(name, args)| {
727                    let args_v = serde_json::from_str::<serde_json::Value>(&args)
728                        .unwrap_or(serde_json::Value::String(args));
729                    ToolCall {
730                        id,
731                        name,
732                        args: args_v,
733                    }
734                })
735            })
736            .collect();
737        // Reconcile stop_reason with what actually came out — adapters
738        // sometimes emit `Stop(EndTurn)` even after tool_calls, which would
739        // confuse downstream consumers that branch on stop_reason alone.
740        let stop_reason = if !tool_calls.is_empty() {
741            StopReason::ToolUse
742        } else {
743            stop_reason
744        };
745        Ok(ModelOutput {
746            text: if text.is_empty() { None } else { Some(text) },
747            tool_calls,
748            usage,
749            stop_reason,
750            reasoning: if reasoning_lines.is_empty() {
751                None
752            } else {
753                Some(reasoning_lines.join("\n"))
754            },
755        })
756    }
757
758    /// Best-effort append to the recall store. Never fails the turn.
759    async fn recall_append(&self, owner: &str, session: &str, msg: harness_core::RecallMessage) {
760        if let Some(store) = &self.recall
761            && let Err(e) = store.append(owner, session, &msg).await
762        {
763            tracing::warn!(error = %e, "recall append failed");
764        }
765    }
766
767    /// Best-effort post-session review. Never affects the finished run.
768    async fn run_learning_review(&self, ctx: &Context, world: &mut World, tools_called: u32) {
769        let Some(cfg) = &self.learning else { return };
770        if tools_called < cfg.nudge_interval {
771            return;
772        }
773        let transcript = crate::render_transcript(&ctx.history, 12_000);
774        let task = harness_core::Task {
775            description: format!(
776                "{}\n\n## Conversation transcript\n{}",
777                cfg.review_prompt, transcript
778            ),
779            source: None,
780            deadline: None,
781        };
782        let mut spec =
783            crate::SubagentSpec::new("learning-review", task).with_max_iters(cfg.max_iters);
784        for t in &cfg.tools {
785            spec = spec.with_tool(t.clone());
786        }
787        let sub = crate::Subagent::new(harness_core::DynModel(cfg.review_model.clone()), spec);
788        // Box::pin breaks the recursive async-future cycle: AgentLoop<M> →
789        // run_learning_review → Subagent<DynModel>::run →
790        // AgentLoop<Arc<dyn Model>>::run_built_context. Without pinning the
791        // compiler rejects the infinite-sized future.
792        if let Err(e) = Box::pin(sub.run(world)).await {
793            tracing::warn!(error = %e, "learning review failed");
794        }
795    }
796
797    /// One final model call with tools removed, asking it to write the
798    /// best-effort conclusion from whatever it has already gathered.
799    ///
800    /// Errors from the model are swallowed — observability is best-effort
801    /// here, and a transport blip during synthesis should not turn a
802    /// near-complete run into a hard failure.
803    async fn force_final_synthesis(
804        &self,
805        ctx: &mut Context,
806        world: &mut World,
807        total_usage: &mut harness_core::Usage,
808    ) -> Option<String> {
809        const SYNTHESIS_PROMPT: &str = "[system: iteration budget exhausted] \
810            You have run out of tool-calling iterations. Write your final answer \
811            NOW using only the tool results already in this conversation. Do not \
812            request more tools. Mark facts you could not verify as UNKNOWN. \
813            Include source URLs for every claim that is not UNKNOWN.";
814
815        // Signal to any observer (LiveProgressHook, SessionRecorder, custom
816        // hooks) that we've used 100% of the budget and are about to force
817        // synthesis. Pre-existing `BudgetWarning` event was unused; this is
818        // its natural home.
819        self.hooks.fire(&Event::BudgetWarning { ratio: 1.0 }, world);
820
821        // Snapshot + clear tool schemas so the model has no choice but text.
822        let saved_tools = std::mem::take(&mut ctx.tools);
823        ctx.history.push(Turn {
824            role: TurnRole::User,
825            blocks: vec![Block::Text(SYNTHESIS_PROMPT.into())],
826        });
827
828        self.hooks.fire(&Event::PreModel { ctx }, world);
829        let result = self.model.complete(ctx).await;
830        ctx.tools = saved_tools;
831
832        match result {
833            Ok(out) => {
834                self.hooks.fire(&Event::PostModel { out: &out }, world);
835                total_usage.input_tokens += out.usage.input_tokens;
836                total_usage.output_tokens += out.usage.output_tokens;
837                total_usage.cached_input_tokens += out.usage.cached_input_tokens;
838                ctx.push_model_output(&out);
839                out.text
840            }
841            Err(_) => None,
842        }
843    }
844}
845
846/// Audit #7: default safelist for `FixPatch::RunCommand`.
847///
848/// Sensors emitting `RunCommand` patches would otherwise be a silent
849/// arbitrary-code-execution channel. We restrict the *program* by name to a
850/// short list of well-known, side-effect-bounded formatters/fixers. Anything
851/// else returns false and the patch is rejected (write your own `PreAutoFix`
852/// hook returning `HookOutcome::Allow` to widen the policy).
853///
854/// `ReplaceFile` and `UnifiedDiff` are not restricted here — they only touch
855/// files inside the workspace and are covered by the symlink-safe path
856/// resolution in `harness-tools-fs`.
857pub fn is_default_safe_fix(patch: &harness_core::FixPatch) -> bool {
858    use harness_core::FixPatch;
859    match patch {
860        FixPatch::ReplaceFile { .. } | FixPatch::UnifiedDiff { .. } => true,
861        FixPatch::RunCommand { program, args, .. } => match program.as_str() {
862            // Cargo subcommands proven side-effect-bounded.
863            "cargo" => matches!(
864                args.first().map(String::as_str),
865                Some("fmt" | "clippy" | "fix"),
866            ),
867            "rustfmt" | "gofmt" | "prettier" | "ruff" | "black" => true,
868            _ => false,
869        },
870        // Future FixPatch variants: deny by default — review and add to the list above.
871        _ => false,
872    }
873}
874
875/// Monotonic counter for `.harness-patch-*.diff` temp filenames — millisecond
876/// resolution alone collides under parallel agent runs.
877static PATCH_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
878
879/// Monotonic counter for fallback recall session ids (no `uuid` dep).
880static RECALL_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
881
882/// Apply auto-fix patches; return short descriptions of those that succeeded.
883///
884/// Made `pub` (was `pub(crate)`) so integration tests can call it directly.
885pub async fn apply_patches(patches: &[harness_core::FixPatch], world: &mut World) -> Vec<String> {
886    use harness_core::FixPatch;
887    let mut applied = Vec::new();
888    for p in patches {
889        match p {
890            FixPatch::ReplaceFile { path, content } => {
891                let abs = world.repo.root.join(path);
892                if let Some(parent) = abs.parent() {
893                    let _ = tokio::fs::create_dir_all(parent).await;
894                }
895                if tokio::fs::write(&abs, content).await.is_ok() {
896                    applied.push(format!("replaced {}", path.display()));
897                }
898            }
899            FixPatch::UnifiedDiff { diff } => {
900                if try_apply_diff(world, diff).await {
901                    applied.push("unified diff applied".into());
902                }
903            }
904            FixPatch::RunCommand { program, args, cwd } => {
905                let cwd_ref = cwd.as_deref().unwrap_or(world.repo.root.as_path());
906                let args_ref: Vec<&str> = args.iter().map(String::as_str).collect();
907                if let Ok(out) = world.runner.exec(program, &args_ref, Some(cwd_ref)).await
908                    && out.status == 0
909                {
910                    applied.push(format!("ran `{program} {}`", args.join(" ")));
911                }
912            }
913            // FixPatch is `#[non_exhaustive]`; unknown variants are skipped.
914            _ => tracing::warn!("apply_patches: unknown FixPatch variant — skipped"),
915        }
916    }
917    applied
918}
919
920/// Write `diff` to a unique temp file and try `patch -p1` first, then `-p0`.
921/// Returns whether either succeeded. The `-p1`-then-`-p0` order matches the
922/// reality that most agent-emitted diffs are git-style (need `-p1`) but some
923/// hand-rolled diffs use repo-relative paths (need `-p0`).
924async fn try_apply_diff(world: &mut World, diff: &str) -> bool {
925    use std::sync::atomic::Ordering;
926    use tokio::io::AsyncWriteExt;
927
928    let seq = PATCH_SEQ.fetch_add(1, Ordering::SeqCst);
929    let pid = std::process::id();
930    let now = world.clock.now_ms();
931    let tmp = world
932        .repo
933        .root
934        .join(format!(".harness-patch-{pid}-{now}-{seq}.diff"));
935
936    let mut f = match tokio::fs::File::create(&tmp).await {
937        Ok(f) => f,
938        Err(e) => {
939            tracing::warn!(error=%e, path=%tmp.display(), "could not create patch tempfile");
940            return false;
941        }
942    };
943    if let Err(e) = f.write_all(diff.as_bytes()).await {
944        tracing::warn!(error=%e, "could not write patch tempfile");
945        let _ = tokio::fs::remove_file(&tmp).await;
946        return false;
947    }
948    drop(f);
949
950    let tmp_str = tmp.to_string_lossy().to_string();
951    let mut applied = false;
952    for strip in ["-p1", "-p0"] {
953        match world
954            .runner
955            .exec(
956                "patch",
957                &[strip, "--silent", "-i", tmp_str.as_str()],
958                Some(world.repo.root.as_path()),
959            )
960            .await
961        {
962            Ok(out) if out.status == 0 => {
963                tracing::info!(strip, "patch applied");
964                applied = true;
965                break;
966            }
967            Ok(out) => {
968                tracing::debug!(strip, stderr=%out.stderr, "patch failed; trying next strip level");
969            }
970            Err(e) => {
971                tracing::warn!(error=%e, "patch command not available");
972                break; // patch tool missing — no point trying other strip
973            }
974        }
975    }
976    let _ = tokio::fs::remove_file(&tmp).await;
977    applied
978}