Skip to main content

harness_loop/
lib.rs

1//! ReAct agent loop with self-correction.
2//!
3//! Minimal v0.0.1 implementation:
4//! - Applies guides once at the start.
5//! - Sends `Context` (with `tools`) to the model.
6//! - Dispatches each returned tool call via [`ToolRegistry`].
7//! - Runs `Sensor::SelfCorrect` sensors after each action; auto-fix patches are
8//!   applied directly to the world, blocking signals are fed back to the model.
9//! - Stops when the model returns no tool calls, or when `policy.max_iters` is hit.
10
11pub mod learning;
12pub mod memory_layer;
13pub mod profile_guide;
14pub mod recall_layer;
15pub mod registry;
16pub mod replay;
17pub mod subagent;
18
19pub use learning::*;
20pub use memory_layer::*;
21pub use profile_guide::*;
22pub use recall_layer::*;
23pub use registry::*;
24pub use replay::*;
25pub use subagent::*;
26
27use harness_compactor::DefaultCompactor;
28use harness_core::{
29    Action, Block, Compactor, Context, Event, Guide, HarnessError, HookOutcome, Model, ModelDelta,
30    ModelOutput, ResponseFormat, Sensor, SessionSource, SignalSet, Stage, StopReason, Task,
31    ToolCall, ToolResult, Turn, TurnRole, Usage, World,
32};
33use harness_hooks::HookBus;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37/// Where a run finished. Marked `#[non_exhaustive]` so future fields don't break
38/// downstream matches — always include `..` when destructuring.
39#[derive(Debug, Clone)]
40pub enum Outcome {
41    /// Model returned text with no tool calls (natural end).
42    #[non_exhaustive]
43    Done {
44        text: Option<String>,
45        iters: u32,
46        tools_called: u32,
47        usage: harness_core::Usage,
48    },
49    /// Policy budget exhausted before the model stopped requesting tools.
50    /// Carries everything we know so the caller can recover partial work
51    /// (saved notes, files written by tools, the last assistant text, etc.)
52    /// instead of seeing a single bare "budget out" string.
53    #[non_exhaustive]
54    BudgetExhausted {
55        iters: u32,
56        last_text: Option<String>,
57        tools_called: u32,
58        usage: harness_core::Usage,
59    },
60}
61
62/// The agent loop.
63pub struct AgentLoop<M: Model> {
64    pub model: M,
65    pub tools: ToolRegistry,
66    pub guides: Vec<Arc<dyn Guide>>,
67    pub sensors: Vec<Arc<dyn Sensor>>,
68    pub hooks: HookBus,
69    pub compactor: Arc<dyn Compactor>,
70    /// Default response format applied to every run unless overridden by
71    /// `run_typed`. See [`ResponseFormat`].
72    pub response_format: ResponseFormat,
73    /// When `true`, the loop drives each model turn via `Model::stream()`
74    /// instead of `complete()`, firing `Event::ModelTokenDelta` for each
75    /// text fragment. Tool-call deltas are still assembled inside the loop;
76    /// only the terminal `ModelOutput` shape is observable downstream.
77    pub streaming: bool,
78    /// Optional cross-session recall store. When set, the loop captures every
79    /// turn and the `session_search` tool is registered. See `with_recall`.
80    pub recall: Option<Arc<dyn harness_core::RecallStore>>,
81    /// When true (and `recall` is set), a `RecallGuide` auto-injects top-k
82    /// past context at session start.
83    pub recall_auto_inject: bool,
84    pub learning: Option<LearningConfig>,
85}
86
87impl<M: Model> AgentLoop<M> {
88    pub fn new(model: M) -> Self {
89        Self {
90            model,
91            tools: ToolRegistry::new(),
92            guides: Vec::new(),
93            sensors: Vec::new(),
94            hooks: HookBus::new(),
95            compactor: Arc::new(DefaultCompactor::new()),
96            response_format: ResponseFormat::Free,
97            streaming: false,
98            recall: None,
99            recall_auto_inject: false,
100            learning: None,
101        }
102    }
103
104    /// Opt in to streaming the model's terminal turn token-by-token via
105    /// `Model::stream()`. Hooks subscribed to `Event::ModelTokenDelta` see
106    /// each fragment as it arrives; the rest of the loop is unchanged.
107    pub fn with_streaming(mut self, enable: bool) -> Self {
108        self.streaming = enable;
109        self
110    }
111
112    pub fn with_compactor(mut self, c: Arc<dyn Compactor>) -> Self {
113        self.compactor = c;
114        self
115    }
116
117    pub fn with_tool(mut self, t: Arc<dyn harness_core::Tool>) -> Self {
118        self.tools.insert(t);
119        self
120    }
121
122    pub fn with_guide(mut self, g: Arc<dyn Guide>) -> Self {
123        self.guides.push(g);
124        self
125    }
126
127    pub fn with_sensor(mut self, s: Arc<dyn Sensor>) -> Self {
128        self.sensors.push(s);
129        self
130    }
131
132    pub fn with_hook(mut self, h: Arc<dyn harness_core::Hook>) -> Self {
133        self.hooks.register(h);
134        self
135    }
136
137    /// Pull in every `#[hook]`-registered hook.
138    pub fn with_macro_hooks(mut self) -> Self {
139        self.hooks = self.hooks.with_macro_hooks_take();
140        self
141    }
142
143    /// Enable cross-session recall: capture every turn into `store` and
144    /// register the `session_search` tool. Owner + session id are read from
145    /// `world.profile.extra["recall_owner"|"recall_session"]` at run time.
146    pub fn with_recall(mut self, store: Arc<dyn harness_core::RecallStore>) -> Self {
147        self.tools
148            .insert(Arc::new(crate::SessionSearchTool::new(store.clone())));
149        self.recall = Some(store);
150        self
151    }
152
153    /// After `with_recall`, also auto-inject top-k relevant past context at
154    /// session start (off by default — tool-only is prompt-cache friendly).
155    pub fn auto_inject(mut self) -> Self {
156        self.recall_auto_inject = true;
157        self
158    }
159
160    /// Enable the self-evolving learning loop: after a session that made
161    /// `>= cfg.nudge_interval` tool calls, fork a review subagent (white-listed to
162    /// `cfg.tools`) to update skills + memory from the transcript. Best-effort.
163    pub fn with_learning_loop(mut self, cfg: LearningConfig) -> Self {
164        self.learning = Some(cfg);
165        self
166    }
167
168    /// Set the default response format for all runs through this loop. See
169    /// [`ResponseFormat`]. For typed deserialisation, prefer `run_typed::<T>()`.
170    pub fn with_response_format(mut self, fmt: ResponseFormat) -> Self {
171        self.response_format = fmt;
172        self
173    }
174
175    /// Shortcut for `with_response_format(ResponseFormat::JsonSchema { name, schema })`.
176    /// Accepts a raw `serde_json::Value` so callers can hand-roll the schema or
177    /// pull it from `schemars::schema_for!(T)`.
178    pub fn with_response_schema(self, name: impl Into<String>, schema: serde_json::Value) -> Self {
179        self.with_response_format(ResponseFormat::JsonSchema {
180            name: name.into(),
181            schema,
182        })
183    }
184
185    pub async fn run(&self, task: Task, world: &mut World) -> Result<Outcome, HarnessError> {
186        let max = harness_core::Policy::default().max_iters;
187        self.run_with_max_iters(task, world, max).await
188    }
189
190    pub async fn run_with_max_iters(
191        &self,
192        task: Task,
193        world: &mut World,
194        max_iters: u32,
195    ) -> Result<Outcome, HarnessError> {
196        self.run_with_seed_history(task, Vec::new(), world, max_iters)
197            .await
198    }
199
200    /// Run the agent and deserialise the terminal reply into `T`.
201    ///
202    /// The schema for `T` is derived via `schemars::schema_for!(T)` and
203    /// installed as `ResponseFormat::JsonSchema` for this run only — any
204    /// pre-existing `self.response_format` is ignored. On success the
205    /// returned `T` is parsed from `Outcome::Done.text` (or, on budget
206    /// exhaustion, from `Outcome::BudgetExhausted.last_text`).
207    ///
208    /// Errors:
209    /// - `HarnessError::Other` if the model returns no text at all
210    /// - `HarnessError::Other` if `serde_json::from_str::<T>(text)` fails —
211    ///   the original text is included in the message for debugging.
212    pub async fn run_typed<T>(&self, task: Task, world: &mut World) -> Result<T, HarnessError>
213    where
214        T: serde::de::DeserializeOwned + schemars::JsonSchema + 'static,
215    {
216        let max = harness_core::Policy::default().max_iters;
217        self.run_typed_with_max_iters::<T>(task, world, max).await
218    }
219
220    /// Like `run_typed` but with explicit `max_iters`.
221    pub async fn run_typed_with_max_iters<T>(
222        &self,
223        task: Task,
224        world: &mut World,
225        max_iters: u32,
226    ) -> Result<T, HarnessError>
227    where
228        T: serde::de::DeserializeOwned + schemars::JsonSchema + 'static,
229    {
230        let schema_root = schemars::schema_for!(T);
231        let schema = serde_json::to_value(&schema_root)
232            .map_err(|e| HarnessError::Other(format!("response schema: {e}")))?;
233        let name = std::any::type_name::<T>()
234            .rsplit("::")
235            .next()
236            .unwrap_or("response")
237            .to_string();
238        let fmt = ResponseFormat::JsonSchema { name, schema };
239        let outcome = self
240            .run_with_response_format(task, world, max_iters, fmt)
241            .await?;
242        let text = match outcome {
243            Outcome::Done { text: Some(t), .. }
244            | Outcome::BudgetExhausted {
245                last_text: Some(t), ..
246            } => t,
247            Outcome::Done { text: None, .. } => {
248                return Err(HarnessError::Other(
249                    "run_typed: model returned no text".into(),
250                ));
251            }
252            Outcome::BudgetExhausted {
253                last_text: None, ..
254            } => {
255                return Err(HarnessError::Other(
256                    "run_typed: budget exhausted with no text".into(),
257                ));
258            }
259        };
260        serde_json::from_str::<T>(&text).map_err(|e| {
261            HarnessError::Other(format!(
262                "run_typed: decode {} failed: {e} — raw text was: {text}",
263                std::any::type_name::<T>()
264            ))
265        })
266    }
267
268    /// Run with a one-off `ResponseFormat` override (doesn't touch `self`).
269    pub async fn run_with_response_format(
270        &self,
271        task: Task,
272        world: &mut World,
273        max_iters: u32,
274        fmt: ResponseFormat,
275    ) -> Result<Outcome, HarnessError> {
276        // Borrow checker won't let us swap `self.response_format` because
277        // `self` is `&`. Easiest workaround: hand-roll the same setup that
278        // `run_with_seed_history` does, but with our `fmt`. We do this by
279        // calling through a private helper.
280        self.run_with_seed_history_and_format(task, Vec::new(), world, max_iters, Some(fmt))
281            .await
282    }
283
284    async fn run_with_seed_history_and_format(
285        &self,
286        task: Task,
287        seed: Vec<Turn>,
288        world: &mut World,
289        max_iters: u32,
290        fmt_override: Option<ResponseFormat>,
291    ) -> Result<Outcome, HarnessError> {
292        let mut ctx = Context::new(task);
293        ctx.policy.max_iters = max_iters;
294        ctx.tools = self.tools.schemas();
295        ctx.history = seed;
296        ctx.response_format = fmt_override.unwrap_or_else(|| self.response_format.clone());
297        self.run_built_context(ctx, world).await
298    }
299
300    /// Like `run_with_max_iters` but seeds `ctx.history` with `seed` **before**
301    /// the current user task is appended. Use this for multi-turn REPLs so
302    /// prior conversation lives in `ctx.history` (where the Compactor can see
303    /// it) instead of being concatenated into `task.description` (where it
304    /// previously bypassed compaction entirely — see audit #2).
305    pub async fn run_with_seed_history(
306        &self,
307        task: Task,
308        seed: Vec<Turn>,
309        world: &mut World,
310        max_iters: u32,
311    ) -> Result<Outcome, HarnessError> {
312        let mut ctx = Context::new(task);
313        ctx.policy.max_iters = max_iters;
314        ctx.tools = self.tools.schemas();
315        ctx.history = seed;
316        ctx.response_format = self.response_format.clone();
317        self.run_built_context(ctx, world).await
318    }
319
320    /// Inner ReAct loop on an already-prepared `Context`. Use the public
321    /// `run*` methods unless you need to inject a non-standard `Context`
322    /// (e.g. `run_with_response_format` does to apply a one-off
323    /// `ResponseFormat` without mutating `self`).
324    async fn run_built_context(
325        &self,
326        mut ctx: Context,
327        world: &mut World,
328    ) -> Result<Outcome, HarnessError> {
329        self.hooks.fire(
330            &Event::SessionStart {
331                source: SessionSource::Startup,
332            },
333            world,
334        );
335
336        // ── recall: resolve owner/session, ensure the session row ──
337        let (recall_owner, recall_session) = if self.recall.is_some() {
338            use std::sync::atomic::Ordering;
339            let owner = crate::recall_owner(world);
340            let session = world
341                .profile
342                .extra
343                .get("recall_session")
344                .and_then(|v| v.as_str())
345                .map(|s| s.to_string())
346                .unwrap_or_else(|| {
347                    format!(
348                        "sess-{}-{}",
349                        world.clock.now_ms(),
350                        RECALL_SEQ.fetch_add(1, Ordering::SeqCst)
351                    )
352                });
353            if let Some(store) = &self.recall {
354                let meta = harness_core::SessionMeta::new(&session, world.clock.now_ms());
355                if let Err(e) = store.ensure_session(&owner, &session, &meta).await {
356                    tracing::warn!(error = %e, "recall ensure_session failed");
357                }
358            }
359            (owner, session)
360        } else {
361            (String::new(), String::new())
362        };
363
364        let recall_guide: Option<Arc<dyn Guide>> = if self.recall_auto_inject {
365            if self.recall.is_none() {
366                tracing::warn!(
367                    "auto_inject() set but no recall store — call with_recall(store) first; skipping recall guide"
368                );
369                None
370            } else {
371                self.recall
372                    .clone()
373                    .map(|s| Arc::new(crate::RecallGuide::new(s)) as Arc<dyn Guide>)
374            }
375        } else {
376            None
377        };
378        let all_guides: Vec<&Arc<dyn Guide>> =
379            self.guides.iter().chain(recall_guide.iter()).collect();
380        for g in &all_guides {
381            if g.scope().matches(&ctx.task) {
382                self.hooks.fire(&Event::PreGuide { guide: g.id() }, world);
383                g.apply(&mut ctx, world).await?;
384                self.hooks.fire(&Event::PostGuide { guide: g.id() }, world);
385            }
386        }
387
388        ctx.history.push(Turn {
389            role: TurnRole::User,
390            blocks: vec![Block::Text(ctx.task.description.clone())],
391        });
392
393        if self.recall.is_some() {
394            self.recall_append(
395                &recall_owner,
396                &recall_session,
397                harness_core::RecallMessage::new(
398                    "user",
399                    ctx.task.description.clone(),
400                    world.clock.now_ms(),
401                ),
402            )
403            .await;
404        }
405
406        // Running totals — surface to caller even on BudgetExhausted.
407        let mut tools_called: u32 = 0;
408        let mut total_usage = harness_core::Usage::default();
409        let mut last_text: Option<String> = None;
410
411        for iter in 0..ctx.policy.max_iters {
412            self.hooks.fire(&Event::Heartbeat { iter }, world);
413
414            // Compaction: run every stage required by current budget.
415            let stages = self.compactor.budget(&ctx).required_stages();
416            for stage in stages {
417                self.hooks.fire(&Event::PreCompact { stage }, world);
418                self.compactor.compact(stage, &mut ctx).await?;
419                self.hooks.fire(&Event::PostCompact { stage }, world);
420            }
421
422            // Per-iteration guides — recall-style adapters that want to
423            // refresh their injected context every turn (e.g. MemoryGuide
424            // re-recalling against the latest user message). Default
425            // `apply_before_iter` is a no-op, so this loop is cheap for
426            // guides that don't override it.
427            for g in &all_guides {
428                if g.scope().matches(&ctx.task)
429                    && let Err(e) = g.apply_before_iter(&mut ctx, world).await
430                {
431                    tracing::warn!(guide = %g.id(), error = %e, "apply_before_iter failed; continuing");
432                }
433            }
434
435            self.hooks.fire(&Event::PreModel { ctx: &ctx }, world);
436            let out = if self.streaming {
437                self.complete_via_stream(&ctx, world).await?
438            } else {
439                self.model.complete(&ctx).await?
440            };
441            self.hooks.fire(&Event::PostModel { out: &out }, world);
442            // Accumulate usage even if the run later exhausts budget.
443            total_usage.input_tokens += out.usage.input_tokens;
444            total_usage.output_tokens += out.usage.output_tokens;
445            total_usage.cached_input_tokens += out.usage.cached_input_tokens;
446            if let Some(t) = &out.text {
447                last_text = Some(t.clone());
448            }
449            ctx.push_model_output(&out);
450
451            if self.recall.is_some() {
452                let calls = if out.tool_calls.is_empty() {
453                    None
454                } else {
455                    serde_json::to_string(&out.tool_calls).ok()
456                };
457                let mut m = harness_core::RecallMessage::new(
458                    "assistant",
459                    out.text.clone().unwrap_or_default(),
460                    world.clock.now_ms(),
461                );
462                m.tool_calls = calls;
463                self.recall_append(&recall_owner, &recall_session, m).await;
464            }
465
466            if out.tool_calls.is_empty() {
467                self.hooks.fire(&Event::TaskCompleted, world);
468                self.hooks.fire(&Event::SessionEnd, world);
469                self.run_learning_review(&ctx, world, tools_called).await;
470                // Thinking models (e.g. Qwen3 via Ollama) sometimes emit the
471                // whole answer into the reasoning channel and leave `text`
472                // empty. Fall back to the reasoning so the turn isn't blank.
473                let text = out
474                    .text
475                    .filter(|t| !t.trim().is_empty())
476                    .or_else(|| out.reasoning.filter(|r| !r.trim().is_empty()));
477                return Ok(Outcome::Done {
478                    text,
479                    iters: iter + 1,
480                    tools_called,
481                    usage: total_usage,
482                });
483            }
484
485            for call in &out.tool_calls {
486                let action = Action {
487                    tool: call.name.clone(),
488                    call_id: call.id.clone(),
489                    args: call.args.clone(),
490                };
491
492                // PreToolUse hook can deny destructive actions
493                if let HookOutcome::Deny { reason } = self
494                    .hooks
495                    .fire(&Event::PreToolUse { action: &action }, world)
496                {
497                    ctx.history.push(Turn {
498                        role: TurnRole::Tool,
499                        blocks: vec![Block::ToolResult {
500                            call_id: action.call_id.clone(),
501                            content: serde_json::json!({
502                                "ok": false,
503                                "denied_by_hook": reason,
504                            }),
505                        }],
506                    });
507                    if self.recall.is_some() {
508                        self.recall_append(
509                            &recall_owner,
510                            &recall_session,
511                            harness_core::RecallMessage::new(
512                                "tool",
513                                format!("[denied by hook] {reason}"),
514                                world.clock.now_ms(),
515                            )
516                            .with_tool_name(action.tool.clone()),
517                        )
518                        .await;
519                    }
520                    continue;
521                }
522
523                let result = match self.tools.dispatch(&action, world).await {
524                    Ok(r) => r,
525                    Err(e) => ToolResult {
526                        ok: false,
527                        content: serde_json::json!({"error": e.to_string()}),
528                        trace: None,
529                    },
530                };
531                tools_called += 1;
532                self.hooks.fire(
533                    &Event::PostToolUse {
534                        action: &action,
535                        result: &result,
536                    },
537                    world,
538                );
539
540                ctx.history.push(Turn {
541                    role: TurnRole::Tool,
542                    blocks: vec![Block::ToolResult {
543                        call_id: action.call_id.clone(),
544                        content: result.content.clone(),
545                    }],
546                });
547
548                if self.recall.is_some() {
549                    let body = serde_json::to_string(&result.content).unwrap_or_default();
550                    self.recall_append(
551                        &recall_owner,
552                        &recall_session,
553                        harness_core::RecallMessage::new("tool", body, world.clock.now_ms())
554                            .with_tool_name(action.tool.clone()),
555                    )
556                    .await;
557                }
558
559                // run self-correct sensors
560                let mut all_signals = Vec::new();
561                for s in &self.sensors {
562                    if s.stage() != Stage::SelfCorrect {
563                        continue;
564                    }
565                    self.hooks.fire(&Event::PreSensor { sensor: s.id() }, world);
566                    let sigs = s.observe(&action, world).await.unwrap_or_else(|e| {
567                        tracing::warn!(?e, "sensor failed");
568                        Vec::new()
569                    });
570                    self.hooks.fire(
571                        &Event::PostSensor {
572                            sensor: s.id(),
573                            signals: &sigs,
574                        },
575                        world,
576                    );
577                    all_signals.extend(sigs);
578                }
579                if !all_signals.is_empty() {
580                    let bundle = SignalSet::new(all_signals);
581                    let (patches, remaining) = bundle.partition_auto_fix();
582
583                    // audit #7: each patch goes through PreAutoFix.
584                    // Hooks can Deny (skip silently). Default safelist on
585                    // RunCommand catches the obvious misuses with no hook.
586                    let approved: Vec<harness_core::FixPatch> = patches.into_iter().filter(|p| {
587                        if !is_default_safe_fix(p) {
588                            tracing::warn!(?p, "auto-fix rejected by default safelist (use PreAutoFix hook to override)");
589                            self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
590                            return false;
591                        }
592                        match self.hooks.fire(&Event::PreAutoFix { patch: p }, world) {
593                            HookOutcome::Deny { reason } => {
594                                tracing::warn!(?p, %reason, "auto-fix denied by hook");
595                                self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
596                                false
597                            }
598                            _ => true,
599                        }
600                    }).collect();
601
602                    let applied = apply_patches(&approved, world).await;
603                    // Emit PostAutoFix for each approved patch with the application result.
604                    for (i, p) in approved.iter().enumerate() {
605                        self.hooks.fire(
606                            &Event::PostAutoFix {
607                                patch: p,
608                                applied: i < applied.len(),
609                            },
610                            world,
611                        );
612                    }
613                    if !applied.is_empty() {
614                        ctx.push_feedback(vec![harness_core::Signal {
615                            severity: harness_core::Severity::Hint,
616                            origin: "auto-fix".into(),
617                            message: format!(
618                                "applied {} auto-fix patch(es): {applied:?}",
619                                applied.len()
620                            ),
621                            agent_hint: Some(
622                                "re-check the affected files before continuing".into(),
623                            ),
624                            auto_fix: None,
625                            location: None,
626                        }]);
627                    }
628                    if remaining.has_blocking() {
629                        ctx.push_feedback(remaining.signals);
630                    }
631                }
632            }
633        }
634        // ── Budget exhausted ─────────────────────────────────────────
635        // Force a final synthesis pass with tools DISABLED. Otherwise the
636        // model often spins on tool calls right up to the budget cap and
637        // never emits a text conclusion, leaving the caller with nothing
638        // but `last_text` from some earlier intermediate turn (or None).
639        //
640        // The synthesis call is "free" — it costs one extra model call
641        // beyond max_iters but doesn't count toward `iters`. The result
642        // lands in `last_text` so callers display it as the answer.
643        let synthesised = self
644            .force_final_synthesis(&mut ctx, world, &mut total_usage)
645            .await;
646        if let Some(t) = synthesised {
647            last_text = Some(t);
648        }
649
650        self.hooks.fire(&Event::SessionEnd, world);
651        self.run_learning_review(&ctx, world, tools_called).await;
652        Ok(Outcome::BudgetExhausted {
653            iters: ctx.policy.max_iters,
654            last_text,
655            tools_called,
656            usage: total_usage,
657        })
658    }
659
660    /// Drive `Model::stream()` and assemble the result into a `ModelOutput`,
661    /// firing `Event::ModelTokenDelta` for each text fragment along the way.
662    ///
663    /// Adapters that don't implement real streaming (e.g. `GeminiNative` /
664    /// `AnthropicNative` today) fall back to the default trait impl, which
665    /// runs `complete()` and emits the whole reply as a single delta. That
666    /// works — the loop sees one big `ModelDelta::Text(...)` followed by
667    /// `Stop`, fires one big `ModelTokenDelta`, and proceeds. So enabling
668    /// `streaming` is safe regardless of which provider the user picked.
669    async fn complete_via_stream(
670        &self,
671        ctx: &Context,
672        world: &mut World,
673    ) -> Result<ModelOutput, HarnessError> {
674        use futures::StreamExt;
675        let mut stream = self
676            .model
677            .stream(ctx)
678            .await
679            .map_err(harness_core::HarnessError::Model)?;
680        let mut text = String::new();
681        let mut reasoning = String::new();
682        let mut usage = Usage::default();
683        let mut stop_reason = StopReason::EndTurn;
684        // Insertion-ordered map: index → (id, name, args). We can't use the
685        // tool-call id as the primary key because the stream may emit args
686        // chunks before the first chunk that carries the id; the OpenAI-compat
687        // SSE parser already does its own buffering and surfaces `id` in
688        // ToolCallStart, but be lenient with adapters that may interleave.
689        let mut tool_starts: HashMap<String, (String, String)> = HashMap::new();
690        let mut tool_order: Vec<String> = Vec::new();
691        while let Some(item) = stream.next().await {
692            let delta = item.map_err(harness_core::HarnessError::Model)?;
693            match delta {
694                ModelDelta::Text(t) => {
695                    if !t.is_empty() {
696                        self.hooks.fire(&Event::ModelTokenDelta { text: &t }, world);
697                        text.push_str(&t);
698                    }
699                }
700                ModelDelta::ToolCallStart { id, name } => {
701                    if !tool_starts.contains_key(&id) {
702                        tool_order.push(id.clone());
703                    }
704                    tool_starts
705                        .entry(id)
706                        .or_insert_with(|| (name, String::new()));
707                }
708                ModelDelta::ToolCallArgs { id, partial_json } => {
709                    let entry = tool_starts
710                        .entry(id.clone())
711                        .or_insert_with(|| (String::new(), String::new()));
712                    if !tool_order.iter().any(|k| k == &id) {
713                        tool_order.push(id);
714                    }
715                    entry.1.push_str(&partial_json);
716                }
717                ModelDelta::ToolCallEnd { .. } => {}
718                ModelDelta::Usage(u) => usage = u,
719                ModelDelta::Stop(r) => stop_reason = r,
720                ModelDelta::Reasoning(s) => {
721                    // Streamed reasoning arrives as token fragments, not lines —
722                    // concatenate verbatim (same as `text`), don't insert newlines.
723                    reasoning.push_str(&s);
724                }
725                // ModelDelta is `#[non_exhaustive]`; ignore future variants
726                // we don't yet understand.
727                _ => {}
728            }
729        }
730        let tool_calls: Vec<ToolCall> = tool_order
731            .into_iter()
732            .filter_map(|id| {
733                tool_starts.remove(&id).map(|(name, args)| {
734                    let args_v = serde_json::from_str::<serde_json::Value>(&args)
735                        .unwrap_or(serde_json::Value::String(args));
736                    ToolCall {
737                        id,
738                        name,
739                        args: args_v,
740                    }
741                })
742            })
743            .collect();
744        // Reconcile stop_reason with what actually came out — adapters
745        // sometimes emit `Stop(EndTurn)` even after tool_calls, which would
746        // confuse downstream consumers that branch on stop_reason alone.
747        let stop_reason = if !tool_calls.is_empty() {
748            StopReason::ToolUse
749        } else {
750            stop_reason
751        };
752        Ok(ModelOutput {
753            text: if text.is_empty() { None } else { Some(text) },
754            tool_calls,
755            usage,
756            stop_reason,
757            reasoning: if reasoning.is_empty() {
758                None
759            } else {
760                Some(reasoning)
761            },
762        })
763    }
764
765    /// Best-effort append to the recall store. Never fails the turn.
766    async fn recall_append(&self, owner: &str, session: &str, msg: harness_core::RecallMessage) {
767        if let Some(store) = &self.recall
768            && let Err(e) = store.append(owner, session, &msg).await
769        {
770            tracing::warn!(error = %e, "recall append failed");
771        }
772    }
773
774    /// Best-effort post-session review. Never affects the finished run.
775    async fn run_learning_review(&self, ctx: &Context, world: &mut World, tools_called: u32) {
776        let Some(cfg) = &self.learning else { return };
777        if tools_called < cfg.nudge_interval {
778            return;
779        }
780        let transcript = crate::render_transcript(&ctx.history, 12_000);
781        let task = harness_core::Task {
782            description: format!(
783                "{}\n\n## Conversation transcript\n{}",
784                cfg.review_prompt, transcript
785            ),
786            source: None,
787            deadline: None,
788        };
789        let mut spec =
790            crate::SubagentSpec::new("learning-review", task).with_max_iters(cfg.max_iters);
791        for t in &cfg.tools {
792            spec = spec.with_tool(t.clone());
793        }
794        let sub = crate::Subagent::new(harness_core::DynModel(cfg.review_model.clone()), spec);
795        // Box::pin breaks the recursive async-future cycle: AgentLoop<M> →
796        // run_learning_review → Subagent<DynModel>::run →
797        // AgentLoop<Arc<dyn Model>>::run_built_context. Without pinning the
798        // compiler rejects the infinite-sized future.
799        if let Err(e) = Box::pin(sub.run(world)).await {
800            tracing::warn!(error = %e, "learning review failed");
801        }
802    }
803
804    /// One final model call with tools removed, asking it to write the
805    /// best-effort conclusion from whatever it has already gathered.
806    ///
807    /// Errors from the model are swallowed — observability is best-effort
808    /// here, and a transport blip during synthesis should not turn a
809    /// near-complete run into a hard failure.
810    async fn force_final_synthesis(
811        &self,
812        ctx: &mut Context,
813        world: &mut World,
814        total_usage: &mut harness_core::Usage,
815    ) -> Option<String> {
816        const SYNTHESIS_PROMPT: &str = "[system: iteration budget exhausted] \
817            You have run out of tool-calling iterations. Write your final answer \
818            NOW using only the tool results already in this conversation. Do not \
819            request more tools. Mark facts you could not verify as UNKNOWN. \
820            Include source URLs for every claim that is not UNKNOWN.";
821
822        // Signal to any observer (LiveProgressHook, SessionRecorder, custom
823        // hooks) that we've used 100% of the budget and are about to force
824        // synthesis. Pre-existing `BudgetWarning` event was unused; this is
825        // its natural home.
826        self.hooks.fire(&Event::BudgetWarning { ratio: 1.0 }, world);
827
828        // Snapshot + clear tool schemas so the model has no choice but text.
829        let saved_tools = std::mem::take(&mut ctx.tools);
830        ctx.history.push(Turn {
831            role: TurnRole::User,
832            blocks: vec![Block::Text(SYNTHESIS_PROMPT.into())],
833        });
834
835        self.hooks.fire(&Event::PreModel { ctx }, world);
836        let result = self.model.complete(ctx).await;
837        ctx.tools = saved_tools;
838
839        match result {
840            Ok(out) => {
841                self.hooks.fire(&Event::PostModel { out: &out }, world);
842                total_usage.input_tokens += out.usage.input_tokens;
843                total_usage.output_tokens += out.usage.output_tokens;
844                total_usage.cached_input_tokens += out.usage.cached_input_tokens;
845                ctx.push_model_output(&out);
846                out.text
847            }
848            Err(_) => None,
849        }
850    }
851}
852
853/// Audit #7: default safelist for `FixPatch::RunCommand`.
854///
855/// Sensors emitting `RunCommand` patches would otherwise be a silent
856/// arbitrary-code-execution channel. We restrict the *program* by name to a
857/// short list of well-known, side-effect-bounded formatters/fixers. Anything
858/// else returns false and the patch is rejected (write your own `PreAutoFix`
859/// hook returning `HookOutcome::Allow` to widen the policy).
860///
861/// `ReplaceFile` and `UnifiedDiff` are not restricted here — they only touch
862/// files inside the workspace and are covered by the symlink-safe path
863/// resolution in `harness-tools-fs`.
864pub fn is_default_safe_fix(patch: &harness_core::FixPatch) -> bool {
865    use harness_core::FixPatch;
866    match patch {
867        FixPatch::ReplaceFile { .. } | FixPatch::UnifiedDiff { .. } => true,
868        FixPatch::RunCommand { program, args, .. } => match program.as_str() {
869            // Cargo subcommands proven side-effect-bounded.
870            "cargo" => matches!(
871                args.first().map(String::as_str),
872                Some("fmt" | "clippy" | "fix"),
873            ),
874            "rustfmt" | "gofmt" | "prettier" | "ruff" | "black" => true,
875            _ => false,
876        },
877        // Future FixPatch variants: deny by default — review and add to the list above.
878        _ => false,
879    }
880}
881
882/// Monotonic counter for `.harness-patch-*.diff` temp filenames — millisecond
883/// resolution alone collides under parallel agent runs.
884static PATCH_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
885
886/// Monotonic counter for fallback recall session ids (no `uuid` dep).
887static RECALL_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
888
889/// Apply auto-fix patches; return short descriptions of those that succeeded.
890///
891/// Made `pub` (was `pub(crate)`) so integration tests can call it directly.
892pub async fn apply_patches(patches: &[harness_core::FixPatch], world: &mut World) -> Vec<String> {
893    use harness_core::FixPatch;
894    let mut applied = Vec::new();
895    for p in patches {
896        match p {
897            FixPatch::ReplaceFile { path, content } => {
898                let abs = world.repo.root.join(path);
899                if let Some(parent) = abs.parent() {
900                    let _ = tokio::fs::create_dir_all(parent).await;
901                }
902                if tokio::fs::write(&abs, content).await.is_ok() {
903                    applied.push(format!("replaced {}", path.display()));
904                }
905            }
906            FixPatch::UnifiedDiff { diff } => {
907                if try_apply_diff(world, diff).await {
908                    applied.push("unified diff applied".into());
909                }
910            }
911            FixPatch::RunCommand { program, args, cwd } => {
912                let cwd_ref = cwd.as_deref().unwrap_or(world.repo.root.as_path());
913                let args_ref: Vec<&str> = args.iter().map(String::as_str).collect();
914                if let Ok(out) = world.runner.exec(program, &args_ref, Some(cwd_ref)).await
915                    && out.status == 0
916                {
917                    applied.push(format!("ran `{program} {}`", args.join(" ")));
918                }
919            }
920            // FixPatch is `#[non_exhaustive]`; unknown variants are skipped.
921            _ => tracing::warn!("apply_patches: unknown FixPatch variant — skipped"),
922        }
923    }
924    applied
925}
926
927/// Write `diff` to a unique temp file and try `patch -p1` first, then `-p0`.
928/// Returns whether either succeeded. The `-p1`-then-`-p0` order matches the
929/// reality that most agent-emitted diffs are git-style (need `-p1`) but some
930/// hand-rolled diffs use repo-relative paths (need `-p0`).
931async fn try_apply_diff(world: &mut World, diff: &str) -> bool {
932    use std::sync::atomic::Ordering;
933    use tokio::io::AsyncWriteExt;
934
935    let seq = PATCH_SEQ.fetch_add(1, Ordering::SeqCst);
936    let pid = std::process::id();
937    let now = world.clock.now_ms();
938    let tmp = world
939        .repo
940        .root
941        .join(format!(".harness-patch-{pid}-{now}-{seq}.diff"));
942
943    let mut f = match tokio::fs::File::create(&tmp).await {
944        Ok(f) => f,
945        Err(e) => {
946            tracing::warn!(error=%e, path=%tmp.display(), "could not create patch tempfile");
947            return false;
948        }
949    };
950    if let Err(e) = f.write_all(diff.as_bytes()).await {
951        tracing::warn!(error=%e, "could not write patch tempfile");
952        let _ = tokio::fs::remove_file(&tmp).await;
953        return false;
954    }
955    drop(f);
956
957    let tmp_str = tmp.to_string_lossy().to_string();
958    let mut applied = false;
959    for strip in ["-p1", "-p0"] {
960        match world
961            .runner
962            .exec(
963                "patch",
964                &[strip, "--silent", "-i", tmp_str.as_str()],
965                Some(world.repo.root.as_path()),
966            )
967            .await
968        {
969            Ok(out) if out.status == 0 => {
970                tracing::info!(strip, "patch applied");
971                applied = true;
972                break;
973            }
974            Ok(out) => {
975                tracing::debug!(strip, stderr=%out.stderr, "patch failed; trying next strip level");
976            }
977            Err(e) => {
978                tracing::warn!(error=%e, "patch command not available");
979                break; // patch tool missing — no point trying other strip
980            }
981        }
982    }
983    let _ = tokio::fs::remove_file(&tmp).await;
984    applied
985}