use std::time::{Duration, Instant};
use serde::Serialize;
use agentchrome::cdp::CdpEvent;
use agentchrome::connection::ManagedSession;
use agentchrome::error::{AppError, ExitCode};
use crate::cli::{
GlobalOpts, NavigateArgs, NavigateCommand, NavigateReloadArgs, NavigateUrlArgs, WaitUntil,
};
use crate::output::{print_output, setup_session_with_interceptors as setup_session};
use crate::page::wait::check_selector_condition;
pub const DEFAULT_NAVIGATE_TIMEOUT_MS: u64 = 30_000;
pub const NETWORK_IDLE_MS: u64 = 500;
pub(crate) struct NavigateResult {
#[allow(dead_code)]
pub url: String,
#[allow(dead_code)]
pub title: String,
pub status: Option<u16>,
}
#[derive(Serialize)]
struct NavigateOutput {
url: String,
title: String,
#[serde(skip_serializing_if = "Option::is_none")]
status: Option<u16>,
}
#[derive(Serialize)]
struct HistoryResult {
url: String,
title: String,
}
pub async fn execute_navigate(global: &GlobalOpts, args: &NavigateArgs) -> Result<(), AppError> {
match &args.command {
Some(NavigateCommand::Back) => execute_back(global).await,
Some(NavigateCommand::Forward) => execute_forward(global).await,
Some(NavigateCommand::Reload(reload_args)) => execute_reload(global, reload_args).await,
None => execute_url(global, &args.url_args).await,
}
}
async fn execute_url(global: &GlobalOpts, args: &NavigateUrlArgs) -> Result<(), AppError> {
let url = args.url.as_deref().ok_or_else(|| AppError {
message: "URL is required. Usage: agentchrome navigate <URL>".into(),
code: ExitCode::GeneralError,
custom_json: None,
})?;
let timeout_ms = args.timeout.unwrap_or(DEFAULT_NAVIGATE_TIMEOUT_MS);
let start = Instant::now();
let (_client, mut managed) = setup_session(global).await?;
if global.auto_dismiss_dialogs {
let _dismiss = managed.spawn_auto_dismiss().await?;
}
managed.ensure_domain("Page").await?;
managed.ensure_domain("Network").await?;
let response_rx = managed.subscribe("Network.responseReceived").await?;
let wait_rx = match args.wait_until {
WaitUntil::Load => Some(managed.subscribe("Page.loadEventFired").await?),
WaitUntil::Domcontentloaded => Some(managed.subscribe("Page.domContentEventFired").await?),
WaitUntil::Networkidle | WaitUntil::None => None,
};
let network_subs = if args.wait_until == WaitUntil::Networkidle {
let req_rx = managed.subscribe("Network.requestWillBeSent").await?;
let fin_rx = managed.subscribe("Network.loadingFinished").await?;
let fail_rx = managed.subscribe("Network.loadingFailed").await?;
Some((req_rx, fin_rx, fail_rx))
} else {
None
};
let params = serde_json::json!({ "url": url });
if args.ignore_cache {
managed
.send_command(
"Network.setCacheDisabled",
Some(serde_json::json!({ "cacheDisabled": true })),
)
.await?;
}
let result = managed.send_command("Page.navigate", Some(params)).await?;
if let Some(error_text) = result["errorText"].as_str() {
if !error_text.is_empty() {
return Err(AppError::navigation_failed(error_text));
}
}
let frame_id = result["frameId"].as_str().unwrap_or_default().to_string();
match args.wait_until {
WaitUntil::Load | WaitUntil::Domcontentloaded => {
if let Some(rx) = wait_rx {
wait_for_event(rx, timeout_ms, &format!("{:?}", args.wait_until)).await?;
}
}
WaitUntil::Networkidle => {
if let Some((req_rx, fin_rx, fail_rx)) = network_subs {
wait_for_network_idle(req_rx, fin_rx, fail_rx, timeout_ms).await?;
}
}
WaitUntil::None => {}
}
if let Some(ref selector) = args.wait_for_selector {
managed.ensure_domain("Runtime").await?;
#[allow(clippy::cast_possible_truncation)]
let elapsed_ms = start.elapsed().as_millis() as u64;
if elapsed_ms >= timeout_ms {
return Err(AppError::navigation_timeout(
timeout_ms,
&format!("selector \"{selector}\" (no time remaining after page load)"),
));
}
let remaining_ms = timeout_ms - elapsed_ms;
let deadline = Instant::now() + Duration::from_millis(remaining_ms);
let interval = Duration::from_millis(100);
if !check_selector_condition(&managed, selector, 1).await {
loop {
tokio::time::sleep(interval).await;
if Instant::now() > deadline {
return Err(AppError::navigation_timeout(
timeout_ms,
&format!("selector \"{selector}\" not found"),
));
}
if check_selector_condition(&managed, selector, 1).await {
break;
}
}
}
}
let status = extract_http_status(response_rx, &frame_id);
let (page_url, page_title) = get_page_info(&managed).await?;
let output = NavigateOutput {
url: page_url,
title: page_title,
status,
};
print_output(&output, &global.output)
}
pub(crate) async fn navigate_and_wait(
managed: &mut ManagedSession,
url: &str,
wait_until: WaitUntil,
timeout_ms: u64,
) -> Result<NavigateResult, AppError> {
managed.ensure_domain("Page").await?;
managed.ensure_domain("Network").await?;
let response_rx = managed.subscribe("Network.responseReceived").await?;
let wait_rx = match wait_until {
WaitUntil::Load => Some(managed.subscribe("Page.loadEventFired").await?),
WaitUntil::Domcontentloaded => Some(managed.subscribe("Page.domContentEventFired").await?),
WaitUntil::Networkidle | WaitUntil::None => None,
};
let network_subs = if wait_until == WaitUntil::Networkidle {
let req_rx = managed.subscribe("Network.requestWillBeSent").await?;
let fin_rx = managed.subscribe("Network.loadingFinished").await?;
let fail_rx = managed.subscribe("Network.loadingFailed").await?;
Some((req_rx, fin_rx, fail_rx))
} else {
None
};
let params = serde_json::json!({ "url": url });
let result = managed.send_command("Page.navigate", Some(params)).await?;
if let Some(error_text) = result["errorText"].as_str() {
if !error_text.is_empty() {
return Err(AppError::navigation_failed(error_text));
}
}
let frame_id = result["frameId"].as_str().unwrap_or_default().to_string();
match wait_until {
WaitUntil::Load | WaitUntil::Domcontentloaded => {
if let Some(rx) = wait_rx {
wait_for_event(rx, timeout_ms, &format!("{wait_until:?}")).await?;
}
}
WaitUntil::Networkidle => {
if let Some((req_rx, fin_rx, fail_rx)) = network_subs {
wait_for_network_idle(req_rx, fin_rx, fail_rx, timeout_ms).await?;
}
}
WaitUntil::None => {}
}
let status = extract_http_status(response_rx, &frame_id);
let (page_url, page_title) = get_page_info(managed).await?;
Ok(NavigateResult {
url: page_url,
title: page_title,
status,
})
}
async fn execute_back(global: &GlobalOpts) -> Result<(), AppError> {
let (_client, mut managed) = setup_session(global).await?;
if global.auto_dismiss_dialogs {
let _dismiss = managed.spawn_auto_dismiss().await?;
}
managed.ensure_domain("Page").await?;
let history = managed
.send_command("Page.getNavigationHistory", None)
.await?;
#[allow(clippy::cast_possible_truncation)]
let current_index = history["currentIndex"].as_u64().unwrap_or(0) as usize;
if current_index == 0 {
return Err(AppError {
message: "Cannot go back: already at the beginning of history.".into(),
code: ExitCode::GeneralError,
custom_json: None,
});
}
let entries = history["entries"].as_array().ok_or_else(|| AppError {
message: "Invalid navigation history response".into(),
code: ExitCode::GeneralError,
custom_json: None,
})?;
let target_entry = &entries[current_index - 1];
let entry_id = target_entry["id"].as_i64().unwrap_or(0);
let nav_rx = managed.subscribe("Page.frameNavigated").await?;
let within_doc_rx = managed.subscribe("Page.navigatedWithinDocument").await?;
managed
.send_command(
"Page.navigateToHistoryEntry",
Some(serde_json::json!({ "entryId": entry_id })),
)
.await?;
wait_for_history_navigation(
nav_rx,
within_doc_rx,
global.timeout.unwrap_or(DEFAULT_NAVIGATE_TIMEOUT_MS),
)
.await?;
let (page_url, page_title) = get_page_info(&managed).await?;
let output = HistoryResult {
url: page_url,
title: page_title,
};
print_output(&output, &global.output)
}
async fn execute_forward(global: &GlobalOpts) -> Result<(), AppError> {
let (_client, mut managed) = setup_session(global).await?;
if global.auto_dismiss_dialogs {
let _dismiss = managed.spawn_auto_dismiss().await?;
}
managed.ensure_domain("Page").await?;
let history = managed
.send_command("Page.getNavigationHistory", None)
.await?;
#[allow(clippy::cast_possible_truncation)]
let current_index = history["currentIndex"].as_u64().unwrap_or(0) as usize;
let entries = history["entries"].as_array().ok_or_else(|| AppError {
message: "Invalid navigation history response".into(),
code: ExitCode::GeneralError,
custom_json: None,
})?;
let next_index = current_index + 1;
if next_index >= entries.len() {
return Err(AppError {
message: "Cannot go forward: already at the end of history.".into(),
code: ExitCode::GeneralError,
custom_json: None,
});
}
let target_entry = &entries[next_index];
let entry_id = target_entry["id"].as_i64().unwrap_or(0);
let nav_rx = managed.subscribe("Page.frameNavigated").await?;
let within_doc_rx = managed.subscribe("Page.navigatedWithinDocument").await?;
managed
.send_command(
"Page.navigateToHistoryEntry",
Some(serde_json::json!({ "entryId": entry_id })),
)
.await?;
wait_for_history_navigation(
nav_rx,
within_doc_rx,
global.timeout.unwrap_or(DEFAULT_NAVIGATE_TIMEOUT_MS),
)
.await?;
let (page_url, page_title) = get_page_info(&managed).await?;
let output = HistoryResult {
url: page_url,
title: page_title,
};
print_output(&output, &global.output)
}
async fn execute_reload(global: &GlobalOpts, args: &NavigateReloadArgs) -> Result<(), AppError> {
let (_client, mut managed) = setup_session(global).await?;
if global.auto_dismiss_dialogs {
let _dismiss = managed.spawn_auto_dismiss().await?;
}
managed.ensure_domain("Page").await?;
let load_rx = managed.subscribe("Page.loadEventFired").await?;
let params = serde_json::json!({ "ignoreCache": args.ignore_cache });
managed.send_command("Page.reload", Some(params)).await?;
wait_for_event(
load_rx,
global.timeout.unwrap_or(DEFAULT_NAVIGATE_TIMEOUT_MS),
"load",
)
.await?;
let (page_url, page_title) = get_page_info(&managed).await?;
let output = HistoryResult {
url: page_url,
title: page_title,
};
print_output(&output, &global.output)
}
pub async fn wait_for_event(
mut rx: tokio::sync::mpsc::Receiver<CdpEvent>,
timeout_ms: u64,
strategy: &str,
) -> Result<(), AppError> {
let timeout = Duration::from_millis(timeout_ms);
tokio::select! {
event = rx.recv() => {
match event {
Some(_) => Ok(()),
None => Err(AppError {
message: format!("Event channel closed while waiting for {strategy}"),
code: ExitCode::GeneralError,
custom_json: None,
}),
}
}
() = tokio::time::sleep(timeout) => {
Err(AppError::navigation_timeout(timeout_ms, strategy))
}
}
}
async fn wait_for_history_navigation(
mut frame_rx: tokio::sync::mpsc::Receiver<CdpEvent>,
mut within_doc_rx: tokio::sync::mpsc::Receiver<CdpEvent>,
timeout_ms: u64,
) -> Result<(), AppError> {
let timeout = Duration::from_millis(timeout_ms);
tokio::select! {
event = frame_rx.recv() => {
match event {
Some(_) => Ok(()),
None => Err(AppError {
message: "Event channel closed while waiting for navigation".into(),
code: ExitCode::GeneralError,
custom_json: None,
}),
}
}
event = within_doc_rx.recv() => {
match event {
Some(_) => Ok(()),
None => Err(AppError {
message: "Event channel closed while waiting for navigation".into(),
code: ExitCode::GeneralError,
custom_json: None,
}),
}
}
() = tokio::time::sleep(timeout) => {
Err(AppError::navigation_timeout(timeout_ms, "navigation"))
}
}
}
pub async fn wait_for_network_idle(
mut req_rx: tokio::sync::mpsc::Receiver<CdpEvent>,
mut fin_rx: tokio::sync::mpsc::Receiver<CdpEvent>,
mut fail_rx: tokio::sync::mpsc::Receiver<CdpEvent>,
timeout_ms: u64,
) -> Result<(), AppError> {
let timeout = Duration::from_millis(timeout_ms);
let idle_duration = Duration::from_millis(NETWORK_IDLE_MS);
let deadline = tokio::time::Instant::now() + timeout;
let mut in_flight: i64 = 0;
let idle_timer = tokio::time::sleep(idle_duration);
tokio::pin!(idle_timer);
loop {
tokio::select! {
event = req_rx.recv() => {
if event.is_some() {
in_flight += 1;
idle_timer.as_mut().reset(tokio::time::Instant::now() + idle_duration);
}
}
event = fin_rx.recv() => {
if event.is_some() {
in_flight = (in_flight - 1).max(0);
if in_flight == 0 {
idle_timer.as_mut().reset(tokio::time::Instant::now() + idle_duration);
}
}
}
event = fail_rx.recv() => {
if event.is_some() {
in_flight = (in_flight - 1).max(0);
if in_flight == 0 {
idle_timer.as_mut().reset(tokio::time::Instant::now() + idle_duration);
}
}
}
() = &mut idle_timer => {
if in_flight == 0 {
return Ok(());
}
idle_timer.as_mut().reset(tokio::time::Instant::now() + idle_duration);
}
() = tokio::time::sleep_until(deadline) => {
return Err(AppError::navigation_timeout(timeout_ms, "networkidle"));
}
}
}
}
async fn get_page_info(managed: &ManagedSession) -> Result<(String, String), AppError> {
let url_result = managed
.send_command(
"Runtime.evaluate",
Some(serde_json::json!({ "expression": "location.href" })),
)
.await?;
let title_result = managed
.send_command(
"Runtime.evaluate",
Some(serde_json::json!({ "expression": "document.title" })),
)
.await?;
let url = url_result["result"]["value"]
.as_str()
.unwrap_or_default()
.to_string();
let title = title_result["result"]["value"]
.as_str()
.unwrap_or_default()
.to_string();
Ok((url, title))
}
fn extract_http_status(
mut rx: tokio::sync::mpsc::Receiver<CdpEvent>,
frame_id: &str,
) -> Option<u16> {
let mut status = None;
while let Ok(event) = rx.try_recv() {
let event_frame = event.params["frameId"].as_str().unwrap_or_default();
let resource_type = event.params["type"].as_str().unwrap_or_default();
if event_frame == frame_id && resource_type == "Document" {
if let Some(s) = event.params["response"]["status"].as_u64() {
#[allow(clippy::cast_possible_truncation)]
{
status = Some(s as u16);
}
}
}
}
status
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn navigate_result_serialization() {
let result = NavigateOutput {
url: "https://example.com".to_string(),
title: "Example".to_string(),
status: Some(200),
};
let json: serde_json::Value = serde_json::to_value(&result).unwrap();
assert_eq!(json["url"], "https://example.com");
assert_eq!(json["title"], "Example");
assert_eq!(json["status"], 200);
}
#[test]
fn navigate_result_without_status() {
let result = NavigateOutput {
url: "https://example.com".to_string(),
title: "Example".to_string(),
status: None,
};
let json: serde_json::Value = serde_json::to_value(&result).unwrap();
assert_eq!(json["url"], "https://example.com");
assert!(json.get("status").is_none());
}
#[test]
fn history_result_serialization() {
let result = HistoryResult {
url: "https://example.com".to_string(),
title: "Example".to_string(),
};
let json: serde_json::Value = serde_json::to_value(&result).unwrap();
assert_eq!(json["url"], "https://example.com");
assert_eq!(json["title"], "Example");
}
#[test]
fn wait_until_default_is_load() {
let default = WaitUntil::default();
assert_eq!(default, WaitUntil::Load);
}
fn mock_cdp_event(method: &str) -> CdpEvent {
CdpEvent {
method: method.to_string(),
params: serde_json::Value::Null,
session_id: None,
}
}
#[tokio::test]
async fn history_navigation_resolves_on_frame_navigated() {
let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let (_within_tx, within_rx) = tokio::sync::mpsc::channel(1);
frame_tx
.send(mock_cdp_event("Page.frameNavigated"))
.await
.unwrap();
let result = wait_for_history_navigation(frame_rx, within_rx, 1000).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn history_navigation_resolves_on_within_document() {
let (_frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let (within_tx, within_rx) = tokio::sync::mpsc::channel(1);
within_tx
.send(mock_cdp_event("Page.navigatedWithinDocument"))
.await
.unwrap();
let result = wait_for_history_navigation(frame_rx, within_rx, 1000).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn history_navigation_times_out_when_no_event() {
let (_frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let (_within_tx, within_rx) = tokio::sync::mpsc::channel(1);
let result = wait_for_history_navigation(frame_rx, within_rx, 50).await;
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("timed out"));
}
#[tokio::test]
async fn history_navigation_errors_on_closed_frame_channel() {
let (frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let (_within_tx, within_rx) = tokio::sync::mpsc::channel(1);
drop(frame_tx);
let result = wait_for_history_navigation(frame_rx, within_rx, 1000).await;
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("channel closed"));
}
#[tokio::test]
async fn history_navigation_errors_on_closed_within_doc_channel() {
let (_frame_tx, frame_rx) = tokio::sync::mpsc::channel(1);
let (within_tx, within_rx) = tokio::sync::mpsc::channel(1);
drop(within_tx);
let result = wait_for_history_navigation(frame_rx, within_rx, 1000).await;
assert!(result.is_err());
assert!(result.unwrap_err().message.contains("channel closed"));
}
}