use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Duration;
use serde_json::{Value, json};
use tokio::sync::Mutex;
use tokio::sync::broadcast::error::RecvError;
use tokio::time::{Instant, sleep};
use crate::Result;
use crate::cdp::core::{CdpCore, EventBuf};
use crate::protocol::Connection;
#[derive(Debug, Clone, Default)]
pub struct ConsoleData {
pub level: String,
pub text: String,
pub args: Vec<Value>,
pub url: String,
pub line: u32,
}
impl ConsoleData {
pub fn body(&self) -> Value {
serde_json::from_str(&self.text).unwrap_or(Value::Null)
}
}
#[derive(Debug, Clone, Default)]
pub struct ConsoleFilter {
pub level: Option<String>,
pub text_contains: Option<String>,
}
impl ConsoleFilter {
pub fn new() -> Self {
Self::default()
}
pub fn level(mut self, level: impl Into<String>) -> Self {
self.level = Some(level.into());
self
}
pub fn text(mut self, sub: impl Into<String>) -> Self {
self.text_contains = Some(sub.into());
self
}
fn matches(&self, d: &ConsoleData) -> bool {
if let Some(l) = &self.level {
if &d.level != l {
return false;
}
}
if let Some(s) = &self.text_contains {
if !d.text.contains(s) {
return false;
}
}
true
}
}
const BUFFER_CAP: usize = 500;
pub struct ChromiumConsole {
core: Arc<CdpCore>,
}
impl ChromiumConsole {
pub(crate) fn new(core: Arc<CdpCore>) -> Self {
Self { core }
}
pub async fn start(&self) -> Result<()> {
self.start_with(ConsoleFilter::default()).await
}
pub async fn start_with(&self, filter: ConsoleFilter) -> Result<()> {
self.stop().await?;
self.core.send("Runtime.enable", json!({})).await?;
let buf = self.core.console.lock().await.buf.clone();
let task = tokio::spawn(console_pump(
self.core.conn.clone(),
self.core.session_id.clone(),
filter,
buf,
));
let mut g = self.core.console.lock().await;
g.running = true;
g.abort = Some(task.abort_handle());
Ok(())
}
pub async fn listening(&self) -> bool {
self.core.console.lock().await.running
}
pub async fn wait(&self, timeout: Option<Duration>) -> Result<Option<ConsoleData>> {
let buf = self.core.console.lock().await.buf.clone();
let deadline = Instant::now() + timeout.unwrap_or_else(|| self.core.timeout());
loop {
if let Some(d) = buf.lock().await.pop_front() {
return Ok(Some(d));
}
if Instant::now() >= deadline {
return Ok(None);
}
sleep(Duration::from_millis(50)).await;
}
}
pub async fn messages(&self) -> Vec<ConsoleData> {
let buf = self.core.console.lock().await.buf.clone();
let mut g = buf.lock().await;
g.drain(..).collect()
}
pub async fn clear(&self) {
let buf = self.core.console.lock().await.buf.clone();
buf.lock().await.clear();
}
pub async fn stop(&self) -> Result<()> {
let (abort, buf) = {
let mut g = self.core.console.lock().await;
g.running = false;
(g.abort.take(), g.buf.clone())
};
buf.lock().await.clear();
if let Some(a) = abort {
a.abort();
}
Ok(())
}
}
async fn console_pump(
conn: Connection,
session_id: String,
filter: ConsoleFilter,
buf: Arc<Mutex<VecDeque<ConsoleData>>>,
) {
let mut events = conn.subscribe();
loop {
let ev = match events.recv().await {
Ok(ev) => ev,
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
};
if ev.session_id.as_deref() != Some(session_id.as_str())
|| ev.method != "Runtime.consoleAPICalled"
{
continue;
}
let d = build_console(&ev.params);
if filter.matches(&d) {
let mut g = buf.lock().await;
if g.len() >= BUFFER_CAP {
g.pop_front();
}
g.push_back(d);
}
}
}
fn build_console(p: &Value) -> ConsoleData {
let ty = p["type"].as_str().unwrap_or("log");
let level = match ty {
"warning" => "warning",
other => other,
}
.to_string();
let args = p["args"].as_array().cloned().unwrap_or_default();
let parts: Vec<String> = args
.iter()
.map(|a| {
if let Some(v) = a.get("value") {
match v {
Value::String(s) => s.clone(),
other => other.to_string(),
}
} else if let Some(desc) = a["description"].as_str() {
desc.to_string()
} else {
a["type"].as_str().unwrap_or("").to_string()
}
})
.collect();
let frame = &p["stackTrace"]["callFrames"][0];
ConsoleData {
level,
text: parts.join(" "),
args,
url: frame["url"].as_str().unwrap_or_default().to_string(),
line: frame["lineNumber"].as_u64().unwrap_or(0) as u32,
}
}
pub(crate) type ConsoleShared = EventBuf<ConsoleData>;