use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering};
use std::time::Duration;
use serde_json::{Value, json};
use tokio::sync::{Mutex, mpsc, watch};
use tokio::task::JoinHandle;
use tokio::time::{Instant, timeout_at};
use crate::browser::actions::Actions;
use crate::browser::console::{Console, ConsoleShared};
use crate::browser::download::{DownloadShared, Downloads};
use crate::browser::element::Element;
use crate::browser::frame::Frame;
use crate::browser::handles::{Intercept, Listen, Scroll, SetTab, Wait};
use crate::browser::interceptor::{Decision, InterceptedRequest, InterceptorState};
use crate::browser::listener::{
DRAIN_JS, DataPacket, ListenBuffer, ListenFilter, UNINSTALL_JS, hook_script, parse_packets,
};
use crate::browser::screencast::{Screencast, ScreencastShared};
use crate::browser::static_element::StaticElement;
use crate::browser::websocket::{WsListener, WsShared};
use crate::launcher::{BrowserOptions, Geolocation, OsType, Proxy};
use crate::locator::{self, Query};
use crate::protocol::{Connection, Event};
use crate::util::{base64_decode, base64_encode};
use crate::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum LoadMode {
#[default]
Normal,
Eager,
None,
}
impl LoadMode {
fn event_name(self) -> Option<&'static str> {
match self {
LoadMode::Normal => Some("load"),
LoadMode::Eager => Some("DOMContentLoaded"),
LoadMode::None => Option::None,
}
}
fn as_u8(self) -> u8 {
match self {
LoadMode::Normal => 0,
LoadMode::Eager => 1,
LoadMode::None => 2,
}
}
fn from_u8(v: u8) -> Self {
match v {
1 => LoadMode::Eager,
2 => LoadMode::None,
_ => LoadMode::Normal,
}
}
}
#[derive(Debug, Clone)]
pub struct GetOptions {
pub retry: u32,
pub interval: Duration,
pub timeout: Option<Duration>,
pub load_mode: Option<LoadMode>,
pub referer: Option<String>,
}
impl Default for GetOptions {
fn default() -> Self {
Self {
retry: 0,
interval: Duration::from_secs(1),
timeout: None,
load_mode: None,
referer: None,
}
}
}
impl GetOptions {
pub fn new() -> Self {
Self::default()
}
pub fn retry(mut self, n: u32) -> Self {
self.retry = n;
self
}
pub fn interval(mut self, secs: f64) -> Self {
self.interval = Duration::from_secs_f64(secs.max(0.0));
self
}
pub fn timeout(mut self, d: Duration) -> Self {
self.timeout = Some(d);
self
}
pub fn load_mode(mut self, m: LoadMode) -> Self {
self.load_mode = Some(m);
self
}
pub fn referer(mut self, r: impl Into<String>) -> Self {
self.referer = Some(r.into());
self
}
}
#[derive(Debug, Clone, Default)]
pub struct ContextOverride {
pub proxy: Option<Proxy>,
pub user_agent: Option<String>,
pub locale: Option<String>,
pub timezone_id: Option<String>,
pub platform: Option<String>,
pub os: Option<OsType>,
pub geolocation: Option<Geolocation>,
pub window_size: Option<(u32, u32)>,
}
impl ContextOverride {
pub fn new() -> Self {
Self::default()
}
pub fn proxy(mut self, p: Proxy) -> Self {
self.proxy = Some(p);
self
}
pub fn user_agent(mut self, ua: impl Into<String>) -> Self {
self.user_agent = Some(ua.into());
self
}
pub fn locale(mut self, locale: impl Into<String>) -> Self {
self.locale = Some(locale.into());
self
}
pub fn timezone(mut self, tz: impl Into<String>) -> Self {
self.timezone_id = Some(tz.into());
self
}
pub fn platform(mut self, platform: impl Into<String>) -> Self {
self.platform = Some(platform.into());
self
}
pub fn os(mut self, os: OsType) -> Self {
self.os = Some(os);
self
}
pub fn geolocation(mut self, latitude: f64, longitude: f64) -> Self {
self.geolocation = Some(Geolocation {
latitude,
longitude,
accuracy: None,
});
self
}
pub fn window_size(mut self, width: u32, height: u32) -> Self {
self.window_size = Some((width, height));
self
}
pub fn is_empty(&self) -> bool {
self.proxy.is_none()
&& self.user_agent.is_none()
&& self.locale.is_none()
&& self.timezone_id.is_none()
&& self.platform.is_none()
&& self.os.is_none()
&& self.geolocation.is_none()
&& self.window_size.is_none()
}
pub(crate) fn merge_into(&self, mut base: BrowserOptions) -> BrowserOptions {
if let Some(p) = &self.proxy {
base.proxy = Some(p.clone());
}
if let Some(ua) = &self.user_agent {
base.fingerprint.user_agent = Some(ua.clone());
}
if let Some(l) = &self.locale {
base.fingerprint.locale = Some(l.clone());
}
if let Some(tz) = &self.timezone_id {
base.fingerprint.timezone_id = Some(tz.clone());
}
if let Some(p) = &self.platform {
base.fingerprint.platform = Some(p.clone());
}
if let Some(os) = self.os {
base.fingerprint.os = Some(os);
}
if let Some(g) = self.geolocation {
base.fingerprint.geolocation = Some(g);
}
if let Some(ws) = self.window_size {
base.window_size = Some(ws);
}
base
}
}
#[derive(Debug, Clone)]
pub struct DialogInfo {
pub dialog_type: String,
pub message: String,
pub default_value: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct PageRect {
pub window_width: f64,
pub window_height: f64,
pub page_width: f64,
pub page_height: f64,
pub scroll_x: f64,
pub scroll_y: f64,
pub device_pixel_ratio: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ImageFormat {
#[default]
Png,
Jpeg,
}
impl ImageFormat {
pub(crate) fn mime(self) -> &'static str {
match self {
ImageFormat::Png => "image/png",
ImageFormat::Jpeg => "image/jpeg",
}
}
pub fn from_path(path: &Path) -> Self {
match path
.extension()
.and_then(|e| e.to_str())
.map(str::to_ascii_lowercase)
.as_deref()
{
Some("jpg") | Some("jpeg") => ImageFormat::Jpeg,
_ => ImageFormat::Png,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ShotOpts {
pub full_page: bool,
pub region: Option<((f64, f64), (f64, f64))>,
pub format: ImageFormat,
pub quality: Option<u8>,
}
impl ShotOpts {
pub fn new() -> Self {
Self::default()
}
pub fn full_page(mut self, yes: bool) -> Self {
self.full_page = yes;
self
}
pub fn region(mut self, left_top: (f64, f64), right_bottom: (f64, f64)) -> Self {
self.region = Some((left_top, right_bottom));
self
}
pub fn format(mut self, format: ImageFormat) -> Self {
self.format = format;
self
}
pub fn quality(mut self, q: u8) -> Self {
self.quality = Some(q);
self
}
fn region_clip(&self) -> Option<Value> {
self.region.map(|((l, t), (r, b))| {
json!({ "x": l, "y": t, "width": (r - l).max(1.0), "height": (b - t).max(1.0) })
})
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Cookie {
pub name: String,
pub value: String,
pub domain: String,
pub path: String,
pub expires: f64,
pub http_only: bool,
pub secure: bool,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct CookieParam {
pub name: String,
pub value: String,
pub url: Option<String>,
pub domain: Option<String>,
pub path: Option<String>,
pub secure: Option<bool>,
pub http_only: Option<bool>,
pub expires: Option<f64>,
}
#[derive(Debug, Clone)]
pub struct DownloadInfo {
pub url: String,
pub suggested_filename: String,
pub path: std::path::PathBuf,
pub success: bool,
pub error: Option<String>,
}
impl CookieParam {
pub fn new(name: impl Into<String>, value: impl Into<String>) -> Self {
Self {
name: name.into(),
value: value.into(),
..Default::default()
}
}
pub fn url(mut self, url: impl Into<String>) -> Self {
self.url = Some(url.into());
self
}
pub fn domain(mut self, domain: impl Into<String>) -> Self {
self.domain = Some(domain.into());
self
}
}
pub(crate) struct TabCore {
pub conn: Connection,
pub session_id: String,
pub target_id: String,
pub browser_context_id: String,
pub main_frame_id: String,
pub exec_ctx: watch::Receiver<Option<String>>,
pub frame_ctxs: Arc<Mutex<HashMap<String, String>>>,
pub listen_active: Mutex<bool>,
pub listen_buf: Mutex<ListenBuffer>,
pub bg_active: AtomicBool,
pub listen_bg: Mutex<Option<JoinHandle<()>>>,
pub listen_script: Mutex<Option<String>>,
pub init_scripts: Mutex<Vec<String>>,
pub interceptor: Arc<Mutex<Option<InterceptorState>>>,
pub intercept_rx: Mutex<Option<mpsc::UnboundedReceiver<InterceptedRequest>>>,
pub console: Arc<ConsoleShared>,
pub console_task: Mutex<Option<JoinHandle<()>>>,
pub ws: Arc<WsShared>,
pub ws_task: Mutex<Option<JoinHandle<()>>>,
pub upload: Arc<UploadShared>,
pub timeout_ms: AtomicU64,
pub load_mode: AtomicU8,
pub last_load_ok: AtomicBool,
pub screencast: Arc<ScreencastShared>,
pub downloads: Arc<DownloadShared>,
pub download_path: Option<std::path::PathBuf>,
pub pump_abort: tokio::task::AbortHandle,
}
impl Drop for TabCore {
fn drop(&mut self) {
self.pump_abort.abort();
}
}
#[derive(Default)]
pub(crate) struct UploadShared {
pub files: Mutex<Option<Vec<String>>>,
}
impl TabCore {
pub(crate) fn timeout(&self) -> Duration {
Duration::from_millis(self.timeout_ms.load(Ordering::Relaxed))
}
pub(crate) fn set_timeout(&self, d: Duration) {
self.timeout_ms
.store(d.as_millis() as u64, Ordering::Relaxed);
}
pub(crate) fn current_load_mode(&self) -> LoadMode {
LoadMode::from_u8(self.load_mode.load(Ordering::Relaxed))
}
pub(crate) fn set_load_mode(&self, m: LoadMode) {
self.load_mode.store(m.as_u8(), Ordering::Relaxed);
}
pub(crate) async fn poll_ready_complete(&self, timeout: Duration) -> Result<bool> {
let deadline = Instant::now() + timeout;
loop {
let rs = self
.evaluate("document.readyState", true)
.await
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_default();
if rs == "complete" {
return Ok(true);
}
if Instant::now() >= deadline {
return Ok(false);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub(crate) async fn arm_upload(&self, files: Vec<String>) -> Result<()> {
*self.upload.files.lock().await = Some(files);
self.evaluate(UPLOAD_HOOK_JS, true).await?;
Ok(())
}
pub(crate) async fn wait_upload(&self, timeout: Duration) -> Result<bool> {
let deadline = Instant::now() + timeout;
loop {
let captured = self
.evaluate("window.__drission_upload_el || null", false)
.await?;
if let Some(oid) = captured.get("objectId").and_then(|v| v.as_str()) {
let files = self.upload.files.lock().await.clone().unwrap_or_default();
self.send_page(
"Page.setFileInputFiles",
json!({ "frameId": self.main_frame_id, "objectId": oid, "files": files }),
)
.await?;
let _ = self
.evaluate("window.__drission_upload_el = null", true)
.await;
return Ok(true);
}
if Instant::now() >= deadline {
return Ok(false);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
pub(crate) async fn dispatch_mouse(
&self,
ty: &str,
x: f64,
y: f64,
buttons: i64,
) -> Result<()> {
let click_count = i64::from(ty == "mousedown" || ty == "mouseup");
self.dispatch_mouse_ex(ty, x, y, 0, buttons, click_count)
.await
}
pub(crate) async fn dispatch_mouse_ex(
&self,
ty: &str,
x: f64,
y: f64,
button: i64,
buttons: i64,
click_count: i64,
) -> Result<()> {
self.send_page(
"Page.dispatchMouseEvent",
json!({
"type": ty,
"button": button,
"buttons": buttons,
"x": x,
"y": y,
"modifiers": 0,
"clickCount": click_count,
}),
)
.await?;
Ok(())
}
pub(crate) fn dispatch_mouse_fire(&self, ty: &str, x: f64, y: f64, buttons: i64) -> Result<()> {
let click_count = i64::from(ty == "mousedown" || ty == "mouseup");
self.conn.fire_session(
"Page.dispatchMouseEvent",
json!({
"type": ty,
"button": 0,
"buttons": buttons,
"x": x,
"y": y,
"modifiers": 0,
"clickCount": click_count,
}),
Some(&self.session_id),
)
}
pub(crate) async fn exec_ctx_for_frame(&self, frame_id: &str) -> Result<String> {
if frame_id == self.main_frame_id {
return self.exec_ctx_id().await;
}
let timeout = self.timeout();
let deadline = Instant::now() + timeout;
loop {
if let Some(ctx) = self.frame_ctxs.lock().await.get(frame_id).cloned() {
return Ok(ctx);
}
if Instant::now() >= deadline {
return Err(Error::Timeout(timeout));
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
pub(crate) async fn node_array_object_ids(
&self,
frame_id: &str,
array_object_id: &str,
) -> Result<Vec<String>> {
let ctx = self.exec_ctx_for_frame(frame_id).await?;
let props = self
.send_page(
"Runtime.getObjectProperties",
json!({ "executionContextId": ctx, "objectId": array_object_id }),
)
.await?;
let mut out = Vec::new();
if let Some(list) = props["properties"].as_array() {
for p in list {
if p["name"].as_str().map(is_index).unwrap_or(false)
&& let Some(oid) = p["value"]["objectId"].as_str()
{
out.push(oid.to_string());
}
}
}
Ok(out)
}
pub(crate) async fn evaluate_in(
&self,
frame_id: &str,
expression: &str,
return_by_value: bool,
) -> Result<Value> {
let mut base = serde_json::Map::new();
base.insert("expression".into(), json!(expression));
base.insert("returnByValue".into(), json!(return_by_value));
self.runtime_call_in(frame_id, "Runtime.evaluate", base)
.await
}
pub(crate) async fn call_function_in(
&self,
frame_id: &str,
declaration: &str,
args: Vec<Value>,
return_by_value: bool,
) -> Result<Value> {
let mut base = serde_json::Map::new();
base.insert("functionDeclaration".into(), json!(declaration));
base.insert("args".into(), json!(args));
base.insert("returnByValue".into(), json!(return_by_value));
self.runtime_call_in(frame_id, "Runtime.callFunction", base)
.await
}
async fn runtime_call_in(
&self,
frame_id: &str,
method: &str,
base: serde_json::Map<String, Value>,
) -> Result<Value> {
for _ in 0..4 {
let ctx = self.exec_ctx_for_frame(frame_id).await?;
let mut params = base.clone();
params.insert("executionContextId".into(), json!(ctx));
match self.send_page(method, Value::Object(params)).await {
Ok(r) => return extract_runtime_result(r),
Err(Error::Protocol(msg)) if is_stale_context(&msg) => {
if frame_id != self.main_frame_id {
self.frame_ctxs.lock().await.remove(frame_id);
}
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
Err(e) => return Err(e),
}
}
Err(Error::Other("多次重试后仍无有效执行上下文".into()))
}
pub(crate) async fn exec_ctx_id(&self) -> Result<String> {
if let Some(id) = self.exec_ctx.borrow().clone() {
return Ok(id);
}
let mut rx = self.exec_ctx.clone();
let timeout = self.timeout();
let deadline = Instant::now() + timeout;
loop {
if timeout_at(deadline, rx.changed()).await.is_err() {
return Err(Error::Timeout(timeout));
}
if let Some(id) = rx.borrow().clone() {
return Ok(id);
}
}
}
pub(crate) async fn send_page(&self, method: &str, params: Value) -> Result<Value> {
self.conn.send(method, params, Some(&self.session_id)).await
}
pub(crate) async fn capture(
&self,
clip: Value,
format: ImageFormat,
quality: Option<u8>,
) -> Result<Vec<u8>> {
let mut params = json!({ "mimeType": format.mime(), "clip": clip });
if let (ImageFormat::Jpeg, Some(q)) = (format, quality) {
params["quality"] = json!(q.min(100));
}
let r = self.send_page("Page.screenshot", params).await?;
let data = r["data"]
.as_str()
.ok_or_else(|| Error::Protocol("screenshot 未返回 data".into()))?;
base64_decode(data).ok_or_else(|| Error::Protocol("screenshot data 非法 base64".into()))
}
pub(crate) async fn page_clip(&self, full_page: bool) -> Result<Value> {
let js = if full_page {
"(() => { const d = document.documentElement, b = document.body; \
const w = Math.max(d.scrollWidth, b ? b.scrollWidth : 0, d.clientWidth); \
const h = Math.max(d.scrollHeight, b ? b.scrollHeight : 0, d.clientHeight); \
return [0, 0, w, h]; })()"
} else {
"[window.scrollX, window.scrollY, window.innerWidth, window.innerHeight]"
};
let v = self.evaluate(js, true).await?;
let x = v.get(0).and_then(Value::as_f64).unwrap_or(0.0);
let y = v.get(1).and_then(Value::as_f64).unwrap_or(0.0);
let w = v.get(2).and_then(Value::as_f64).unwrap_or(0.0).max(1.0);
let h = v.get(3).and_then(Value::as_f64).unwrap_or(0.0).max(1.0);
Ok(json!({ "x": x, "y": y, "width": w, "height": h }))
}
pub(crate) async fn set_viewport_size(&self, width: u32, height: u32) -> Result<()> {
self.send_page(
"Page.setViewportSize",
json!({ "viewportSize": { "width": width.max(1), "height": height.max(1) } }),
)
.await?;
Ok(())
}
pub(crate) async fn press_key(&self, key: &str) -> Result<()> {
let (norm, code, key_code, text) = key_descriptor(key);
self.send_page(
"Page.dispatchKeyEvent",
json!({
"type": "keydown", "key": norm, "code": code,
"keyCode": key_code, "location": 0, "repeat": false, "text": text,
}),
)
.await?;
self.send_page(
"Page.dispatchKeyEvent",
json!({
"type": "keyup", "key": norm, "code": code,
"keyCode": key_code, "location": 0, "repeat": false,
}),
)
.await?;
Ok(())
}
pub(crate) async fn rebuild_init_scripts(&self) -> Result<()> {
let mut scripts: Vec<Value> = Vec::new();
if let Some(s) = self.listen_script.lock().await.as_ref() {
scripts.push(json!({ "script": s }));
}
for s in self.init_scripts.lock().await.iter() {
scripts.push(json!({ "script": s }));
}
self.send_page("Page.setInitScripts", json!({ "scripts": scripts }))
.await?;
Ok(())
}
pub(crate) async fn evaluate(&self, expression: &str, return_by_value: bool) -> Result<Value> {
let mut base = serde_json::Map::new();
base.insert("expression".into(), json!(expression));
base.insert("returnByValue".into(), json!(return_by_value));
self.runtime_call("Runtime.evaluate", base).await
}
pub(crate) async fn call_function(
&self,
declaration: &str,
args: Vec<Value>,
return_by_value: bool,
) -> Result<Value> {
let mut base = serde_json::Map::new();
base.insert("functionDeclaration".into(), json!(declaration));
base.insert("args".into(), json!(args));
base.insert("returnByValue".into(), json!(return_by_value));
self.runtime_call("Runtime.callFunction", base).await
}
async fn runtime_call(
&self,
method: &str,
base: serde_json::Map<String, Value>,
) -> Result<Value> {
for _ in 0..4 {
let ctx = self.exec_ctx_id().await?;
let mut params = base.clone();
params.insert("executionContextId".into(), json!(ctx));
match self.send_page(method, Value::Object(params)).await {
Ok(r) => return extract_runtime_result(r),
Err(Error::Protocol(msg)) if is_stale_context(&msg) => {
tracing::debug!("执行上下文失效,等待新上下文后重试");
let mut rx = self.exec_ctx.clone();
let _ = tokio::time::timeout(Duration::from_secs(3), rx.changed()).await;
continue;
}
Err(e) => return Err(e),
}
}
Err(Error::Other("多次重试后仍无有效执行上下文".into()))
}
}
fn is_stale_context(msg: &str) -> bool {
msg.contains("execution context") || msg.contains("findExecutionContext")
}
pub(crate) async fn write_file(path: &Path, bytes: &[u8]) -> Result<()> {
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(path, bytes).await?;
Ok(())
}
#[derive(Clone)]
pub struct Tab {
pub(crate) core: Arc<TabCore>,
}
impl Tab {
pub(crate) async fn open(conn: Connection, opts: &BrowserOptions) -> Result<Self> {
let timeout = opts.launch_timeout.min(Duration::from_secs(60));
let mut events = conn.subscribe();
let r = conn
.send(
"Browser.createBrowserContext",
json!({ "removeOnDetach": true }),
None,
)
.await?;
let browser_context_id = r["browserContextId"]
.as_str()
.ok_or_else(|| Error::Protocol("createBrowserContext 未返回 browserContextId".into()))?
.to_string();
apply_context_overrides(&conn, &browser_context_id, opts).await;
let r = conn
.send(
"Browser.newPage",
json!({ "browserContextId": browser_context_id }),
None,
)
.await?;
let target_id = r["targetId"]
.as_str()
.ok_or_else(|| Error::Protocol("newPage 未返回 targetId".into()))?
.to_string();
let session_id = wait_attached(&mut events, &target_id, timeout).await?;
let (main_frame_id, exec_ctx_id) =
wait_frame_and_ctx(&mut events, &session_id, timeout).await?;
let frame_ctxs: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
frame_ctxs
.lock()
.await
.insert(main_frame_id.clone(), exec_ctx_id.clone());
let (ctx_tx, ctx_rx) = watch::channel(Some(exec_ctx_id));
let interceptor: Arc<Mutex<Option<InterceptorState>>> = Arc::new(Mutex::new(None));
let pump_handle = tokio::spawn(pump(
events,
conn.clone(),
session_id.clone(),
main_frame_id.clone(),
ctx_tx,
frame_ctxs.clone(),
interceptor.clone(),
));
let pump_abort = pump_handle.abort_handle();
let core = Arc::new(TabCore {
conn,
session_id,
target_id,
browser_context_id,
main_frame_id,
exec_ctx: ctx_rx,
frame_ctxs,
listen_active: Mutex::new(false),
listen_buf: Mutex::new(VecDeque::new()),
bg_active: AtomicBool::new(false),
listen_bg: Mutex::new(None),
listen_script: Mutex::new(None),
init_scripts: Mutex::new(Vec::new()),
interceptor,
intercept_rx: Mutex::new(None),
console: Arc::new(ConsoleShared::new()),
console_task: Mutex::new(None),
ws: Arc::new(WsShared::new()),
ws_task: Mutex::new(None),
upload: Arc::new(UploadShared::default()),
timeout_ms: AtomicU64::new(30_000),
load_mode: AtomicU8::new(LoadMode::Normal.as_u8()),
last_load_ok: AtomicBool::new(true),
screencast: Arc::new(ScreencastShared::new()),
downloads: Arc::new(DownloadShared::new()),
download_path: opts.download_path.clone(),
pump_abort,
});
Ok(Tab { core })
}
#[allow(dead_code)]
pub(crate) async fn from_attached(
conn: Connection,
target_id: String,
browser_context_id: String,
session_id: String,
mut events: tokio::sync::broadcast::Receiver<Event>,
opts: &BrowserOptions,
timeout: Duration,
) -> Result<Self> {
let (main_frame_id, exec_ctx_id) =
wait_frame_and_ctx(&mut events, &session_id, timeout).await?;
let frame_ctxs: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
frame_ctxs
.lock()
.await
.insert(main_frame_id.clone(), exec_ctx_id.clone());
let (ctx_tx, ctx_rx) = watch::channel(Some(exec_ctx_id));
let interceptor: Arc<Mutex<Option<InterceptorState>>> = Arc::new(Mutex::new(None));
let pump_handle = tokio::spawn(pump(
events,
conn.clone(),
session_id.clone(),
main_frame_id.clone(),
ctx_tx,
frame_ctxs.clone(),
interceptor.clone(),
));
let pump_abort = pump_handle.abort_handle();
let core = Arc::new(TabCore {
conn,
session_id,
target_id,
browser_context_id,
main_frame_id,
exec_ctx: ctx_rx,
frame_ctxs,
listen_active: Mutex::new(false),
listen_buf: Mutex::new(VecDeque::new()),
bg_active: AtomicBool::new(false),
listen_bg: Mutex::new(None),
listen_script: Mutex::new(None),
init_scripts: Mutex::new(Vec::new()),
interceptor,
intercept_rx: Mutex::new(None),
console: Arc::new(ConsoleShared::new()),
console_task: Mutex::new(None),
ws: Arc::new(WsShared::new()),
ws_task: Mutex::new(None),
upload: Arc::new(UploadShared::default()),
timeout_ms: AtomicU64::new(30_000),
load_mode: AtomicU8::new(LoadMode::Normal.as_u8()),
last_load_ok: AtomicBool::new(true),
screencast: Arc::new(ScreencastShared::new()),
downloads: Arc::new(DownloadShared::new()),
download_path: opts.download_path.clone(),
pump_abort,
});
Ok(Tab { core })
}
pub fn session_id(&self) -> &str {
&self.core.session_id
}
pub fn browser_context_id(&self) -> &str {
&self.core.browser_context_id
}
pub fn target_id(&self) -> &str {
&self.core.target_id
}
pub async fn get(&self, url: &str) -> Result<bool> {
self.get_with(url, &GetOptions::default()).await
}
pub async fn get_with(&self, url: &str, opts: &GetOptions) -> Result<bool> {
let timeout = opts.timeout.unwrap_or_else(|| self.core.timeout());
let mode = opts
.load_mode
.unwrap_or_else(|| self.core.current_load_mode());
let attempts = opts.retry.saturating_add(1);
let mut ok = false;
for attempt in 0..attempts {
match self
.navigate_once(url, mode, timeout, opts.referer.as_deref())
.await
{
Ok(true) => {
ok = true;
break;
}
Ok(false) => {
tracing::warn!(attempt, %url, "导航未达到目标加载状态");
}
Err(e) => {
tracing::warn!(attempt, %url, error = %e, "导航出错");
}
}
if attempt + 1 < attempts {
tokio::time::sleep(opts.interval).await;
}
}
self.core.last_load_ok.store(ok, Ordering::Relaxed);
Ok(ok)
}
async fn navigate_once(
&self,
url: &str,
mode: LoadMode,
timeout: Duration,
referer: Option<&str>,
) -> Result<bool> {
let mut events = self.core.conn.subscribe();
let mut params = json!({ "frameId": self.core.main_frame_id, "url": url });
if let Some(r) = referer {
params["referer"] = json!(r);
}
self.core.send_page("Page.navigate", params).await?;
let Some(target) = mode.event_name() else {
return Ok(true); };
let deadline = Instant::now() + timeout;
loop {
match timeout_at(deadline, events.recv()).await {
Ok(Ok(ev)) => {
if ev.session_id.as_deref() == Some(&self.core.session_id)
&& ev.method == "Page.eventFired"
&& ev.params["frameId"].as_str() == Some(&self.core.main_frame_id)
&& ev.params["name"].as_str() == Some(target)
{
return Ok(true);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(_)) => return Err(Error::Transport("连接已关闭".into())),
Err(_) => {
let rs = self.ready_state().await.unwrap_or_default();
let reached = match mode {
LoadMode::Normal => rs == "complete",
LoadMode::Eager => rs == "interactive" || rs == "complete",
LoadMode::None => true,
};
if !reached {
tracing::warn!(%url, ready_state = %rs, "等待加载超时");
}
return Ok(reached);
}
}
}
}
pub async fn url(&self) -> Result<String> {
Ok(self
.core
.evaluate("location.href", true)
.await?
.as_str()
.unwrap_or_default()
.to_string())
}
pub async fn title(&self) -> Result<String> {
Ok(self
.core
.evaluate("document.title", true)
.await?
.as_str()
.unwrap_or_default()
.to_string())
}
pub async fn html(&self) -> Result<String> {
Ok(self
.core
.evaluate("document.documentElement.outerHTML", true)
.await?
.as_str()
.unwrap_or_default()
.to_string())
}
pub async fn run_js(&self, script: &str) -> Result<Value> {
self.core.evaluate(script, true).await
}
pub async fn user_agent(&self) -> Result<String> {
Ok(self
.core
.evaluate("navigator.userAgent", true)
.await?
.as_str()
.unwrap_or_default()
.to_string())
}
pub async fn ready_state(&self) -> Result<String> {
Ok(self
.core
.evaluate("document.readyState", true)
.await?
.as_str()
.unwrap_or_default()
.to_string())
}
pub fn url_available(&self) -> bool {
self.core.last_load_ok.load(Ordering::Relaxed)
}
pub async fn stop_loading(&self) -> Result<()> {
let _ = self.core.evaluate("window.stop()", true).await;
Ok(())
}
pub async fn wait_loaded(&self) -> Result<bool> {
self.core.poll_ready_complete(self.core.timeout()).await
}
pub async fn handle_next_dialog(
&self,
accept: bool,
prompt_text: Option<&str>,
) -> Result<DialogInfo> {
let mut events = self.core.conn.subscribe();
let timeout = self.core.timeout();
let deadline = Instant::now() + timeout;
loop {
match timeout_at(deadline, events.recv()).await {
Ok(Ok(ev)) => {
if ev.session_id.as_deref() == Some(&self.core.session_id)
&& ev.method == "Page.dialogOpened"
{
let dialog_id = ev.params["dialogId"]
.as_str()
.unwrap_or_default()
.to_string();
let info = DialogInfo {
dialog_type: ev.params["type"].as_str().unwrap_or_default().to_string(),
message: ev.params["message"]
.as_str()
.unwrap_or_default()
.to_string(),
default_value: ev.params["defaultValue"].as_str().map(str::to_string),
};
let mut p = json!({ "dialogId": dialog_id, "accept": accept });
if let Some(t) = prompt_text {
p["promptText"] = json!(t);
}
self.core.send_page("Page.handleDialog", p).await?;
return Ok(info);
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(_)) => return Err(Error::Transport("连接已关闭".into())),
Err(_) => return Err(Error::Timeout(timeout)),
}
}
}
pub async fn set_upload_files(&self, paths: &[&str]) -> Result<()> {
if paths.is_empty() {
return Err(Error::Other("set_upload_files: 文件列表为空".into()));
}
let files = paths.iter().map(|p| p.to_string()).collect();
self.core.arm_upload(files).await
}
pub async fn wait_upload_paths_inputted(&self, timeout: Option<Duration>) -> Result<bool> {
let d = timeout.unwrap_or_else(|| self.core.timeout());
self.core.wait_upload(d).await
}
pub async fn reload(&self) -> Result<()> {
self.core.send_page("Page.reload", json!({})).await?;
Ok(())
}
pub async fn back(&self) -> Result<()> {
self.core
.send_page("Page.goBack", json!({ "frameId": self.core.main_frame_id }))
.await?;
Ok(())
}
pub async fn forward(&self) -> Result<()> {
self.core
.send_page(
"Page.goForward",
json!({ "frameId": self.core.main_frame_id }),
)
.await?;
Ok(())
}
pub async fn ele(&self, selector: &str) -> Result<Element> {
let deadline = Instant::now() + self.core.timeout();
loop {
if let Some(el) = self.find_once(selector).await? {
return Ok(el);
}
if Instant::now() >= deadline {
return Err(Error::ElementNotFound(selector.to_string()));
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub(crate) async fn find_once(&self, selector: &str) -> Result<Option<Element>> {
let query = locator::parse(selector);
let expr = single_query_expr(&query);
let result = self.core.evaluate(&expr, false).await?;
Ok(result
.get("objectId")
.and_then(|v| v.as_str())
.map(|oid| Element::new(self.core.clone(), oid.to_string())))
}
pub async fn s_root(&self) -> Result<StaticElement> {
let html = self.html().await?;
StaticElement::parse(&html)
}
pub async fn s_ele(&self, selector: &str) -> Result<StaticElement> {
let html = self.html().await?;
StaticElement::parse(&html)?.ele(selector)
}
pub async fn s_eles(&self, selector: &str) -> Result<Vec<StaticElement>> {
let html = self.html().await?;
StaticElement::parse(&html)?.eles(selector)
}
pub async fn get_frame(&self, selector: &str) -> Result<Frame> {
self.ele(selector).await?.content_frame().await
}
pub async fn eles(&self, selector: &str) -> Result<Vec<Element>> {
let query = locator::parse(selector);
let expr = multi_query_expr(&query);
let result = self.core.evaluate(&expr, false).await?;
let Some(array_object_id) = result.get("objectId").and_then(|v| v.as_str()) else {
return Ok(Vec::new());
};
let oids = self
.core
.node_array_object_ids(&self.core.main_frame_id, array_object_id)
.await?;
Ok(oids
.into_iter()
.map(|oid| Element::new(self.core.clone(), oid))
.collect())
}
pub async fn paginate<F, Fut, T>(
&self,
next_selector: &str,
max_pages: usize,
mut f: F,
) -> Result<Vec<T>>
where
F: FnMut(usize) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut out = Vec::new();
for page in 0..max_pages.max(1) {
out.push(f(page).await?);
let next = match self.find_once(next_selector).await? {
Some(e) if e.is_clickable().await.unwrap_or(false) => e,
_ => break,
};
next.click().await?;
tokio::time::sleep(Duration::from_millis(800)).await;
}
Ok(out)
}
pub async fn cookies(&self) -> Result<Vec<Cookie>> {
let r = self
.core
.conn
.send(
"Browser.getCookies",
json!({ "browserContextId": self.core.browser_context_id }),
None,
)
.await?;
let mut out = Vec::new();
if let Some(arr) = r["cookies"].as_array() {
for c in arr {
out.push(Cookie {
name: c["name"].as_str().unwrap_or_default().to_string(),
value: c["value"].as_str().unwrap_or_default().to_string(),
domain: c["domain"].as_str().unwrap_or_default().to_string(),
path: c["path"].as_str().unwrap_or_default().to_string(),
expires: c["expires"].as_f64().unwrap_or(-1.0),
http_only: c["httpOnly"].as_bool().unwrap_or(false),
secure: c["secure"].as_bool().unwrap_or(false),
});
}
}
Ok(out)
}
pub async fn set_cookies(&self, cookies: Vec<CookieParam>) -> Result<()> {
let arr: Vec<Value> = cookies
.into_iter()
.map(|c| {
let mut o = serde_json::Map::new();
o.insert("name".into(), json!(c.name));
o.insert("value".into(), json!(c.value));
if let Some(v) = c.url {
o.insert("url".into(), json!(v));
}
if let Some(v) = c.domain {
o.insert("domain".into(), json!(v));
}
if let Some(v) = c.path {
o.insert("path".into(), json!(v));
}
if let Some(v) = c.secure {
o.insert("secure".into(), json!(v));
}
if let Some(v) = c.http_only {
o.insert("httpOnly".into(), json!(v));
}
if let Some(v) = c.expires {
o.insert("expires".into(), json!(v));
}
Value::Object(o)
})
.collect();
self.core
.conn
.send(
"Browser.setCookies",
json!({ "browserContextId": self.core.browser_context_id, "cookies": arr }),
None,
)
.await?;
Ok(())
}
pub async fn wait_download(&self, timeout: Duration) -> Result<DownloadInfo> {
use crate::browser::download::{
DownloadState, list_dir_files, scan_new_files, size_stable,
};
let dir = self.core.download_path.clone().ok_or_else(|| {
Error::Other("wait_download 需先用 BrowserOptions::download_path 设置下载目录".into())
})?;
let baseline = list_dir_files(&dir).await;
let deadline = Instant::now() + timeout;
loop {
for m in scan_new_files(&dir, &baseline).await {
if m.state == DownloadState::Finished && size_stable(&m.path).await {
return Ok(DownloadInfo {
url: String::new(),
suggested_filename: m.suggested_filename,
path: m.path,
success: true,
error: None,
});
}
}
if Instant::now() >= deadline {
return Err(Error::Timeout(timeout));
}
tokio::time::sleep(Duration::from_millis(80)).await;
}
}
pub async fn listen_start(&self, keywords: &[&str]) -> Result<()> {
self.listen_start_filter(ListenFilter {
url_keywords: keywords.iter().map(|s| s.to_string()).collect(),
xhr_only: false,
})
.await
}
pub async fn listen_xhr(&self, keywords: &[&str]) -> Result<()> {
self.listen_start(keywords).await
}
async fn listen_start_filter(&self, filter: ListenFilter) -> Result<()> {
let script = hook_script(&filter);
*self.core.listen_script.lock().await = Some(script.clone());
self.core.rebuild_init_scripts().await?;
let _ = self.core.evaluate(&script, true).await;
*self.core.listen_active.lock().await = true;
self.core.listen_buf.lock().await.clear();
Ok(())
}
async fn drain_into_buffer(&self) -> Result<()> {
if self.core.bg_active.load(Ordering::SeqCst) {
return Ok(());
}
let v = self.core.evaluate(DRAIN_JS, true).await?;
let mut buf = self.core.listen_buf.lock().await;
for p in parse_packets(&v) {
buf.push_back(p);
}
Ok(())
}
pub async fn listen_wait(&self) -> Result<DataPacket> {
self.listen_wait_timeout(self.core.timeout()).await
}
pub async fn listen_wait_timeout(&self, timeout: Duration) -> Result<DataPacket> {
if !*self.core.listen_active.lock().await {
return Err(Error::Other("尚未调用 listen_start".into()));
}
let deadline = Instant::now() + timeout;
loop {
if let Some(p) = self.core.listen_buf.lock().await.pop_front() {
return Ok(p);
}
let _ = self.drain_into_buffer().await;
if let Some(p) = self.core.listen_buf.lock().await.pop_front() {
return Ok(p);
}
if Instant::now() >= deadline {
return Err(Error::Timeout(timeout));
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
pub async fn listen_next(&self) -> Result<Option<DataPacket>> {
match self.listen_wait().await {
Ok(p) => Ok(Some(p)),
Err(Error::Timeout(_)) => Ok(None),
Err(e) => Err(e),
}
}
pub async fn listen_forever(&self) -> Result<()> {
if !*self.core.listen_active.lock().await {
return Err(Error::Other(
"请先调用 listen_start/listen_xhr 再开启长监听".into(),
));
}
let mut bg = self.core.listen_bg.lock().await;
if bg.is_some() {
return Ok(());
}
self.core.bg_active.store(true, Ordering::SeqCst);
*bg = Some(tokio::spawn(bg_drain_loop(self.core.clone())));
Ok(())
}
pub async fn listen_stream(&self) -> Result<ListenStream> {
self.listen_forever().await?;
Ok(ListenStream { tab: self.clone() })
}
pub async fn listen_stop(&self) -> Result<()> {
*self.core.listen_active.lock().await = false;
self.core.bg_active.store(false, Ordering::SeqCst);
if let Some(h) = self.core.listen_bg.lock().await.take() {
h.abort();
}
*self.core.listen_script.lock().await = None;
let _ = self.core.rebuild_init_scripts().await;
let _ = self.core.evaluate(UNINSTALL_JS, true).await;
self.core.listen_buf.lock().await.clear();
Ok(())
}
pub async fn add_init_script(&self, script: &str) -> Result<()> {
self.core.init_scripts.lock().await.push(script.to_string());
self.core.rebuild_init_scripts().await?;
let _ = self.core.evaluate(script, true).await;
Ok(())
}
pub async fn apply_pointer_stealth(&self) -> Result<()> {
const POINTER_STEALTH_JS: &str = r#"(function(){
try {
var proto = window.PointerEvent && window.PointerEvent.prototype;
if (!proto) return;
var d = Object.getOwnPropertyDescriptor(proto, 'pointerType');
if (!d || !d.get) return;
var orig = d.get;
Object.defineProperty(proto, 'pointerType', {
configurable: true, enumerable: d.enumerable,
get: function(){ var v = orig.call(this); return (v === '' || v == null) ? 'mouse' : v; }
});
} catch(e){}
})()"#;
self.add_init_script(POINTER_STEALTH_JS).await
}
pub fn dump_env(&self) -> crate::browser::dump_env::EnvDumper {
crate::browser::dump_env::EnvDumper::new(self.clone())
}
pub async fn press_key(&self, key: &str) -> Result<()> {
self.core.press_key(key).await
}
pub async fn wheel(&self, delta_x: f64, delta_y: f64) -> Result<()> {
let c = self
.core
.evaluate(
"[Math.floor(innerWidth/2), Math.floor(innerHeight/2)]",
true,
)
.await
.unwrap_or(Value::Null);
let x = c.get(0).and_then(Value::as_f64).unwrap_or(200.0);
let y = c.get(1).and_then(Value::as_f64).unwrap_or(200.0);
self.wheel_at(x, y, delta_x, delta_y).await
}
pub async fn wheel_at(&self, x: f64, y: f64, delta_x: f64, delta_y: f64) -> Result<()> {
self.core
.send_page(
"Page.dispatchWheelEvent",
json!({
"x": x.floor(), "y": y.floor(),
"deltaX": delta_x, "deltaY": delta_y, "deltaZ": 0, "modifiers": 0,
}),
)
.await?;
Ok(())
}
pub async fn scroll_by(&self, x: f64, y: f64) -> Result<()> {
self.core
.evaluate(&format!("window.scrollBy({x},{y})"), true)
.await?;
Ok(())
}
pub async fn mouse_move(&self, x: f64, y: f64) -> Result<()> {
self.core.dispatch_mouse("mousemove", x, y, 0).await
}
pub async fn mouse_down(&self, x: f64, y: f64) -> Result<()> {
self.core.dispatch_mouse("mousedown", x, y, 1).await
}
pub async fn mouse_drag(&self, x: f64, y: f64) -> Result<()> {
self.core.dispatch_mouse("mousemove", x, y, 1).await
}
pub async fn mouse_up(&self, x: f64, y: f64) -> Result<()> {
self.core.dispatch_mouse("mouseup", x, y, 0).await
}
pub fn mouse_move_fast(&self, x: f64, y: f64) -> Result<()> {
self.core.dispatch_mouse_fire("mousemove", x, y, 0)
}
pub fn mouse_drag_fast(&self, x: f64, y: f64) -> Result<()> {
self.core.dispatch_mouse_fire("mousemove", x, y, 1)
}
pub fn wait(&self) -> Wait {
Wait::new(self.clone())
}
pub fn scroll(&self) -> Scroll {
Scroll::new(self.clone())
}
pub fn set(&self) -> SetTab {
SetTab::new(self.clone())
}
pub fn listen(&self) -> Listen {
Listen::new(self.clone())
}
pub fn console(&self) -> Console {
Console::new(self.clone())
}
pub fn websocket(&self) -> WsListener {
WsListener::new(self.clone())
}
pub fn actions(&self) -> Actions {
Actions::new(self.clone())
}
pub fn intercept(&self) -> Intercept {
Intercept::new(self.clone())
}
pub fn downloads(&self) -> Downloads {
Downloads::new(self.clone())
}
pub fn screencast(&self) -> Screencast {
Screencast::new(self.clone())
}
pub async fn screenshot_bytes(&self, full_page: bool) -> Result<Vec<u8>> {
let clip = self.core.page_clip(full_page).await?;
self.core.capture(clip, ImageFormat::Png, None).await
}
pub async fn screenshot_base64(&self, full_page: bool) -> Result<String> {
Ok(base64_encode(&self.screenshot_bytes(full_page).await?))
}
pub async fn screenshot(&self, opts: &ShotOpts) -> Result<Vec<u8>> {
let clip = match opts.region_clip() {
Some(c) => c,
None => self.core.page_clip(opts.full_page).await?,
};
self.core.capture(clip, opts.format, opts.quality).await
}
pub async fn get_screenshot(&self, path: impl AsRef<Path>, full_page: bool) -> Result<PathBuf> {
let path = path.as_ref().to_path_buf();
let format = ImageFormat::from_path(&path);
let clip = self.core.page_clip(full_page).await?;
let bytes = self.core.capture(clip, format, None).await?;
write_file(&path, &bytes).await?;
Ok(path)
}
pub async fn size(&self) -> Result<(f64, f64)> {
let v = self
.run_js("[window.innerWidth, window.innerHeight]")
.await?;
Ok((
v.get(0).and_then(Value::as_f64).unwrap_or(0.0),
v.get(1).and_then(Value::as_f64).unwrap_or(0.0),
))
}
pub async fn page_size(&self) -> Result<(f64, f64)> {
let v = self
.run_js(
"(() => { const d = document.documentElement, b = document.body; \
return [Math.max(d.scrollWidth, b ? b.scrollWidth : 0), \
Math.max(d.scrollHeight, b ? b.scrollHeight : 0)]; })()",
)
.await?;
Ok((
v.get(0).and_then(Value::as_f64).unwrap_or(0.0),
v.get(1).and_then(Value::as_f64).unwrap_or(0.0),
))
}
pub async fn rect(&self) -> Result<PageRect> {
let v = self
.run_js(
"(() => { const d = document.documentElement, b = document.body; return { \
ww: window.innerWidth, wh: window.innerHeight, \
pw: Math.max(d.scrollWidth, b ? b.scrollWidth : 0), \
ph: Math.max(d.scrollHeight, b ? b.scrollHeight : 0), \
sx: window.scrollX, sy: window.scrollY, dpr: window.devicePixelRatio }; })()",
)
.await?;
let f = |k: &str| v.get(k).and_then(Value::as_f64).unwrap_or(0.0);
Ok(PageRect {
window_width: f("ww"),
window_height: f("wh"),
page_width: f("pw"),
page_height: f("ph"),
scroll_x: f("sx"),
scroll_y: f("sy"),
device_pixel_ratio: f("dpr"),
})
}
pub async fn intercept_start(&self, keywords: &[&str]) -> Result<()> {
self.intercept_start_filter(ListenFilter {
url_keywords: keywords.iter().map(|s| s.to_string()).collect(),
xhr_only: false,
})
.await
}
pub async fn intercept_xhr(&self, keywords: &[&str]) -> Result<()> {
self.intercept_start_filter(ListenFilter {
url_keywords: keywords.iter().map(|s| s.to_string()).collect(),
xhr_only: true,
})
.await
}
async fn intercept_start_filter(&self, filter: ListenFilter) -> Result<()> {
self.core
.send_page("Network.setRequestInterception", json!({ "enabled": true }))
.await?;
let (state, rx) =
InterceptorState::new(filter, self.core.conn.clone(), self.core.session_id.clone());
*self.core.interceptor.lock().await = Some(state);
*self.core.intercept_rx.lock().await = Some(rx);
Ok(())
}
pub async fn intercept_next(&self) -> Result<InterceptedRequest> {
let mut guard = self.core.intercept_rx.lock().await;
let rx = guard
.as_mut()
.ok_or_else(|| Error::Other("尚未调用 intercept_start".into()))?;
let timeout = self.core.timeout();
match tokio::time::timeout(timeout, rx.recv()).await {
Ok(Some(r)) => Ok(r),
Ok(None) => Err(Error::Transport("拦截通道已关闭".into())),
Err(_) => Err(Error::Timeout(timeout)),
}
}
pub async fn intercept_next_timeout(
&self,
timeout: Duration,
) -> Result<Option<InterceptedRequest>> {
let mut guard = self.core.intercept_rx.lock().await;
let rx = guard
.as_mut()
.ok_or_else(|| Error::Other("尚未调用 intercept_start".into()))?;
match tokio::time::timeout(timeout, rx.recv()).await {
Ok(Some(r)) => Ok(Some(r)),
Ok(None) => Err(Error::Transport("拦截通道已关闭".into())),
Err(_) => Ok(None),
}
}
pub async fn intercept_stop(&self) -> Result<()> {
let _ = self
.core
.send_page(
"Network.setRequestInterception",
json!({ "enabled": false }),
)
.await;
*self.core.interceptor.lock().await = None;
*self.core.intercept_rx.lock().await = None;
Ok(())
}
pub async fn close(&self) -> Result<()> {
let _ = self.core.send_page("Page.close", json!({})).await;
let _ = self
.core
.conn
.send(
"Browser.removeBrowserContext",
json!({ "browserContextId": self.core.browser_context_id }),
None,
)
.await;
Ok(())
}
}
const UPLOAD_HOOK_JS: &str = r#"(() => {
if (window.__drission_upload_hook) {
document.removeEventListener('click', window.__drission_upload_hook, true);
}
window.__drission_upload_el = null;
const hook = (e) => {
const t = e.target;
if (t && t.tagName === 'INPUT' && String(t.type).toLowerCase() === 'file') {
e.preventDefault();
e.stopImmediatePropagation();
window.__drission_upload_el = t;
document.removeEventListener('click', hook, true);
window.__drission_upload_hook = null;
}
};
window.__drission_upload_hook = hook;
document.addEventListener('click', hook, true);
})();"#;
async fn pump(
mut events: tokio::sync::broadcast::Receiver<Event>,
conn: Connection,
session_id: String,
main_frame_id: String,
ctx_tx: watch::Sender<Option<String>>,
frame_ctxs: Arc<Mutex<HashMap<String, String>>>,
interceptor: Arc<Mutex<Option<InterceptorState>>>,
) {
loop {
let ev = match events.recv().await {
Ok(ev) => ev,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "事件泵落后,跳过部分事件");
continue;
}
Err(_) => break,
};
if ev.session_id.as_deref() != Some(&session_id) {
continue;
}
match ev.method.as_str() {
"Runtime.executionContextCreated" => {
let aux = &ev.params["auxData"];
let name = aux["name"].as_str().unwrap_or("");
let frame = aux["frameId"].as_str();
if name.is_empty() {
if let (Some(fid), Some(id)) = (frame, ev.params["executionContextId"].as_str())
{
frame_ctxs
.lock()
.await
.insert(fid.to_string(), id.to_string());
if fid == main_frame_id {
let _ = ctx_tx.send(Some(id.to_string()));
}
}
}
}
"Runtime.executionContextDestroyed" => {
let gone = ev.params["executionContextId"].as_str();
if let Some(gone) = gone {
frame_ctxs.lock().await.retain(|_, v| v != gone);
}
if ctx_tx.borrow().as_deref() == gone {
let _ = ctx_tx.send(None);
}
}
"Network.requestWillBeSent" => {
let decision = match interceptor.lock().await.as_ref() {
Some(s) => s.on_request_will_be_sent(&ev.params),
None => Decision::Ignore,
};
if let Decision::AutoResume(rid) = decision {
let conn2 = conn.clone();
let session2 = session_id.clone();
tokio::spawn(async move {
let _ = conn2
.send(
"Network.resumeInterceptedRequest",
json!({ "requestId": rid }),
Some(&session2),
)
.await;
});
}
}
_ => {}
}
}
tracing::debug!(%session_id, "事件泵结束");
}
#[derive(Clone)]
pub struct ListenStream {
tab: Tab,
}
impl ListenStream {
pub async fn next(&self) -> Result<DataPacket> {
self.tab.listen_wait().await
}
pub async fn next_timeout(&self, timeout: Duration) -> Result<Option<DataPacket>> {
match self.tab.listen_wait_timeout(timeout).await {
Ok(p) => Ok(Some(p)),
Err(Error::Timeout(_)) => Ok(None),
Err(e) => Err(e),
}
}
pub async fn drain_ready(&self) -> Vec<DataPacket> {
self.tab.core.listen_buf.lock().await.drain(..).collect()
}
}
async fn bg_drain_loop(core: Arc<TabCore>) {
loop {
if !*core.listen_active.lock().await {
break;
}
match core.evaluate(DRAIN_JS, true).await {
Ok(v) => {
let pkts = parse_packets(&v);
if !pkts.is_empty() {
let mut buf = core.listen_buf.lock().await;
for p in pkts {
buf.push_back(p);
}
}
}
Err(Error::Transport(_)) => break,
Err(_) => {}
}
tokio::time::sleep(Duration::from_millis(120)).await;
}
tracing::debug!("长监听后台抽取结束");
}
fn key_descriptor(key: &str) -> (&str, &str, i64, &str) {
match key {
"ArrowDown" | "Down" => ("ArrowDown", "ArrowDown", 40, ""),
"ArrowUp" | "Up" => ("ArrowUp", "ArrowUp", 38, ""),
"ArrowLeft" | "Left" => ("ArrowLeft", "ArrowLeft", 37, ""),
"ArrowRight" | "Right" => ("ArrowRight", "ArrowRight", 39, ""),
"Enter" => ("Enter", "Enter", 13, ""),
"Escape" | "Esc" => ("Escape", "Escape", 27, ""),
"Tab" => ("Tab", "Tab", 9, ""),
"Backspace" => ("Backspace", "Backspace", 8, ""),
"Delete" | "Del" => ("Delete", "Delete", 46, ""),
"Insert" => ("Insert", "Insert", 45, ""),
"PageDown" => ("PageDown", "PageDown", 34, ""),
"PageUp" => ("PageUp", "PageUp", 33, ""),
"Home" => ("Home", "Home", 36, ""),
"End" => ("End", "End", 35, ""),
" " | "Space" => (" ", "Space", 32, " "),
"Control" | "Ctrl" => ("Control", "ControlLeft", 17, ""),
"Shift" => ("Shift", "ShiftLeft", 16, ""),
"Alt" => ("Alt", "AltLeft", 18, ""),
"Meta" | "Command" | "Cmd" => ("Meta", "MetaLeft", 91, ""),
other => (
other,
"",
0,
if other.chars().count() == 1 {
other
} else {
""
},
),
}
}
async fn apply_context_overrides(conn: &Connection, bctx: &str, opts: &BrowserOptions) {
let fp = &opts.fingerprint;
macro_rules! best_effort {
($method:expr, $params:expr) => {
if let Err(e) = conn.send($method, $params, None).await {
tracing::warn!(method = $method, error = %e, "上下文覆盖失败(已忽略)");
}
};
}
if let Some(ua) = &fp.user_agent {
best_effort!(
"Browser.setUserAgentOverride",
json!({ "browserContextId": bctx, "userAgent": ua })
);
}
if let Some(locale) = &fp.locale {
best_effort!(
"Browser.setLocaleOverride",
json!({ "browserContextId": bctx, "locale": locale })
);
}
if let Some(tz) = &fp.timezone_id {
best_effort!(
"Browser.setTimezoneOverride",
json!({ "browserContextId": bctx, "timezoneId": tz })
);
}
let platform = fp.platform.clone().or_else(|| fp.os.map(platform_for_os));
if let Some(platform) = platform {
best_effort!(
"Browser.setPlatformOverride",
json!({ "browserContextId": bctx, "platform": platform })
);
}
if let Some(geo) = &fp.geolocation {
let mut g = json!({ "latitude": geo.latitude, "longitude": geo.longitude });
if let Some(acc) = geo.accuracy {
g["accuracy"] = json!(acc);
}
best_effort!(
"Browser.setGeolocationOverride",
json!({ "browserContextId": bctx, "geolocation": g })
);
}
if let Some((w, h)) = opts.window_size {
best_effort!(
"Browser.setDefaultViewport",
json!({ "browserContextId": bctx, "viewport": { "viewportSize": { "width": w, "height": h } } })
);
}
if opts.bypass_csp {
best_effort!(
"Browser.setBypassCSP",
json!({ "browserContextId": bctx, "bypassCSP": true })
);
}
if opts.ignore_https_errors {
best_effort!(
"Browser.setIgnoreHTTPSErrors",
json!({ "browserContextId": bctx, "ignoreHTTPSErrors": true })
);
}
if let Some(proxy) = &opts.proxy {
if let Some(params) = proxy_to_params(bctx, proxy) {
best_effort!("Browser.setContextProxy", params);
}
}
}
fn platform_for_os(os: OsType) -> String {
match os {
OsType::Windows => "Win32",
OsType::MacOS => "MacIntel",
OsType::Linux => "Linux x86_64",
}
.to_string()
}
fn proxy_to_params(bctx: &str, proxy: &crate::launcher::Proxy) -> Option<Value> {
let s = proxy.server.trim();
let (scheme, rest) = s.split_once("://").unwrap_or(("http", s));
let ty = match scheme.to_ascii_lowercase().as_str() {
"http" => "http",
"https" => "https",
"socks5" | "socks" | "socks5h" => "socks",
"socks4" => "socks4",
_ => "http",
};
let (host, port_str) = rest.rsplit_once(':')?;
let port: u32 = port_str.parse().ok()?;
let mut o = json!({
"browserContextId": bctx,
"type": ty,
"host": host,
"port": port,
"bypass": proxy.bypass,
});
if let Some(u) = &proxy.username {
o["username"] = json!(u);
}
if let Some(p) = &proxy.password {
o["password"] = json!(p);
}
Some(o)
}
async fn wait_attached(
events: &mut tokio::sync::broadcast::Receiver<Event>,
target_id: &str,
timeout: Duration,
) -> Result<String> {
let deadline = Instant::now() + timeout;
loop {
match timeout_at(deadline, events.recv()).await {
Ok(Ok(ev)) => {
if ev.method == "Browser.attachedToTarget"
&& ev.params["targetInfo"]["targetId"].as_str() == Some(target_id)
{
if let Some(sid) = ev.params["sessionId"].as_str() {
return Ok(sid.to_string());
}
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(_)) => return Err(Error::Transport("连接已关闭".into())),
Err(_) => return Err(Error::Timeout(timeout)),
}
}
}
async fn wait_frame_and_ctx(
events: &mut tokio::sync::broadcast::Receiver<Event>,
session_id: &str,
timeout: Duration,
) -> Result<(String, String)> {
let deadline = Instant::now() + timeout;
let mut main_frame: Option<String> = None;
let mut exec_ctx: Option<String> = None;
loop {
match timeout_at(deadline, events.recv()).await {
Ok(Ok(ev)) => {
if ev.session_id.as_deref() != Some(session_id) {
continue;
}
match ev.method.as_str() {
"Page.frameAttached" => {
if ev.params["parentFrameId"].as_str().is_none() {
main_frame = ev.params["frameId"].as_str().map(str::to_string);
}
}
"Runtime.executionContextCreated" => {
let aux = &ev.params["auxData"];
if aux["name"].as_str().unwrap_or("").is_empty() {
exec_ctx = ev.params["executionContextId"].as_str().map(str::to_string);
if main_frame.is_none() {
main_frame = aux["frameId"].as_str().map(str::to_string);
}
}
}
_ => {}
}
if let (Some(f), Some(c)) = (&main_frame, &exec_ctx) {
return Ok((f.clone(), c.clone()));
}
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue,
Ok(Err(_)) => return Err(Error::Transport("连接已关闭".into())),
Err(_) => return Err(Error::Timeout(timeout)),
}
}
}
pub(crate) fn extract_runtime_result(r: Value) -> Result<Value> {
if let Some(ex) = r.get("exceptionDetails") {
if !ex.is_null() {
let text = ex["text"]
.as_str()
.or_else(|| ex["value"].as_str())
.unwrap_or("JS 执行异常");
return Err(Error::Protocol(text.to_string()));
}
}
let result = r.get("result").cloned().unwrap_or(Value::Null);
if let Some(v) = result.get("value") {
return Ok(v.clone());
}
Ok(result)
}
pub(crate) fn single_query_expr(query: &Query) -> String {
match query {
Query::Css(sel) => format!("document.querySelector({})", js_string(sel)),
Query::Xpath(xp) => format!(
"document.evaluate({}, document, null, 9, null).singleNodeValue",
js_string(xp)
),
}
}
pub(crate) fn multi_query_expr(query: &Query) -> String {
match query {
Query::Css(sel) => format!("Array.from(document.querySelectorAll({}))", js_string(sel)),
Query::Xpath(xp) => format!(
"(() => {{ const r = document.evaluate({}, document, null, 7, null); const a = []; \
for (let i = 0; i < r.snapshotLength; i++) a.push(r.snapshotItem(i)); return a; }})()",
js_string(xp)
),
}
}
fn js_string(s: &str) -> String {
serde_json::to_string(s).unwrap_or_else(|_| "\"\"".to_string())
}
pub(crate) fn is_index(name: &str) -> bool {
!name.is_empty() && name.bytes().all(|b| b.is_ascii_digit())
}
#[cfg(test)]
mod tests {
use super::{ImageFormat, ShotOpts, UPLOAD_HOOK_JS};
use std::path::Path;
#[test]
fn image_format_from_path_and_mime() {
assert_eq!(ImageFormat::from_path(Path::new("a.png")), ImageFormat::Png);
assert_eq!(
ImageFormat::from_path(Path::new("a.jpg")),
ImageFormat::Jpeg
);
assert_eq!(
ImageFormat::from_path(Path::new("a.JPEG")),
ImageFormat::Jpeg
);
assert_eq!(
ImageFormat::from_path(Path::new("a.webp")),
ImageFormat::Png
);
assert_eq!(ImageFormat::from_path(Path::new("noext")), ImageFormat::Png);
assert_eq!(ImageFormat::Png.mime(), "image/png");
assert_eq!(ImageFormat::Jpeg.mime(), "image/jpeg");
}
#[test]
fn shot_opts_region_clip() {
let clip = ShotOpts::new()
.region((10.0, 20.0), (110.0, 220.0))
.region_clip()
.expect("region 应产出 clip");
assert_eq!(clip["x"], 10.0);
assert_eq!(clip["y"], 20.0);
assert_eq!(clip["width"], 100.0);
assert_eq!(clip["height"], 200.0);
assert!(ShotOpts::new().region_clip().is_none());
}
#[test]
fn upload_hook_keeps_wait_contract() {
assert!(
UPLOAD_HOOK_JS.contains("window.__drission_upload_el"),
"hook 必须写入 wait_upload 轮询的 window.__drission_upload_el"
);
assert!(
UPLOAD_HOOK_JS.contains("addEventListener('click'")
&& UPLOAD_HOOK_JS.contains(", true)"),
"hook 必须在捕获阶段监听 click"
);
assert!(UPLOAD_HOOK_JS.contains("preventDefault"));
assert!(UPLOAD_HOOK_JS.contains("'file'") && UPLOAD_HOOK_JS.contains("INPUT"));
}
}