use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use serde_json::{Value, json};
use tokio::sync::Mutex;
use tokio::time::Instant;
use crate::browser::tab::{Tab, extract_runtime_result};
use crate::protocol::{Connection, Event};
use crate::{Error, Result};
const MAX_BUFFERED: usize = 1000;
const CONSOLE_JOIN_FN: &str = r#"function(){
function s(x){
if (typeof x === 'string') return x;
if (x === null) return 'null';
var t = typeof x;
if (t === 'undefined') return 'undefined';
if (t === 'number' || t === 'boolean' || t === 'bigint') return String(x);
if (t === 'symbol') { try { return x.toString(); } catch(e){ return 'Symbol()'; } }
if (t === 'function') { return 'function ' + (x.name || '') + '()'; }
try { if (x instanceof Error) return (x.stack || (x.name + ': ' + x.message)); } catch(e){}
try { if (typeof Node !== 'undefined' && x instanceof Node) return (x.outerHTML || x.nodeName || '[node]'); } catch(e){}
try { return JSON.stringify(x); } catch(e){}
try { return String(x); } catch(e){ return '[object]'; }
}
return Array.prototype.slice.call(arguments).map(s).join(' ');
}"#;
#[derive(Debug, Clone)]
pub struct ConsoleData {
pub source: String,
pub level: String,
pub text: String,
pub args: Vec<Value>,
pub url: String,
pub line: i64,
pub column: i64,
}
impl ConsoleData {
pub fn body(&self) -> Option<Value> {
serde_json::from_str(&self.text).ok()
}
}
#[derive(Debug, Clone, Default)]
pub struct ConsoleFilter {
pub levels: Vec<String>,
pub contains: Vec<String>,
}
impl ConsoleFilter {
pub fn new() -> Self {
Self::default()
}
pub fn level(mut self, level: &str) -> Self {
self.levels.push(level.to_string());
self
}
pub fn contains(mut self, needle: &str) -> Self {
self.contains.push(needle.to_string());
self
}
fn matches(&self, d: &ConsoleData) -> bool {
let level_ok =
self.levels.is_empty() || self.levels.iter().any(|l| l.eq_ignore_ascii_case(&d.level));
let text_ok = self.contains.is_empty() || self.contains.iter().any(|s| d.text.contains(s));
level_ok && text_ok
}
}
pub(crate) struct ConsoleShared {
pub buf: Mutex<VecDeque<ConsoleData>>,
pub active: AtomicBool,
}
impl ConsoleShared {
pub(crate) fn new() -> Self {
Self {
buf: Mutex::new(VecDeque::new()),
active: AtomicBool::new(false),
}
}
}
pub struct Console {
tab: Tab,
}
impl Console {
pub(crate) fn new(tab: Tab) -> Self {
Self { tab }
}
pub async fn start(&self) -> Result<()> {
self.start_with(ConsoleFilter::default()).await
}
pub async fn start_with(&self, filter: ConsoleFilter) -> Result<()> {
let shared = self.tab.core.console.clone();
if shared.active.swap(true, Ordering::SeqCst) {
return Ok(());
}
shared.buf.lock().await.clear();
let events = self.tab.core.conn.subscribe();
let conn = self.tab.core.conn.clone();
let session = self.tab.core.session_id.clone();
let task = tokio::spawn(console_loop(events, conn, session, shared, filter));
*self.tab.core.console_task.lock().await = Some(task);
Ok(())
}
pub fn listening(&self) -> bool {
self.tab.core.console.active.load(Ordering::SeqCst)
}
pub async fn wait(&self, timeout: Option<Duration>) -> Result<Option<ConsoleData>> {
let shared = &self.tab.core.console;
if !shared.active.load(Ordering::SeqCst) {
return Err(Error::Other("尚未调用 console.start()".into()));
}
let deadline = timeout.map(|d| Instant::now() + d);
loop {
if let Some(m) = shared.buf.lock().await.pop_front() {
return Ok(Some(m));
}
if !shared.active.load(Ordering::SeqCst) {
return Ok(None); }
if let Some(dl) = deadline
&& Instant::now() >= dl
{
return Ok(None);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
pub async fn messages(&self) -> Vec<ConsoleData> {
self.tab.core.console.buf.lock().await.drain(..).collect()
}
pub async fn clear(&self) {
self.tab.core.console.buf.lock().await.clear();
}
pub fn steps(&self) -> ConsoleSteps {
ConsoleSteps {
tab: self.tab.clone(),
}
}
pub async fn stop(&self) -> Result<()> {
self.tab.core.console.active.store(false, Ordering::SeqCst);
if let Some(h) = self.tab.core.console_task.lock().await.take() {
h.abort();
}
self.tab.core.console.buf.lock().await.clear();
Ok(())
}
}
pub struct ConsoleSteps {
tab: Tab,
}
impl ConsoleSteps {
pub async fn next(&self, timeout: Option<Duration>) -> Result<Option<ConsoleData>> {
Console::new(self.tab.clone()).wait(timeout).await
}
}
async fn console_loop(
mut events: tokio::sync::broadcast::Receiver<Event>,
conn: Connection,
session: String,
shared: Arc<ConsoleShared>,
filter: ConsoleFilter,
) {
loop {
let ev = match events.recv().await {
Ok(ev) => ev,
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(skipped = n, "控制台监听落后,跳过部分事件");
continue;
}
Err(_) => break,
};
if !shared.active.load(Ordering::SeqCst) {
break;
}
if ev.session_id.as_deref() != Some(&session) || ev.method != "Runtime.console" {
continue;
}
let data = build_console_data(&conn, &session, &ev.params).await;
if !filter.matches(&data) {
continue;
}
let mut buf = shared.buf.lock().await;
if buf.len() >= MAX_BUFFERED {
buf.pop_front();
}
buf.push_back(data);
}
tracing::debug!(%session, "控制台监听任务结束");
}
async fn build_console_data(conn: &Connection, session: &str, params: &Value) -> ConsoleData {
let level = params
.get("type")
.and_then(Value::as_str)
.unwrap_or("log")
.to_string();
let loc = params.get("location");
let url = loc
.and_then(|l| l.get("url"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let line = loc
.and_then(|l| l.get("lineNumber"))
.and_then(Value::as_i64)
.unwrap_or(0);
let column = loc
.and_then(|l| l.get("columnNumber"))
.and_then(Value::as_i64)
.unwrap_or(0);
let args: Vec<Value> = params
.get("args")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
let arg_values: Vec<Value> = args
.iter()
.map(|a| a.get("value").cloned().unwrap_or(Value::Null))
.collect();
let need_resolve = args.iter().any(|a| a.get("objectId").is_some());
let text = if need_resolve {
resolve_text(conn, session, params)
.await
.unwrap_or_else(|| fallback_text(&args))
} else {
args.iter()
.map(primitive_text)
.collect::<Vec<_>>()
.join(" ")
};
ConsoleData {
source: "console-api".to_string(),
level,
text,
args: arg_values,
url,
line,
column,
}
}
fn primitive_text(arg: &Value) -> String {
if let Some(u) = arg.get("unserializableValue").and_then(Value::as_str) {
return u.to_string();
}
match arg.get("value") {
Some(Value::String(s)) => s.clone(),
Some(Value::Null) => "null".to_string(),
Some(v) => v.to_string(),
None => "undefined".to_string(),
}
}
fn fallback_text(args: &[Value]) -> String {
args.iter()
.map(|a| {
if a.get("objectId").is_some() {
let label = a
.get("subtype")
.and_then(Value::as_str)
.or_else(|| a.get("type").and_then(Value::as_str))
.unwrap_or("object");
format!("[{label}]")
} else {
primitive_text(a)
}
})
.collect::<Vec<_>>()
.join(" ")
}
async fn resolve_text(conn: &Connection, session: &str, params: &Value) -> Option<String> {
let ctx = params.get("executionContextId").and_then(Value::as_str)?;
let args: Vec<Value> = params
.get("args")
.and_then(Value::as_array)?
.iter()
.map(clean_arg)
.collect();
let r = conn
.send(
"Runtime.callFunction",
json!({
"executionContextId": ctx,
"functionDeclaration": CONSOLE_JOIN_FN,
"returnByValue": true,
"args": args,
}),
Some(session),
)
.await
.ok()?;
extract_runtime_result(r).ok()?.as_str().map(str::to_string)
}
fn clean_arg(arg: &Value) -> Value {
if let Some(o) = arg.get("objectId") {
json!({ "objectId": o })
} else if let Some(u) = arg.get("unserializableValue") {
json!({ "unserializableValue": u })
} else if let Some(v) = arg.get("value") {
json!({ "value": v })
} else {
json!({}) }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn primitive_text_variants() {
assert_eq!(
primitive_text(&json!({ "value": "hello", "type": "string" })),
"hello"
);
assert_eq!(
primitive_text(&json!({ "value": 42, "type": "number" })),
"42"
);
assert_eq!(primitive_text(&json!({ "value": true })), "true");
assert_eq!(
primitive_text(&json!({ "value": null, "subtype": "null" })),
"null"
);
assert_eq!(
primitive_text(&json!({ "unserializableValue": "NaN" })),
"NaN"
);
assert_eq!(primitive_text(&json!({})), "undefined");
}
#[test]
fn fallback_uses_subtype_label() {
let args = vec![
json!({ "value": "x", "type": "string" }),
json!({ "objectId": "id-1", "type": "object", "subtype": "array" }),
json!({ "objectId": "id-2", "type": "object" }),
];
assert_eq!(fallback_text(&args), "x [array] [object]");
}
#[test]
fn clean_arg_keeps_only_call_fields() {
assert_eq!(
clean_arg(&json!({ "objectId": "id-9", "type": "object", "subtype": "array" })),
json!({ "objectId": "id-9" })
);
assert_eq!(
clean_arg(&json!({ "value": "s", "type": "string" })),
json!({ "value": "s" })
);
assert_eq!(
clean_arg(&json!({ "unserializableValue": "Infinity" })),
json!({ "unserializableValue": "Infinity" })
);
assert_eq!(clean_arg(&json!({ "type": "undefined" })), json!({}));
}
#[test]
fn filter_matches_level_and_text() {
let d = ConsoleData {
source: "console-api".into(),
level: "error".into(),
text: "boom at line".into(),
args: vec![],
url: String::new(),
line: 0,
column: 0,
};
assert!(ConsoleFilter::default().matches(&d));
assert!(ConsoleFilter::new().level("ERROR").matches(&d));
assert!(!ConsoleFilter::new().level("log").matches(&d));
assert!(ConsoleFilter::new().contains("boom").matches(&d));
assert!(!ConsoleFilter::new().contains("nope").matches(&d));
assert!(
ConsoleFilter::new()
.level("error")
.contains("boom")
.matches(&d)
);
}
#[test]
fn console_data_body_parses_json() {
let d = ConsoleData {
source: "console-api".into(),
level: "log".into(),
text: r#"{"a":1,"b":[2,3]}"#.into(),
args: vec![],
url: String::new(),
line: 0,
column: 0,
};
let body = d.body().unwrap();
assert_eq!(body["a"], 1);
assert_eq!(body["b"][1], 3);
let plain = ConsoleData {
text: "not json".into(),
..d
};
assert!(plain.body().is_none());
}
}