Skip to main content

harness_loop/
lib.rs

1//! ReAct agent loop with self-correction.
2//!
3//! Minimal v0.0.1 implementation:
4//! - Applies guides once at the start.
5//! - Sends `Context` (with `tools`) to the model.
6//! - Dispatches each returned tool call via [`ToolRegistry`].
7//! - Runs `Sensor::SelfCorrect` sensors after each action; auto-fix patches are
8//!   applied directly to the world, blocking signals are fed back to the model.
9//! - Stops when the model returns no tool calls, or when `policy.max_iters` is hit.
10
11pub mod learning;
12pub mod memory_layer;
13pub mod profile_guide;
14pub mod recall_layer;
15pub mod registry;
16pub mod replay;
17pub mod subagent;
18
19pub use learning::*;
20pub use memory_layer::*;
21pub use profile_guide::*;
22pub use recall_layer::*;
23pub use registry::*;
24pub use replay::*;
25pub use subagent::*;
26
27use harness_compactor::DefaultCompactor;
28use harness_core::{
29    Action, Block, Compactor, Context, Event, Guide, HarnessError, HookOutcome, Model,
30    ModelDelta, ModelOutput, ResponseFormat, Sensor, SessionSource, SignalSet, Stage, StopReason,
31    Task, ToolCall, ToolResult, Turn, TurnRole, Usage, World,
32};
33use harness_hooks::HookBus;
34use std::collections::HashMap;
35use std::sync::Arc;
36
37/// Where a run finished. Marked `#[non_exhaustive]` so future fields don't break
38/// downstream matches — always include `..` when destructuring.
39#[derive(Debug, Clone)]
40pub enum Outcome {
41    /// Model returned text with no tool calls (natural end).
42    #[non_exhaustive]
43    Done {
44        text: Option<String>,
45        iters: u32,
46        tools_called: u32,
47        usage: harness_core::Usage,
48    },
49    /// Policy budget exhausted before the model stopped requesting tools.
50    /// Carries everything we know so the caller can recover partial work
51    /// (saved notes, files written by tools, the last assistant text, etc.)
52    /// instead of seeing a single bare "budget out" string.
53    #[non_exhaustive]
54    BudgetExhausted {
55        iters: u32,
56        last_text: Option<String>,
57        tools_called: u32,
58        usage: harness_core::Usage,
59    },
60}
61
62/// The agent loop.
63pub struct AgentLoop<M: Model> {
64    pub model: M,
65    pub tools: ToolRegistry,
66    pub guides: Vec<Arc<dyn Guide>>,
67    pub sensors: Vec<Arc<dyn Sensor>>,
68    pub hooks: HookBus,
69    pub compactor: Arc<dyn Compactor>,
70    /// Default response format applied to every run unless overridden by
71    /// `run_typed`. See [`ResponseFormat`].
72    pub response_format: ResponseFormat,
73    /// When `true`, the loop drives each model turn via `Model::stream()`
74    /// instead of `complete()`, firing `Event::ModelTokenDelta` for each
75    /// text fragment. Tool-call deltas are still assembled inside the loop;
76    /// only the terminal `ModelOutput` shape is observable downstream.
77    pub streaming: bool,
78    /// Optional cross-session recall store. When set, the loop captures every
79    /// turn and the `session_search` tool is registered. See `with_recall`.
80    pub recall: Option<Arc<dyn harness_core::RecallStore>>,
81    /// When true (and `recall` is set), a `RecallGuide` auto-injects top-k
82    /// past context at session start.
83    pub recall_auto_inject: bool,
84    pub learning: Option<LearningConfig>,
85}
86
87impl<M: Model> AgentLoop<M> {
88    pub fn new(model: M) -> Self {
89        Self {
90            model,
91            tools: ToolRegistry::new(),
92            guides: Vec::new(),
93            sensors: Vec::new(),
94            hooks: HookBus::new(),
95            compactor: Arc::new(DefaultCompactor::new()),
96            response_format: ResponseFormat::Free,
97            streaming: false,
98            recall: None,
99            recall_auto_inject: false,
100            learning: None,
101        }
102    }
103
104    /// Opt in to streaming the model's terminal turn token-by-token via
105    /// `Model::stream()`. Hooks subscribed to `Event::ModelTokenDelta` see
106    /// each fragment as it arrives; the rest of the loop is unchanged.
107    pub fn with_streaming(mut self, enable: bool) -> Self {
108        self.streaming = enable;
109        self
110    }
111
112    pub fn with_compactor(mut self, c: Arc<dyn Compactor>) -> Self {
113        self.compactor = c;
114        self
115    }
116
117    pub fn with_tool(mut self, t: Arc<dyn harness_core::Tool>) -> Self {
118        self.tools.insert(t);
119        self
120    }
121
122    pub fn with_guide(mut self, g: Arc<dyn Guide>) -> Self {
123        self.guides.push(g);
124        self
125    }
126
127    pub fn with_sensor(mut self, s: Arc<dyn Sensor>) -> Self {
128        self.sensors.push(s);
129        self
130    }
131
132    pub fn with_hook(mut self, h: Arc<dyn harness_core::Hook>) -> Self {
133        self.hooks.register(h);
134        self
135    }
136
137    /// Pull in every `#[hook]`-registered hook.
138    pub fn with_macro_hooks(mut self) -> Self {
139        self.hooks = self.hooks.with_macro_hooks_take();
140        self
141    }
142
143    /// Enable cross-session recall: capture every turn into `store` and
144    /// register the `session_search` tool. Owner + session id are read from
145    /// `world.profile.extra["recall_owner"|"recall_session"]` at run time.
146    pub fn with_recall(mut self, store: Arc<dyn harness_core::RecallStore>) -> Self {
147        self.tools.insert(Arc::new(crate::SessionSearchTool::new(store.clone())));
148        self.recall = Some(store);
149        self
150    }
151
152    /// After `with_recall`, also auto-inject top-k relevant past context at
153    /// session start (off by default — tool-only is prompt-cache friendly).
154    pub fn auto_inject(mut self) -> Self {
155        self.recall_auto_inject = true;
156        self
157    }
158
159    /// Enable the self-evolving learning loop: after a session that made
160    /// `>= cfg.nudge_interval` tool calls, fork a review subagent (white-listed to
161    /// `cfg.tools`) to update skills + memory from the transcript. Best-effort.
162    pub fn with_learning_loop(mut self, cfg: LearningConfig) -> Self {
163        self.learning = Some(cfg);
164        self
165    }
166
167    /// Set the default response format for all runs through this loop. See
168    /// [`ResponseFormat`]. For typed deserialisation, prefer `run_typed::<T>()`.
169    pub fn with_response_format(mut self, fmt: ResponseFormat) -> Self {
170        self.response_format = fmt;
171        self
172    }
173
174    /// Shortcut for `with_response_format(ResponseFormat::JsonSchema { name, schema })`.
175    /// Accepts a raw `serde_json::Value` so callers can hand-roll the schema or
176    /// pull it from `schemars::schema_for!(T)`.
177    pub fn with_response_schema(
178        self,
179        name: impl Into<String>,
180        schema: serde_json::Value,
181    ) -> Self {
182        self.with_response_format(ResponseFormat::JsonSchema {
183            name: name.into(),
184            schema,
185        })
186    }
187
188    pub async fn run(&self, task: Task, world: &mut World) -> Result<Outcome, HarnessError> {
189        let max = harness_core::Policy::default().max_iters;
190        self.run_with_max_iters(task, world, max).await
191    }
192
193    pub async fn run_with_max_iters(
194        &self,
195        task: Task,
196        world: &mut World,
197        max_iters: u32,
198    ) -> Result<Outcome, HarnessError> {
199        self.run_with_seed_history(task, Vec::new(), world, max_iters)
200            .await
201    }
202
203    /// Run the agent and deserialise the terminal reply into `T`.
204    ///
205    /// The schema for `T` is derived via `schemars::schema_for!(T)` and
206    /// installed as `ResponseFormat::JsonSchema` for this run only — any
207    /// pre-existing `self.response_format` is ignored. On success the
208    /// returned `T` is parsed from `Outcome::Done.text` (or, on budget
209    /// exhaustion, from `Outcome::BudgetExhausted.last_text`).
210    ///
211    /// Errors:
212    /// - `HarnessError::Other` if the model returns no text at all
213    /// - `HarnessError::Other` if `serde_json::from_str::<T>(text)` fails —
214    ///   the original text is included in the message for debugging.
215    pub async fn run_typed<T>(&self, task: Task, world: &mut World) -> Result<T, HarnessError>
216    where
217        T: serde::de::DeserializeOwned + schemars::JsonSchema + 'static,
218    {
219        let max = harness_core::Policy::default().max_iters;
220        self.run_typed_with_max_iters::<T>(task, world, max).await
221    }
222
223    /// Like `run_typed` but with explicit `max_iters`.
224    pub async fn run_typed_with_max_iters<T>(
225        &self,
226        task: Task,
227        world: &mut World,
228        max_iters: u32,
229    ) -> Result<T, HarnessError>
230    where
231        T: serde::de::DeserializeOwned + schemars::JsonSchema + 'static,
232    {
233        let schema_root = schemars::schema_for!(T);
234        let schema = serde_json::to_value(&schema_root)
235            .map_err(|e| HarnessError::Other(format!("response schema: {e}")))?;
236        let name = std::any::type_name::<T>()
237            .rsplit("::")
238            .next()
239            .unwrap_or("response")
240            .to_string();
241        let fmt = ResponseFormat::JsonSchema { name, schema };
242        let outcome = self
243            .run_with_response_format(task, world, max_iters, fmt)
244            .await?;
245        let text = match outcome {
246            Outcome::Done {
247                text: Some(t),
248                ..
249            }
250            | Outcome::BudgetExhausted {
251                last_text: Some(t),
252                ..
253            } => t,
254            Outcome::Done { text: None, .. } => {
255                return Err(HarnessError::Other(
256                    "run_typed: model returned no text".into(),
257                ));
258            }
259            Outcome::BudgetExhausted {
260                last_text: None, ..
261            } => {
262                return Err(HarnessError::Other(
263                    "run_typed: budget exhausted with no text".into(),
264                ));
265            }
266        };
267        serde_json::from_str::<T>(&text).map_err(|e| {
268            HarnessError::Other(format!(
269                "run_typed: decode {} failed: {e} — raw text was: {text}",
270                std::any::type_name::<T>()
271            ))
272        })
273    }
274
275    /// Run with a one-off `ResponseFormat` override (doesn't touch `self`).
276    pub async fn run_with_response_format(
277        &self,
278        task: Task,
279        world: &mut World,
280        max_iters: u32,
281        fmt: ResponseFormat,
282    ) -> Result<Outcome, HarnessError> {
283        // Borrow checker won't let us swap `self.response_format` because
284        // `self` is `&`. Easiest workaround: hand-roll the same setup that
285        // `run_with_seed_history` does, but with our `fmt`. We do this by
286        // calling through a private helper.
287        self.run_with_seed_history_and_format(task, Vec::new(), world, max_iters, Some(fmt))
288            .await
289    }
290
291    async fn run_with_seed_history_and_format(
292        &self,
293        task: Task,
294        seed: Vec<Turn>,
295        world: &mut World,
296        max_iters: u32,
297        fmt_override: Option<ResponseFormat>,
298    ) -> Result<Outcome, HarnessError> {
299        let mut ctx = Context::new(task);
300        ctx.policy.max_iters = max_iters;
301        ctx.tools = self.tools.schemas();
302        ctx.history = seed;
303        ctx.response_format = fmt_override.unwrap_or_else(|| self.response_format.clone());
304        self.run_built_context(ctx, world).await
305    }
306
307    /// Like `run_with_max_iters` but seeds `ctx.history` with `seed` **before**
308    /// the current user task is appended. Use this for multi-turn REPLs so
309    /// prior conversation lives in `ctx.history` (where the Compactor can see
310    /// it) instead of being concatenated into `task.description` (where it
311    /// previously bypassed compaction entirely — see audit #2).
312    pub async fn run_with_seed_history(
313        &self,
314        task: Task,
315        seed: Vec<Turn>,
316        world: &mut World,
317        max_iters: u32,
318    ) -> Result<Outcome, HarnessError> {
319        let mut ctx = Context::new(task);
320        ctx.policy.max_iters = max_iters;
321        ctx.tools = self.tools.schemas();
322        ctx.history = seed;
323        ctx.response_format = self.response_format.clone();
324        self.run_built_context(ctx, world).await
325    }
326
327    /// Inner ReAct loop on an already-prepared `Context`. Use the public
328    /// `run*` methods unless you need to inject a non-standard `Context`
329    /// (e.g. `run_with_response_format` does to apply a one-off
330    /// `ResponseFormat` without mutating `self`).
331    async fn run_built_context(
332        &self,
333        mut ctx: Context,
334        world: &mut World,
335    ) -> Result<Outcome, HarnessError> {
336        self.hooks.fire(
337            &Event::SessionStart {
338                source: SessionSource::Startup,
339            },
340            world,
341        );
342
343        // ── recall: resolve owner/session, ensure the session row ──
344        let (recall_owner, recall_session) = if self.recall.is_some() {
345            use std::sync::atomic::Ordering;
346            let owner = crate::recall_owner(world);
347            let session = world
348                .profile
349                .extra
350                .get("recall_session")
351                .and_then(|v| v.as_str())
352                .map(|s| s.to_string())
353                .unwrap_or_else(|| {
354                    format!("sess-{}-{}", world.clock.now_ms(), RECALL_SEQ.fetch_add(1, Ordering::SeqCst))
355                });
356            if let Some(store) = &self.recall {
357                let meta = harness_core::SessionMeta::new(&session, world.clock.now_ms());
358                if let Err(e) = store.ensure_session(&owner, &session, &meta).await {
359                    tracing::warn!(error = %e, "recall ensure_session failed");
360                }
361            }
362            (owner, session)
363        } else {
364            (String::new(), String::new())
365        };
366
367        let recall_guide: Option<Arc<dyn Guide>> = if self.recall_auto_inject {
368            if self.recall.is_none() {
369                tracing::warn!("auto_inject() set but no recall store — call with_recall(store) first; skipping recall guide");
370                None
371            } else {
372                self.recall.clone().map(|s| Arc::new(crate::RecallGuide::new(s)) as Arc<dyn Guide>)
373            }
374        } else {
375            None
376        };
377        let all_guides: Vec<&Arc<dyn Guide>> =
378            self.guides.iter().chain(recall_guide.iter()).collect();
379        for g in &all_guides {
380            if g.scope().matches(&ctx.task) {
381                self.hooks.fire(&Event::PreGuide { guide: g.id() }, world);
382                g.apply(&mut ctx, world).await?;
383                self.hooks.fire(&Event::PostGuide { guide: g.id() }, world);
384            }
385        }
386
387        ctx.history.push(Turn {
388            role: TurnRole::User,
389            blocks: vec![Block::Text(ctx.task.description.clone())],
390        });
391
392        if self.recall.is_some() {
393            self.recall_append(
394                &recall_owner,
395                &recall_session,
396                harness_core::RecallMessage::new("user", ctx.task.description.clone(), world.clock.now_ms()),
397            )
398            .await;
399        }
400
401        // Running totals — surface to caller even on BudgetExhausted.
402        let mut tools_called: u32 = 0;
403        let mut total_usage = harness_core::Usage::default();
404        let mut last_text: Option<String> = None;
405
406        for iter in 0..ctx.policy.max_iters {
407            self.hooks.fire(&Event::Heartbeat { iter }, world);
408
409            // Compaction: run every stage required by current budget.
410            let stages = self.compactor.budget(&ctx).required_stages();
411            for stage in stages {
412                self.hooks.fire(&Event::PreCompact { stage }, world);
413                self.compactor.compact(stage, &mut ctx).await?;
414                self.hooks.fire(&Event::PostCompact { stage }, world);
415            }
416
417            // Per-iteration guides — recall-style adapters that want to
418            // refresh their injected context every turn (e.g. MemoryGuide
419            // re-recalling against the latest user message). Default
420            // `apply_before_iter` is a no-op, so this loop is cheap for
421            // guides that don't override it.
422            for g in &all_guides {
423                if g.scope().matches(&ctx.task)
424                    && let Err(e) = g.apply_before_iter(&mut ctx, world).await
425                {
426                    tracing::warn!(guide = %g.id(), error = %e, "apply_before_iter failed; continuing");
427                }
428            }
429
430            self.hooks.fire(&Event::PreModel { ctx: &ctx }, world);
431            let out = if self.streaming {
432                self.complete_via_stream(&ctx, world).await?
433            } else {
434                self.model.complete(&ctx).await?
435            };
436            self.hooks.fire(&Event::PostModel { out: &out }, world);
437            // Accumulate usage even if the run later exhausts budget.
438            total_usage.input_tokens += out.usage.input_tokens;
439            total_usage.output_tokens += out.usage.output_tokens;
440            total_usage.cached_input_tokens += out.usage.cached_input_tokens;
441            if let Some(t) = &out.text {
442                last_text = Some(t.clone());
443            }
444            ctx.push_model_output(&out);
445
446            if self.recall.is_some() {
447                let calls = if out.tool_calls.is_empty() {
448                    None
449                } else {
450                    serde_json::to_string(&out.tool_calls).ok()
451                };
452                let mut m = harness_core::RecallMessage::new(
453                    "assistant",
454                    out.text.clone().unwrap_or_default(),
455                    world.clock.now_ms(),
456                );
457                m.tool_calls = calls;
458                self.recall_append(&recall_owner, &recall_session, m).await;
459            }
460
461            if out.tool_calls.is_empty() {
462                self.hooks.fire(&Event::TaskCompleted, world);
463                self.hooks.fire(&Event::SessionEnd, world);
464                self.run_learning_review(&ctx, world, tools_called).await;
465                return Ok(Outcome::Done {
466                    text: out.text,
467                    iters: iter + 1,
468                    tools_called,
469                    usage: total_usage,
470                });
471            }
472
473            for call in &out.tool_calls {
474                let action = Action {
475                    tool: call.name.clone(),
476                    call_id: call.id.clone(),
477                    args: call.args.clone(),
478                };
479
480                // PreToolUse hook can deny destructive actions
481                if let HookOutcome::Deny { reason } = self
482                    .hooks
483                    .fire(&Event::PreToolUse { action: &action }, world)
484                {
485                    ctx.history.push(Turn {
486                        role: TurnRole::Tool,
487                        blocks: vec![Block::ToolResult {
488                            call_id: action.call_id.clone(),
489                            content: serde_json::json!({
490                                "ok": false,
491                                "denied_by_hook": reason,
492                            }),
493                        }],
494                    });
495                    if self.recall.is_some() {
496                        self.recall_append(
497                            &recall_owner,
498                            &recall_session,
499                            harness_core::RecallMessage::new(
500                                "tool",
501                                format!("[denied by hook] {reason}"),
502                                world.clock.now_ms(),
503                            )
504                            .with_tool_name(action.tool.clone()),
505                        )
506                        .await;
507                    }
508                    continue;
509                }
510
511                let result = match self.tools.dispatch(&action, world).await {
512                    Ok(r) => r,
513                    Err(e) => ToolResult {
514                        ok: false,
515                        content: serde_json::json!({"error": e.to_string()}),
516                        trace: None,
517                    },
518                };
519                tools_called += 1;
520                self.hooks.fire(
521                    &Event::PostToolUse {
522                        action: &action,
523                        result: &result,
524                    },
525                    world,
526                );
527
528                ctx.history.push(Turn {
529                    role: TurnRole::Tool,
530                    blocks: vec![Block::ToolResult {
531                        call_id: action.call_id.clone(),
532                        content: result.content.clone(),
533                    }],
534                });
535
536                if self.recall.is_some() {
537                    let body = serde_json::to_string(&result.content).unwrap_or_default();
538                    self.recall_append(
539                        &recall_owner,
540                        &recall_session,
541                        harness_core::RecallMessage::new("tool", body, world.clock.now_ms())
542                            .with_tool_name(action.tool.clone()),
543                    )
544                    .await;
545                }
546
547                // run self-correct sensors
548                let mut all_signals = Vec::new();
549                for s in &self.sensors {
550                    if s.stage() != Stage::SelfCorrect {
551                        continue;
552                    }
553                    self.hooks.fire(&Event::PreSensor { sensor: s.id() }, world);
554                    let sigs = s.observe(&action, world).await.unwrap_or_else(|e| {
555                        tracing::warn!(?e, "sensor failed");
556                        Vec::new()
557                    });
558                    self.hooks.fire(
559                        &Event::PostSensor {
560                            sensor: s.id(),
561                            signals: &sigs,
562                        },
563                        world,
564                    );
565                    all_signals.extend(sigs);
566                }
567                if !all_signals.is_empty() {
568                    let bundle = SignalSet::new(all_signals);
569                    let (patches, remaining) = bundle.partition_auto_fix();
570
571                    // audit #7: each patch goes through PreAutoFix.
572                    // Hooks can Deny (skip silently). Default safelist on
573                    // RunCommand catches the obvious misuses with no hook.
574                    let approved: Vec<harness_core::FixPatch> = patches.into_iter().filter(|p| {
575                        if !is_default_safe_fix(p) {
576                            tracing::warn!(?p, "auto-fix rejected by default safelist (use PreAutoFix hook to override)");
577                            self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
578                            return false;
579                        }
580                        match self.hooks.fire(&Event::PreAutoFix { patch: p }, world) {
581                            HookOutcome::Deny { reason } => {
582                                tracing::warn!(?p, %reason, "auto-fix denied by hook");
583                                self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
584                                false
585                            }
586                            _ => true,
587                        }
588                    }).collect();
589
590                    let applied = apply_patches(&approved, world).await;
591                    // Emit PostAutoFix for each approved patch with the application result.
592                    for (i, p) in approved.iter().enumerate() {
593                        self.hooks.fire(
594                            &Event::PostAutoFix {
595                                patch: p,
596                                applied: i < applied.len(),
597                            },
598                            world,
599                        );
600                    }
601                    if !applied.is_empty() {
602                        ctx.push_feedback(vec![harness_core::Signal {
603                            severity: harness_core::Severity::Hint,
604                            origin: "auto-fix".into(),
605                            message: format!(
606                                "applied {} auto-fix patch(es): {applied:?}",
607                                applied.len()
608                            ),
609                            agent_hint: Some(
610                                "re-check the affected files before continuing".into(),
611                            ),
612                            auto_fix: None,
613                            location: None,
614                        }]);
615                    }
616                    if remaining.has_blocking() {
617                        ctx.push_feedback(remaining.signals);
618                    }
619                }
620            }
621        }
622        // ── Budget exhausted ─────────────────────────────────────────
623        // Force a final synthesis pass with tools DISABLED. Otherwise the
624        // model often spins on tool calls right up to the budget cap and
625        // never emits a text conclusion, leaving the caller with nothing
626        // but `last_text` from some earlier intermediate turn (or None).
627        //
628        // The synthesis call is "free" — it costs one extra model call
629        // beyond max_iters but doesn't count toward `iters`. The result
630        // lands in `last_text` so callers display it as the answer.
631        let synthesised = self
632            .force_final_synthesis(&mut ctx, world, &mut total_usage)
633            .await;
634        if let Some(t) = synthesised {
635            last_text = Some(t);
636        }
637
638        self.hooks.fire(&Event::SessionEnd, world);
639        self.run_learning_review(&ctx, world, tools_called).await;
640        Ok(Outcome::BudgetExhausted {
641            iters: ctx.policy.max_iters,
642            last_text,
643            tools_called,
644            usage: total_usage,
645        })
646    }
647
648    /// Drive `Model::stream()` and assemble the result into a `ModelOutput`,
649    /// firing `Event::ModelTokenDelta` for each text fragment along the way.
650    ///
651    /// Adapters that don't implement real streaming (e.g. `GeminiNative` /
652    /// `AnthropicNative` today) fall back to the default trait impl, which
653    /// runs `complete()` and emits the whole reply as a single delta. That
654    /// works — the loop sees one big `ModelDelta::Text(...)` followed by
655    /// `Stop`, fires one big `ModelTokenDelta`, and proceeds. So enabling
656    /// `streaming` is safe regardless of which provider the user picked.
657    async fn complete_via_stream(
658        &self,
659        ctx: &Context,
660        world: &mut World,
661    ) -> Result<ModelOutput, HarnessError> {
662        use futures::StreamExt;
663        let mut stream = self
664            .model
665            .stream(ctx)
666            .await
667            .map_err(harness_core::HarnessError::Model)?;
668        let mut text = String::new();
669        let mut reasoning_lines: Vec<String> = Vec::new();
670        let mut usage = Usage::default();
671        let mut stop_reason = StopReason::EndTurn;
672        // Insertion-ordered map: index → (id, name, args). We can't use the
673        // tool-call id as the primary key because the stream may emit args
674        // chunks before the first chunk that carries the id; the OpenAI-compat
675        // SSE parser already does its own buffering and surfaces `id` in
676        // ToolCallStart, but be lenient with adapters that may interleave.
677        let mut tool_starts: HashMap<String, (String, String)> = HashMap::new();
678        let mut tool_order: Vec<String> = Vec::new();
679        while let Some(item) = stream.next().await {
680            let delta = item.map_err(harness_core::HarnessError::Model)?;
681            match delta {
682                ModelDelta::Text(t) => {
683                    if !t.is_empty() {
684                        self.hooks
685                            .fire(&Event::ModelTokenDelta { text: &t }, world);
686                        text.push_str(&t);
687                    }
688                }
689                ModelDelta::ToolCallStart { id, name } => {
690                    if !tool_starts.contains_key(&id) {
691                        tool_order.push(id.clone());
692                    }
693                    tool_starts.entry(id).or_insert_with(|| (name, String::new()));
694                }
695                ModelDelta::ToolCallArgs { id, partial_json } => {
696                    let entry = tool_starts
697                        .entry(id.clone())
698                        .or_insert_with(|| (String::new(), String::new()));
699                    if !tool_order.iter().any(|k| k == &id) {
700                        tool_order.push(id);
701                    }
702                    entry.1.push_str(&partial_json);
703                }
704                ModelDelta::ToolCallEnd { .. } => {}
705                ModelDelta::Usage(u) => usage = u,
706                ModelDelta::Stop(r) => stop_reason = r,
707                ModelDelta::Reasoning(s) => {
708                    if !s.is_empty() {
709                        reasoning_lines.push(s);
710                    }
711                }
712                // ModelDelta is `#[non_exhaustive]`; ignore future variants
713                // we don't yet understand.
714                _ => {}
715            }
716        }
717        let tool_calls: Vec<ToolCall> = tool_order
718            .into_iter()
719            .filter_map(|id| {
720                tool_starts.remove(&id).map(|(name, args)| {
721                    let args_v = serde_json::from_str::<serde_json::Value>(&args)
722                        .unwrap_or_else(|_| serde_json::Value::String(args));
723                    ToolCall {
724                        id,
725                        name,
726                        args: args_v,
727                    }
728                })
729            })
730            .collect();
731        // Reconcile stop_reason with what actually came out — adapters
732        // sometimes emit `Stop(EndTurn)` even after tool_calls, which would
733        // confuse downstream consumers that branch on stop_reason alone.
734        let stop_reason = if !tool_calls.is_empty() {
735            StopReason::ToolUse
736        } else {
737            stop_reason
738        };
739        Ok(ModelOutput {
740            text: if text.is_empty() { None } else { Some(text) },
741            tool_calls,
742            usage,
743            stop_reason,
744            reasoning: if reasoning_lines.is_empty() {
745                None
746            } else {
747                Some(reasoning_lines.join("\n"))
748            },
749        })
750    }
751
752    /// Best-effort append to the recall store. Never fails the turn.
753    async fn recall_append(&self, owner: &str, session: &str, msg: harness_core::RecallMessage) {
754        if let Some(store) = &self.recall {
755            if let Err(e) = store.append(owner, session, &msg).await {
756                tracing::warn!(error = %e, "recall append failed");
757            }
758        }
759    }
760
761    /// Best-effort post-session review. Never affects the finished run.
762    async fn run_learning_review(&self, ctx: &Context, world: &mut World, tools_called: u32) {
763        let Some(cfg) = &self.learning else { return };
764        if tools_called < cfg.nudge_interval { return; }
765        let transcript = crate::render_transcript(&ctx.history, 12_000);
766        let task = harness_core::Task {
767            description: format!("{}\n\n## Conversation transcript\n{}", cfg.review_prompt, transcript),
768            source: None,
769            deadline: None,
770        };
771        let mut spec = crate::SubagentSpec::new("learning-review", task).with_max_iters(cfg.max_iters);
772        for t in &cfg.tools {
773            spec = spec.with_tool(t.clone());
774        }
775        let sub = crate::Subagent::new(harness_core::DynModel(cfg.review_model.clone()), spec);
776        // Box::pin breaks the recursive async-future cycle: AgentLoop<M> →
777        // run_learning_review → Subagent<DynModel>::run →
778        // AgentLoop<Arc<dyn Model>>::run_built_context. Without pinning the
779        // compiler rejects the infinite-sized future.
780        if let Err(e) = Box::pin(sub.run(world)).await {
781            tracing::warn!(error = %e, "learning review failed");
782        }
783    }
784
785    /// One final model call with tools removed, asking it to write the
786    /// best-effort conclusion from whatever it has already gathered.
787    ///
788    /// Errors from the model are swallowed — observability is best-effort
789    /// here, and a transport blip during synthesis should not turn a
790    /// near-complete run into a hard failure.
791    async fn force_final_synthesis(
792        &self,
793        ctx: &mut Context,
794        world: &mut World,
795        total_usage: &mut harness_core::Usage,
796    ) -> Option<String> {
797        const SYNTHESIS_PROMPT: &str = "[system: iteration budget exhausted] \
798            You have run out of tool-calling iterations. Write your final answer \
799            NOW using only the tool results already in this conversation. Do not \
800            request more tools. Mark facts you could not verify as UNKNOWN. \
801            Include source URLs for every claim that is not UNKNOWN.";
802
803        // Signal to any observer (LiveProgressHook, SessionRecorder, custom
804        // hooks) that we've used 100% of the budget and are about to force
805        // synthesis. Pre-existing `BudgetWarning` event was unused; this is
806        // its natural home.
807        self.hooks.fire(&Event::BudgetWarning { ratio: 1.0 }, world);
808
809        // Snapshot + clear tool schemas so the model has no choice but text.
810        let saved_tools = std::mem::take(&mut ctx.tools);
811        ctx.history.push(Turn {
812            role: TurnRole::User,
813            blocks: vec![Block::Text(SYNTHESIS_PROMPT.into())],
814        });
815
816        self.hooks.fire(&Event::PreModel { ctx }, world);
817        let result = self.model.complete(ctx).await;
818        ctx.tools = saved_tools;
819
820        match result {
821            Ok(out) => {
822                self.hooks.fire(&Event::PostModel { out: &out }, world);
823                total_usage.input_tokens += out.usage.input_tokens;
824                total_usage.output_tokens += out.usage.output_tokens;
825                total_usage.cached_input_tokens += out.usage.cached_input_tokens;
826                ctx.push_model_output(&out);
827                out.text
828            }
829            Err(_) => None,
830        }
831    }
832}
833
834/// Audit #7: default safelist for `FixPatch::RunCommand`.
835///
836/// Sensors emitting `RunCommand` patches would otherwise be a silent
837/// arbitrary-code-execution channel. We restrict the *program* by name to a
838/// short list of well-known, side-effect-bounded formatters/fixers. Anything
839/// else returns false and the patch is rejected (write your own `PreAutoFix`
840/// hook returning `HookOutcome::Allow` to widen the policy).
841///
842/// `ReplaceFile` and `UnifiedDiff` are not restricted here — they only touch
843/// files inside the workspace and are covered by the symlink-safe path
844/// resolution in `harness-tools-fs`.
845pub fn is_default_safe_fix(patch: &harness_core::FixPatch) -> bool {
846    use harness_core::FixPatch;
847    match patch {
848        FixPatch::ReplaceFile { .. } | FixPatch::UnifiedDiff { .. } => true,
849        FixPatch::RunCommand { program, args, .. } => match program.as_str() {
850            // Cargo subcommands proven side-effect-bounded.
851            "cargo" => matches!(
852                args.first().map(String::as_str),
853                Some("fmt" | "clippy" | "fix"),
854            ),
855            "rustfmt" | "gofmt" | "prettier" | "ruff" | "black" => true,
856            _ => false,
857        },
858        // Future FixPatch variants: deny by default — review and add to the list above.
859        _ => false,
860    }
861}
862
863/// Monotonic counter for `.harness-patch-*.diff` temp filenames — millisecond
864/// resolution alone collides under parallel agent runs.
865static PATCH_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
866
867/// Monotonic counter for fallback recall session ids (no `uuid` dep).
868static RECALL_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
869
870/// Apply auto-fix patches; return short descriptions of those that succeeded.
871///
872/// Made `pub` (was `pub(crate)`) so integration tests can call it directly.
873pub async fn apply_patches(patches: &[harness_core::FixPatch], world: &mut World) -> Vec<String> {
874    use harness_core::FixPatch;
875    let mut applied = Vec::new();
876    for p in patches {
877        match p {
878            FixPatch::ReplaceFile { path, content } => {
879                let abs = world.repo.root.join(path);
880                if let Some(parent) = abs.parent() {
881                    let _ = tokio::fs::create_dir_all(parent).await;
882                }
883                if tokio::fs::write(&abs, content).await.is_ok() {
884                    applied.push(format!("replaced {}", path.display()));
885                }
886            }
887            FixPatch::UnifiedDiff { diff } => {
888                if try_apply_diff(world, diff).await {
889                    applied.push("unified diff applied".into());
890                }
891            }
892            FixPatch::RunCommand { program, args, cwd } => {
893                let cwd_ref = cwd.as_deref().unwrap_or(world.repo.root.as_path());
894                let args_ref: Vec<&str> = args.iter().map(String::as_str).collect();
895                if let Ok(out) = world.runner.exec(program, &args_ref, Some(cwd_ref)).await
896                    && out.status == 0
897                {
898                    applied.push(format!("ran `{program} {}`", args.join(" ")));
899                }
900            }
901            // FixPatch is `#[non_exhaustive]`; unknown variants are skipped.
902            _ => tracing::warn!("apply_patches: unknown FixPatch variant — skipped"),
903        }
904    }
905    applied
906}
907
908/// Write `diff` to a unique temp file and try `patch -p1` first, then `-p0`.
909/// Returns whether either succeeded. The `-p1`-then-`-p0` order matches the
910/// reality that most agent-emitted diffs are git-style (need `-p1`) but some
911/// hand-rolled diffs use repo-relative paths (need `-p0`).
912async fn try_apply_diff(world: &mut World, diff: &str) -> bool {
913    use std::sync::atomic::Ordering;
914    use tokio::io::AsyncWriteExt;
915
916    let seq = PATCH_SEQ.fetch_add(1, Ordering::SeqCst);
917    let pid = std::process::id();
918    let now = world.clock.now_ms();
919    let tmp = world
920        .repo
921        .root
922        .join(format!(".harness-patch-{pid}-{now}-{seq}.diff"));
923
924    let mut f = match tokio::fs::File::create(&tmp).await {
925        Ok(f) => f,
926        Err(e) => {
927            tracing::warn!(error=%e, path=%tmp.display(), "could not create patch tempfile");
928            return false;
929        }
930    };
931    if let Err(e) = f.write_all(diff.as_bytes()).await {
932        tracing::warn!(error=%e, "could not write patch tempfile");
933        let _ = tokio::fs::remove_file(&tmp).await;
934        return false;
935    }
936    drop(f);
937
938    let tmp_str = tmp.to_string_lossy().to_string();
939    let mut applied = false;
940    for strip in ["-p1", "-p0"] {
941        match world
942            .runner
943            .exec(
944                "patch",
945                &[strip, "--silent", "-i", tmp_str.as_str()],
946                Some(world.repo.root.as_path()),
947            )
948            .await
949        {
950            Ok(out) if out.status == 0 => {
951                tracing::info!(strip, "patch applied");
952                applied = true;
953                break;
954            }
955            Ok(out) => {
956                tracing::debug!(strip, stderr=%out.stderr, "patch failed; trying next strip level");
957            }
958            Err(e) => {
959                tracing::warn!(error=%e, "patch command not available");
960                break; // patch tool missing — no point trying other strip
961            }
962        }
963    }
964    let _ = tokio::fs::remove_file(&tmp).await;
965    applied
966}