use alloy_json_rpc::PubSubItem;
use serde_json::value::RawValue;
use tokio::{
sync::{
mpsc,
oneshot::{self, error::TryRecvError},
},
time::Duration,
};
#[derive(Debug)]
pub struct ConnectionHandle {
pub(crate) to_socket: mpsc::UnboundedSender<Box<RawValue>>,
pub(crate) from_socket: mpsc::UnboundedReceiver<PubSubItem>,
pub(crate) error: oneshot::Receiver<()>,
pub(crate) shutdown: oneshot::Sender<()>,
pub(crate) max_retries: u32,
pub(crate) retry_interval: Duration,
}
impl ConnectionHandle {
pub fn new() -> (Self, ConnectionInterface) {
let (to_socket, from_frontend) = mpsc::unbounded_channel();
let (to_frontend, from_socket) = mpsc::unbounded_channel();
let (error_tx, error_rx) = oneshot::channel();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let handle = Self {
to_socket,
from_socket,
error: error_rx,
shutdown: shutdown_tx,
max_retries: 10,
retry_interval: Duration::from_secs(3),
};
let interface = ConnectionInterface {
from_frontend,
to_frontend,
error: error_tx,
shutdown: shutdown_rx,
};
(handle, interface)
}
pub const fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
pub const fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.retry_interval = retry_interval;
self
}
pub fn shutdown(self) {
let _ = self.shutdown.send(());
}
}
#[derive(Debug)]
pub struct ConnectionInterface {
pub(crate) from_frontend: mpsc::UnboundedReceiver<Box<RawValue>>,
pub(crate) to_frontend: mpsc::UnboundedSender<PubSubItem>,
pub(crate) error: oneshot::Sender<()>,
pub(crate) shutdown: oneshot::Receiver<()>,
}
impl ConnectionInterface {
pub fn send_to_frontend(
&self,
item: PubSubItem,
) -> Result<(), mpsc::error::SendError<PubSubItem>> {
self.to_frontend.send(item)
}
pub async fn recv_from_frontend(&mut self) -> Option<Box<RawValue>> {
match self.shutdown.try_recv() {
Ok(_) | Err(TryRecvError::Closed) => return None,
Err(TryRecvError::Empty) => {}
}
self.from_frontend.recv().await
}
pub fn close_with_error(self) {
let _ = self.error.send(());
}
}