Skip to main content

harness_rs_loop/
lib.rs

1//! ReAct agent loop with self-correction.
2//!
3//! Minimal v0.0.1 implementation:
4//! - Applies guides once at the start.
5//! - Sends `Context` (with `tools`) to the model.
6//! - Dispatches each returned tool call via [`ToolRegistry`].
7//! - Runs `Sensor::SelfCorrect` sensors after each action; auto-fix patches are
8//!   applied directly to the world, blocking signals are fed back to the model.
9//! - Stops when the model returns no tool calls, or when `policy.max_iters` is hit.
10
11pub mod registry;
12pub mod replay;
13pub mod subagent;
14
15pub use registry::*;
16pub use replay::*;
17pub use subagent::*;
18
19use harness_compactor::DefaultCompactor;
20use harness_core::{
21    Action, Block, Compactor, Context, Event, Guide, HarnessError, HookOutcome, Model, Sensor,
22    SessionSource, SignalSet, Stage, Task, ToolResult, Turn, TurnRole, World,
23};
24use harness_hooks::HookBus;
25use std::sync::Arc;
26
27/// Where a run finished.
28#[derive(Debug, Clone)]
29pub enum Outcome {
30    /// Model returned text with no tool calls.
31    Done { text: Option<String>, iters: u32 },
32    /// Policy budget exhausted.
33    BudgetExhausted { iters: u32 },
34}
35
36/// The agent loop.
37pub struct AgentLoop<M: Model> {
38    pub model:      M,
39    pub tools:      ToolRegistry,
40    pub guides:     Vec<Arc<dyn Guide>>,
41    pub sensors:    Vec<Arc<dyn Sensor>>,
42    pub hooks:      HookBus,
43    pub compactor:  Arc<dyn Compactor>,
44}
45
46impl<M: Model> AgentLoop<M> {
47    pub fn new(model: M) -> Self {
48        Self {
49            model,
50            tools:     ToolRegistry::new(),
51            guides:    Vec::new(),
52            sensors:   Vec::new(),
53            hooks:     HookBus::new(),
54            compactor: Arc::new(DefaultCompactor::new()),
55        }
56    }
57
58    pub fn with_compactor(mut self, c: Arc<dyn Compactor>) -> Self {
59        self.compactor = c;
60        self
61    }
62
63    pub fn with_tool(mut self, t: Arc<dyn harness_core::Tool>) -> Self {
64        self.tools.insert(t);
65        self
66    }
67
68    pub fn with_guide(mut self, g: Arc<dyn Guide>) -> Self {
69        self.guides.push(g);
70        self
71    }
72
73    pub fn with_sensor(mut self, s: Arc<dyn Sensor>) -> Self {
74        self.sensors.push(s);
75        self
76    }
77
78    pub fn with_hook(mut self, h: Arc<dyn harness_core::Hook>) -> Self {
79        self.hooks.register(h);
80        self
81    }
82
83    /// Pull in every `#[hook]`-registered hook.
84    pub fn with_macro_hooks(mut self) -> Self {
85        self.hooks = self.hooks.with_macro_hooks_take();
86        self
87    }
88
89    pub async fn run(&self, task: Task, world: &mut World) -> Result<Outcome, HarnessError> {
90        let max = harness_core::Policy::default().max_iters;
91        self.run_with_max_iters(task, world, max).await
92    }
93
94    pub async fn run_with_max_iters(
95        &self,
96        task: Task,
97        world: &mut World,
98        max_iters: u32,
99    ) -> Result<Outcome, HarnessError> {
100        let mut ctx = Context::new(task);
101        ctx.policy.max_iters = max_iters;
102        ctx.tools = self.tools.schemas();
103
104        self.hooks.fire(&Event::SessionStart { source: SessionSource::Startup }, world);
105
106        for g in &self.guides {
107            if g.scope().matches(&ctx.task) {
108                self.hooks.fire(&Event::PreGuide { guide: g.id() }, world);
109                g.apply(&mut ctx, world).await?;
110                self.hooks.fire(&Event::PostGuide { guide: g.id() }, world);
111            }
112        }
113
114        ctx.history.push(Turn {
115            role:   TurnRole::User,
116            blocks: vec![Block::Text(ctx.task.description.clone())],
117        });
118
119        for iter in 0..ctx.policy.max_iters {
120            self.hooks.fire(&Event::Heartbeat { iter }, world);
121
122            // Compaction: run every stage required by current budget.
123            let stages = self.compactor.budget(&ctx).required_stages();
124            for stage in stages {
125                self.hooks.fire(&Event::PreCompact { stage }, world);
126                self.compactor.compact(stage, &mut ctx).await?;
127                self.hooks.fire(&Event::PostCompact { stage }, world);
128            }
129
130            self.hooks.fire(&Event::PreModel { ctx: &ctx }, world);
131            let out = self.model.complete(&ctx).await?;
132            self.hooks.fire(&Event::PostModel { out: &out }, world);
133            ctx.push_model_output(&out);
134
135            if out.tool_calls.is_empty() {
136                self.hooks.fire(&Event::TaskCompleted, world);
137                self.hooks.fire(&Event::SessionEnd, world);
138                return Ok(Outcome::Done { text: out.text, iters: iter + 1 });
139            }
140
141            for call in &out.tool_calls {
142                let action = Action {
143                    tool:    call.name.clone(),
144                    call_id: call.id.clone(),
145                    args:    call.args.clone(),
146                };
147
148                // PreToolUse hook can deny destructive actions
149                if let HookOutcome::Deny { reason } =
150                    self.hooks.fire(&Event::PreToolUse { action: &action }, world)
151                {
152                    ctx.history.push(Turn {
153                        role: TurnRole::Tool,
154                        blocks: vec![Block::ToolResult {
155                            call_id: action.call_id.clone(),
156                            content: serde_json::json!({
157                                "ok": false,
158                                "denied_by_hook": reason,
159                            }),
160                        }],
161                    });
162                    continue;
163                }
164
165                let result = match self.tools.dispatch(&action, world).await {
166                    Ok(r) => r,
167                    Err(e) => ToolResult {
168                        ok: false,
169                        content: serde_json::json!({"error": e.to_string()}),
170                        trace: None,
171                    },
172                };
173                self.hooks.fire(&Event::PostToolUse { action: &action, result: &result }, world);
174
175                ctx.history.push(Turn {
176                    role:   TurnRole::Tool,
177                    blocks: vec![Block::ToolResult {
178                        call_id: action.call_id.clone(),
179                        content: result.content.clone(),
180                    }],
181                });
182
183                // run self-correct sensors
184                let mut all_signals = Vec::new();
185                for s in &self.sensors {
186                    if s.stage() != Stage::SelfCorrect { continue; }
187                    self.hooks.fire(&Event::PreSensor { sensor: s.id() }, world);
188                    let sigs = s.observe(&action, world).await.unwrap_or_else(|e| {
189                        tracing::warn!(?e, "sensor failed");
190                        Vec::new()
191                    });
192                    self.hooks.fire(&Event::PostSensor { sensor: s.id(), signals: &sigs }, world);
193                    all_signals.extend(sigs);
194                }
195                if !all_signals.is_empty() {
196                    let bundle = SignalSet::new(all_signals);
197                    let (patches, remaining) = bundle.partition_auto_fix();
198
199                    let applied = apply_patches(&patches, world).await;
200                    if !applied.is_empty() {
201                        ctx.push_feedback(vec![harness_core::Signal {
202                            severity:   harness_core::Severity::Hint,
203                            origin:     "auto-fix".into(),
204                            message:    format!("applied {} auto-fix patch(es): {applied:?}", applied.len()),
205                            agent_hint: Some("re-check the affected files before continuing".into()),
206                            auto_fix:   None,
207                            location:   None,
208                        }]);
209                    }
210                    if remaining.has_blocking() {
211                        ctx.push_feedback(remaining.signals);
212                    }
213                }
214            }
215        }
216        self.hooks.fire(&Event::SessionEnd, world);
217        Ok(Outcome::BudgetExhausted { iters: ctx.policy.max_iters })
218    }
219}
220
221/// Monotonic counter for `.harness-patch-*.diff` temp filenames — millisecond
222/// resolution alone collides under parallel agent runs.
223static PATCH_SEQ: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
224
225/// Apply auto-fix patches; return short descriptions of those that succeeded.
226///
227/// Made `pub` (was `pub(crate)`) so integration tests can call it directly.
228pub async fn apply_patches(
229    patches: &[harness_core::FixPatch],
230    world: &mut World,
231) -> Vec<String> {
232    use harness_core::FixPatch;
233    let mut applied = Vec::new();
234    for p in patches {
235        match p {
236            FixPatch::ReplaceFile { path, content } => {
237                let abs = world.repo.root.join(path);
238                if let Some(parent) = abs.parent() {
239                    let _ = tokio::fs::create_dir_all(parent).await;
240                }
241                if tokio::fs::write(&abs, content).await.is_ok() {
242                    applied.push(format!("replaced {}", path.display()));
243                }
244            }
245            FixPatch::UnifiedDiff { diff } => {
246                if try_apply_diff(world, diff).await {
247                    applied.push("unified diff applied".into());
248                }
249            }
250            FixPatch::RunCommand { program, args, cwd } => {
251                let cwd_ref = cwd.as_deref().unwrap_or(world.repo.root.as_path());
252                let args_ref: Vec<&str> = args.iter().map(String::as_str).collect();
253                if let Ok(out) = world.runner.exec(program, &args_ref, Some(cwd_ref)).await
254                    && out.status == 0
255                {
256                    applied.push(format!("ran `{program} {}`", args.join(" ")));
257                }
258            }
259            // FixPatch is `#[non_exhaustive]`; unknown variants are skipped.
260            _ => tracing::warn!("apply_patches: unknown FixPatch variant — skipped"),
261        }
262    }
263    applied
264}
265
266/// Write `diff` to a unique temp file and try `patch -p1` first, then `-p0`.
267/// Returns whether either succeeded. The `-p1`-then-`-p0` order matches the
268/// reality that most agent-emitted diffs are git-style (need `-p1`) but some
269/// hand-rolled diffs use repo-relative paths (need `-p0`).
270async fn try_apply_diff(world: &mut World, diff: &str) -> bool {
271    use std::sync::atomic::Ordering;
272    use tokio::io::AsyncWriteExt;
273
274    let seq = PATCH_SEQ.fetch_add(1, Ordering::SeqCst);
275    let pid = std::process::id();
276    let now = world.clock.now_ms();
277    let tmp = world
278        .repo
279        .root
280        .join(format!(".harness-patch-{pid}-{now}-{seq}.diff"));
281
282    let mut f = match tokio::fs::File::create(&tmp).await {
283        Ok(f) => f,
284        Err(e) => {
285            tracing::warn!(error=%e, path=%tmp.display(), "could not create patch tempfile");
286            return false;
287        }
288    };
289    if let Err(e) = f.write_all(diff.as_bytes()).await {
290        tracing::warn!(error=%e, "could not write patch tempfile");
291        let _ = tokio::fs::remove_file(&tmp).await;
292        return false;
293    }
294    drop(f);
295
296    let tmp_str = tmp.to_string_lossy().to_string();
297    let mut applied = false;
298    for strip in ["-p1", "-p0"] {
299        match world
300            .runner
301            .exec(
302                "patch",
303                &[strip, "--silent", "-i", tmp_str.as_str()],
304                Some(world.repo.root.as_path()),
305            )
306            .await
307        {
308            Ok(out) if out.status == 0 => {
309                tracing::info!(strip, "patch applied");
310                applied = true;
311                break;
312            }
313            Ok(out) => {
314                tracing::debug!(strip, stderr=%out.stderr, "patch failed; trying next strip level");
315            }
316            Err(e) => {
317                tracing::warn!(error=%e, "patch command not available");
318                break; // patch tool missing — no point trying other strip
319            }
320        }
321    }
322    let _ = tokio::fs::remove_file(&tmp).await;
323    applied
324}