use crate::backend::FrameInfo;
use crate::console_message::ConsoleMessage;
use crate::dialog::Dialog;
use crate::download::Download;
use crate::file_chooser::FileChooser;
use crate::network::{Request, Response, WebSocket};
use crate::web_error::WebError;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub enum PageEvent {
Console(ConsoleMessage),
Request(Request),
Response(Response),
RequestFinished(Request),
RequestFailed(Request),
WebSocket(WebSocket),
Dialog(Dialog),
FileChooser(FileChooser),
FrameAttached(FrameInfo),
FrameDetached { frame_id: String },
FrameNavigated(FrameInfo),
Load,
DomContentLoaded,
Close,
PageError(WebError),
Download(Download),
}
pub type ExposedFnFuture = Pin<Box<dyn Future<Output = serde_json::Value> + Send>>;
pub type ExposedFn = Arc<dyn Fn(Vec<serde_json::Value>) -> ExposedFnFuture + Send + Sync>;
pub type EventCallback = Arc<dyn Fn(PageEvent) + Send + Sync>;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ListenerId(pub u64);
pub async fn drain_until<F>(
rx: &mut tokio::sync::broadcast::Receiver<PageEvent>,
predicate: F,
timeout_ms: u64,
) -> crate::error::Result<PageEvent>
where
F: Fn(&PageEvent) -> bool,
{
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(crate::error::FerriError::timeout("waiting for event", timeout_ms));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) if predicate(&event) => return Ok(event),
Ok(Ok(_)) => {},
Ok(Err(_)) => {
return Err(crate::error::FerriError::target_closed(Some(
"event channel closed".into(),
)));
},
Err(_) => return Err(crate::error::FerriError::timeout("waiting for event", timeout_ms)),
}
}
}
#[must_use]
pub fn event_name_matches(name: &str, event: &PageEvent) -> bool {
matches!(
(name, event),
("console", PageEvent::Console(_))
| ("request", PageEvent::Request(_))
| ("response", PageEvent::Response(_))
| ("requestfinished", PageEvent::RequestFinished(_))
| ("requestfailed", PageEvent::RequestFailed(_))
| ("websocket", PageEvent::WebSocket(_))
| ("dialog", PageEvent::Dialog(_))
| ("filechooser", PageEvent::FileChooser(_))
| ("frameattached", PageEvent::FrameAttached(_))
| ("framedetached", PageEvent::FrameDetached { .. })
| ("framenavigated", PageEvent::FrameNavigated(_))
| ("load", PageEvent::Load)
| ("domcontentloaded", PageEvent::DomContentLoaded)
| ("close", PageEvent::Close)
| ("pageerror", PageEvent::PageError(_))
| ("download", PageEvent::Download(_))
)
}
#[derive(Clone)]
pub struct EventEmitter {
tx: broadcast::Sender<PageEvent>,
listeners: Arc<std::sync::Mutex<rustc_hash::FxHashMap<u64, ListenerEntry>>>,
next_listener_id: Arc<std::sync::atomic::AtomicU64>,
runtime_handle: Arc<std::sync::Mutex<Option<tokio::runtime::Handle>>>,
}
struct ListenerEntry {
abort: tokio::task::AbortHandle,
event_name: String,
}
impl EventEmitter {
#[must_use]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(512);
let handle = tokio::runtime::Handle::try_current().ok();
Self {
tx,
listeners: Arc::new(std::sync::Mutex::new(rustc_hash::FxHashMap::default())),
next_listener_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
runtime_handle: Arc::new(std::sync::Mutex::new(handle)),
}
}
#[must_use]
pub fn has_listener(&self, event_name: &str) -> bool {
let Ok(listeners) = self.listeners.lock() else {
return false;
};
listeners.values().any(|entry| entry.event_name == event_name)
}
fn spawn_listener(&self, future: impl std::future::Future<Output = ()> + Send + 'static) -> tokio::task::AbortHandle {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
return handle.spawn(future).abort_handle();
}
if let Ok(guard) = self.runtime_handle.lock() {
if let Some(handle) = guard.as_ref() {
return handle.spawn(future).abort_handle();
}
}
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let Ok(rt) = tokio::runtime::Builder::new_current_thread().enable_all().build() else {
return;
};
let handle = rt.spawn(future);
let _ = tx.send(handle.abort_handle());
rt.block_on(handle).ok();
});
rx.recv().unwrap_or_else(|_| {
tokio::runtime::Handle::current().spawn(async {}).abort_handle()
})
}
pub fn emit(&self, event: PageEvent) {
let _ = self.tx.send(event);
}
#[must_use]
pub fn receiver_count(&self) -> usize {
self.tx.receiver_count()
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<PageEvent> {
self.tx.subscribe()
}
pub async fn wait_for<F>(&self, predicate: F, timeout_ms: u64) -> crate::error::Result<PageEvent>
where
F: Fn(&PageEvent) -> bool,
{
let mut rx = self.tx.subscribe();
drain_until(&mut rx, predicate, timeout_ms).await
}
pub async fn wait_for_event(&self, event_name: &str, timeout_ms: u64) -> crate::error::Result<PageEvent> {
let name = event_name.to_string();
self.wait_for(move |e| event_name_matches(&name, e), timeout_ms).await
}
pub fn on(&self, event_name: &str, callback: EventCallback) -> ListenerId {
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let name = event_name.to_string();
let filter_name = name.clone();
let abort_handle = self.spawn_listener(async move {
while let Ok(event) = rx.recv().await {
if event_name_matches(&filter_name, &event) {
callback(event);
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(
id,
ListenerEntry {
abort: abort_handle,
event_name: name,
},
);
}
ListenerId(id)
}
pub fn once(&self, event_name: &str, callback: EventCallback) -> ListenerId {
let listeners = self.listeners.clone();
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let name = event_name.to_string();
let filter_name = name.clone();
let abort_handle = self.spawn_listener(async move {
while let Ok(event) = rx.recv().await {
if event_name_matches(&filter_name, &event) {
callback(event);
if let Ok(mut guard) = listeners.lock() {
guard.remove(&id);
}
break;
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(
id,
ListenerEntry {
abort: abort_handle,
event_name: name,
},
);
}
ListenerId(id)
}
pub fn off(&self, id: ListenerId) {
if let Ok(mut guard) = self.listeners.lock() {
if let Some(entry) = guard.remove(&id.0) {
entry.abort.abort();
}
}
}
pub fn remove_all_listeners(&self) {
if let Ok(mut listeners) = self.listeners.lock() {
for (_, entry) in listeners.drain() {
entry.abort.abort();
}
}
}
}
impl Default for EventEmitter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum ContextEvent {
WebError(crate::web_error::WebError),
}
pub type ContextEventCallback = Arc<dyn Fn(ContextEvent) + Send + Sync>;
fn context_event_name_matches(name: &str, event: &ContextEvent) -> bool {
matches!((name, event), ("weberror", ContextEvent::WebError(_)))
}
#[derive(Clone)]
pub struct ContextEventEmitter {
tx: broadcast::Sender<ContextEvent>,
listeners: Arc<std::sync::Mutex<rustc_hash::FxHashMap<u64, ContextListenerEntry>>>,
next_listener_id: Arc<std::sync::atomic::AtomicU64>,
runtime_handle: Arc<std::sync::Mutex<Option<tokio::runtime::Handle>>>,
}
struct ContextListenerEntry {
abort: tokio::task::AbortHandle,
}
impl ContextEventEmitter {
#[must_use]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(512);
let handle = tokio::runtime::Handle::try_current().ok();
Self {
tx,
listeners: Arc::new(std::sync::Mutex::new(rustc_hash::FxHashMap::default())),
next_listener_id: Arc::new(std::sync::atomic::AtomicU64::new(1)),
runtime_handle: Arc::new(std::sync::Mutex::new(handle)),
}
}
fn spawn_listener(&self, future: impl std::future::Future<Output = ()> + Send + 'static) -> tokio::task::AbortHandle {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
return handle.spawn(future).abort_handle();
}
if let Ok(guard) = self.runtime_handle.lock() {
if let Some(handle) = guard.as_ref() {
return handle.spawn(future).abort_handle();
}
}
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let Ok(rt) = tokio::runtime::Builder::new_current_thread().enable_all().build() else {
return;
};
let handle = rt.spawn(future);
let _ = tx.send(handle.abort_handle());
rt.block_on(handle).ok();
});
rx.recv()
.unwrap_or_else(|_| tokio::runtime::Handle::current().spawn(async {}).abort_handle())
}
pub fn emit(&self, event: ContextEvent) {
let _ = self.tx.send(event);
}
#[must_use]
pub fn subscribe(&self) -> broadcast::Receiver<ContextEvent> {
self.tx.subscribe()
}
pub async fn wait_for_event(&self, event_name: &str, timeout_ms: u64) -> crate::error::Result<ContextEvent> {
let mut rx = self.tx.subscribe();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
let name = event_name.to_string();
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Err(crate::error::FerriError::timeout(
"waiting for context event",
timeout_ms,
));
}
match tokio::time::timeout(remaining, rx.recv()).await {
Ok(Ok(event)) if context_event_name_matches(&name, &event) => return Ok(event),
Ok(Ok(_)) => {},
Ok(Err(_)) => {
return Err(crate::error::FerriError::target_closed(Some(
"context event channel closed".into(),
)));
},
Err(_) => {
return Err(crate::error::FerriError::timeout(
"waiting for context event",
timeout_ms,
));
},
}
}
}
pub fn on(&self, event_name: &str, callback: ContextEventCallback) -> ListenerId {
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let filter_name = event_name.to_string();
let abort_handle = self.spawn_listener(async move {
while let Ok(event) = rx.recv().await {
if context_event_name_matches(&filter_name, &event) {
callback(event);
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(id, ContextListenerEntry { abort: abort_handle });
}
ListenerId(id)
}
pub fn once(&self, event_name: &str, callback: ContextEventCallback) -> ListenerId {
let listeners = self.listeners.clone();
let id = self.next_listener_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut rx = self.tx.subscribe();
let filter_name = event_name.to_string();
let abort_handle = self.spawn_listener(async move {
while let Ok(event) = rx.recv().await {
if context_event_name_matches(&filter_name, &event) {
callback(event);
if let Ok(mut guard) = listeners.lock() {
guard.remove(&id);
}
break;
}
}
});
if let Ok(mut guard) = self.listeners.lock() {
guard.insert(id, ContextListenerEntry { abort: abort_handle });
}
ListenerId(id)
}
pub fn off(&self, id: ListenerId) {
if let Ok(mut guard) = self.listeners.lock() {
if let Some(entry) = guard.remove(&id.0) {
entry.abort.abort();
}
}
}
pub fn remove_all_listeners(&self) {
if let Ok(mut listeners) = self.listeners.lock() {
for (_, entry) in listeners.drain() {
entry.abort.abort();
}
}
}
}
impl Default for ContextEventEmitter {
fn default() -> Self {
Self::new()
}
}