use std::{
fmt::{Debug, Display},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{future::FusedFuture, ready, Future, FutureExt};
pub use atomic_counter;
pub use futures;
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
#[cfg(not(target_arch = "wasm32"))]
pub use tokio::spawn;
#[cfg(not(target_arch = "wasm32"))]
pub use tokio::task::JoinHandle;
#[cfg(not(target_arch = "wasm32"))]
use tokio::time::{sleep, Sleep};
#[cfg(target_arch = "wasm32")]
pub use js_utils::spawn::spawn;
#[cfg(target_arch = "wasm32")]
pub use js_utils::spawn::JoinHandle;
#[cfg(target_arch = "wasm32")]
use js_utils::{sleep, sleep::Sleep};
#[cfg(target_arch = "wasm32")]
use zduny_wasm_timer::Instant;
pub use crate::consumer::{Consume, StreamRequest, ValueRequest};
pub use zzrpc_derive::api;
pub use zzrpc_derive::Produce;
pub mod consumer;
pub mod producer;
#[derive(Debug, Clone)]
pub enum Error<TransportError> {
Closed,
Aborted,
Shutdown,
Timeout,
Dropped,
Transport(TransportError),
}
impl<TransportError> From<ShutdownType> for Error<TransportError> {
fn from(value: ShutdownType) -> Self {
match value {
ShutdownType::Closed => Error::Closed,
ShutdownType::Shutdown => Error::Shutdown,
ShutdownType::Aborted => Error::Aborted,
ShutdownType::Timeout => Error::Timeout,
}
}
}
impl<Transport> Display for Error<Transport>
where
Transport: Display,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Error::Closed => write!(f, "transport closed"),
Error::Aborted => write!(f, "request was aborted"),
Error::Shutdown => write!(f, "producer/consumer was shutdown"),
Error::Timeout => write!(f, "producer/consumer timed out"),
Error::Dropped => write!(f, "consumer was dropped"),
Error::Transport(error) => write!(f, "transport error: {error}"),
}
}
}
impl<Transport> std::error::Error for Error<Transport> where Transport: Debug + Display {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ShutdownType {
Closed,
Shutdown,
Aborted,
Timeout,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandlingStrategy {
Stop(ShutdownType),
Ignore,
}
pub trait SendErrorCallback<Error> {
fn on_send_error(
&mut self,
request_id: usize,
error: mezzenger::Error<Error>,
) -> HandlingStrategy;
}
impl<T, Error> SendErrorCallback<Error> for T
where
T: FnMut(usize, mezzenger::Error<Error>) -> HandlingStrategy,
{
fn on_send_error(
&mut self,
request_id: usize,
error: mezzenger::Error<Error>,
) -> HandlingStrategy {
(self)(request_id, error)
}
}
pub trait ReceiveErrorCallback<Error> {
fn on_receive_error(&mut self, error: Error) -> HandlingStrategy;
}
impl<T, Error> ReceiveErrorCallback<Error> for T
where
T: FnMut(Error) -> HandlingStrategy,
{
fn on_receive_error(&mut self, error: Error) -> HandlingStrategy {
(self)(error)
}
}
#[derive(Debug)]
pub struct DefaultSendErrorCallback {}
impl<Error> SendErrorCallback<Error> for DefaultSendErrorCallback {
fn on_send_error(
&mut self,
_request_id: usize,
error: mezzenger::Error<Error>,
) -> HandlingStrategy {
match error {
mezzenger::Error::Closed => HandlingStrategy::Stop(ShutdownType::Closed),
mezzenger::Error::Other(_) => HandlingStrategy::Ignore,
}
}
}
#[derive(Debug)]
pub struct DefaultReceiveErrorCallback {}
impl<Error> ReceiveErrorCallback<Error> for DefaultReceiveErrorCallback {
fn on_receive_error(&mut self, _error: Error) -> HandlingStrategy {
HandlingStrategy::Ignore
}
}
#[derive(Debug)]
pub enum Timeout {
Duration {
duration: Duration,
sleep: Pin<Box<Sleep>>,
},
Never,
}
impl Timeout {
pub fn new(duration: Duration) -> Self {
Timeout::Duration {
duration,
sleep: Box::pin(sleep(duration)),
}
}
pub fn never() -> Self {
Timeout::Never
}
pub fn reset(&mut self) {
if let Timeout::Duration { duration, sleep } = self {
let deadline = Instant::now() + *duration;
sleep.as_mut().reset(deadline.into())
}
}
}
impl Future for Timeout {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
Timeout::Duration { sleep, .. } => {
ready!(sleep.poll_unpin(cx));
Poll::Ready(())
}
Timeout::Never => Poll::Pending,
}
}
}
impl FusedFuture for Timeout {
fn is_terminated(&self) -> bool {
match self {
Timeout::Duration { .. } => false,
Timeout::Never => true,
}
}
}