Skip to main content

harness_loop/
lib.rs

1//! ReAct agent loop with self-correction.
2//!
3//! Minimal v0.0.1 implementation:
4//! - Applies guides once at the start.
5//! - Sends `Context` (with `tools`) to the model.
6//! - Dispatches each returned tool call via [`ToolRegistry`].
7//! - Runs `Sensor::SelfCorrect` sensors after each action; auto-fix patches are
8//!   applied directly to the world, blocking signals are fed back to the model.
9//! - Stops when the model returns no tool calls, or when `policy.max_iters` is hit.
10
11pub mod profile_guide;
12pub mod registry;
13pub mod replay;
14pub mod subagent;
15
16pub use profile_guide::*;
17pub use registry::*;
18pub use replay::*;
19pub use subagent::*;
20
21use harness_compactor::DefaultCompactor;
22use harness_core::{
23    Action, Block, Compactor, Context, Event, Guide, HarnessError, HookOutcome, Model, Sensor,
24    SessionSource, SignalSet, Stage, Task, ToolResult, Turn, TurnRole, World,
25};
26use harness_hooks::HookBus;
27use std::sync::Arc;
28
29/// Where a run finished. Marked `#[non_exhaustive]` so future fields don't break
30/// downstream matches — always include `..` when destructuring.
31#[derive(Debug, Clone)]
32pub enum Outcome {
33    /// Model returned text with no tool calls (natural end).
34    #[non_exhaustive]
35    Done {
36        text: Option<String>,
37        iters: u32,
38        tools_called: u32,
39        usage: harness_core::Usage,
40    },
41    /// Policy budget exhausted before the model stopped requesting tools.
42    /// Carries everything we know so the caller can recover partial work
43    /// (saved notes, files written by tools, the last assistant text, etc.)
44    /// instead of seeing a single bare "budget out" string.
45    #[non_exhaustive]
46    BudgetExhausted {
47        iters: u32,
48        last_text: Option<String>,
49        tools_called: u32,
50        usage: harness_core::Usage,
51    },
52}
53
54/// The agent loop.
55pub struct AgentLoop<M: Model> {
56    pub model: M,
57    pub tools: ToolRegistry,
58    pub guides: Vec<Arc<dyn Guide>>,
59    pub sensors: Vec<Arc<dyn Sensor>>,
60    pub hooks: HookBus,
61    pub compactor: Arc<dyn Compactor>,
62}
63
64impl<M: Model> AgentLoop<M> {
65    pub fn new(model: M) -> Self {
66        Self {
67            model,
68            tools: ToolRegistry::new(),
69            guides: Vec::new(),
70            sensors: Vec::new(),
71            hooks: HookBus::new(),
72            compactor: Arc::new(DefaultCompactor::new()),
73        }
74    }
75
76    pub fn with_compactor(mut self, c: Arc<dyn Compactor>) -> Self {
77        self.compactor = c;
78        self
79    }
80
81    pub fn with_tool(mut self, t: Arc<dyn harness_core::Tool>) -> Self {
82        self.tools.insert(t);
83        self
84    }
85
86    pub fn with_guide(mut self, g: Arc<dyn Guide>) -> Self {
87        self.guides.push(g);
88        self
89    }
90
91    pub fn with_sensor(mut self, s: Arc<dyn Sensor>) -> Self {
92        self.sensors.push(s);
93        self
94    }
95
96    pub fn with_hook(mut self, h: Arc<dyn harness_core::Hook>) -> Self {
97        self.hooks.register(h);
98        self
99    }
100
101    /// Pull in every `#[hook]`-registered hook.
102    pub fn with_macro_hooks(mut self) -> Self {
103        self.hooks = self.hooks.with_macro_hooks_take();
104        self
105    }
106
107    pub async fn run(&self, task: Task, world: &mut World) -> Result<Outcome, HarnessError> {
108        let max = harness_core::Policy::default().max_iters;
109        self.run_with_max_iters(task, world, max).await
110    }
111
112    pub async fn run_with_max_iters(
113        &self,
114        task: Task,
115        world: &mut World,
116        max_iters: u32,
117    ) -> Result<Outcome, HarnessError> {
118        self.run_with_seed_history(task, Vec::new(), world, max_iters)
119            .await
120    }
121
122    /// Like `run_with_max_iters` but seeds `ctx.history` with `seed` **before**
123    /// the current user task is appended. Use this for multi-turn REPLs so
124    /// prior conversation lives in `ctx.history` (where the Compactor can see
125    /// it) instead of being concatenated into `task.description` (where it
126    /// previously bypassed compaction entirely — see audit #2).
127    pub async fn run_with_seed_history(
128        &self,
129        task: Task,
130        seed: Vec<Turn>,
131        world: &mut World,
132        max_iters: u32,
133    ) -> Result<Outcome, HarnessError> {
134        let mut ctx = Context::new(task);
135        ctx.policy.max_iters = max_iters;
136        ctx.tools = self.tools.schemas();
137        ctx.history = seed;
138
139        self.hooks.fire(
140            &Event::SessionStart {
141                source: SessionSource::Startup,
142            },
143            world,
144        );
145
146        for g in &self.guides {
147            if g.scope().matches(&ctx.task) {
148                self.hooks.fire(&Event::PreGuide { guide: g.id() }, world);
149                g.apply(&mut ctx, world).await?;
150                self.hooks.fire(&Event::PostGuide { guide: g.id() }, world);
151            }
152        }
153
154        ctx.history.push(Turn {
155            role: TurnRole::User,
156            blocks: vec![Block::Text(ctx.task.description.clone())],
157        });
158
159        // Running totals — surface to caller even on BudgetExhausted.
160        let mut tools_called: u32 = 0;
161        let mut total_usage = harness_core::Usage::default();
162        let mut last_text: Option<String> = None;
163
164        for iter in 0..ctx.policy.max_iters {
165            self.hooks.fire(&Event::Heartbeat { iter }, world);
166
167            // Compaction: run every stage required by current budget.
168            let stages = self.compactor.budget(&ctx).required_stages();
169            for stage in stages {
170                self.hooks.fire(&Event::PreCompact { stage }, world);
171                self.compactor.compact(stage, &mut ctx).await?;
172                self.hooks.fire(&Event::PostCompact { stage }, world);
173            }
174
175            self.hooks.fire(&Event::PreModel { ctx: &ctx }, world);
176            let out = self.model.complete(&ctx).await?;
177            self.hooks.fire(&Event::PostModel { out: &out }, world);
178            // Accumulate usage even if the run later exhausts budget.
179            total_usage.input_tokens += out.usage.input_tokens;
180            total_usage.output_tokens += out.usage.output_tokens;
181            total_usage.cached_input_tokens += out.usage.cached_input_tokens;
182            if let Some(t) = &out.text {
183                last_text = Some(t.clone());
184            }
185            ctx.push_model_output(&out);
186
187            if out.tool_calls.is_empty() {
188                self.hooks.fire(&Event::TaskCompleted, world);
189                self.hooks.fire(&Event::SessionEnd, world);
190                return Ok(Outcome::Done {
191                    text: out.text,
192                    iters: iter + 1,
193                    tools_called,
194                    usage: total_usage,
195                });
196            }
197
198            for call in &out.tool_calls {
199                let action = Action {
200                    tool: call.name.clone(),
201                    call_id: call.id.clone(),
202                    args: call.args.clone(),
203                };
204
205                // PreToolUse hook can deny destructive actions
206                if let HookOutcome::Deny { reason } = self
207                    .hooks
208                    .fire(&Event::PreToolUse { action: &action }, world)
209                {
210                    ctx.history.push(Turn {
211                        role: TurnRole::Tool,
212                        blocks: vec![Block::ToolResult {
213                            call_id: action.call_id.clone(),
214                            content: serde_json::json!({
215                                "ok": false,
216                                "denied_by_hook": reason,
217                            }),
218                        }],
219                    });
220                    continue;
221                }
222
223                let result = match self.tools.dispatch(&action, world).await {
224                    Ok(r) => r,
225                    Err(e) => ToolResult {
226                        ok: false,
227                        content: serde_json::json!({"error": e.to_string()}),
228                        trace: None,
229                    },
230                };
231                tools_called += 1;
232                self.hooks.fire(
233                    &Event::PostToolUse {
234                        action: &action,
235                        result: &result,
236                    },
237                    world,
238                );
239
240                ctx.history.push(Turn {
241                    role: TurnRole::Tool,
242                    blocks: vec![Block::ToolResult {
243                        call_id: action.call_id.clone(),
244                        content: result.content.clone(),
245                    }],
246                });
247
248                // run self-correct sensors
249                let mut all_signals = Vec::new();
250                for s in &self.sensors {
251                    if s.stage() != Stage::SelfCorrect {
252                        continue;
253                    }
254                    self.hooks.fire(&Event::PreSensor { sensor: s.id() }, world);
255                    let sigs = s.observe(&action, world).await.unwrap_or_else(|e| {
256                        tracing::warn!(?e, "sensor failed");
257                        Vec::new()
258                    });
259                    self.hooks.fire(
260                        &Event::PostSensor {
261                            sensor: s.id(),
262                            signals: &sigs,
263                        },
264                        world,
265                    );
266                    all_signals.extend(sigs);
267                }
268                if !all_signals.is_empty() {
269                    let bundle = SignalSet::new(all_signals);
270                    let (patches, remaining) = bundle.partition_auto_fix();
271
272                    // audit #7: each patch goes through PreAutoFix.
273                    // Hooks can Deny (skip silently). Default safelist on
274                    // RunCommand catches the obvious misuses with no hook.
275                    let approved: Vec<harness_core::FixPatch> = patches.into_iter().filter(|p| {
276                        if !is_default_safe_fix(p) {
277                            tracing::warn!(?p, "auto-fix rejected by default safelist (use PreAutoFix hook to override)");
278                            self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
279                            return false;
280                        }
281                        match self.hooks.fire(&Event::PreAutoFix { patch: p }, world) {
282                            HookOutcome::Deny { reason } => {
283                                tracing::warn!(?p, %reason, "auto-fix denied by hook");
284                                self.hooks.fire(&Event::PostAutoFix { patch: p, applied: false }, world);
285                                false
286                            }
287                            _ => true,
288                        }
289                    }).collect();
290
291                    let applied = apply_patches(&approved, world).await;
292                    // Emit PostAutoFix for each approved patch with the application result.
293                    for (i, p) in approved.iter().enumerate() {
294                        self.hooks.fire(
295                            &Event::PostAutoFix {
296                                patch: p,
297                                applied: i < applied.len(),
298                            },
299                            world,
300                        );
301                    }
302                    if !applied.is_empty() {
303                        ctx.push_feedback(vec![harness_core::Signal {
304                            severity: harness_core::Severity::Hint,
305                            origin: "auto-fix".into(),
306                            message: format!(
307                                "applied {} auto-fix patch(es): {applied:?}",
308                                applied.len()
309                            ),
310                            agent_hint: Some(
311                                "re-check the affected files before continuing".into(),
312                            ),
313                            auto_fix: None,
314                            location: None,
315                        }]);
316                    }
317                    if remaining.has_blocking() {
318                        ctx.push_feedback(remaining.signals);
319                    }
320                }
321            }
322        }
323        self.hooks.fire(&Event::SessionEnd, world);
324        Ok(Outcome::BudgetExhausted {
325            iters: ctx.policy.max_iters,
326            last_text,
327            tools_called,
328            usage: total_usage,
329        })
330    }
331}
332
333/// Audit #7: default safelist for `FixPatch::RunCommand`.
334///
335/// Sensors emitting `RunCommand` patches would otherwise be a silent
336/// arbitrary-code-execution channel. We restrict the *program* by name to a
337/// short list of well-known, side-effect-bounded formatters/fixers. Anything
338/// else returns false and the patch is rejected (write your own `PreAutoFix`
339/// hook returning `HookOutcome::Allow` to widen the policy).
340///
341/// `ReplaceFile` and `UnifiedDiff` are not restricted here — they only touch
342/// files inside the workspace and are covered by the symlink-safe path
343/// resolution in `harness-tools-fs`.
344pub fn is_default_safe_fix(patch: &harness_core::FixPatch) -> bool {
345    use harness_core::FixPatch;
346    match patch {
347        FixPatch::ReplaceFile { .. } | FixPatch::UnifiedDiff { .. } => true,
348        FixPatch::RunCommand { program, args, .. } => match program.as_str() {
349            // Cargo subcommands proven side-effect-bounded.
350            "cargo" => matches!(
351                args.first().map(String::as_str),
352                Some("fmt" | "clippy" | "fix"),
353            ),
354            "rustfmt" | "gofmt" | "prettier" | "ruff" | "black" => true,
355            _ => false,
356        },
357        // Future FixPatch variants: deny by default — review and add to the list above.
358        _ => false,
359    }
360}
361
362/// Monotonic counter for `.harness-patch-*.diff` temp filenames — millisecond
363/// resolution alone collides under parallel agent runs.
364static PATCH_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
365
366/// Apply auto-fix patches; return short descriptions of those that succeeded.
367///
368/// Made `pub` (was `pub(crate)`) so integration tests can call it directly.
369pub async fn apply_patches(patches: &[harness_core::FixPatch], world: &mut World) -> Vec<String> {
370    use harness_core::FixPatch;
371    let mut applied = Vec::new();
372    for p in patches {
373        match p {
374            FixPatch::ReplaceFile { path, content } => {
375                let abs = world.repo.root.join(path);
376                if let Some(parent) = abs.parent() {
377                    let _ = tokio::fs::create_dir_all(parent).await;
378                }
379                if tokio::fs::write(&abs, content).await.is_ok() {
380                    applied.push(format!("replaced {}", path.display()));
381                }
382            }
383            FixPatch::UnifiedDiff { diff } => {
384                if try_apply_diff(world, diff).await {
385                    applied.push("unified diff applied".into());
386                }
387            }
388            FixPatch::RunCommand { program, args, cwd } => {
389                let cwd_ref = cwd.as_deref().unwrap_or(world.repo.root.as_path());
390                let args_ref: Vec<&str> = args.iter().map(String::as_str).collect();
391                if let Ok(out) = world.runner.exec(program, &args_ref, Some(cwd_ref)).await
392                    && out.status == 0
393                {
394                    applied.push(format!("ran `{program} {}`", args.join(" ")));
395                }
396            }
397            // FixPatch is `#[non_exhaustive]`; unknown variants are skipped.
398            _ => tracing::warn!("apply_patches: unknown FixPatch variant — skipped"),
399        }
400    }
401    applied
402}
403
404/// Write `diff` to a unique temp file and try `patch -p1` first, then `-p0`.
405/// Returns whether either succeeded. The `-p1`-then-`-p0` order matches the
406/// reality that most agent-emitted diffs are git-style (need `-p1`) but some
407/// hand-rolled diffs use repo-relative paths (need `-p0`).
408async fn try_apply_diff(world: &mut World, diff: &str) -> bool {
409    use std::sync::atomic::Ordering;
410    use tokio::io::AsyncWriteExt;
411
412    let seq = PATCH_SEQ.fetch_add(1, Ordering::SeqCst);
413    let pid = std::process::id();
414    let now = world.clock.now_ms();
415    let tmp = world
416        .repo
417        .root
418        .join(format!(".harness-patch-{pid}-{now}-{seq}.diff"));
419
420    let mut f = match tokio::fs::File::create(&tmp).await {
421        Ok(f) => f,
422        Err(e) => {
423            tracing::warn!(error=%e, path=%tmp.display(), "could not create patch tempfile");
424            return false;
425        }
426    };
427    if let Err(e) = f.write_all(diff.as_bytes()).await {
428        tracing::warn!(error=%e, "could not write patch tempfile");
429        let _ = tokio::fs::remove_file(&tmp).await;
430        return false;
431    }
432    drop(f);
433
434    let tmp_str = tmp.to_string_lossy().to_string();
435    let mut applied = false;
436    for strip in ["-p1", "-p0"] {
437        match world
438            .runner
439            .exec(
440                "patch",
441                &[strip, "--silent", "-i", tmp_str.as_str()],
442                Some(world.repo.root.as_path()),
443            )
444            .await
445        {
446            Ok(out) if out.status == 0 => {
447                tracing::info!(strip, "patch applied");
448                applied = true;
449                break;
450            }
451            Ok(out) => {
452                tracing::debug!(strip, stderr=%out.stderr, "patch failed; trying next strip level");
453            }
454            Err(e) => {
455                tracing::warn!(error=%e, "patch command not available");
456                break; // patch tool missing — no point trying other strip
457            }
458        }
459    }
460    let _ = tokio::fs::remove_file(&tmp).await;
461    applied
462}