Skip to main content

oharness_loop/
react.rs

1//! The default ReAct-style loop: thought → action → observation.
2//!
3//! Emits only lifecycle events (`meta`, `run.*`, `turn.*`, `budget.exceeded`).
4//! `llm.*` and `tool.*` events are produced by `RequestTracer` and
5//! `ToolTracer` in `oharness-trace`, wired in by `Agent::run`
6//! (see docs/remaining-work.md §2.4 — M1b-δ refactor).
7
8use crate::loop_trait::{Loop, LoopContext};
9use async_trait::async_trait;
10use oharness_core::event::{
11    EventKind, MetaPayload, RunFinishedPayload, RunStartedPayload, TurnFinishedPayload,
12    TurnPayload, TurnRevisedPayload,
13};
14use oharness_core::{
15    AgentError, AssistantTurn, BudgetRequest, CompletionRequest, CompletionResponse, Content,
16    ConversationView, Message, MetadataMap, ResourceUsage, RunError, RunErrorCategory, RunOutcome,
17    StopReason, Task, Termination, TrajectoryHandle, TrajectoryView, TruncationLimit,
18};
19use oharness_critic::{AssessmentContext, Critic, CriticTrigger, CriticVerdict};
20use oharness_llm::complete_from_stream;
21use oharness_memory::policy::MemoryContext;
22use oharness_tools::context::ToolContext;
23use oharness_tools::toolset::ToolOutcome;
24use oharness_trace::TOOL_USE_ID_KEY;
25use serde_json::json;
26use time::OffsetDateTime;
27
28pub struct ReactLoop {
29    system_prompt: Option<String>,
30}
31
32impl Default for ReactLoop {
33    fn default() -> Self {
34        Self {
35            system_prompt: Some(DEFAULT_SYSTEM_PROMPT.to_string()),
36        }
37    }
38}
39
40impl ReactLoop {
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
46        self.system_prompt = Some(prompt.into());
47        self
48    }
49
50    pub fn without_system_prompt(mut self) -> Self {
51        self.system_prompt = None;
52        self
53    }
54}
55
56#[async_trait]
57impl Loop for ReactLoop {
58    async fn run(&self, task: Task, ctx: &LoopContext) -> Result<RunOutcome, AgentError> {
59        let started_at = OffsetDateTime::now_utc();
60        let start_instant = std::time::Instant::now();
61
62        // ---- meta event (always first) ----
63        let capabilities = ctx.llm.capabilities();
64        ctx.events.emit(
65            "run-0",
66            EventKind::Meta(MetaPayload {
67                schema_version: oharness_core::event::SchemaVersion::CURRENT,
68                harness_version: env!("CARGO_PKG_VERSION").to_string(),
69                task_snapshot: task.clone(),
70                llm_capabilities: capabilities.clone(),
71            }),
72            None,
73        );
74
75        let run_open_seq = ctx.events.emit(
76            "run-0",
77            EventKind::RunStarted(RunStartedPayload {
78                extra: MetadataMap::new(),
79            }),
80            None,
81        );
82
83        // ---- initial conversation ----
84        let mut messages: Vec<Message> = Vec::new();
85        let user_text = build_user_text(&task);
86        messages.push(Message::user_text(user_text));
87
88        let tools_specs = ctx.tools.specs().to_vec();
89        let mut usage_totals = ResourceUsage::default();
90        let mut per_model: std::collections::HashMap<oharness_core::ModelId, ResourceUsage> =
91            std::collections::HashMap::new();
92
93        let mut termination: Option<Termination> = None;
94        let mut turn_index: u32 = 0;
95        while termination.is_none() {
96            if turn_index >= ctx.max_turns {
97                termination = Some(Termination::Truncated {
98                    limit: TruncationLimit::MaxTurns(ctx.max_turns),
99                });
100                break;
101            }
102            if ctx.cancellation.is_cancelled() {
103                termination = Some(Termination::Interrupted {
104                    reason: oharness_core::InterruptionReason::Cancellation,
105                });
106                break;
107            }
108
109            let turn_span = format!("turn-{turn_index}");
110            let turn_open_seq = ctx.events.emit(
111                &turn_span,
112                EventKind::TurnStarted(TurnPayload { turn_index }),
113                Some(run_open_seq),
114            );
115
116            // ---- memory policy transform ----
117            let mem_ctx = MemoryContext {
118                events: ctx.events.clone(),
119                token_budget: capabilities.max_context_tokens,
120            };
121            let transformed = match ctx
122                .memory
123                .transform(ConversationView::new(&messages), &mem_ctx)
124                .await
125            {
126                Ok(m) => m,
127                Err(e) => {
128                    termination = Some(Termination::Failed {
129                        error: RunError {
130                            category: RunErrorCategory::Memory,
131                            message: e.to_string(),
132                        },
133                        at_turn: turn_index,
134                    });
135                    break;
136                }
137            };
138
139            // ---- budget pre-check ----
140            let pre_budget = ctx
141                .budget
142                .check(BudgetRequest {
143                    estimated_input_tokens: Some(
144                        ConversationView::new(&transformed).token_estimate() as u64,
145                    ),
146                    ..Default::default()
147                })
148                .await;
149            if let oharness_core::BudgetDecision::Deny { reason } = pre_budget {
150                ctx.events.emit(
151                    &turn_span,
152                    EventKind::BudgetExceeded(json!({"reason": reason})),
153                    Some(turn_open_seq),
154                );
155                termination = Some(Termination::Truncated {
156                    limit: TruncationLimit::Budget(reason),
157                });
158                break;
159            }
160
161            // ---- build + issue LLM call (llm.* events are emitted by
162            //      RequestTracer wrapping ctx.llm) ----
163            let mut req = CompletionRequest::new(transformed);
164            req.tools = tools_specs.clone();
165            req.system = self.system_prompt.clone();
166
167            let response =
168                match complete_with_optional_streaming(ctx, req, capabilities.streaming).await {
169                    Ok(r) => r,
170                    Err(e) => {
171                        termination = Some(Termination::Failed {
172                            error: RunError {
173                                category: RunErrorCategory::Llm,
174                                message: e.to_string(),
175                            },
176                            at_turn: turn_index,
177                        });
178                        break;
179                    }
180                };
181
182            // ---- consume budget + update totals ----
183            usage_totals.add_usage(&response.usage);
184            per_model
185                .entry(response.model.clone())
186                .or_default()
187                .add_usage(&response.usage);
188
189            ctx.budget
190                .consume(oharness_core::BudgetAmount {
191                    tokens_input: response.usage.tokens_input,
192                    tokens_output: response.usage.tokens_output,
193                    cost_usd: 0.0,
194                    wall_clock: std::time::Duration::ZERO,
195                    steps: 1,
196                })
197                .await;
198
199            // ---- append assistant message ----
200            let mut assistant_msg = Message::Assistant {
201                content: response.content.clone(),
202                stop_reason: Some(response.stop_reason.clone()),
203                meta: MetadataMap::new(),
204            };
205            let mut effective_stop = response.stop_reason.clone();
206            let mut effective_usage = response.usage.clone();
207            messages.push(assistant_msg.clone());
208
209            // ---- critic invocation (plan §11) ----
210            // Only AfterAssistant is wired for M2 part 2; other triggers
211            // (AfterToolResult, AfterEveryNTurns, OnDemand) are deferred.
212            if matches!(ctx.critic_trigger, CriticTrigger::AfterAssistant) {
213                if let Some(critic) = &ctx.critics {
214                    match run_critic_after_assistant(
215                        critic.as_ref(),
216                        &task,
217                        &messages,
218                        turn_index,
219                        &assistant_msg,
220                        &effective_usage,
221                        &effective_stop,
222                        &turn_span,
223                        turn_open_seq,
224                        ctx,
225                    )
226                    .await
227                    {
228                        CriticOutcome::Continue {
229                            effective_message,
230                            effective_usage: new_usage,
231                            effective_stop: new_stop,
232                        } => {
233                            // Swap the in-history assistant message with the
234                            // (possibly revised) final version.
235                            if let Some(last) = messages.last_mut() {
236                                *last = effective_message.clone();
237                            }
238                            assistant_msg = effective_message;
239                            effective_usage = new_usage;
240                            effective_stop = new_stop;
241                        }
242                        CriticOutcome::Terminate { error } => {
243                            termination = Some(Termination::Failed {
244                                error,
245                                at_turn: turn_index,
246                            });
247                            break;
248                        }
249                    }
250                }
251            }
252            let _ = assistant_msg; // silence unused-assignment warning when no revision
253
254            // ---- tool execution if any ----
255            // Use the *effective* response — a revise verdict may have
256            // swapped the tool_use blocks the agent actually intended.
257            let effective_response = CompletionResponse {
258                id: response.id.clone(),
259                model: response.model.clone(),
260                content: extract_content_from_message(messages.last()),
261                stop_reason: effective_stop.clone(),
262                usage: effective_usage.clone(),
263            };
264            let tool_calls_in_turn =
265                execute_tool_calls(&effective_response, ctx, &mut messages).await;
266
267            // ---- turn.finished ----
268            ctx.events.emit(
269                &turn_span,
270                EventKind::TurnFinished(TurnFinishedPayload {
271                    turn_index,
272                    stop_reason: effective_stop.clone(),
273                    usage: effective_usage.clone(),
274                    tool_calls: tool_calls_in_turn,
275                }),
276                Some(turn_open_seq),
277            );
278
279            usage_totals.turns += 1;
280            usage_totals.tool_calls += tool_calls_in_turn;
281
282            // ---- termination decision ----
283            match effective_stop {
284                StopReason::EndTurn => {
285                    termination = Some(Termination::Completed {
286                        reason: oharness_core::CompletionReason::EndTurn,
287                    });
288                }
289                StopReason::StopSequence(s) => {
290                    termination = Some(Termination::Completed {
291                        reason: oharness_core::CompletionReason::StopSequence(s),
292                    });
293                }
294                StopReason::MaxTokens => {
295                    termination = Some(Termination::Truncated {
296                        limit: TruncationLimit::MaxTokens,
297                    });
298                }
299                StopReason::Refusal => {
300                    termination = Some(Termination::Completed {
301                        reason: oharness_core::CompletionReason::EndTurn,
302                    });
303                }
304                StopReason::ToolUse => {
305                    // Continue to next turn; tool_results already appended.
306                    turn_index += 1;
307                    continue;
308                }
309                StopReason::Error(e) => {
310                    termination = Some(Termination::Failed {
311                        error: RunError {
312                            category: RunErrorCategory::Llm,
313                            message: e,
314                        },
315                        at_turn: turn_index,
316                    });
317                }
318            }
319        }
320
321        let termination = termination.unwrap_or(Termination::Completed {
322            reason: oharness_core::CompletionReason::EndTurn,
323        });
324
325        let finished_at = OffsetDateTime::now_utc();
326        usage_totals.wall_clock = start_instant.elapsed();
327
328        ctx.events.emit(
329            "run-0",
330            EventKind::RunFinished(RunFinishedPayload {
331                termination: format!("{termination:?}"),
332                turns: usage_totals.turns,
333                tool_calls: usage_totals.tool_calls,
334                extra: MetadataMap::new(),
335            }),
336            Some(run_open_seq),
337        );
338
339        Ok(RunOutcome {
340            run_id: ctx.events.run_id(),
341            task_id: task.id.clone(),
342            termination,
343            final_messages: messages,
344            // Trajectory reference is supplied by the Agent wrapper; the loop itself
345            // doesn't know where events were routed. Agent overwrites this with the
346            // correct handle (file path or in-memory snapshot) before returning.
347            trajectory: TrajectoryHandle::in_memory(Vec::new()),
348            usage: usage_totals,
349            per_model_usage: per_model,
350            started_at,
351            finished_at,
352            agent_state: MetadataMap::new(),
353        })
354    }
355}
356
357async fn complete_with_optional_streaming(
358    ctx: &LoopContext,
359    req: CompletionRequest,
360    streaming: bool,
361) -> Result<CompletionResponse, oharness_llm::LlmError> {
362    if streaming {
363        let stream = ctx.llm.stream(req).await?;
364        complete_from_stream(stream).await
365    } else {
366        ctx.llm.complete(req).await
367    }
368}
369
370/// Result of a single critic cycle for one assistant turn. Produced by
371/// [`run_critic_after_assistant`], consumed by the loop body.
372///
373/// `Continue` always carries the effective assistant state — the same
374/// values that went in if the critic accepted first-pass, or the final
375/// revision if the critic revised-then-accepted. This lets the loop
376/// consume one canonical `(message, usage, stop)` triple regardless of
377/// whether a revision happened.
378enum CriticOutcome {
379    Continue {
380        effective_message: Message,
381        effective_usage: oharness_core::Usage,
382        effective_stop: StopReason,
383    },
384    Terminate {
385        error: RunError,
386    },
387}
388
389/// Invoke the critic on the freshly-appended assistant turn and resolve
390/// the verdict. Re-invokes the critic on each revision up to
391/// [`LoopContext::revision_depth_cap`] — once exceeded, the most recent
392/// `Revise` verdict is converted to `Reject` per plan §11.1.
393#[allow(clippy::too_many_arguments)]
394async fn run_critic_after_assistant(
395    critic: &oharness_critic::CompositeCritic,
396    task: &Task,
397    messages: &[Message],
398    turn_index: u32,
399    initial_message: &Message,
400    initial_usage: &oharness_core::Usage,
401    initial_stop: &StopReason,
402    turn_span: &str,
403    turn_open_seq: u64,
404    ctx: &LoopContext,
405) -> CriticOutcome {
406    let mut current_message = initial_message.clone();
407    let mut current_usage = initial_usage.clone();
408    let mut current_stop = initial_stop.clone();
409
410    for depth in 0..=ctx.revision_depth_cap {
411        let original_seq = turn_open_seq;
412        let trajectory_tail: Vec<oharness_core::Event> = Vec::new(); // loop does not re-read events
413        let turn = AssistantTurn::new(
414            turn_index,
415            turn_span,
416            current_message.clone(),
417            current_usage.clone(),
418            current_stop.clone(),
419        );
420        let assess_ctx = AssessmentContext::new(
421            task,
422            ConversationView::new(messages),
423            &turn,
424            TrajectoryView::new(&trajectory_tail),
425        );
426
427        let verdict = critic.assess(&assess_ctx).await;
428        match verdict {
429            CriticVerdict::Accept => {
430                ctx.events.emit(
431                    turn_span,
432                    EventKind::CriticAssessed(json!({
433                        "critic": critic.name(),
434                        "verdict": "accept",
435                        "revision_depth": depth,
436                    })),
437                    Some(turn_open_seq),
438                );
439                return CriticOutcome::Continue {
440                    effective_message: current_message,
441                    effective_usage: current_usage,
442                    effective_stop: current_stop,
443                };
444            }
445            CriticVerdict::AcceptWithNote(note) => {
446                ctx.events.emit(
447                    turn_span,
448                    EventKind::CriticAssessed(json!({
449                        "critic": critic.name(),
450                        "verdict": "accept_with_note",
451                        "note": note,
452                        "revision_depth": depth,
453                    })),
454                    Some(turn_open_seq),
455                );
456                return CriticOutcome::Continue {
457                    effective_message: current_message,
458                    effective_usage: current_usage,
459                    effective_stop: current_stop,
460                };
461            }
462            CriticVerdict::Reject { reason } => {
463                ctx.events.emit(
464                    turn_span,
465                    EventKind::CriticRejected(json!({
466                        "critic": critic.name(),
467                        "reason": reason,
468                        "revision_depth": depth,
469                    })),
470                    Some(turn_open_seq),
471                );
472                return CriticOutcome::Terminate {
473                    error: RunError {
474                        category: RunErrorCategory::Critic,
475                        message: format!("critic `{}` rejected turn: {reason}", critic.name()),
476                    },
477                };
478            }
479            CriticVerdict::Abort { reason } => {
480                ctx.events.emit(
481                    turn_span,
482                    EventKind::CriticRejected(json!({
483                        "critic": critic.name(),
484                        "reason": reason,
485                        "abort": true,
486                        "revision_depth": depth,
487                    })),
488                    Some(turn_open_seq),
489                );
490                return CriticOutcome::Terminate {
491                    error: RunError {
492                        category: RunErrorCategory::Critic,
493                        message: format!("critic `{}` aborted run: {reason}", critic.name()),
494                    },
495                };
496            }
497            CriticVerdict::Revise {
498                replacement,
499                reason,
500            } => {
501                if depth >= ctx.revision_depth_cap {
502                    // Cap exceeded — convert to Reject per plan §11.1.
503                    ctx.events.emit(
504                        turn_span,
505                        EventKind::CriticRejected(json!({
506                            "critic": critic.name(),
507                            "reason": format!(
508                                "revision depth cap ({}) exceeded: {reason}",
509                                ctx.revision_depth_cap
510                            ),
511                            "revision_depth": depth,
512                        })),
513                        Some(turn_open_seq),
514                    );
515                    return CriticOutcome::Terminate {
516                        error: RunError {
517                            category: RunErrorCategory::Critic,
518                            message: format!(
519                                "critic `{}`: revision depth cap ({}) exceeded",
520                                critic.name(),
521                                ctx.revision_depth_cap
522                            ),
523                        },
524                    };
525                }
526                // Emit critic.revised (critic's declaration) + turn.revised
527                // (loop's view of the swap), per plan §11.1.
528                let critic_revised_seq = ctx.events.emit(
529                    turn_span,
530                    EventKind::CriticRevised(json!({
531                        "critic": critic.name(),
532                        "reason": reason,
533                        "revision_depth": depth,
534                    })),
535                    Some(turn_open_seq),
536                );
537                ctx.events.emit(
538                    turn_span,
539                    EventKind::TurnRevised(TurnRevisedPayload {
540                        original_seq,
541                        replacement_seq: critic_revised_seq,
542                        reason,
543                    }),
544                    Some(turn_open_seq),
545                );
546                current_message = replacement.message;
547                current_usage = replacement.usage;
548                current_stop = replacement.stop_reason;
549                // Loop again — the revised turn is itself assessed.
550                continue;
551            }
552        }
553    }
554
555    // Unreachable: loop returns in every branch except Revise, and Revise
556    // checks `depth >= cap` to bail out. Defensive fallback:
557    CriticOutcome::Continue {
558        effective_message: current_message,
559        effective_usage: current_usage,
560        effective_stop: current_stop,
561    }
562}
563
564/// Pull the `Content` vec out of an `Assistant` message. Used to build
565/// the post-revision `CompletionResponse` fed into
566/// `execute_tool_calls` — the critic may have changed which tool-use
567/// blocks the agent issues.
568fn extract_content_from_message(msg: Option<&Message>) -> Vec<Content> {
569    match msg {
570        Some(Message::Assistant { content, .. }) => content.clone(),
571        _ => Vec::new(),
572    }
573}
574
575async fn execute_tool_calls(
576    response: &CompletionResponse,
577    ctx: &LoopContext,
578    messages: &mut Vec<Message>,
579) -> u32 {
580    let mut results: Vec<Content> = Vec::new();
581    let mut count = 0u32;
582
583    for block in &response.content {
584        if let Content::ToolUse { id, name, input } = block {
585            count += 1;
586
587            // tool.call.* events are emitted by the ToolTracer wrapping
588            // ctx.tools. Pass the tool_use_id via ctx.extensions so the
589            // tracer can populate its payloads (see tracer::TOOL_USE_ID_KEY).
590            let mut extensions = MetadataMap::new();
591            extensions.insert(TOOL_USE_ID_KEY.to_string(), json!(id));
592            let tool_ctx = ToolContext {
593                events: ctx.events.sink().clone(),
594                budget: ctx.budget.clone(),
595                cancellation: ctx.cancellation.clone(),
596                approval: ctx.approval.clone(),
597                // Thread the loop's workspace into every tool call so
598                // the shipped `fs` / `bash` tools scope to it instead
599                // of cwd. Benchmark adapters populate this by building
600                // the agent with `.with_workspace(loaded.workspace)`.
601                workspace: ctx.workspace.clone(),
602                extensions,
603            };
604
605            let outcome = ctx.tools.execute(name, input.clone(), &tool_ctx).await;
606            match outcome {
607                ToolOutcome::Success(output) => {
608                    results.push(Content::ToolResult {
609                        tool_use_id: id.clone(),
610                        output,
611                        is_error: false,
612                    });
613                }
614                ToolOutcome::ExecutionError {
615                    message,
616                    recoverable: _,
617                } => {
618                    results.push(Content::ToolResult {
619                        tool_use_id: id.clone(),
620                        output: oharness_core::message::ToolOutput::text(format!(
621                            "error: {message}"
622                        )),
623                        is_error: true,
624                    });
625                }
626                ToolOutcome::Denied { reason } => {
627                    results.push(Content::ToolResult {
628                        tool_use_id: id.clone(),
629                        output: oharness_core::message::ToolOutput::text(format!(
630                            "denied: {reason}"
631                        )),
632                        is_error: true,
633                    });
634                }
635                ToolOutcome::Cancelled => {
636                    results.push(Content::ToolResult {
637                        tool_use_id: id.clone(),
638                        output: oharness_core::message::ToolOutput::text("cancelled"),
639                        is_error: true,
640                    });
641                }
642            }
643        }
644    }
645
646    if !results.is_empty() {
647        messages.push(Message::User {
648            content: results,
649            meta: MetadataMap::new(),
650        });
651    }
652    count
653}
654
655fn build_user_text(task: &Task) -> String {
656    let mut s = task.instruction.clone();
657    for att in &task.attachments {
658        s.push_str("\n\n");
659        match att {
660            oharness_core::Attachment::Text { name, content } => {
661                s.push_str(&format!("# attachment: {name}\n{content}"));
662            }
663            oharness_core::Attachment::File { name, path } => {
664                s.push_str(&format!("# attachment: {name} (file: {})", path.display()));
665            }
666            oharness_core::Attachment::Inline { name, mime, bytes } => {
667                s.push_str(&format!(
668                    "# attachment: {name} ({mime}, {} bytes)",
669                    bytes.len()
670                ));
671            }
672            oharness_core::Attachment::Url { url, .. } => {
673                s.push_str(&format!("# attachment: {url}"));
674            }
675        }
676    }
677    s
678}
679
680const DEFAULT_SYSTEM_PROMPT: &str =
681    "You are an agent running inside the open-harness research framework. You have \
682     access to the tools listed in the `tools` field. Think step by step, call tools \
683     to gather evidence and make changes, and respond with plain text when you've \
684     completed the task. Stop calling tools once the task is done.";