Skip to main content

harness_loop/
lib.rs

1//! ReAct agent loop with self-correction.
2//!
3//! Minimal v0.0.1 implementation:
4//! - Applies guides once at the start.
5//! - Sends `Context` (with `tools`) to the model.
6//! - Dispatches each returned tool call via [`ToolRegistry`].
7//! - Runs `Sensor::SelfCorrect` sensors after each action; auto-fix patches are
8//!   applied directly to the world, blocking signals are fed back to the model.
9//! - Stops when the model returns no tool calls, or when `policy.max_iters` is hit.
10
11pub mod memory_layer;
12pub mod profile_guide;
13pub mod registry;
14pub mod replay;
15pub mod subagent;
16
17pub use memory_layer::*;
18pub use profile_guide::*;
19pub use registry::*;
20pub use replay::*;
21pub use subagent::*;
22
23use harness_compactor::DefaultCompactor;
24use harness_core::{
25    Action, Block, Compactor, Context, Event, Guide, HarnessError, HookOutcome, Model, Sensor,
26    SessionSource, SignalSet, Stage, Task, ToolResult, Turn, TurnRole, World,
27};
28use harness_hooks::HookBus;
29use std::sync::Arc;
30
31/// Where a run finished. Marked `#[non_exhaustive]` so future fields don't break
32/// downstream matches — always include `..` when destructuring.
33#[derive(Debug, Clone)]
34pub enum Outcome {
35    /// Model returned text with no tool calls (natural end).
36    #[non_exhaustive]
37    Done {
38        text: Option<String>,
39        iters: u32,
40        tools_called: u32,
41        usage: harness_core::Usage,
42    },
43    /// Policy budget exhausted before the model stopped requesting tools.
44    /// Carries everything we know so the caller can recover partial work
45    /// (saved notes, files written by tools, the last assistant text, etc.)
46    /// instead of seeing a single bare "budget out" string.
47    #[non_exhaustive]
48    BudgetExhausted {
49        iters: u32,
50        last_text: Option<String>,
51        tools_called: u32,
52        usage: harness_core::Usage,
53    },
54}
55
56/// The agent loop.
57pub struct AgentLoop<M: Model> {
58    pub model: M,
59    pub tools: ToolRegistry,
60    pub guides: Vec<Arc<dyn Guide>>,
61    pub sensors: Vec<Arc<dyn Sensor>>,
62    pub hooks: HookBus,
63    pub compactor: Arc<dyn Compactor>,
64}
65
66impl<M: Model> AgentLoop<M> {
67    pub fn new(model: M) -> Self {
68        Self {
69            model,
70            tools: ToolRegistry::new(),
71            guides: Vec::new(),
72            sensors: Vec::new(),
73            hooks: HookBus::new(),
74            compactor: Arc::new(DefaultCompactor::new()),
75        }
76    }
77
78    pub fn with_compactor(mut self, c: Arc<dyn Compactor>) -> Self {
79        self.compactor = c;
80        self
81    }
82
83    pub fn with_tool(mut self, t: Arc<dyn harness_core::Tool>) -> Self {
84        self.tools.insert(t);
85        self
86    }
87
88    pub fn with_guide(mut self, g: Arc<dyn Guide>) -> Self {
89        self.guides.push(g);
90        self
91    }
92
93    pub fn with_sensor(mut self, s: Arc<dyn Sensor>) -> Self {
94        self.sensors.push(s);
95        self
96    }
97
98    pub fn with_hook(mut self, h: Arc<dyn harness_core::Hook>) -> Self {
99        self.hooks.register(h);
100        self
101    }
102
103    /// Pull in every `#[hook]`-registered hook.
104    pub fn with_macro_hooks(mut self) -> Self {
105        self.hooks = self.hooks.with_macro_hooks_take();
106        self
107    }
108
109    pub async fn run(&self, task: Task, world: &mut World) -> Result<Outcome, HarnessError> {
110        let max = harness_core::Policy::default().max_iters;
111        self.run_with_max_iters(task, world, max).await
112    }
113
114    pub async fn run_with_max_iters(
115        &self,
116        task: Task,
117        world: &mut World,
118        max_iters: u32,
119    ) -> Result<Outcome, HarnessError> {
120        self.run_with_seed_history(task, Vec::new(), world, max_iters)
121            .await
122    }
123
124    /// Like `run_with_max_iters` but seeds `ctx.history` with `seed` **before**
125    /// the current user task is appended. Use this for multi-turn REPLs so
126    /// prior conversation lives in `ctx.history` (where the Compactor can see
127    /// it) instead of being concatenated into `task.description` (where it
128    /// previously bypassed compaction entirely — see audit #2).
129    pub async fn run_with_seed_history(
130        &self,
131        task: Task,
132        seed: Vec<Turn>,
133        world: &mut World,
134        max_iters: u32,
135    ) -> Result<Outcome, HarnessError> {
136        let mut ctx = Context::new(task);
137        ctx.policy.max_iters = max_iters;
138        ctx.tools = self.tools.schemas();
139        ctx.history = seed;
140
141        self.hooks.fire(
142            &Event::SessionStart {
143                source: SessionSource::Startup,
144            },
145            world,
146        );
147
148        for g in &self.guides {
149            if g.scope().matches(&ctx.task) {
150                self.hooks.fire(&Event::PreGuide { guide: g.id() }, world);
151                g.apply(&mut ctx, world).await?;
152                self.hooks.fire(&Event::PostGuide { guide: g.id() }, world);
153            }
154        }
155
156        ctx.history.push(Turn {
157            role: TurnRole::User,
158            blocks: vec![Block::Text(ctx.task.description.clone())],
159        });
160
161        // Running totals — surface to caller even on BudgetExhausted.
162        let mut tools_called: u32 = 0;
163        let mut total_usage = harness_core::Usage::default();
164        let mut last_text: Option<String> = None;
165
166        for iter in 0..ctx.policy.max_iters {
167            self.hooks.fire(&Event::Heartbeat { iter }, world);
168
169            // Compaction: run every stage required by current budget.
170            let stages = self.compactor.budget(&ctx).required_stages();
171            for stage in stages {
172                self.hooks.fire(&Event::PreCompact { stage }, world);
173                self.compactor.compact(stage, &mut ctx).await?;
174                self.hooks.fire(&Event::PostCompact { stage }, world);
175            }
176
177            self.hooks.fire(&Event::PreModel { ctx: &ctx }, world);
178            let out = self.model.complete(&ctx).await?;
179            self.hooks.fire(&Event::PostModel { out: &out }, world);
180            // Accumulate usage even if the run later exhausts budget.
181            total_usage.input_tokens += out.usage.input_tokens;
182            total_usage.output_tokens += out.usage.output_tokens;
183            total_usage.cached_input_tokens += out.usage.cached_input_tokens;
184            if let Some(t) = &out.text {
185                last_text = Some(t.clone());
186            }
187            ctx.push_model_output(&out);
188
189            if out.tool_calls.is_empty() {
190                self.hooks.fire(&Event::TaskCompleted, world);
191                self.hooks.fire(&Event::SessionEnd, world);
192                return Ok(Outcome::Done {
193                    text: out.text,
194                    iters: iter + 1,
195                    tools_called,
196                    usage: total_usage,
197                });
198            }
199
200            for call in &out.tool_calls {
201                let action = Action {
202                    tool: call.name.clone(),
203                    call_id: call.id.clone(),
204                    args: call.args.clone(),
205                };
206
207                // PreToolUse hook can deny destructive actions
208                if let HookOutcome::Deny { reason } = self
209                    .hooks
210                    .fire(&Event::PreToolUse { action: &action }, world)
211                {
212                    ctx.history.push(Turn {
213                        role: TurnRole::Tool,
214                        blocks: vec![Block::ToolResult {
215                            call_id: action.call_id.clone(),
216                            content: serde_json::json!({
217                                "ok": false,
218                                "denied_by_hook": reason,
219                            }),
220                        }],
221                    });
222                    continue;
223                }
224
225                let result = match self.tools.dispatch(&action, world).await {
226                    Ok(r) => r,
227                    Err(e) => ToolResult {
228                        ok: false,
229                        content: serde_json::json!({"error": e.to_string()}),
230                        trace: None,
231                    },
232                };
233                tools_called += 1;
234                self.hooks.fire(
235                    &Event::PostToolUse {
236                        action: &action,
237                        result: &result,
238                    },
239                    world,
240                );
241
242                ctx.history.push(Turn {
243                    role: TurnRole::Tool,
244                    blocks: vec![Block::ToolResult {
245                        call_id: action.call_id.clone(),
246                        content: result.content.clone(),
247                    }],
248                });
249
250                // run self-correct sensors
251                let mut all_signals = Vec::new();
252                for s in &self.sensors {
253                    if s.stage() != Stage::SelfCorrect {
254                        continue;
255                    }
256                    self.hooks.fire(&Event::PreSensor { sensor: s.id() }, world);
257                    let sigs = s.observe(&action, world).await.unwrap_or_else(|e| {
258                        tracing::warn!(?e, "sensor failed");
259                        Vec::new()
260                    });
261                    self.hooks.fire(
262                        &Event::PostSensor {
263                            sensor: s.id(),
264                            signals: &sigs,
265                        },
266                        world,
267                    );
268                    all_signals.extend(sigs);
269                }
270                if !all_signals.is_empty() {
271                    let bundle = SignalSet::new(all_signals);
272                    let (patches, remaining) = bundle.partition_auto_fix();
273
274                    // audit #7: each patch goes through PreAutoFix.
275                    // Hooks can Deny (skip silently). Default safelist on
276                    // RunCommand catches the obvious misuses with no hook.
277                    let approved: Vec<harness_core::FixPatch> = patches.into_iter().filter(|p| {
278                        if !is_default_safe_fix(p) {
279                            tracing::warn!(?p, "auto-fix rejected by default safelist (use PreAutoFix hook to override)");
280                            self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
281                            return false;
282                        }
283                        match self.hooks.fire(&Event::PreAutoFix { patch: p }, world) {
284                            HookOutcome::Deny { reason } => {
285                                tracing::warn!(?p, %reason, "auto-fix denied by hook");
286                                self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
287                                false
288                            }
289                            _ => true,
290                        }
291                    }).collect();
292
293                    let applied = apply_patches(&approved, world).await;
294                    // Emit PostAutoFix for each approved patch with the application result.
295                    for (i, p) in approved.iter().enumerate() {
296                        self.hooks.fire(
297                            &Event::PostAutoFix {
298                                patch: p,
299                                applied: i < applied.len(),
300                            },
301                            world,
302                        );
303                    }
304                    if !applied.is_empty() {
305                        ctx.push_feedback(vec![harness_core::Signal {
306                            severity: harness_core::Severity::Hint,
307                            origin: "auto-fix".into(),
308                            message: format!(
309                                "applied {} auto-fix patch(es): {applied:?}",
310                                applied.len()
311                            ),
312                            agent_hint: Some(
313                                "re-check the affected files before continuing".into(),
314                            ),
315                            auto_fix: None,
316                            location: None,
317                        }]);
318                    }
319                    if remaining.has_blocking() {
320                        ctx.push_feedback(remaining.signals);
321                    }
322                }
323            }
324        }
325        // ── Budget exhausted ─────────────────────────────────────────
326        // Force a final synthesis pass with tools DISABLED. Otherwise the
327        // model often spins on tool calls right up to the budget cap and
328        // never emits a text conclusion, leaving the caller with nothing
329        // but `last_text` from some earlier intermediate turn (or None).
330        //
331        // The synthesis call is "free" — it costs one extra model call
332        // beyond max_iters but doesn't count toward `iters`. The result
333        // lands in `last_text` so callers display it as the answer.
334        let synthesised = self
335            .force_final_synthesis(&mut ctx, world, &mut total_usage)
336            .await;
337        if let Some(t) = synthesised {
338            last_text = Some(t);
339        }
340
341        self.hooks.fire(&Event::SessionEnd, world);
342        Ok(Outcome::BudgetExhausted {
343            iters: ctx.policy.max_iters,
344            last_text,
345            tools_called,
346            usage: total_usage,
347        })
348    }
349
350    /// One final model call with tools removed, asking it to write the
351    /// best-effort conclusion from whatever it has already gathered.
352    ///
353    /// Errors from the model are swallowed — observability is best-effort
354    /// here, and a transport blip during synthesis should not turn a
355    /// near-complete run into a hard failure.
356    async fn force_final_synthesis(
357        &self,
358        ctx: &mut Context,
359        world: &mut World,
360        total_usage: &mut harness_core::Usage,
361    ) -> Option<String> {
362        const SYNTHESIS_PROMPT: &str = "[system: iteration budget exhausted] \
363            You have run out of tool-calling iterations. Write your final answer \
364            NOW using only the tool results already in this conversation. Do not \
365            request more tools. Mark facts you could not verify as UNKNOWN. \
366            Include source URLs for every claim that is not UNKNOWN.";
367
368        // Signal to any observer (LiveProgressHook, SessionRecorder, custom
369        // hooks) that we've used 100% of the budget and are about to force
370        // synthesis. Pre-existing `BudgetWarning` event was unused; this is
371        // its natural home.
372        self.hooks.fire(&Event::BudgetWarning { ratio: 1.0 }, world);
373
374        // Snapshot + clear tool schemas so the model has no choice but text.
375        let saved_tools = std::mem::take(&mut ctx.tools);
376        ctx.history.push(Turn {
377            role: TurnRole::User,
378            blocks: vec![Block::Text(SYNTHESIS_PROMPT.into())],
379        });
380
381        self.hooks.fire(&Event::PreModel { ctx }, world);
382        let result = self.model.complete(ctx).await;
383        ctx.tools = saved_tools;
384
385        match result {
386            Ok(out) => {
387                self.hooks.fire(&Event::PostModel { out: &out }, world);
388                total_usage.input_tokens += out.usage.input_tokens;
389                total_usage.output_tokens += out.usage.output_tokens;
390                total_usage.cached_input_tokens += out.usage.cached_input_tokens;
391                ctx.push_model_output(&out);
392                out.text
393            }
394            Err(_) => None,
395        }
396    }
397}
398
399/// Audit #7: default safelist for `FixPatch::RunCommand`.
400///
401/// Sensors emitting `RunCommand` patches would otherwise be a silent
402/// arbitrary-code-execution channel. We restrict the *program* by name to a
403/// short list of well-known, side-effect-bounded formatters/fixers. Anything
404/// else returns false and the patch is rejected (write your own `PreAutoFix`
405/// hook returning `HookOutcome::Allow` to widen the policy).
406///
407/// `ReplaceFile` and `UnifiedDiff` are not restricted here — they only touch
408/// files inside the workspace and are covered by the symlink-safe path
409/// resolution in `harness-tools-fs`.
410pub fn is_default_safe_fix(patch: &harness_core::FixPatch) -> bool {
411    use harness_core::FixPatch;
412    match patch {
413        FixPatch::ReplaceFile { .. } | FixPatch::UnifiedDiff { .. } => true,
414        FixPatch::RunCommand { program, args, .. } => match program.as_str() {
415            // Cargo subcommands proven side-effect-bounded.
416            "cargo" => matches!(
417                args.first().map(String::as_str),
418                Some("fmt" | "clippy" | "fix"),
419            ),
420            "rustfmt" | "gofmt" | "prettier" | "ruff" | "black" => true,
421            _ => false,
422        },
423        // Future FixPatch variants: deny by default — review and add to the list above.
424        _ => false,
425    }
426}
427
428/// Monotonic counter for `.harness-patch-*.diff` temp filenames — millisecond
429/// resolution alone collides under parallel agent runs.
430static PATCH_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
431
432/// Apply auto-fix patches; return short descriptions of those that succeeded.
433///
434/// Made `pub` (was `pub(crate)`) so integration tests can call it directly.
435pub async fn apply_patches(patches: &[harness_core::FixPatch], world: &mut World) -> Vec<String> {
436    use harness_core::FixPatch;
437    let mut applied = Vec::new();
438    for p in patches {
439        match p {
440            FixPatch::ReplaceFile { path, content } => {
441                let abs = world.repo.root.join(path);
442                if let Some(parent) = abs.parent() {
443                    let _ = tokio::fs::create_dir_all(parent).await;
444                }
445                if tokio::fs::write(&abs, content).await.is_ok() {
446                    applied.push(format!("replaced {}", path.display()));
447                }
448            }
449            FixPatch::UnifiedDiff { diff } => {
450                if try_apply_diff(world, diff).await {
451                    applied.push("unified diff applied".into());
452                }
453            }
454            FixPatch::RunCommand { program, args, cwd } => {
455                let cwd_ref = cwd.as_deref().unwrap_or(world.repo.root.as_path());
456                let args_ref: Vec<&str> = args.iter().map(String::as_str).collect();
457                if let Ok(out) = world.runner.exec(program, &args_ref, Some(cwd_ref)).await
458                    && out.status == 0
459                {
460                    applied.push(format!("ran `{program} {}`", args.join(" ")));
461                }
462            }
463            // FixPatch is `#[non_exhaustive]`; unknown variants are skipped.
464            _ => tracing::warn!("apply_patches: unknown FixPatch variant — skipped"),
465        }
466    }
467    applied
468}
469
470/// Write `diff` to a unique temp file and try `patch -p1` first, then `-p0`.
471/// Returns whether either succeeded. The `-p1`-then-`-p0` order matches the
472/// reality that most agent-emitted diffs are git-style (need `-p1`) but some
473/// hand-rolled diffs use repo-relative paths (need `-p0`).
474async fn try_apply_diff(world: &mut World, diff: &str) -> bool {
475    use std::sync::atomic::Ordering;
476    use tokio::io::AsyncWriteExt;
477
478    let seq = PATCH_SEQ.fetch_add(1, Ordering::SeqCst);
479    let pid = std::process::id();
480    let now = world.clock.now_ms();
481    let tmp = world
482        .repo
483        .root
484        .join(format!(".harness-patch-{pid}-{now}-{seq}.diff"));
485
486    let mut f = match tokio::fs::File::create(&tmp).await {
487        Ok(f) => f,
488        Err(e) => {
489            tracing::warn!(error=%e, path=%tmp.display(), "could not create patch tempfile");
490            return false;
491        }
492    };
493    if let Err(e) = f.write_all(diff.as_bytes()).await {
494        tracing::warn!(error=%e, "could not write patch tempfile");
495        let _ = tokio::fs::remove_file(&tmp).await;
496        return false;
497    }
498    drop(f);
499
500    let tmp_str = tmp.to_string_lossy().to_string();
501    let mut applied = false;
502    for strip in ["-p1", "-p0"] {
503        match world
504            .runner
505            .exec(
506                "patch",
507                &[strip, "--silent", "-i", tmp_str.as_str()],
508                Some(world.repo.root.as_path()),
509            )
510            .await
511        {
512            Ok(out) if out.status == 0 => {
513                tracing::info!(strip, "patch applied");
514                applied = true;
515                break;
516            }
517            Ok(out) => {
518                tracing::debug!(strip, stderr=%out.stderr, "patch failed; trying next strip level");
519            }
520            Err(e) => {
521                tracing::warn!(error=%e, "patch command not available");
522                break; // patch tool missing — no point trying other strip
523            }
524        }
525    }
526    let _ = tokio::fs::remove_file(&tmp).await;
527    applied
528}