use crate::backend::FrameInfo;
use crate::console_message::ConsoleMessage;
use crate::dialog::Dialog;
use crate::download::Download;
use crate::file_chooser::FileChooser;
use crate::network::{Request, Response, WebSocket};
use crate::web_error::WebError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub enum PageEvent {
Console(ConsoleMessage),
Request(Request),
Response(Response),
RequestFinished(Request),
RequestFailed(Request),
WebSocket(WebSocket),
Dialog(Dialog),
FileChooser(FileChooser),
FrameAttached(FrameInfo),
FrameDetached { frame_id: String },
FrameNavigated(FrameInfo),
Load,
DomContentLoaded,
Close,
PageError(WebError),
Download(Download),
}
impl PageEvent {
#[must_use]
pub fn to_snapshot(&self) -> serde_json::Value {
match self {
PageEvent::Console(msg) => {
let loc = msg.location();
serde_json::json!({
"type": msg.type_str(),
"text": msg.text(),
"location": {
"url": loc.url,
"lineNumber": loc.line_number,
"columnNumber": loc.column_number,
},
"timestamp": msg.timestamp(),
"argsCount": msg.args().len(),
})
},
PageEvent::Response(r) => serde_json::json!({
"url": r.url(),
"status": r.status(),
"statusText": r.status_text(),
"ok": r.ok(),
"fromServiceWorker": r.is_from_service_worker(),
"headers": r.headers(),
}),
PageEvent::Request(r) | PageEvent::RequestFinished(r) | PageEvent::RequestFailed(r) => serde_json::json!({
"url": r.url(),
"method": r.method(),
"resourceType": r.resource_type(),
"isNavigationRequest": r.is_navigation_request(),
"headers": r.headers(),
"postData": r.post_data(),
}),
PageEvent::WebSocket(ws) => serde_json::json!({ "url": ws.url(), "isClosed": ws.is_closed() }),
PageEvent::Dialog(d) => serde_json::json!({
"type": d.dialog_type().as_str(),
"message": d.message(),
"defaultValue": d.default_value(),
}),
PageEvent::FileChooser(fc) => serde_json::json!({ "isMultiple": fc.is_multiple() }),
PageEvent::FrameAttached(f) | PageEvent::FrameNavigated(f) => serde_json::to_value(f).unwrap_or_default(),
PageEvent::FrameDetached { frame_id } => serde_json::json!({ "frameId": frame_id }),
PageEvent::Download(d) => serde_json::json!({
"url": d.url(),
"suggestedFilename": d.suggested_filename(),
}),
PageEvent::Load => serde_json::json!({ "type": "load" }),
PageEvent::DomContentLoaded => serde_json::json!({ "type": "domcontentloaded" }),
PageEvent::Close => serde_json::json!({ "type": "close" }),
PageEvent::PageError(err) => {
let d = err.error();
serde_json::json!({
"name": d.name,
"message": d.message,
"stack": d.stack,
})
},
}
}
}
pub type ExposedFnFuture = Pin<Box<dyn Future<Output = serde_json::Value> + Send>>;
pub type ExposedFn = Arc<dyn Fn(Vec<serde_json::Value>) -> ExposedFnFuture + Send + Sync>;
#[derive(Debug, Clone, Default)]
pub struct BindingSource {
pub context: String,
pub page: String,
pub frame: String,
}
pub type ExposedBinding = Arc<dyn Fn(BindingSource, Vec<serde_json::Value>) -> ExposedFnFuture + Send + Sync>;
pub type EventCallback = Arc<dyn Fn(PageEvent) + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ListenerId(pub u64);
pub async fn drain_until<F>(
rx: &mut tokio::sync::broadcast::Receiver<PageEvent>,
predicate: F,
timeout_ms: u64,
) -> crate::error::Result<PageEvent>
where
F: Fn(&PageEvent) -> bool,
{
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(crate::error::FerriError::timeout("waiting for event", timeout_ms));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) if predicate(&event) => return Ok(event),
Ok(Ok(_)) => {},
Ok(Err(_)) => {
return Err(crate::error::FerriError::target_closed(Some(
"event channel closed".into(),
)));
},
Err(_) => return Err(crate::error::FerriError::timeout("waiting for event", timeout_ms)),
}
}
}
#[must_use]
pub fn event_name_matches(name: &str, event: &PageEvent) -> bool {
matches!(
(name, event),
("console", PageEvent::Console(_))
| ("request", PageEvent::Request(_))
| ("response", PageEvent::Response(_))
| ("requestfinished", PageEvent::RequestFinished(_))
| ("requestfailed", PageEvent::RequestFailed(_))
| ("websocket", PageEvent::WebSocket(_))
| ("dialog", PageEvent::Dialog(_))
| ("filechooser", PageEvent::FileChooser(_))
| ("frameattached", PageEvent::FrameAttached(_))
| ("framedetached", PageEvent::FrameDetached { .. })
| ("framenavigated", PageEvent::FrameNavigated(_))
| ("load", PageEvent::Load)
| ("domcontentloaded", PageEvent::DomContentLoaded)
| ("close", PageEvent::Close)
| ("pageerror", PageEvent::PageError(_))
| ("download", PageEvent::Download(_))
)
}
pub async fn recv_tolerant<T: Clone>(rx: &mut broadcast::Receiver<T>) -> Option<T> {
loop {
match rx.recv().await {
Ok(v) => return Some(v),
Err(broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(dropped = n, "broadcast listener lagged; dropped {n} event(s)");
},
Err(broadcast::error::RecvError::Closed) => return None,
}
}
}
#[derive(Clone)]
pub struct EventEmitter {
tx: broadcast::Sender<PageEvent>,
listeners: Arc<std::sync::Mutex<rustc_hash::FxHashMap<u64, ListenerEntry>>>,
next_listener_id: Arc<std::sync::atomic::AtomicU64>,
runtime_handle: Arc<std::sync::Mutex<Option<tokio::runtime::Handle>>>,
}
struct ListenerEntry {
abort: tokio::task::AbortHandle,
event_name: String,
}
impl EventEmitter {
#[must_use]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(512);
let handle = tokio::runtime::Handle::try_current().ok();
Self {
tx,
listeners: Arc::new(std::sync::Mutex::new(rustc_hash::FxHashMap::default())),
next_listener_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
runtime_handle: Arc::new(std::sync::Mutex::new(handle)),
}
}
#[must_use]
pub fn has_listener(&self, event_name: &str) -> bool {
let Ok(listeners) = self.listeners.lock() else {
return false;
};
listeners.values().any(|entry| entry.event_name == event_name)
}
fn spawn_listener(&self, future: impl std::future::Future<Output = ()> + Send + 'static) -> tokio::task::AbortHandle {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
return handle.spawn(future).abort_handle();
}
if let Ok(guard) = self.runtime_handle.lock() {
if let Some(handle) = guard.as_ref() {
return handle.spawn(future).abort_handle();
}
}
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let Ok(rt) = tokio::runtime::Builder::new_current_thread().enable_all().build() else {
return;
};
let handle = rt.spawn(future);
let _ = tx.send(handle.abort_handle());
rt.block_on(handle).ok();
});
rx.recv().unwrap_or_else(|_| {
tokio::runtime::Handle::current().spawn(async {}).abort_handle()
})
}
pub fn emit(&self, event: PageEvent) {
let _ = self.tx.send(event);
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.tx.receiver_count()
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<PageEvent> {
self.tx.subscribe()
}
pub async fn wait_for<F>(&self, predicate: F, timeout_ms: u64) -> crate::error::Result<PageEvent>
where
F: Fn(&PageEvent) -> bool,
{
let mut rx = self.tx.subscribe();
drain_until(&mut rx, predicate, timeout_ms).await
}
pub async fn wait_for_event(&self, event_name: &str, timeout_ms: u64) -> crate::error::Result<PageEvent> {
let name = event_name.to_string();
self.wait_for(move |e| event_name_matches(&name, e), timeout_ms).await
}
pub fn on(&self, event_name: &str, callback: EventCallback) -> ListenerId {
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let name = event_name.to_string();
let filter_name = name.clone();
let abort_handle = self.spawn_listener(async move {
while let Some(event) = crate::events::recv_tolerant(&mut rx).await {
if event_name_matches(&filter_name, &event) {
callback(event);
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(
id,
ListenerEntry {
abort: abort_handle,
event_name: name,
},
);
}
ListenerId(id)
}
pub fn once(&self, event_name: &str, callback: EventCallback) -> ListenerId {
let listeners = self.listeners.clone();
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let name = event_name.to_string();
let filter_name = name.clone();
let abort_handle = self.spawn_listener(async move {
while let Some(event) = crate::events::recv_tolerant(&mut rx).await {
if event_name_matches(&filter_name, &event) {
callback(event);
if let Ok(mut guard) = listeners.lock() {
guard.remove(&id);
}
break;
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(
id,
ListenerEntry {
abort: abort_handle,
event_name: name,
},
);
}
ListenerId(id)
}
pub fn off(&self, id: ListenerId) {
if let Ok(mut guard) = self.listeners.lock() {
if let Some(entry) = guard.remove(&id.0) {
entry.abort.abort();
}
}
}
pub fn remove_all_listeners(&self) {
if let Ok(mut listeners) = self.listeners.lock() {
for (_, entry) in listeners.drain() {
entry.abort.abort();
}
}
}
pub fn remove_listeners_named(&self, event_name: &str) {
if let Ok(mut listeners) = self.listeners.lock() {
listeners.retain(|_, entry| {
if entry.event_name == event_name {
entry.abort.abort();
false
} else {
true
}
});
}
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum ContextEvent {
WebError(crate::web_error::WebError),
Download(Download),
FrameAttached {
page: Arc<crate::page::Page>,
frame_id: String,
},
FrameDetached {
page: Arc<crate::page::Page>,
frame_id: String,
},
FrameNavigated {
page: Arc<crate::page::Page>,
frame_id: String,
},
PageClose(Arc<crate::page::Page>),
PageLoad(Arc<crate::page::Page>),
}
pub type ContextEventCallback = Arc<dyn Fn(ContextEvent) + Send + Sync>;
fn context_event_name_matches(name: &str, event: &ContextEvent) -> bool {
matches!(
(name, event),
("weberror", ContextEvent::WebError(_))
| ("download", ContextEvent::Download(_))
| ("frameattached", ContextEvent::FrameAttached { .. })
| ("framedetached", ContextEvent::FrameDetached { .. })
| ("framenavigated", ContextEvent::FrameNavigated { .. })
| ("pageclose", ContextEvent::PageClose(_))
| ("pageload", ContextEvent::PageLoad(_))
)
}
#[derive(Clone)]
pub struct ContextEventEmitter {
tx: broadcast::Sender<ContextEvent>,
listeners: Arc<std::sync::Mutex<rustc_hash::FxHashMap<u64, ContextListenerEntry>>>,
next_listener_id: Arc<std::sync::atomic::AtomicU64>,
runtime_handle: Arc<std::sync::Mutex<Option<tokio::runtime::Handle>>>,
}
struct ContextListenerEntry {
abort: tokio::task::AbortHandle,
}
impl ContextEventEmitter {
#[must_use]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(512);
let handle = tokio::runtime::Handle::try_current().ok();
Self {
tx,
listeners: Arc::new(std::sync::Mutex::new(rustc_hash::FxHashMap::default())),
next_listener_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
runtime_handle: Arc::new(std::sync::Mutex::new(handle)),
}
}
fn spawn_listener(&self, future: impl std::future::Future<Output = ()> + Send + 'static) -> tokio::task::AbortHandle {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
return handle.spawn(future).abort_handle();
}
if let Ok(guard) = self.runtime_handle.lock() {
if let Some(handle) = guard.as_ref() {
return handle.spawn(future).abort_handle();
}
}
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let Ok(rt) = tokio::runtime::Builder::new_current_thread().enable_all().build() else {
return;
};
let handle = rt.spawn(future);
let _ = tx.send(handle.abort_handle());
rt.block_on(handle).ok();
});
rx.recv()
.unwrap_or_else(|_| tokio::runtime::Handle::current().spawn(async {}).abort_handle())
}
pub fn emit(&self, event: ContextEvent) {
let _ = self.tx.send(event);
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<ContextEvent> {
self.tx.subscribe()
}
pub async fn wait_for_event(&self, event_name: &str, timeout_ms: u64) -> crate::error::Result<ContextEvent> {
let mut rx = self.tx.subscribe();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
let name = event_name.to_string();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(crate::error::FerriError::timeout(
"waiting for context event",
timeout_ms,
));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) if context_event_name_matches(&name, &event) => return Ok(event),
Ok(Ok(_)) => {},
Ok(Err(_)) => {
return Err(crate::error::FerriError::target_closed(Some(
"context event channel closed".into(),
)));
},
Err(_) => {
return Err(crate::error::FerriError::timeout(
"waiting for context event",
timeout_ms,
));
},
}
}
}
pub fn on(&self, event_name: &str, callback: ContextEventCallback) -> ListenerId {
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let filter_name = event_name.to_string();
let abort_handle = self.spawn_listener(async move {
while let Some(event) = crate::events::recv_tolerant(&mut rx).await {
if context_event_name_matches(&filter_name, &event) {
callback(event);
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(id, ContextListenerEntry { abort: abort_handle });
}
ListenerId(id)
}
pub fn once(&self, event_name: &str, callback: ContextEventCallback) -> ListenerId {
let listeners = self.listeners.clone();
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let filter_name = event_name.to_string();
let abort_handle = self.spawn_listener(async move {
while let Some(event) = crate::events::recv_tolerant(&mut rx).await {
if context_event_name_matches(&filter_name, &event) {
callback(event);
if let Ok(mut guard) = listeners.lock() {
guard.remove(&id);
}
break;
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(id, ContextListenerEntry { abort: abort_handle });
}
ListenerId(id)
}
pub fn off(&self, id: ListenerId) {
if let Ok(mut guard) = self.listeners.lock() {
if let Some(entry) = guard.remove(&id.0) {
entry.abort.abort();
}
}
}
pub fn remove_all_listeners(&self) {
if let Ok(mut listeners) = self.listeners.lock() {
for (_, entry) in listeners.drain() {
entry.abort.abort();
}
}
}
}
impl Default for ContextEventEmitter {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub enum BrowserEvent {
Context(crate::context::ContextRef),
}
pub type BrowserEventCallback = Arc<dyn Fn(BrowserEvent) + Send + Sync>;
fn browser_event_name_matches(name: &str, event: &BrowserEvent) -> bool {
matches!((name, event), ("context", BrowserEvent::Context(_)))
}
#[derive(Clone)]
pub struct BrowserEventEmitter {
tx: broadcast::Sender<BrowserEvent>,
listeners: Arc<std::sync::Mutex<rustc_hash::FxHashMap<u64, ContextListenerEntry>>>,
next_listener_id: Arc<std::sync::atomic::AtomicU64>,
runtime_handle: Arc<std::sync::Mutex<Option<tokio::runtime::Handle>>>,
}
impl BrowserEventEmitter {
#[must_use]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(256);
let handle = tokio::runtime::Handle::try_current().ok();
Self {
tx,
listeners: Arc::new(std::sync::Mutex::new(rustc_hash::FxHashMap::default())),
next_listener_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
runtime_handle: Arc::new(std::sync::Mutex::new(handle)),
}
}
fn spawn_listener(&self, future: impl std::future::Future<Output = ()> + Send + 'static) -> tokio::task::AbortHandle {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
return handle.spawn(future).abort_handle();
}
if let Ok(guard) = self.runtime_handle.lock() {
if let Some(handle) = guard.as_ref() {
return handle.spawn(future).abort_handle();
}
}
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let Ok(rt) = tokio::runtime::Builder::new_current_thread().enable_all().build() else {
return;
};
let handle = rt.spawn(future);
let _ = tx.send(handle.abort_handle());
rt.block_on(handle).ok();
});
rx.recv()
.unwrap_or_else(|_| tokio::runtime::Handle::current().spawn(async {}).abort_handle())
}
pub fn emit(&self, event: BrowserEvent) {
let _ = self.tx.send(event);
}
pub async fn wait_for_event(&self, event_name: &str, timeout_ms: u64) -> crate::error::Result<BrowserEvent> {
let mut rx = self.tx.subscribe();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
let name = event_name.to_string();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(crate::error::FerriError::timeout(
"waiting for browser event",
timeout_ms,
));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) if browser_event_name_matches(&name, &event) => return Ok(event),
Ok(Ok(_)) => {},
Ok(Err(_)) => {
return Err(crate::error::FerriError::target_closed(Some(
"browser event channel closed".into(),
)));
},
Err(_) => {
return Err(crate::error::FerriError::timeout(
"waiting for browser event",
timeout_ms,
));
},
}
}
}
pub fn on(&self, event_name: &str, callback: BrowserEventCallback) -> ListenerId {
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let filter_name = event_name.to_string();
let abort_handle = self.spawn_listener(async move {
while let Some(event) = crate::events::recv_tolerant(&mut rx).await {
if browser_event_name_matches(&filter_name, &event) {
callback(event);
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(id, ContextListenerEntry { abort: abort_handle });
}
ListenerId(id)
}
pub fn once(&self, event_name: &str, callback: BrowserEventCallback) -> ListenerId {
let listeners = self.listeners.clone();
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let filter_name = event_name.to_string();
let abort_handle = self.spawn_listener(async move {
while let Some(event) = crate::events::recv_tolerant(&mut rx).await {
if browser_event_name_matches(&filter_name, &event) {
callback(event);
if let Ok(mut guard) = listeners.lock() {
guard.remove(&id);
}
break;
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(id, ContextListenerEntry { abort: abort_handle });
}
ListenerId(id)
}
pub fn off(&self, id: ListenerId) {
if let Ok(mut guard) = self.listeners.lock() {
if let Some(entry) = guard.remove(&id.0) {
entry.abort.abort();
}
}
}
}
impl Default for BrowserEventEmitter {
fn default() -> Self {
Self::new()
}
}