1use std::sync::Arc;
2use std::time::Instant;
3
4use chrono::Utc;
5use ras_cdp::BrowserPort;
6use ras_dom::DomExtractor;
7use ras_errors::AppError;
8use ras_events::EventBus;
9use ras_llm::{ChatMessage, ChatResponse, InvokeOptions, LlmClient};
10use ras_tools::domain::registry::{ActionRegistry, ToolContext};
11use ras_types::{ActionResult, StepId, TargetId};
12
13use crate::application::clickable_map::build_current_page_message;
14use crate::application::compute_action_hash::compute_action_hash;
15use crate::application::detect_loop::{build_budget_warning, build_loop_nudge};
16use crate::application::fallback_llm::should_switch_to_fallback;
17use crate::application::parse_output::parse_agent_output;
18use crate::application::run_step_log::{log_action_err, log_action_ok, log_decision};
19use crate::application::salvage::salvage_into;
20use crate::domain::agent_history::StepRecord;
21use crate::domain::loop_detector::ActionLoopDetector;
22use crate::domain::step_metadata::StepMetadata;
23
24pub struct RunStep {
25 primary_llm: Arc<dyn LlmClient>,
26 fallback_llm: Option<Arc<dyn LlmClient>>,
27 registry: Arc<ActionRegistry>,
28 browser: Arc<dyn BrowserPort>,
29 events: Arc<dyn EventBus>,
30 dom_extractor: Option<Arc<dyn DomExtractor>>,
31 bound_target: Option<TargetId>,
32}
33
34impl RunStep {
35 #[must_use]
36 pub fn new(
37 primary: Arc<dyn LlmClient>,
38 fallback: Option<Arc<dyn LlmClient>>,
39 registry: Arc<ActionRegistry>,
40 browser: Arc<dyn BrowserPort>,
41 events: Arc<dyn EventBus>,
42 dom_extractor: Option<Arc<dyn DomExtractor>>,
43 bound_target: Option<TargetId>,
44 ) -> Self {
45 Self {
46 primary_llm: primary,
47 fallback_llm: fallback,
48 registry,
49 browser,
50 events,
51 dom_extractor,
52 bound_target,
53 }
54 }
55
56 pub async fn execute(
57 &self,
58 step: StepId,
59 max_steps: u32,
60 prompt: Vec<ChatMessage>,
61 detector: &mut ActionLoopDetector,
62 ) -> Result<StepRecord, AppError> {
63 let started = Instant::now();
64 let mut messages = prompt;
65 if let Some(nudge) = build_loop_nudge(detector) {
66 messages.push(nudge);
67 }
68 if let Some(warn) = build_budget_warning(step.0, max_steps) {
69 messages.push(warn);
70 }
71 let target = match &self.bound_target {
72 Some(t) => Some(t.clone()),
73 None => self.browser.focused_target().await.ok(),
74 };
75 let current = match (&self.dom_extractor, &target) {
76 (Some(extractor), Some(t)) => extractor.snapshot(t).await.ok(),
77 _ => None,
78 };
79 if let Some(page_msg) = current.as_ref().and_then(build_current_page_message) {
80 messages.push(page_msg);
81 }
82 let pre_clickables: Arc<Vec<ras_dom::ClickableElement>> = Arc::new(
83 current
84 .as_ref()
85 .map(|s| s.clickables.clone())
86 .unwrap_or_default(),
87 );
88 let page_url = current.as_ref().map(|s| s.url.clone());
89
90 let response = self.invoke_with_fallback(messages).await?;
91 let mut output = parse_agent_output(&response)?;
92 salvage_into(&mut output, &self.registry);
93 log_decision(step.0, &output);
94
95 let mut results = Vec::new();
96 for action in &output.action {
97 detector.record_action(compute_action_hash(action));
98 let Some(reg) = self.registry.get(&action.name) else {
99 results.push(ActionResult::err(format!(
100 "unknown action: {}",
101 action.name.0
102 )));
103 break;
104 };
105 let ctx = ToolContext {
106 target: target.clone(),
107 browser: self.browser.clone(),
108 events: self.events.clone(),
109 page_url: page_url.clone(),
110 available_files: Vec::new(),
111 clickables: pre_clickables.clone(),
112 };
113 match reg.handler.execute(action.parameters.clone(), ctx).await {
114 Ok(r) => {
115 let terminates = reg.metadata.terminates_sequence;
116 let is_done = r.is_done;
117 let is_err = r.is_error();
118 log_action_ok(step.0, action, &r);
119 results.push(r);
120 if terminates || is_done || is_err {
121 break;
122 }
123 }
124 Err(e) => {
125 log_action_err(step.0, action, &e.to_string());
126 results.push(ActionResult::err(e.to_string()));
127 break;
128 }
129 }
130 }
131
132 let summary = match (&self.dom_extractor, &target) {
133 (Some(extractor), Some(t)) => match extractor.snapshot(t).await {
134 Ok(s) => Some(s),
135 Err(e) => {
136 tracing::warn!(error = %e, "dom snapshot failed; continuing without grounding");
137 None
138 }
139 },
140 _ => None,
141 };
142
143 let metadata = StepMetadata {
144 duration_ms: started.elapsed().as_millis() as u64,
145 step_interval_ms: None,
146 usage: response.usage,
147 model: Some(response.model.clone()),
148 fallback_used: false,
149 };
150 Ok(StepRecord {
151 step,
152 started_at: Utc::now(),
153 url: page_url,
154 output,
155 results,
156 metadata,
157 summary,
158 })
159 }
160
161 async fn invoke_with_fallback(
162 &self,
163 messages: Vec<ChatMessage>,
164 ) -> Result<ChatResponse, AppError> {
165 let opts = InvokeOptions::default();
166 match self
167 .primary_llm
168 .ainvoke(messages.clone(), opts.clone())
169 .await
170 {
171 Ok(r) => Ok(r),
172 Err(e) if should_switch_to_fallback(&e) => match &self.fallback_llm {
173 Some(fb) => fb.ainvoke(messages, opts).await,
174 None => Err(e),
175 },
176 Err(e) => Err(e),
177 }
178 }
179}
180
181#[must_use]
182pub fn done_result(text: impl Into<String>) -> ActionResult {
183 ActionResult::done(text)
184}