1use crate::loop_trait::{Loop, LoopContext};
9use async_trait::async_trait;
10use oharness_core::event::{
11 EventKind, MetaPayload, RunFinishedPayload, RunStartedPayload, TurnFinishedPayload,
12 TurnPayload, TurnRevisedPayload,
13};
14use oharness_core::{
15 AgentError, AssistantTurn, BudgetRequest, CompletionRequest, CompletionResponse, Content,
16 ConversationView, Message, MetadataMap, ResourceUsage, RunError, RunErrorCategory, RunOutcome,
17 StopReason, Task, Termination, TrajectoryHandle, TrajectoryView, TruncationLimit,
18};
19use oharness_critic::{AssessmentContext, Critic, CriticTrigger, CriticVerdict};
20use oharness_llm::complete_from_stream;
21use oharness_memory::policy::MemoryContext;
22use oharness_tools::context::ToolContext;
23use oharness_tools::toolset::ToolOutcome;
24use oharness_trace::TOOL_USE_ID_KEY;
25use serde_json::json;
26use time::OffsetDateTime;
27
28pub struct ReactLoop {
29 system_prompt: Option<String>,
30}
31
32impl Default for ReactLoop {
33 fn default() -> Self {
34 Self {
35 system_prompt: Some(DEFAULT_SYSTEM_PROMPT.to_string()),
36 }
37 }
38}
39
40impl ReactLoop {
41 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn with_system_prompt(mut self, prompt: impl Into<String>) -> Self {
46 self.system_prompt = Some(prompt.into());
47 self
48 }
49
50 pub fn without_system_prompt(mut self) -> Self {
51 self.system_prompt = None;
52 self
53 }
54}
55
56#[async_trait]
57impl Loop for ReactLoop {
58 async fn run(&self, task: Task, ctx: &LoopContext) -> Result<RunOutcome, AgentError> {
59 let started_at = OffsetDateTime::now_utc();
60 let start_instant = std::time::Instant::now();
61
62 let capabilities = ctx.llm.capabilities();
64 ctx.events.emit(
65 "run-0",
66 EventKind::Meta(MetaPayload {
67 schema_version: oharness_core::event::SchemaVersion::CURRENT,
68 harness_version: env!("CARGO_PKG_VERSION").to_string(),
69 task_snapshot: task.clone(),
70 llm_capabilities: capabilities.clone(),
71 }),
72 None,
73 );
74
75 let run_open_seq = ctx.events.emit(
76 "run-0",
77 EventKind::RunStarted(RunStartedPayload {
78 extra: MetadataMap::new(),
79 }),
80 None,
81 );
82
83 let mut messages: Vec<Message> = Vec::new();
85 let user_text = build_user_text(&task);
86 messages.push(Message::user_text(user_text));
87
88 let tools_specs = ctx.tools.specs().to_vec();
89 let mut usage_totals = ResourceUsage::default();
90 let mut per_model: std::collections::HashMap<oharness_core::ModelId, ResourceUsage> =
91 std::collections::HashMap::new();
92
93 let mut termination: Option<Termination> = None;
94 let mut turn_index: u32 = 0;
95 while termination.is_none() {
96 if turn_index >= ctx.max_turns {
97 termination = Some(Termination::Truncated {
98 limit: TruncationLimit::MaxTurns(ctx.max_turns),
99 });
100 break;
101 }
102 if ctx.cancellation.is_cancelled() {
103 termination = Some(Termination::Interrupted {
104 reason: oharness_core::InterruptionReason::Cancellation,
105 });
106 break;
107 }
108
109 let turn_span = format!("turn-{turn_index}");
110 let turn_open_seq = ctx.events.emit(
111 &turn_span,
112 EventKind::TurnStarted(TurnPayload { turn_index }),
113 Some(run_open_seq),
114 );
115
116 let mem_ctx = MemoryContext {
118 events: ctx.events.clone(),
119 token_budget: capabilities.max_context_tokens,
120 };
121 let transformed = match ctx
122 .memory
123 .transform(ConversationView::new(&messages), &mem_ctx)
124 .await
125 {
126 Ok(m) => m,
127 Err(e) => {
128 termination = Some(Termination::Failed {
129 error: RunError {
130 category: RunErrorCategory::Memory,
131 message: e.to_string(),
132 },
133 at_turn: turn_index,
134 });
135 break;
136 }
137 };
138
139 let pre_budget = ctx
141 .budget
142 .check(BudgetRequest {
143 estimated_input_tokens: Some(
144 ConversationView::new(&transformed).token_estimate() as u64,
145 ),
146 ..Default::default()
147 })
148 .await;
149 if let oharness_core::BudgetDecision::Deny { reason } = pre_budget {
150 ctx.events.emit(
151 &turn_span,
152 EventKind::BudgetExceeded(json!({"reason": reason})),
153 Some(turn_open_seq),
154 );
155 termination = Some(Termination::Truncated {
156 limit: TruncationLimit::Budget(reason),
157 });
158 break;
159 }
160
161 let mut req = CompletionRequest::new(transformed);
164 req.tools = tools_specs.clone();
165 req.system = self.system_prompt.clone();
166
167 let response =
168 match complete_with_optional_streaming(ctx, req, capabilities.streaming).await {
169 Ok(r) => r,
170 Err(e) => {
171 termination = Some(Termination::Failed {
172 error: RunError {
173 category: RunErrorCategory::Llm,
174 message: e.to_string(),
175 },
176 at_turn: turn_index,
177 });
178 break;
179 }
180 };
181
182 usage_totals.add_usage(&response.usage);
184 per_model
185 .entry(response.model.clone())
186 .or_default()
187 .add_usage(&response.usage);
188
189 ctx.budget
190 .consume(oharness_core::BudgetAmount {
191 tokens_input: response.usage.tokens_input,
192 tokens_output: response.usage.tokens_output,
193 cost_usd: 0.0,
194 wall_clock: std::time::Duration::ZERO,
195 steps: 1,
196 })
197 .await;
198
199 let mut assistant_msg = Message::Assistant {
201 content: response.content.clone(),
202 stop_reason: Some(response.stop_reason.clone()),
203 meta: MetadataMap::new(),
204 };
205 let mut effective_stop = response.stop_reason.clone();
206 let mut effective_usage = response.usage.clone();
207 messages.push(assistant_msg.clone());
208
209 if matches!(ctx.critic_trigger, CriticTrigger::AfterAssistant) {
213 if let Some(critic) = &ctx.critics {
214 match run_critic_after_assistant(
215 critic.as_ref(),
216 &task,
217 &messages,
218 turn_index,
219 &assistant_msg,
220 &effective_usage,
221 &effective_stop,
222 &turn_span,
223 turn_open_seq,
224 ctx,
225 )
226 .await
227 {
228 CriticOutcome::Continue {
229 effective_message,
230 effective_usage: new_usage,
231 effective_stop: new_stop,
232 } => {
233 if let Some(last) = messages.last_mut() {
236 *last = effective_message.clone();
237 }
238 assistant_msg = effective_message;
239 effective_usage = new_usage;
240 effective_stop = new_stop;
241 }
242 CriticOutcome::Terminate { error } => {
243 termination = Some(Termination::Failed {
244 error,
245 at_turn: turn_index,
246 });
247 break;
248 }
249 }
250 }
251 }
252 let _ = assistant_msg; let effective_response = CompletionResponse {
258 id: response.id.clone(),
259 model: response.model.clone(),
260 content: extract_content_from_message(messages.last()),
261 stop_reason: effective_stop.clone(),
262 usage: effective_usage.clone(),
263 };
264 let tool_calls_in_turn =
265 execute_tool_calls(&effective_response, ctx, &mut messages).await;
266
267 ctx.events.emit(
269 &turn_span,
270 EventKind::TurnFinished(TurnFinishedPayload {
271 turn_index,
272 stop_reason: effective_stop.clone(),
273 usage: effective_usage.clone(),
274 tool_calls: tool_calls_in_turn,
275 }),
276 Some(turn_open_seq),
277 );
278
279 usage_totals.turns += 1;
280 usage_totals.tool_calls += tool_calls_in_turn;
281
282 match effective_stop {
284 StopReason::EndTurn => {
285 termination = Some(Termination::Completed {
286 reason: oharness_core::CompletionReason::EndTurn,
287 });
288 }
289 StopReason::StopSequence(s) => {
290 termination = Some(Termination::Completed {
291 reason: oharness_core::CompletionReason::StopSequence(s),
292 });
293 }
294 StopReason::MaxTokens => {
295 termination = Some(Termination::Truncated {
296 limit: TruncationLimit::MaxTokens,
297 });
298 }
299 StopReason::Refusal => {
300 termination = Some(Termination::Completed {
301 reason: oharness_core::CompletionReason::EndTurn,
302 });
303 }
304 StopReason::ToolUse => {
305 turn_index += 1;
307 continue;
308 }
309 StopReason::Error(e) => {
310 termination = Some(Termination::Failed {
311 error: RunError {
312 category: RunErrorCategory::Llm,
313 message: e,
314 },
315 at_turn: turn_index,
316 });
317 }
318 }
319 }
320
321 let termination = termination.unwrap_or(Termination::Completed {
322 reason: oharness_core::CompletionReason::EndTurn,
323 });
324
325 let finished_at = OffsetDateTime::now_utc();
326 usage_totals.wall_clock = start_instant.elapsed();
327
328 ctx.events.emit(
329 "run-0",
330 EventKind::RunFinished(RunFinishedPayload {
331 termination: format!("{termination:?}"),
332 turns: usage_totals.turns,
333 tool_calls: usage_totals.tool_calls,
334 extra: MetadataMap::new(),
335 }),
336 Some(run_open_seq),
337 );
338
339 Ok(RunOutcome {
340 run_id: ctx.events.run_id(),
341 task_id: task.id.clone(),
342 termination,
343 final_messages: messages,
344 trajectory: TrajectoryHandle::in_memory(Vec::new()),
348 usage: usage_totals,
349 per_model_usage: per_model,
350 started_at,
351 finished_at,
352 agent_state: MetadataMap::new(),
353 })
354 }
355}
356
357async fn complete_with_optional_streaming(
358 ctx: &LoopContext,
359 req: CompletionRequest,
360 streaming: bool,
361) -> Result<CompletionResponse, oharness_llm::LlmError> {
362 if streaming {
363 let stream = ctx.llm.stream(req).await?;
364 complete_from_stream(stream).await
365 } else {
366 ctx.llm.complete(req).await
367 }
368}
369
370enum CriticOutcome {
379 Continue {
380 effective_message: Message,
381 effective_usage: oharness_core::Usage,
382 effective_stop: StopReason,
383 },
384 Terminate {
385 error: RunError,
386 },
387}
388
389#[allow(clippy::too_many_arguments)]
394async fn run_critic_after_assistant(
395 critic: &oharness_critic::CompositeCritic,
396 task: &Task,
397 messages: &[Message],
398 turn_index: u32,
399 initial_message: &Message,
400 initial_usage: &oharness_core::Usage,
401 initial_stop: &StopReason,
402 turn_span: &str,
403 turn_open_seq: u64,
404 ctx: &LoopContext,
405) -> CriticOutcome {
406 let mut current_message = initial_message.clone();
407 let mut current_usage = initial_usage.clone();
408 let mut current_stop = initial_stop.clone();
409
410 for depth in 0..=ctx.revision_depth_cap {
411 let original_seq = turn_open_seq;
412 let trajectory_tail: Vec<oharness_core::Event> = Vec::new(); let turn = AssistantTurn::new(
414 turn_index,
415 turn_span,
416 current_message.clone(),
417 current_usage.clone(),
418 current_stop.clone(),
419 );
420 let assess_ctx = AssessmentContext::new(
421 task,
422 ConversationView::new(messages),
423 &turn,
424 TrajectoryView::new(&trajectory_tail),
425 );
426
427 let verdict = critic.assess(&assess_ctx).await;
428 match verdict {
429 CriticVerdict::Accept => {
430 ctx.events.emit(
431 turn_span,
432 EventKind::CriticAssessed(json!({
433 "critic": critic.name(),
434 "verdict": "accept",
435 "revision_depth": depth,
436 })),
437 Some(turn_open_seq),
438 );
439 return CriticOutcome::Continue {
440 effective_message: current_message,
441 effective_usage: current_usage,
442 effective_stop: current_stop,
443 };
444 }
445 CriticVerdict::AcceptWithNote(note) => {
446 ctx.events.emit(
447 turn_span,
448 EventKind::CriticAssessed(json!({
449 "critic": critic.name(),
450 "verdict": "accept_with_note",
451 "note": note,
452 "revision_depth": depth,
453 })),
454 Some(turn_open_seq),
455 );
456 return CriticOutcome::Continue {
457 effective_message: current_message,
458 effective_usage: current_usage,
459 effective_stop: current_stop,
460 };
461 }
462 CriticVerdict::Reject { reason } => {
463 ctx.events.emit(
464 turn_span,
465 EventKind::CriticRejected(json!({
466 "critic": critic.name(),
467 "reason": reason,
468 "revision_depth": depth,
469 })),
470 Some(turn_open_seq),
471 );
472 return CriticOutcome::Terminate {
473 error: RunError {
474 category: RunErrorCategory::Critic,
475 message: format!("critic `{}` rejected turn: {reason}", critic.name()),
476 },
477 };
478 }
479 CriticVerdict::Abort { reason } => {
480 ctx.events.emit(
481 turn_span,
482 EventKind::CriticRejected(json!({
483 "critic": critic.name(),
484 "reason": reason,
485 "abort": true,
486 "revision_depth": depth,
487 })),
488 Some(turn_open_seq),
489 );
490 return CriticOutcome::Terminate {
491 error: RunError {
492 category: RunErrorCategory::Critic,
493 message: format!("critic `{}` aborted run: {reason}", critic.name()),
494 },
495 };
496 }
497 CriticVerdict::Revise {
498 replacement,
499 reason,
500 } => {
501 if depth >= ctx.revision_depth_cap {
502 ctx.events.emit(
504 turn_span,
505 EventKind::CriticRejected(json!({
506 "critic": critic.name(),
507 "reason": format!(
508 "revision depth cap ({}) exceeded: {reason}",
509 ctx.revision_depth_cap
510 ),
511 "revision_depth": depth,
512 })),
513 Some(turn_open_seq),
514 );
515 return CriticOutcome::Terminate {
516 error: RunError {
517 category: RunErrorCategory::Critic,
518 message: format!(
519 "critic `{}`: revision depth cap ({}) exceeded",
520 critic.name(),
521 ctx.revision_depth_cap
522 ),
523 },
524 };
525 }
526 let critic_revised_seq = ctx.events.emit(
529 turn_span,
530 EventKind::CriticRevised(json!({
531 "critic": critic.name(),
532 "reason": reason,
533 "revision_depth": depth,
534 })),
535 Some(turn_open_seq),
536 );
537 ctx.events.emit(
538 turn_span,
539 EventKind::TurnRevised(TurnRevisedPayload {
540 original_seq,
541 replacement_seq: critic_revised_seq,
542 reason,
543 }),
544 Some(turn_open_seq),
545 );
546 current_message = replacement.message;
547 current_usage = replacement.usage;
548 current_stop = replacement.stop_reason;
549 continue;
551 }
552 }
553 }
554
555 CriticOutcome::Continue {
558 effective_message: current_message,
559 effective_usage: current_usage,
560 effective_stop: current_stop,
561 }
562}
563
564fn extract_content_from_message(msg: Option<&Message>) -> Vec<Content> {
569 match msg {
570 Some(Message::Assistant { content, .. }) => content.clone(),
571 _ => Vec::new(),
572 }
573}
574
575async fn execute_tool_calls(
576 response: &CompletionResponse,
577 ctx: &LoopContext,
578 messages: &mut Vec<Message>,
579) -> u32 {
580 let mut results: Vec<Content> = Vec::new();
581 let mut count = 0u32;
582
583 for block in &response.content {
584 if let Content::ToolUse { id, name, input } = block {
585 count += 1;
586
587 let mut extensions = MetadataMap::new();
591 extensions.insert(TOOL_USE_ID_KEY.to_string(), json!(id));
592 let tool_ctx = ToolContext {
593 events: ctx.events.sink().clone(),
594 budget: ctx.budget.clone(),
595 cancellation: ctx.cancellation.clone(),
596 approval: ctx.approval.clone(),
597 workspace: ctx.workspace.clone(),
602 extensions,
603 };
604
605 let outcome = ctx.tools.execute(name, input.clone(), &tool_ctx).await;
606 match outcome {
607 ToolOutcome::Success(output) => {
608 results.push(Content::ToolResult {
609 tool_use_id: id.clone(),
610 output,
611 is_error: false,
612 });
613 }
614 ToolOutcome::ExecutionError {
615 message,
616 recoverable: _,
617 } => {
618 results.push(Content::ToolResult {
619 tool_use_id: id.clone(),
620 output: oharness_core::message::ToolOutput::text(format!(
621 "error: {message}"
622 )),
623 is_error: true,
624 });
625 }
626 ToolOutcome::Denied { reason } => {
627 results.push(Content::ToolResult {
628 tool_use_id: id.clone(),
629 output: oharness_core::message::ToolOutput::text(format!(
630 "denied: {reason}"
631 )),
632 is_error: true,
633 });
634 }
635 ToolOutcome::Cancelled => {
636 results.push(Content::ToolResult {
637 tool_use_id: id.clone(),
638 output: oharness_core::message::ToolOutput::text("cancelled"),
639 is_error: true,
640 });
641 }
642 }
643 }
644 }
645
646 if !results.is_empty() {
647 messages.push(Message::User {
648 content: results,
649 meta: MetadataMap::new(),
650 });
651 }
652 count
653}
654
655fn build_user_text(task: &Task) -> String {
656 let mut s = task.instruction.clone();
657 for att in &task.attachments {
658 s.push_str("\n\n");
659 match att {
660 oharness_core::Attachment::Text { name, content } => {
661 s.push_str(&format!("# attachment: {name}\n{content}"));
662 }
663 oharness_core::Attachment::File { name, path } => {
664 s.push_str(&format!("# attachment: {name} (file: {})", path.display()));
665 }
666 oharness_core::Attachment::Inline { name, mime, bytes } => {
667 s.push_str(&format!(
668 "# attachment: {name} ({mime}, {} bytes)",
669 bytes.len()
670 ));
671 }
672 oharness_core::Attachment::Url { url, .. } => {
673 s.push_str(&format!("# attachment: {url}"));
674 }
675 }
676 }
677 s
678}
679
680const DEFAULT_SYSTEM_PROMPT: &str =
681 "You are an agent running inside the open-harness research framework. You have \
682 access to the tools listed in the `tools` field. Think step by step, call tools \
683 to gather evidence and make changes, and respond with plain text when you've \
684 completed the task. Stop calling tools once the task is done.";