//! High-level Chrome automation for Playhard.
//!
//! This crate composes `playhard-launcher`, `playhard-transport`, and
//! `playhard-cdp` into a Rust-native browser automation API.
#![deny(missing_docs)]
use std::{
path::PathBuf,
sync::{Arc, Mutex, MutexGuard},
time::Duration,
};
use base64::Engine;
use playhard_cdp::{
CdpClient, CdpError, CdpResponse, CdpTransport, FetchContinueRequestParams, FetchEnableParams,
FetchFailRequestParams, FetchFulfillRequestParams, NetworkEnableParams,
PageCaptureScreenshotParams, PageCaptureScreenshotResult, PageEnableParams, PageNavigateParams,
PageNavigateResult, RuntimeEnableParams, RuntimeEvaluateParams, TargetAttachToTargetParams,
TargetCreateTargetParams,
};
use playhard_launcher::{
LaunchConnection, LaunchError, LaunchOptions, LaunchedChrome, LaunchedChromeParts, Launcher,
ProfileDir, TransportMode,
};
use playhard_transport::{
Connection, ConnectionError, PipeTransport, TransportEvent, WebSocketTransport,
};
use serde::Deserialize;
use serde_json::{json, Value};
use thiserror::Error;
use tokio::{
process::Child,
sync::broadcast,
time::{sleep, timeout, Instant},
};
/// Result alias for the automation crate.
pub type Result<T> = std::result::Result<T, AutomationError>;
fn lock_unpoisoned<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
match mutex.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
/// Errors produced by `playhard-automation`.
#[derive(Debug, Error)]
pub enum AutomationError {
/// Chrome launch failed.
#[error(transparent)]
Launch(#[from] LaunchError),
/// Transport connection failed.
#[error(transparent)]
Connection(#[from] ConnectionError),
/// CDP command failed.
#[error(transparent)]
Cdp(#[from] CdpError),
/// JSON serialization or deserialization failed.
#[error(transparent)]
Json(#[from] serde_json::Error),
/// Base64 decoding failed.
#[error(transparent)]
Base64(#[from] base64::DecodeError),
/// A required websocket endpoint was missing.
#[error("launcher did not expose a websocket endpoint")]
MissingWebSocketEndpoint,
/// A wait timed out.
#[error("timed out waiting for {what}")]
Timeout {
/// Human-readable description of the timed operation.
what: String,
},
/// A locator action could not find a matching element.
#[error("locator did not match any element")]
MissingElement,
/// A selector operation failed.
#[error("{0}")]
Selector(String),
/// An expected protocol field was missing.
#[error("missing protocol field `{0}`")]
MissingField(&'static str),
}
#[derive(Clone)]
enum AutomationTransport {
WebSocket(Arc<Connection<WebSocketTransport>>),
Pipe(Arc<Connection<PipeTransport>>),
}
impl AutomationTransport {
fn subscribe_events(&self) -> broadcast::Receiver<TransportEvent> {
match self {
Self::WebSocket(connection) => connection.subscribe_events(),
Self::Pipe(connection) => connection.subscribe_events(),
}
}
async fn close(&self) -> Result<()> {
match self {
Self::WebSocket(connection) => connection.close().await.map_err(AutomationError::from),
Self::Pipe(connection) => connection.close().await.map_err(AutomationError::from),
}
}
}
impl CdpTransport for AutomationTransport {
async fn send(
&self,
request: playhard_cdp::CdpRequest,
) -> std::result::Result<CdpResponse, CdpError> {
let response = match self {
Self::WebSocket(connection) => send_over_connection(connection, request).await,
Self::Pipe(connection) => send_over_connection(connection, request).await,
};
response.map_err(Into::into)
}
}
async fn send_over_connection<T>(
connection: &Connection<T>,
request: playhard_cdp::CdpRequest,
) -> Result<CdpResponse>
where
T: playhard_transport::TransportHandle,
{
let message = if let Some(session_id) = request.session_id.clone() {
connection
.request_for_session(session_id, request.method, Some(request.params))
.await?
} else {
connection
.request(request.method, Some(request.params))
.await?
};
Ok(CdpResponse {
id: message.id.ok_or(AutomationError::MissingField("id"))?,
result: message.result,
error: message.error.map(|error| playhard_cdp::CdpResponseError {
code: error.code,
message: error.message,
}),
session_id: message.session_id,
})
}
impl From<AutomationError> for CdpError {
fn from(error: AutomationError) -> Self {
match error {
AutomationError::Cdp(error) => error,
other => CdpError::Transport(other.to_string()),
}
}
}
enum LaunchGuard {
WebSocket(LaunchedChrome),
Pipe(PipeGuard),
}
impl LaunchGuard {
async fn shutdown(self) -> Result<()> {
match self {
Self::WebSocket(launched) => launched.shutdown().await.map_err(AutomationError::from),
Self::Pipe(mut guard) => {
let _ = guard.child.start_kill();
let _ = timeout(Duration::from_secs(5), guard.child.wait()).await;
Ok(())
}
}
}
}
struct PipeGuard {
#[allow(dead_code)]
executable_path: PathBuf,
#[allow(dead_code)]
profile: ProfileDir,
child: Child,
}
impl Drop for PipeGuard {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
struct BrowserState {
client: Arc<CdpClient<AutomationTransport>>,
transport: AutomationTransport,
launch_guard: Mutex<Option<LaunchGuard>>,
browser_interception_patterns: Mutex<Option<Vec<RequestPattern>>>,
page_sessions: Mutex<Vec<String>>,
}
/// A browser automation session.
pub struct Browser {
state: Arc<BrowserState>,
}
impl Browser {
/// Launch a new browser using the supplied options.
pub async fn launch(options: LaunchOptions) -> Result<Self> {
let launched = Launcher::new(options).launch().await?;
let transport = match launched.transport_mode() {
TransportMode::WebSocket => {
let endpoint = launched
.websocket_endpoint()
.ok_or(AutomationError::MissingWebSocketEndpoint)?;
let websocket = WebSocketTransport::connect(endpoint)
.await
.map_err(ConnectionError::from)?;
let connection = Connection::new(websocket)?;
let transport = AutomationTransport::WebSocket(Arc::new(connection));
(transport, LaunchGuard::WebSocket(launched))
}
TransportMode::Pipe => {
let parts = launched.into_parts();
let (pipe_transport, guard) = pipe_transport_from_parts(parts)?;
let connection = Connection::new(pipe_transport)?;
let transport = AutomationTransport::Pipe(Arc::new(connection));
(transport, guard)
}
};
Ok(Self::from_transport(transport.0, Some(transport.1)))
}
/// Connect to an already-running Chrome DevTools websocket endpoint.
pub async fn connect_websocket(url: impl AsRef<str>) -> Result<Self> {
let websocket = WebSocketTransport::connect(url.as_ref())
.await
.map_err(ConnectionError::from)?;
let connection = Connection::new(websocket)?;
Ok(Self::from_transport(
AutomationTransport::WebSocket(Arc::new(connection)),
None,
))
}
fn from_transport(transport: AutomationTransport, launch_guard: Option<LaunchGuard>) -> Self {
let client = Arc::new(CdpClient::new(transport.clone()));
let state = BrowserState {
client,
transport,
launch_guard: Mutex::new(launch_guard),
browser_interception_patterns: Mutex::new(None),
page_sessions: Mutex::new(Vec::new()),
};
Self {
state: Arc::new(state),
}
}
/// Returns a raw CDP escape hatch rooted at the browser session.
#[must_use]
pub fn cdp(&self) -> CdpSession {
CdpSession {
client: Arc::clone(&self.state.client),
session_id: None,
}
}
/// Open a new page and bootstrap the common CDP domains used by Playhard.
pub async fn new_page(&self) -> Result<Page> {
let target = self
.state
.client
.execute::<TargetCreateTargetParams>(&TargetCreateTargetParams {
url: "about:blank".to_owned(),
new_window: None,
})
.await?;
let attached = self
.state
.client
.execute::<TargetAttachToTargetParams>(&TargetAttachToTargetParams {
target_id: target.target_id,
flatten: Some(true),
})
.await?;
let session_id = attached.session_id;
self.state
.client
.execute_in_session::<PageEnableParams>(session_id.clone(), &PageEnableParams {})
.await?;
self.state
.client
.execute_in_session::<RuntimeEnableParams>(session_id.clone(), &RuntimeEnableParams {})
.await?;
self.state
.client
.execute_in_session::<NetworkEnableParams>(session_id.clone(), &NetworkEnableParams {})
.await?;
self.state
.client
.call_raw("DOM.enable", json!({}), Some(session_id.clone()))
.await?;
{
let mut sessions = lock_unpoisoned(&self.state.page_sessions);
sessions.push(session_id.clone());
}
let page = Page {
client: Arc::clone(&self.state.client),
transport: self.state.transport.clone(),
session_id,
default_timeout: Duration::from_secs(30),
};
let browser_patterns = {
let patterns = lock_unpoisoned(&self.state.browser_interception_patterns);
patterns.clone()
};
if let Some(patterns) = browser_patterns {
page.enable_request_interception(patterns).await?;
}
Ok(page)
}
/// Subscribe to all browser-wide network and fetch events.
pub fn network_events(&self) -> EventStream {
EventStream::new(
self.state.transport.subscribe_events(),
None,
Some("Network."),
)
.with_extra_prefix("Fetch.")
}
/// Enable request interception for all existing and future pages.
pub async fn enable_request_interception(&self, patterns: Vec<RequestPattern>) -> Result<()> {
{
let mut stored = lock_unpoisoned(&self.state.browser_interception_patterns);
*stored = Some(patterns.clone());
}
let session_ids = lock_unpoisoned(&self.state.page_sessions).clone();
for session_id in session_ids {
let page = Page {
client: Arc::clone(&self.state.client),
transport: self.state.transport.clone(),
session_id,
default_timeout: Duration::from_secs(30),
};
page.enable_request_interception(patterns.clone()).await?;
}
Ok(())
}
/// Wait for the next paused request across all pages.
pub async fn next_route(&self, timeout_duration: Duration) -> Result<Route> {
let mut events = EventStream::new(
self.state.transport.subscribe_events(),
None,
Some("Fetch."),
);
let event = events
.recv_with_timeout(timeout_duration)
.await?
.ok_or_else(|| AutomationError::Timeout {
what: "browser route".to_owned(),
})?;
Route::from_event(Arc::clone(&self.state.client), event)
}
/// Shut down the browser and close the underlying transport.
pub async fn shutdown(self) -> Result<()> {
self.state.transport.close().await?;
let launch_guard = {
let mut guard = lock_unpoisoned(&self.state.launch_guard);
guard.take()
};
if let Some(guard) = launch_guard {
guard.shutdown().await?;
}
Ok(())
}
}
fn pipe_transport_from_parts(parts: LaunchedChromeParts) -> Result<(PipeTransport, LaunchGuard)> {
let LaunchedChromeParts {
executable_path,
profile,
child,
connection,
} = parts;
let LaunchConnection::Pipe { stdin, stdout } = connection else {
return Err(AutomationError::MissingWebSocketEndpoint);
};
let transport = PipeTransport::new(stdin, stdout).map_err(ConnectionError::from)?;
let guard = LaunchGuard::Pipe(PipeGuard {
executable_path,
profile,
child,
});
Ok((transport, guard))
}
/// A raw CDP session escape hatch.
#[derive(Clone)]
pub struct CdpSession {
client: Arc<CdpClient<AutomationTransport>>,
session_id: Option<String>,
}
impl CdpSession {
/// Send an arbitrary CDP command and return its JSON result.
pub async fn call_raw(&self, method: impl Into<String>, params: Value) -> Result<Value> {
self.client
.call_raw(method.into(), params, self.session_id.clone())
.await
.map_err(AutomationError::from)
}
/// Return the target session id, if this session is page-scoped.
#[must_use]
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
}
/// A page/tab automation handle.
#[derive(Clone)]
pub struct Page {
client: Arc<CdpClient<AutomationTransport>>,
transport: AutomationTransport,
session_id: String,
default_timeout: Duration,
}
impl Page {
/// Returns the attached target session id.
#[must_use]
pub fn session_id(&self) -> &str {
&self.session_id
}
/// Returns a raw CDP escape hatch scoped to this page session.
#[must_use]
pub fn cdp(&self) -> CdpSession {
CdpSession {
client: Arc::clone(&self.client),
session_id: Some(self.session_id.clone()),
}
}
/// Navigate the page and wait for the load event.
pub async fn goto(&self, url: impl AsRef<str>) -> Result<PageNavigateResult> {
let result = self
.client
.execute_in_session::<PageNavigateParams>(
self.session_id.clone(),
&PageNavigateParams {
url: url.as_ref().to_owned(),
},
)
.await?;
self.wait_for_event("Page.loadEventFired", self.default_timeout)
.await?;
Ok(result)
}
/// Evaluate JavaScript in the page main world and return the JSON result.
pub async fn evaluate(&self, expression: impl AsRef<str>) -> Result<Value> {
let result = self
.client
.execute_in_session::<RuntimeEvaluateParams>(
self.session_id.clone(),
&RuntimeEvaluateParams {
expression: expression.as_ref().to_owned(),
await_promise: Some(true),
return_by_value: Some(true),
},
)
.await?;
Ok(result.result.value.unwrap_or(Value::Null))
}
/// Capture a page screenshot.
pub async fn screenshot(&self) -> Result<Vec<u8>> {
let result: PageCaptureScreenshotResult = self
.client
.execute_in_session::<PageCaptureScreenshotParams>(
self.session_id.clone(),
&PageCaptureScreenshotParams {
format: Some("png".to_owned()),
},
)
.await?;
Ok(base64::engine::general_purpose::STANDARD.decode(result.data)?)
}
/// Capture a screenshot of the current element matched by the locator.
pub async fn element_screenshot(&self, locator: &Locator) -> Result<Vec<u8>> {
let rect = locator.bounding_rect().await?;
let clip = json!({
"x": rect.x,
"y": rect.y,
"width": rect.width,
"height": rect.height,
"scale": 1.0,
});
let result = self
.cdp()
.call_raw(
"Page.captureScreenshot",
json!({
"format": "png",
"clip": clip,
}),
)
.await?;
let data = result
.get("data")
.and_then(Value::as_str)
.ok_or(AutomationError::MissingField("data"))?;
Ok(base64::engine::general_purpose::STANDARD.decode(data)?)
}
/// Build a CSS locator.
#[must_use]
pub fn locator(&self, css_selector: impl Into<String>) -> Locator {
Locator::new(self.clone(), SelectorKind::Css(css_selector.into()))
}
/// Click the first element matching the CSS selector.
pub async fn click(&self, css_selector: impl Into<String>) -> Result<()> {
self.locator(css_selector).click().await
}
/// Click the first element matching the CSS selector with custom action options.
pub async fn click_with_options(
&self,
css_selector: impl Into<String>,
options: ActionOptions,
) -> Result<()> {
self.locator(css_selector).click_with_options(options).await
}
/// Fill the first form control matching the CSS selector.
pub async fn fill(
&self,
css_selector: impl Into<String>,
value: impl AsRef<str>,
) -> Result<()> {
self.locator(css_selector).fill(value).await
}
/// Fill the first form control matching the CSS selector with custom action options.
pub async fn fill_with_options(
&self,
css_selector: impl Into<String>,
value: impl AsRef<str>,
options: ActionOptions,
) -> Result<()> {
self.locator(css_selector)
.fill_with_options(value, options)
.await
}
/// Focus the first element matching the CSS selector.
pub async fn focus(&self, css_selector: impl Into<String>) -> Result<()> {
self.locator(css_selector).focus().await
}
/// Hover the first element matching the CSS selector.
pub async fn hover(&self, css_selector: impl Into<String>) -> Result<()> {
self.locator(css_selector).hover().await
}
/// Select an option by value on the first matching `<select>` element.
pub async fn select(
&self,
css_selector: impl Into<String>,
value: impl AsRef<str>,
) -> Result<()> {
self.locator(css_selector).select(value).await
}
/// Wait until the CSS selector matches an element.
pub async fn wait_for_selector(
&self,
css_selector: impl Into<String>,
timeout_duration: Duration,
) -> Result<()> {
self.locator(css_selector).wait(timeout_duration).await
}
/// Return whether the CSS selector currently matches an element.
pub async fn exists(&self, css_selector: impl Into<String>) -> Result<bool> {
self.locator(css_selector).exists().await
}
/// Read the text content of the first matching element.
pub async fn text_content(&self, css_selector: impl Into<String>) -> Result<String> {
self.locator(css_selector).text_content().await
}
/// Read the current page URL.
pub async fn url(&self) -> Result<String> {
let value = self.evaluate("window.location.href").await?;
value
.as_str()
.map(str::to_owned)
.ok_or_else(|| AutomationError::Selector("location.href was not a string".to_owned()))
}
/// Read the current page title.
pub async fn title(&self) -> Result<String> {
let value = self.evaluate("document.title").await?;
value
.as_str()
.map(str::to_owned)
.ok_or_else(|| AutomationError::Selector("document.title was not a string".to_owned()))
}
/// Click the first element whose text content contains the supplied text.
pub async fn click_text(&self, text: impl Into<String>) -> Result<()> {
self.locator_text(text).click().await
}
/// Click the first element whose text content contains the supplied text with custom action options.
pub async fn click_text_with_options(
&self,
text: impl Into<String>,
options: ActionOptions,
) -> Result<()> {
self.locator_text(text).click_with_options(options).await
}
/// Wait until an element containing the supplied text appears.
pub async fn wait_for_text(
&self,
text: impl Into<String>,
timeout_duration: Duration,
) -> Result<()> {
self.locator_text(text).wait(timeout_duration).await
}
/// Wait until the current page URL contains the supplied substring.
pub async fn wait_for_url_contains(
&self,
needle: impl AsRef<str>,
timeout_duration: Duration,
) -> Result<String> {
let deadline = Instant::now() + timeout_duration;
let needle = needle.as_ref().to_owned();
loop {
let current_url = self.url().await?;
if current_url.contains(&needle) {
return Ok(current_url);
}
if Instant::now() >= deadline {
return Err(AutomationError::Timeout {
what: format!("URL containing {needle}"),
});
}
sleep(Duration::from_millis(200)).await;
}
}
/// Build a text locator.
#[must_use]
pub fn locator_text(&self, text: impl Into<String>) -> Locator {
Locator::new(self.clone(), SelectorKind::Text(text.into()))
}
/// Build a role locator.
#[must_use]
pub fn locator_role(&self, role: impl Into<String>) -> Locator {
Locator::new(self.clone(), SelectorKind::Role(role.into()))
}
/// Build a test-id locator.
#[must_use]
pub fn locator_test_id(&self, test_id: impl Into<String>) -> Locator {
Locator::new(self.clone(), SelectorKind::TestId(test_id.into()))
}
/// Press a key against the active element.
pub async fn press(&self, key: impl AsRef<str>) -> Result<()> {
let key = serde_json::to_string(key.as_ref())?;
let _ = self
.evaluate(format!(
"(() => {{ const key = {key}; const el = document.activeElement ?? document.body; for (const type of ['keydown','keypress','keyup']) {{ el.dispatchEvent(new KeyboardEvent(type, {{ key, bubbles: true }})); }} return true; }})()"
))
.await?;
Ok(())
}
/// Move the mouse pointer to the given viewport coordinates.
pub async fn move_mouse(&self, x: f64, y: f64) -> Result<()> {
self.dispatch_mouse_event("mouseMoved", x, y, MouseButton::None, 0, 0)
.await
}
/// Press a mouse button at the given viewport coordinates.
pub async fn mouse_down(&self, x: f64, y: f64, button: MouseButton) -> Result<()> {
self.dispatch_mouse_event("mousePressed", x, y, button, 1, 1)
.await
}
/// Release a mouse button at the given viewport coordinates.
pub async fn mouse_up(&self, x: f64, y: f64, button: MouseButton) -> Result<()> {
self.dispatch_mouse_event("mouseReleased", x, y, button, 0, 1)
.await
}
/// Click at the given viewport coordinates.
pub async fn click_at(&self, x: f64, y: f64, options: ClickOptions) -> Result<()> {
self.move_mouse(x, y).await?;
self.dispatch_mouse_event("mousePressed", x, y, options.button, 1, options.click_count)
.await?;
sleep(options.down_up_delay).await;
self.dispatch_mouse_event(
"mouseReleased",
x,
y,
options.button,
0,
options.click_count,
)
.await
}
/// Insert text into the currently focused element.
pub async fn insert_text(&self, text: impl AsRef<str>) -> Result<()> {
self.cdp()
.call_raw("Input.insertText", json!({ "text": text.as_ref() }))
.await?;
Ok(())
}
/// Type text into the currently focused element, waiting between characters.
pub async fn type_text(&self, text: impl AsRef<str>, delay: Duration) -> Result<()> {
for character in text.as_ref().chars() {
self.insert_text(character.to_string()).await?;
sleep(delay).await;
}
Ok(())
}
/// Subscribe to all events for this page session.
pub fn events(&self) -> EventStream {
EventStream::new(
self.transport.subscribe_events(),
Some(self.session_id.clone()),
None,
)
}
/// Subscribe to network and fetch events for this page session.
pub fn network_events(&self) -> EventStream {
EventStream::new(
self.transport.subscribe_events(),
Some(self.session_id.clone()),
Some("Network."),
)
.with_extra_prefix("Fetch.")
}
/// Enable request interception for this page.
pub async fn enable_request_interception(&self, patterns: Vec<RequestPattern>) -> Result<()> {
self.client
.execute_in_session::<FetchEnableParams>(
self.session_id.clone(),
&FetchEnableParams {
patterns: Some(
patterns
.iter()
.map(RequestPattern::to_json)
.map(serde_json::from_value)
.collect::<std::result::Result<Vec<_>, _>>()?,
),
},
)
.await?;
Ok(())
}
/// Wait for the next intercepted route on this page.
pub async fn next_route(&self, timeout_duration: Duration) -> Result<Route> {
let mut events = EventStream::new(
self.transport.subscribe_events(),
Some(self.session_id.clone()),
Some("Fetch."),
);
let event = events
.recv_with_timeout(timeout_duration)
.await?
.ok_or_else(|| AutomationError::Timeout {
what: "page route".to_owned(),
})?;
Route::from_event(Arc::clone(&self.client), event)
}
async fn wait_for_event(
&self,
method: &str,
timeout_duration: Duration,
) -> Result<NetworkEvent> {
let mut events = EventStream::new(
self.transport.subscribe_events(),
Some(self.session_id.clone()),
Some(method),
);
events
.recv_with_timeout(timeout_duration)
.await?
.ok_or_else(|| AutomationError::Timeout {
what: method.to_owned(),
})
}
async fn dispatch_mouse_event(
&self,
event_type: &str,
x: f64,
y: f64,
button: MouseButton,
buttons: u8,
click_count: u8,
) -> Result<()> {
self.cdp()
.call_raw(
"Input.dispatchMouseEvent",
json!({
"type": event_type,
"x": x,
"y": y,
"button": button.as_cdp_value(),
"buttons": buttons,
"clickCount": click_count,
}),
)
.await?;
Ok(())
}
}
/// A query that resolves a DOM element.
#[derive(Clone)]
pub struct Locator {
page: Page,
selector: SelectorKind,
}
impl Locator {
fn new(page: Page, selector: SelectorKind) -> Self {
Self { page, selector }
}
/// Click the matched element.
pub async fn click(&self) -> Result<()> {
self.click_with_options(ActionOptions::default()).await
}
/// Click the matched element with custom action options.
pub async fn click_with_options(&self, options: ActionOptions) -> Result<()> {
self.wait(options.timeout).await?;
if !options.force {
self.wait_for_actionable(options.timeout).await?;
}
self.scroll_into_view().await?;
let rect = self.bounding_rect().await?;
let x = rect.x + (rect.width / 2.0);
let y = rect.y + (rect.height / 2.0);
self.page.click_at(x, y, ClickOptions::default()).await
}
/// Fill the matched form field.
pub async fn fill(&self, value: impl AsRef<str>) -> Result<()> {
self.fill_with_options(value, ActionOptions::default())
.await
}
/// Fill the matched form field with custom action options.
pub async fn fill_with_options(
&self,
value: impl AsRef<str>,
options: ActionOptions,
) -> Result<()> {
self.wait(options.timeout).await?;
if !options.force {
self.wait_for_actionable(options.timeout).await?;
}
self.scroll_into_view().await?;
let rect = self.bounding_rect().await?;
let x = rect.x + (rect.width / 2.0);
let y = rect.y + (rect.height / 2.0);
self.page.click_at(x, y, ClickOptions::default()).await?;
self.run_selector_action(
"el.focus(); if ('value' in el) { el.value = ''; el.dispatchEvent(new Event('input', { bubbles: true })); el.dispatchEvent(new Event('change', { bubbles: true })); }",
)
.await?;
self.page
.type_text(value.as_ref(), Duration::from_millis(45))
.await
}
/// Focus the matched element.
pub async fn focus(&self) -> Result<()> {
self.wait(ActionOptions::default().timeout).await?;
self.run_selector_action("el.focus();").await
}
/// Hover the matched element.
pub async fn hover(&self) -> Result<()> {
self.wait(ActionOptions::default().timeout).await?;
self.run_selector_action(
"el.dispatchEvent(new MouseEvent('mouseover', { bubbles: true })); el.dispatchEvent(new MouseEvent('mouseenter', { bubbles: true }));",
)
.await
}
/// Select an option by value on the matched `<select>`.
pub async fn select(&self, value: impl AsRef<str>) -> Result<()> {
self.wait(ActionOptions::default().timeout).await?;
let value = serde_json::to_string(value.as_ref())?;
self.run_selector_action(&format!(
"el.value = {value}; el.dispatchEvent(new Event('input', {{ bubbles: true }})); el.dispatchEvent(new Event('change', {{ bubbles: true }}));"
))
.await
}
/// Wait until the locator matches an element.
pub async fn wait(&self, timeout_duration: Duration) -> Result<()> {
let deadline = Instant::now() + timeout_duration;
loop {
if self.exists().await? {
return Ok(());
}
if Instant::now() >= deadline {
return Err(AutomationError::Timeout {
what: "locator existence".to_owned(),
});
}
sleep(Duration::from_millis(100)).await;
}
}
/// Returns true when the locator currently matches an element.
pub async fn exists(&self) -> Result<bool> {
let status = self.element_status().await?;
Ok(status.exists)
}
/// Read the text content of the matched element.
pub async fn text_content(&self) -> Result<String> {
let script = format!(
"(() => {{ const el = {selector}; return el ? (el.textContent ?? '') : null; }})()",
selector = self.selector.javascript_expression()?,
);
let value = self.page.evaluate(script).await?;
value
.as_str()
.map(str::to_owned)
.ok_or(AutomationError::MissingElement)
}
async fn bounding_rect(&self) -> Result<BoundingRect> {
let script = format!(
"(() => {{ const el = {selector}; if (!el) return null; const rect = el.getBoundingClientRect(); return {{ x: rect.x, y: rect.y, width: rect.width, height: rect.height }}; }})()",
selector = self.selector.javascript_expression()?,
);
let value = self.page.evaluate(script).await?;
let rect = serde_json::from_value::<Option<BoundingRect>>(value)?
.ok_or(AutomationError::MissingElement)?;
Ok(rect)
}
async fn scroll_into_view(&self) -> Result<()> {
self.run_selector_action(
"el.scrollIntoView({ block: 'center', inline: 'center', behavior: 'instant' });",
)
.await
}
async fn wait_for_actionable(&self, timeout_duration: Duration) -> Result<()> {
let deadline = Instant::now() + timeout_duration;
loop {
let status = self.element_status().await?;
if status.exists && status.visible && status.enabled {
return Ok(());
}
if Instant::now() >= deadline {
return Err(AutomationError::Timeout {
what: "locator actionability".to_owned(),
});
}
sleep(Duration::from_millis(100)).await;
}
}
async fn element_status(&self) -> Result<ElementStatus> {
let script = format!(
"(() => {{ const el = {selector}; if (!el) return {{ exists: false, visible: false, enabled: false }}; const style = window.getComputedStyle(el); const rect = el.getBoundingClientRect(); const visible = style.display !== 'none' && style.visibility !== 'hidden' && Number(rect.width) > 0 && Number(rect.height) > 0; const enabled = !('disabled' in el) || !el.disabled; return {{ exists: true, visible, enabled }}; }})()",
selector = self.selector.javascript_expression()?,
);
let value = self.page.evaluate(script).await?;
Ok(serde_json::from_value(value)?)
}
async fn run_selector_action(&self, body: &str) -> Result<()> {
let script = format!(
"(() => {{ const el = {selector}; if (!el) return {{ ok: false, message: 'missing element' }}; {body} return {{ ok: true }}; }})()",
selector = self.selector.javascript_expression()?,
body = body,
);
let value = self.page.evaluate(script).await?;
let result = serde_json::from_value::<SelectorActionResult>(value)?;
if result.ok {
Ok(())
} else {
Err(AutomationError::Selector(
result
.message
.unwrap_or_else(|| "selector action failed".to_owned()),
))
}
}
}
#[derive(Debug, Deserialize)]
struct SelectorActionResult {
ok: bool,
message: Option<String>,
}
#[derive(Debug, Deserialize)]
struct ElementStatus {
exists: bool,
visible: bool,
enabled: bool,
}
#[derive(Debug, Deserialize)]
struct BoundingRect {
x: f64,
y: f64,
width: f64,
height: f64,
}
#[derive(Clone, Debug)]
enum SelectorKind {
Css(String),
Text(String),
Role(String),
TestId(String),
}
impl SelectorKind {
fn javascript_expression(&self) -> Result<String> {
let value = match self {
Self::Css(selector) => {
format!(
"document.querySelector({})",
serde_json::to_string(selector)?
)
}
Self::Text(text) => {
let text = serde_json::to_string(text)?;
format!(
"(() => {{ const needle = {text}; const nodes = Array.from(document.querySelectorAll('body, body *')); return nodes.find((node) => (node.textContent ?? '').includes(needle)) ?? null; }})()"
)
}
Self::Role(role) => {
let role = serde_json::to_string(role)?;
format!(
"(() => {{ const wanted = {role}; const inferRole = (el) => {{ if (el.hasAttribute('role')) return el.getAttribute('role'); const tag = el.tagName.toLowerCase(); if (tag === 'button') return 'button'; if (tag === 'a' && el.hasAttribute('href')) return 'link'; if (tag === 'select') return 'combobox'; if (tag === 'textarea') return 'textbox'; if (tag === 'img') return 'img'; if (['h1','h2','h3','h4','h5','h6'].includes(tag)) return 'heading'; if (tag === 'input') {{ const type = (el.getAttribute('type') ?? 'text').toLowerCase(); if (['button','submit','reset'].includes(type)) return 'button'; if (['checkbox'].includes(type)) return 'checkbox'; if (['radio'].includes(type)) return 'radio'; return 'textbox'; }} return null; }}; const nodes = Array.from(document.querySelectorAll('[role],button,a,input,select,textarea,img,h1,h2,h3,h4,h5,h6')); return nodes.find((node) => inferRole(node) === wanted) ?? null; }})()"
)
}
Self::TestId(test_id) => format!(
"document.querySelector({})",
serde_json::to_string(&format!(r#"[data-testid="{test_id}"]"#))?
),
};
Ok(value)
}
}
/// Options for actions like click and fill.
#[derive(Debug, Clone, Copy)]
pub struct ActionOptions {
/// Timeout for auto-wait behavior.
pub timeout: Duration,
/// Skip actionability checks while still resolving the locator.
pub force: bool,
}
impl Default for ActionOptions {
fn default() -> Self {
Self {
timeout: Duration::from_secs(30),
force: false,
}
}
}
/// Mouse buttons recognized by Chrome input dispatch.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MouseButton {
/// No mouse button.
None,
/// The primary mouse button.
Left,
/// The auxiliary mouse button.
Middle,
/// The secondary mouse button.
Right,
/// The browser back button.
Back,
/// The browser forward button.
Forward,
}
impl MouseButton {
fn as_cdp_value(self) -> &'static str {
match self {
Self::None => "none",
Self::Left => "left",
Self::Middle => "middle",
Self::Right => "right",
Self::Back => "back",
Self::Forward => "forward",
}
}
}
/// Options for synthetic mouse clicks.
#[derive(Debug, Clone, Copy)]
pub struct ClickOptions {
/// Which mouse button to click with.
pub button: MouseButton,
/// Number of clicks to report to the page.
pub click_count: u8,
/// Delay between mouse down and mouse up.
pub down_up_delay: Duration,
}
impl Default for ClickOptions {
fn default() -> Self {
Self {
button: MouseButton::Left,
click_count: 1,
down_up_delay: Duration::from_millis(50),
}
}
}
/// Request interception pattern.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RequestPattern {
/// Optional URL pattern.
pub url_pattern: Option<String>,
/// Optional request stage.
pub request_stage: Option<String>,
}
impl RequestPattern {
fn to_json(&self) -> Value {
json!({
"urlPattern": self.url_pattern,
"requestStage": self.request_stage,
})
}
}
/// A normalized network or fetch event.
#[derive(Debug, Clone)]
pub struct NetworkEvent {
/// Event method name.
pub method: String,
/// Optional page session id.
pub session_id: Option<String>,
/// Raw JSON parameters.
pub params: Value,
}
impl From<TransportEvent> for NetworkEvent {
fn from(event: TransportEvent) -> Self {
Self {
method: event.method,
session_id: event.session_id,
params: event.params.unwrap_or(Value::Null),
}
}
}
/// A filtered event stream.
pub struct EventStream {
receiver: broadcast::Receiver<TransportEvent>,
session_id: Option<String>,
method_prefixes: Vec<String>,
}
impl EventStream {
fn new(
receiver: broadcast::Receiver<TransportEvent>,
session_id: Option<String>,
method_prefix: Option<&str>,
) -> Self {
let method_prefixes = method_prefix.into_iter().map(str::to_owned).collect();
Self {
receiver,
session_id,
method_prefixes,
}
}
fn with_extra_prefix(mut self, prefix: &str) -> Self {
self.method_prefixes.push(prefix.to_owned());
self
}
/// Receive the next matching event.
pub async fn recv(&mut self) -> Result<NetworkEvent> {
loop {
match self.receiver.recv().await {
Ok(event) => {
if self.matches(&event) {
return Ok(event.into());
}
}
Err(broadcast::error::RecvError::Closed) => {
return Err(AutomationError::Timeout {
what: "event stream closed".to_owned(),
});
}
Err(broadcast::error::RecvError::Lagged(_)) => {}
}
}
}
async fn recv_with_timeout(&mut self, duration: Duration) -> Result<Option<NetworkEvent>> {
match timeout(duration, self.recv()).await {
Ok(event) => event.map(Some),
Err(_) => Ok(None),
}
}
fn matches(&self, event: &TransportEvent) -> bool {
if let Some(session_id) = &self.session_id {
if event.session_id.as_deref() != Some(session_id.as_str()) {
return false;
}
}
if self.method_prefixes.is_empty() {
return true;
}
self.method_prefixes
.iter()
.any(|prefix| event.method.starts_with(prefix))
}
}
/// An intercepted request route.
pub struct Route {
client: Arc<CdpClient<AutomationTransport>>,
/// The intercepted page session.
pub session_id: String,
/// The CDP fetch request id.
pub request_id: String,
/// The intercepted request URL, when present.
pub url: Option<String>,
/// The intercepted request method, when present.
pub method: Option<String>,
}
impl Route {
fn from_event(
client: Arc<CdpClient<AutomationTransport>>,
event: NetworkEvent,
) -> Result<Self> {
let paused = serde_json::from_value::<RequestPausedEvent>(event.params)?;
Ok(Self {
client,
session_id: event
.session_id
.ok_or(AutomationError::MissingField("sessionId"))?,
request_id: paused.request_id,
url: paused.request.as_ref().map(|request| request.url.clone()),
method: paused.request.map(|request| request.method),
})
}
/// Continue the intercepted request.
pub async fn continue_request(&self) -> Result<()> {
self.client
.execute_in_session::<FetchContinueRequestParams>(
self.session_id.clone(),
&FetchContinueRequestParams {
request_id: self.request_id.clone(),
},
)
.await?;
Ok(())
}
/// Abort the intercepted request.
pub async fn abort(&self, error_reason: impl Into<String>) -> Result<()> {
self.client
.execute_in_session::<FetchFailRequestParams>(
self.session_id.clone(),
&FetchFailRequestParams {
request_id: self.request_id.clone(),
error_reason: error_reason.into(),
},
)
.await?;
Ok(())
}
/// Fulfill the intercepted request with a synthetic response.
pub async fn fulfill(&self, response_code: u16, body: Option<Vec<u8>>) -> Result<()> {
let body = body.map(|bytes| base64::engine::general_purpose::STANDARD.encode(bytes));
self.client
.execute_in_session::<FetchFulfillRequestParams>(
self.session_id.clone(),
&FetchFulfillRequestParams {
request_id: self.request_id.clone(),
response_code,
body,
},
)
.await?;
Ok(())
}
}
#[derive(Debug, Deserialize)]
struct RequestPausedEvent {
#[serde(rename = "requestId")]
request_id: String,
request: Option<RequestDetails>,
}
#[derive(Debug, Deserialize)]
struct RequestDetails {
url: String,
method: String,
}
#[cfg(test)]
mod tests {
use super::SelectorKind;
#[test]
fn css_selector_should_compile_to_query_selector() {
let selector = SelectorKind::Css("button.primary".to_owned());
let expression = selector.javascript_expression().unwrap();
assert!(expression.contains("document.querySelector"));
}
#[test]
fn role_selector_should_embed_common_role_inference() {
let selector = SelectorKind::Role("button".to_owned());
let expression = selector.javascript_expression().unwrap();
assert!(expression.contains("inferRole"));
}
#[test]
fn test_id_selector_should_target_data_testid() {
let selector = SelectorKind::TestId("checkout".to_owned());
let expression = selector.javascript_expression().unwrap();
assert!(expression.contains("data-testid"));
}
}