Skip to main content

ras_agent/application/
run_step.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use chrono::Utc;
5use ras_cdp::BrowserPort;
6use ras_dom::DomExtractor;
7use ras_errors::AppError;
8use ras_events::EventBus;
9use ras_llm::{ChatMessage, ChatResponse, InvokeOptions, LlmClient};
10use ras_tools::domain::registry::{ActionRegistry, ToolContext};
11use ras_types::{ActionResult, StepId, TargetId};
12
13use crate::application::clickable_map::build_current_page_message;
14use crate::application::compute_action_hash::compute_action_hash;
15use crate::application::detect_loop::{build_budget_warning, build_loop_nudge};
16use crate::application::fallback_llm::should_switch_to_fallback;
17use crate::application::parse_output::parse_agent_output;
18use crate::application::run_step_log::{log_action_err, log_action_ok, log_decision};
19use crate::application::salvage::salvage_into;
20use crate::domain::agent_history::StepRecord;
21use crate::domain::loop_detector::ActionLoopDetector;
22use crate::domain::step_metadata::StepMetadata;
23
24pub struct RunStep {
25    primary_llm: Arc<dyn LlmClient>,
26    fallback_llm: Option<Arc<dyn LlmClient>>,
27    registry: Arc<ActionRegistry>,
28    browser: Arc<dyn BrowserPort>,
29    events: Arc<dyn EventBus>,
30    dom_extractor: Option<Arc<dyn DomExtractor>>,
31    bound_target: Option<TargetId>,
32}
33
34impl RunStep {
35    #[must_use]
36    pub fn new(
37        primary: Arc<dyn LlmClient>,
38        fallback: Option<Arc<dyn LlmClient>>,
39        registry: Arc<ActionRegistry>,
40        browser: Arc<dyn BrowserPort>,
41        events: Arc<dyn EventBus>,
42        dom_extractor: Option<Arc<dyn DomExtractor>>,
43        bound_target: Option<TargetId>,
44    ) -> Self {
45        Self {
46            primary_llm: primary,
47            fallback_llm: fallback,
48            registry,
49            browser,
50            events,
51            dom_extractor,
52            bound_target,
53        }
54    }
55
56    pub async fn execute(
57        &self,
58        step: StepId,
59        max_steps: u32,
60        prompt: Vec<ChatMessage>,
61        detector: &mut ActionLoopDetector,
62    ) -> Result<StepRecord, AppError> {
63        let started = Instant::now();
64        let mut messages = prompt;
65        if let Some(nudge) = build_loop_nudge(detector) {
66            messages.push(nudge);
67        }
68        if let Some(warn) = build_budget_warning(step.0, max_steps) {
69            messages.push(warn);
70        }
71        let target = match &self.bound_target {
72            Some(t) => Some(t.clone()),
73            None => self.browser.focused_target().await.ok(),
74        };
75        let current = match (&self.dom_extractor, &target) {
76            (Some(extractor), Some(t)) => extractor.snapshot(t).await.ok(),
77            _ => None,
78        };
79        if let Some(page_msg) = current.as_ref().and_then(build_current_page_message) {
80            messages.push(page_msg);
81        }
82        let pre_clickables: Arc<Vec<ras_dom::ClickableElement>> = Arc::new(
83            current
84                .as_ref()
85                .map(|s| s.clickables.clone())
86                .unwrap_or_default(),
87        );
88        let page_url = current.as_ref().map(|s| s.url.clone());
89
90        let response = self.invoke_with_fallback(messages).await?;
91        let mut output = parse_agent_output(&response)?;
92        salvage_into(&mut output, &self.registry);
93        log_decision(step.0, &output);
94
95        let mut results = Vec::new();
96        for action in &output.action {
97            detector.record_action(compute_action_hash(action));
98            let Some(reg) = self.registry.get(&action.name) else {
99                results.push(ActionResult::err(format!(
100                    "unknown action: {}",
101                    action.name.0
102                )));
103                break;
104            };
105            let ctx = ToolContext {
106                target: target.clone(),
107                browser: self.browser.clone(),
108                events: self.events.clone(),
109                page_url: page_url.clone(),
110                available_files: Vec::new(),
111                clickables: pre_clickables.clone(),
112            };
113            match reg.handler.execute(action.parameters.clone(), ctx).await {
114                Ok(r) => {
115                    let terminates = reg.metadata.terminates_sequence;
116                    let is_done = r.is_done;
117                    let is_err = r.is_error();
118                    log_action_ok(step.0, action, &r);
119                    results.push(r);
120                    if terminates || is_done || is_err {
121                        break;
122                    }
123                }
124                Err(e) => {
125                    log_action_err(step.0, action, &e.to_string());
126                    results.push(ActionResult::err(e.to_string()));
127                    break;
128                }
129            }
130        }
131
132        let summary = match (&self.dom_extractor, &target) {
133            (Some(extractor), Some(t)) => match extractor.snapshot(t).await {
134                Ok(s) => Some(s),
135                Err(e) => {
136                    tracing::warn!(error = %e, "dom snapshot failed; continuing without grounding");
137                    None
138                }
139            },
140            _ => None,
141        };
142
143        let metadata = StepMetadata {
144            duration_ms: started.elapsed().as_millis() as u64,
145            step_interval_ms: None,
146            usage: response.usage,
147            model: Some(response.model.clone()),
148            fallback_used: false,
149        };
150        Ok(StepRecord {
151            step,
152            started_at: Utc::now(),
153            url: page_url,
154            output,
155            results,
156            metadata,
157            summary,
158        })
159    }
160
161    async fn invoke_with_fallback(
162        &self,
163        messages: Vec<ChatMessage>,
164    ) -> Result<ChatResponse, AppError> {
165        let opts = InvokeOptions::default();
166        match self
167            .primary_llm
168            .ainvoke(messages.clone(), opts.clone())
169            .await
170        {
171            Ok(r) => Ok(r),
172            Err(e) if should_switch_to_fallback(&e) => match &self.fallback_llm {
173                Some(fb) => fb.ainvoke(messages, opts).await,
174                None => Err(e),
175            },
176            Err(e) => Err(e),
177        }
178    }
179}
180
181#[must_use]
182pub fn done_result(text: impl Into<String>) -> ActionResult {
183    ActionResult::done(text)
184}