use super::protocol::{Envelope, ErrorPayload};
use super::transport::{ReaderHandle, Transport, TransportError, WriterHandle};
use rustc_hash::FxHashMap;
use serde_json::{Value, json};
use std::collections::hash_map::Entry;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::{Arc, Mutex, PoisonError};
use thiserror::Error;
use tokio::sync::{broadcast, oneshot};
const BROWSER_CLOSE_ID: i64 = -9999;
const EVENT_CHANNEL_CAP: usize = 8192;
#[derive(Debug, Error)]
pub enum ConnectionError {
#[error("transport: {0}")]
Transport(#[from] TransportError),
#[error("protocol error: {0}")]
Protocol(String),
#[error("connection closed before reply for {method:?}")]
Closed { method: String },
#[error("json: {0}")]
Json(#[from] serde_json::Error),
}
impl From<ConnectionError> for crate::error::FerriError {
fn from(e: ConnectionError) -> Self {
let msg = e.to_string();
crate::error::FerriError::backend(format!("webkit: {msg}"))
}
}
type ResponseSlot = oneshot::Sender<Result<Value, ErrorPayload>>;
#[derive(Clone, PartialEq, Eq, Hash)]
enum RouteKey {
Browser,
PageProxy(String),
Target(String),
}
enum Route {
Buffering(Vec<Envelope>),
Live(broadcast::Sender<Envelope>),
}
pub struct Connection {
writer: Arc<WriterHandle>,
next_id: AtomicI64,
callbacks: Mutex<FxHashMap<i64, (RouteKey, ResponseSlot)>>,
routes: Mutex<FxHashMap<RouteKey, Route>>,
}
impl Connection {
pub fn spawn(transport: Transport) -> Arc<Self> {
let Transport { reader, writer } = transport;
let conn = Arc::new(Connection {
writer: Arc::new(writer),
next_id: AtomicI64::new(1),
callbacks: Mutex::new(FxHashMap::default()),
routes: Mutex::new(FxHashMap::default()),
});
tokio::spawn(reader_loop(Arc::clone(&conn), reader));
conn
}
#[must_use]
pub fn browser_session(self: &Arc<Self>) -> Session {
Session {
conn: Arc::clone(self),
kind: SessionKind::Browser,
}
}
#[must_use]
pub fn page_proxy_session(self: &Arc<Self>, page_proxy_id: impl Into<String>) -> Session {
Session {
conn: Arc::clone(self),
kind: SessionKind::PageProxy {
page_proxy_id: page_proxy_id.into(),
},
}
}
#[must_use]
pub fn arc(self: &Arc<Self>) -> Arc<Self> {
Arc::clone(self)
}
#[must_use]
pub fn target_session(self: &Arc<Self>, page_proxy_id: impl Into<String>, target_id: impl Into<String>) -> Session {
Session {
conn: Arc::clone(self),
kind: SessionKind::Target {
page_proxy_id: page_proxy_id.into(),
target_id: target_id.into(),
},
}
}
pub fn close_route(&self, page_proxy_id: Option<&str>, target_id: Option<&str>) {
let key = match (page_proxy_id, target_id) {
(_, Some(t)) => RouteKey::Target(t.to_string()),
(Some(p), None) => RouteKey::PageProxy(p.to_string()),
(None, None) => RouteKey::Browser,
};
let mut callbacks = self.callbacks.lock().unwrap_or_else(PoisonError::into_inner);
let ids: Vec<i64> = callbacks
.iter()
.filter(|(_, (k, _))| *k == key)
.map(|(id, _)| *id)
.collect();
let drained: Vec<ResponseSlot> = ids
.iter()
.filter_map(|id| callbacks.remove(id))
.map(|(_, slot)| slot)
.collect();
drop(callbacks);
for slot in drained {
let _ = slot.send(Err(closed_error()));
}
self.routes.lock().unwrap_or_else(PoisonError::into_inner).remove(&key);
}
pub fn send_raw(&self, envelope: &Value) -> Result<(), ConnectionError> {
self.writer.send(envelope).map_err(ConnectionError::from)
}
fn alloc_callback(&self, key: RouteKey) -> (i64, oneshot::Receiver<Result<Value, ErrorPayload>>) {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = oneshot::channel();
self
.callbacks
.lock()
.unwrap_or_else(PoisonError::into_inner)
.insert(id, (key, tx));
(id, rx)
}
fn complete(&self, id: i64, result: Result<Value, ErrorPayload>) {
let slot = self
.callbacks
.lock()
.unwrap_or_else(PoisonError::into_inner)
.remove(&id)
.map(|(_, slot)| slot);
if let Some(slot) = slot {
let _ = slot.send(result);
}
}
fn route_event(&self, key: RouteKey, env: Envelope) {
match self
.routes
.lock()
.unwrap_or_else(PoisonError::into_inner)
.entry(key)
.or_insert_with(|| Route::Buffering(Vec::new()))
{
Route::Buffering(buf) => buf.push(env),
Route::Live(tx) => {
let _ = tx.send(env);
},
}
}
fn subscribe(&self, key: RouteKey) -> broadcast::Receiver<Envelope> {
let mut routes = self.routes.lock().unwrap_or_else(PoisonError::into_inner);
match routes.entry(key) {
Entry::Occupied(mut e) => match e.get_mut() {
Route::Live(tx) => tx.subscribe(),
Route::Buffering(buf) => {
let buffered = std::mem::take(buf);
let (tx, rx) = broadcast::channel(EVENT_CHANNEL_CAP);
for env in buffered {
let _ = tx.send(env);
}
e.insert(Route::Live(tx));
rx
},
},
Entry::Vacant(e) => {
let (tx, rx) = broadcast::channel(EVENT_CHANNEL_CAP);
e.insert(Route::Live(tx));
rx
},
}
}
fn drain_all(&self) {
let drained: Vec<ResponseSlot> = self
.callbacks
.lock()
.unwrap_or_else(PoisonError::into_inner)
.drain()
.map(|(_, (_, slot))| slot)
.collect();
for slot in drained {
let _ = slot.send(Err(closed_error()));
}
}
}
fn closed_error() -> ErrorPayload {
ErrorPayload {
message: "transport closed".into(),
code: None,
data: None,
}
}
async fn reader_loop(conn: Arc<Connection>, mut reader: ReaderHandle) {
while let Some(frame) = reader.recv().await {
let raw = match frame {
Ok(v) => v,
Err(e) => {
tracing::error!(target: "ferridriver::webkit", "reader: {e}");
break;
},
};
tracing::debug!(target: "ferridriver::webkit", "recv: {raw}");
match serde_json::from_value::<Envelope>(raw) {
Ok(env) => dispatch(&conn, env),
Err(e) => tracing::warn!(target: "ferridriver::webkit", "skip un-parseable frame: {e}"),
}
}
conn.drain_all();
}
fn dispatch(conn: &Connection, env: Envelope) {
if env.id == Some(BROWSER_CLOSE_ID) {
return;
}
if let Some(id) = env.id {
conn.complete(id, response_of(&env));
return;
}
let Some(method) = env.method.as_deref() else {
return;
};
if method == "Target.dispatchMessageFromTarget" {
if let Some((target_id, inner)) = unwrap_target_message(&env) {
route_target_inner(conn, &target_id, inner);
}
return;
}
match env.page_proxy_id.clone() {
Some(proxy) => conn.route_event(RouteKey::PageProxy(proxy), env),
None => conn.route_event(RouteKey::Browser, env),
}
}
fn unwrap_target_message(env: &Envelope) -> Option<(String, Envelope)> {
let target_id = env.params.get("targetId").and_then(Value::as_str)?.to_string();
let message = env.params.get("message").and_then(Value::as_str)?;
let inner = serde_json::from_str::<Envelope>(message).ok()?;
Some((target_id, inner))
}
fn route_target_inner(conn: &Connection, target_id: &str, env: Envelope) {
if let Some(id) = env.id {
conn.complete(id, response_of(&env));
} else if env.method.is_some() {
conn.route_event(RouteKey::Target(target_id.to_string()), env);
}
}
fn response_of(env: &Envelope) -> Result<Value, ErrorPayload> {
match env.error.clone() {
Some(err) => Err(err),
None => Ok(env.result.clone().unwrap_or(Value::Null)),
}
}
#[derive(Clone)]
enum SessionKind {
Browser,
PageProxy { page_proxy_id: String },
Target { page_proxy_id: String, target_id: String },
}
#[derive(Clone)]
pub struct Session {
conn: Arc<Connection>,
kind: SessionKind,
}
impl Session {
#[must_use]
pub fn connection_handle(&self) -> Arc<Connection> {
Arc::clone(&self.conn)
}
#[must_use]
pub fn page_proxy_id(&self) -> Option<&str> {
match &self.kind {
SessionKind::Browser => None,
SessionKind::PageProxy { page_proxy_id } | SessionKind::Target { page_proxy_id, .. } => Some(page_proxy_id),
}
}
#[must_use]
pub fn target_id(&self) -> Option<&str> {
match &self.kind {
SessionKind::Target { target_id, .. } => Some(target_id),
_ => None,
}
}
pub async fn send(&self, method: &str, params: Value) -> Result<Value, ConnectionError> {
match &self.kind {
SessionKind::Browser => {
let (id, rx) = self.conn.alloc_callback(RouteKey::Browser);
self
.conn
.writer
.send(&json!({ "id": id, "method": method, "params": params }))?;
wait_for(rx, method).await
},
SessionKind::PageProxy { page_proxy_id } => {
let (id, rx) = self.conn.alloc_callback(RouteKey::PageProxy(page_proxy_id.clone()));
self.conn.writer.send(&json!({
"id": id, "method": method, "params": params, "pageProxyId": page_proxy_id,
}))?;
wait_for(rx, method).await
},
SessionKind::Target {
page_proxy_id,
target_id,
} => {
let (id, rx) = self.conn.alloc_callback(RouteKey::Target(target_id.clone()));
let inner = serde_json::to_string(&json!({ "id": id, "method": method, "params": params }))?;
let (wrap_id, wrap_rx) = self.conn.alloc_callback(RouteKey::PageProxy(page_proxy_id.clone()));
self.conn.writer.send(&json!({
"id": wrap_id,
"method": "Target.sendMessageToTarget",
"params": { "message": inner, "targetId": target_id },
"pageProxyId": page_proxy_id,
}))?;
wait_for(wrap_rx, "Target.sendMessageToTarget").await?;
wait_for(rx, method).await
},
}
}
#[must_use]
pub fn events(&self) -> broadcast::Receiver<Envelope> {
self.conn.subscribe(self.route_key())
}
fn route_key(&self) -> RouteKey {
match &self.kind {
SessionKind::Browser => RouteKey::Browser,
SessionKind::PageProxy { page_proxy_id } => RouteKey::PageProxy(page_proxy_id.clone()),
SessionKind::Target { target_id, .. } => RouteKey::Target(target_id.clone()),
}
}
}
async fn wait_for(rx: oneshot::Receiver<Result<Value, ErrorPayload>>, method: &str) -> Result<Value, ConnectionError> {
match rx.await {
Ok(Ok(v)) => Ok(v),
Ok(Err(err)) => Err(ConnectionError::Protocol(err.message)),
Err(_) => Err(ConnectionError::Closed { method: method.into() }),
}
}