pub mod hash_map;
pub mod hash_set;
pub mod list;
pub mod vec;
use serde::{Deserialize, Serialize};
use std::{error::Error, fmt};
use tokio::sync::watch;
use crate::prelude::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum SendError {
RemoteSend(rch::base::SendErrorKind),
RemoteConnect(chmux::ConnectError),
RemoteListen(chmux::ListenerError),
RemoteForward,
}
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::RemoteSend(err) => write!(f, "send error: {err}"),
Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
Self::RemoteListen(err) => write!(f, "listen error: {err}"),
Self::RemoteForward => write!(f, "forwarding error"),
}
}
}
impl Error for SendError {}
impl<T> TryFrom<rch::broadcast::SendError<T>> for SendError {
type Error = rch::broadcast::SendError<T>;
fn try_from(err: rch::broadcast::SendError<T>) -> Result<Self, Self::Error> {
match err {
rch::broadcast::SendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
rch::broadcast::SendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
rch::broadcast::SendError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
rch::broadcast::SendError::RemoteForward => Ok(Self::RemoteForward),
other @ rch::broadcast::SendError::Closed(_) => Err(other),
}
}
}
impl<T> TryFrom<rch::mpsc::SendError<T>> for SendError {
type Error = rch::mpsc::SendError<T>;
fn try_from(err: rch::mpsc::SendError<T>) -> Result<Self, Self::Error> {
match err {
rch::mpsc::SendError::RemoteSend(err) => Ok(Self::RemoteSend(err)),
rch::mpsc::SendError::RemoteConnect(err) => Ok(Self::RemoteConnect(err)),
rch::mpsc::SendError::RemoteListen(err) => Ok(Self::RemoteListen(err)),
rch::mpsc::SendError::RemoteForward => Ok(Self::RemoteForward),
other @ rch::mpsc::SendError::Closed(_) => Err(other),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum RecvError {
Closed,
Lagged,
MaxSizeExceeded(usize),
RemoteReceive(rch::base::RecvError),
RemoteConnect(chmux::ConnectError),
RemoteListen(chmux::ListenerError),
InvalidIndex(usize),
}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Closed => write!(f, "observed collection was dropped"),
Self::Lagged => write!(f, "observation lagged behind"),
Self::MaxSizeExceeded(size) => write!(f, "mirrored collection reached it maximum size of {size}"),
Self::RemoteReceive(err) => write!(f, "receive error: {err}"),
Self::RemoteConnect(err) => write!(f, "connect error: {err}"),
Self::RemoteListen(err) => write!(f, "listen error: {err}"),
Self::InvalidIndex(idx) => write!(f, "index {idx} is invalid"),
}
}
}
impl Error for RecvError {}
impl From<rch::broadcast::RecvError> for RecvError {
fn from(err: rch::broadcast::RecvError) -> Self {
match err {
rch::broadcast::RecvError::Closed => Self::Closed,
rch::broadcast::RecvError::Lagged => Self::Lagged,
rch::broadcast::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
rch::broadcast::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
rch::broadcast::RecvError::RemoteListen(err) => Self::RemoteListen(err),
}
}
}
impl From<rch::mpsc::RecvError> for RecvError {
fn from(err: rch::mpsc::RecvError) -> Self {
match err {
rch::mpsc::RecvError::RemoteReceive(err) => Self::RemoteReceive(err),
rch::mpsc::RecvError::RemoteConnect(err) => Self::RemoteConnect(err),
rch::mpsc::RecvError::RemoteListen(err) => Self::RemoteListen(err),
}
}
}
pub(crate) fn send_event<E, Codec>(tx: &rch::broadcast::Sender<E, Codec>, on_err: &dyn Fn(SendError), event: E)
where
Codec: crate::codec::Codec,
E: RemoteSend + Clone,
{
match tx.send(event) {
Ok(()) => (),
Err(err) if err.is_disconnected() => (),
Err(err) => match err.try_into() {
Ok(err) => (on_err)(err),
Err(_) => unreachable!(),
},
}
}
pub(crate) fn default_on_err(err: SendError) {
tracing::warn!("sending failed: {}", err);
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DroppedError;
impl fmt::Display for DroppedError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "dropped")
}
}
impl Error for DroppedError {}
pub(crate) struct ChangeSender {
tx: watch::Sender<()>,
rx: watch::Receiver<()>,
}
impl ChangeSender {
pub fn new() -> Self {
let (tx, rx) = watch::channel(());
Self { tx, rx }
}
pub fn subscribe(&self) -> ChangeNotifier {
ChangeNotifier(self.rx.clone())
}
pub fn notify(&self) {
self.tx.send_replace(());
}
}
#[derive(Clone)]
pub struct ChangeNotifier(watch::Receiver<()>);
impl fmt::Debug for ChangeNotifier {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("ChangeNotifier").finish()
}
}
impl ChangeNotifier {
pub async fn changed(&mut self) -> Result<(), DroppedError> {
self.0.changed().await.map_err(|_| DroppedError)
}
pub fn update(&mut self) {
self.0.borrow_and_update();
}
}