mod value;
use std::{
marker::PhantomData,
sync::{Arc, Mutex},
time::Duration,
};
use futures::{
channel::{mpsc::UnboundedSender, oneshot},
future::{pending, Pending},
Future,
};
use serde::{Deserialize, Serialize};
pub use value::*;
mod stream;
pub use stream::*;
use super::{DefaultReceiveErrorCallback, ShutdownType};
use crate::producer;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message<Request> {
pub id: usize,
pub payload: Payload<Request>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Payload<Request> {
Request(Request),
Abort,
}
pub type Result<T, Error> = std::result::Result<T, super::Error<Error>>;
#[derive(Debug)]
pub struct Configuration<Shutdown, Error, ReceiveErrorCallback>
where
Shutdown: Future<Output = ShutdownType>,
ReceiveErrorCallback: crate::ReceiveErrorCallback<Error>,
{
pub shutdown: Shutdown,
pub receive_error_callback: ReceiveErrorCallback,
pub timeout: Option<Duration>,
pub _error: PhantomData<Error>,
}
impl<Error> Default for Configuration<Pending<ShutdownType>, Error, DefaultReceiveErrorCallback> {
fn default() -> Self {
Self {
shutdown: pending(),
receive_error_callback: DefaultReceiveErrorCallback {},
timeout: Default::default(),
_error: Default::default(),
}
}
}
#[derive(Debug)]
pub enum ResultSender<T, Error> {
Value(oneshot::Sender<Result<T, Error>>),
Stream {
result_sender: oneshot::Sender<Result<(), Error>>,
values_sender: UnboundedSender<T>,
},
Abort,
}
#[derive(Debug)]
pub struct Aborter<T, Request, Error> {
id: usize,
#[allow(clippy::type_complexity)]
sender: Arc<UnboundedSender<(Message<Request>, ResultSender<T, Error>)>>,
abort_sender: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
impl<Request, Output, Error> Aborter<Request, Output, Error> {
pub fn abort(self) {
let mut abort_sender = self.abort_sender.lock().unwrap();
if let Some(abort_sender) = abort_sender.take() {
let payload = Payload::Abort;
let message = Message {
id: self.id,
payload,
};
let _ = self.sender.unbounded_send((message, ResultSender::Abort));
let _ = abort_sender.send(());
}
}
pub fn request_id(&self) -> usize {
self.id
}
}
impl<Request, Output, Error> Clone for Aborter<Request, Output, Error> {
fn clone(&self) -> Self {
Self {
id: self.id,
sender: self.sender.clone(),
abort_sender: self.abort_sender.clone(),
}
}
}
pub trait Consume<Consumer, Error> {
type Request;
type Response;
#[cfg(not(target_arch = "wasm32"))]
fn consume<Transport, Shutdown, ReceiveErrorCallback>(
transport: Transport,
configuration: Configuration<Shutdown, Error, ReceiveErrorCallback>,
) -> Consumer
where
Transport: mezzenger::Transport<producer::Message<Self::Response>, Message<Self::Request>, Error>
+ mezzenger::Reliable
+ mezzenger::Order
+ Send
+ 'static,
Shutdown: Future<Output = ShutdownType> + Send + 'static,
ReceiveErrorCallback: crate::ReceiveErrorCallback<Error> + Send + 'static,
Error: Send + 'static,
{
Self::consume_unreliable(transport, configuration)
}
#[cfg(not(target_arch = "wasm32"))]
fn consume_unreliable<Transport, Shutdown, ReceiveErrorCallback>(
transport: Transport,
configuration: Configuration<Shutdown, Error, ReceiveErrorCallback>,
) -> Consumer
where
Transport: mezzenger::Transport<producer::Message<Self::Response>, Message<Self::Request>, Error>
+ mezzenger::Reliable
+ mezzenger::Order
+ Send
+ 'static,
Shutdown: Future<Output = ShutdownType> + Send + 'static,
ReceiveErrorCallback: crate::ReceiveErrorCallback<Error> + Send + 'static,
Error: Send + 'static;
#[cfg(target_arch = "wasm32")]
fn consume<Transport, Shutdown, ReceiveErrorCallback>(
transport: Transport,
configuration: Configuration<Shutdown, Error, ReceiveErrorCallback>,
) -> Consumer
where
Transport: mezzenger::Transport<producer::Message<Self::Response>, Message<Self::Request>, Error>
+ mezzenger::Reliable
+ mezzenger::Order
+ 'static,
Shutdown: Future<Output = ShutdownType> + 'static,
ReceiveErrorCallback: crate::ReceiveErrorCallback<Error> + 'static,
Error: 'static,
{
Self::consume_unreliable(transport, configuration)
}
#[cfg(target_arch = "wasm32")]
fn consume_unreliable<Transport, Shutdown, ReceiveErrorCallback>(
transport: Transport,
configuration: Configuration<Shutdown, Error, ReceiveErrorCallback>,
) -> Consumer
where
Transport: mezzenger::Transport<producer::Message<Self::Response>, Message<Self::Request>, Error>
+ mezzenger::Reliable
+ mezzenger::Order
+ 'static,
Shutdown: Future<Output = ShutdownType> + 'static,
ReceiveErrorCallback: crate::ReceiveErrorCallback<Error> + 'static,
Error: 'static;
}