pub use inventory::submit;
use crate::{consumer::Consumers, extractor::FromMsg};
use sea_streamer::SeaMessage;
use spring::app::App;
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
};
pub trait Handler<T>: Clone + Send + Sized + 'static {
type Future: Future<Output = ()> + Send + 'static;
fn call(self, msg: SeaMessage, app: Arc<App>) -> Self::Future;
}
impl<F, Fut> Handler<()> for F
where
F: FnOnce() -> Fut + Clone + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
fn call(self, _msg: SeaMessage, _app: Arc<App>) -> Self::Future {
Box::pin(self())
}
}
#[rustfmt::skip]
macro_rules! all_the_tuples {
($name:ident) => {
$name!([T1]);
$name!([T1, T2]);
$name!([T1, T2, T3]);
$name!([T1, T2, T3, T4]);
$name!([T1, T2, T3, T4, T5]);
$name!([T1, T2, T3, T4, T5, T6]);
$name!([T1, T2, T3, T4, T5, T6, T7]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14]);
$name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15]);
};
}
macro_rules! impl_handler {
(
[$($ty:ident),*]
) => {
#[allow(non_snake_case, unused_mut)]
impl<F, Fut, $($ty,)*> Handler<($($ty,)*)> for F
where
F: FnOnce($($ty,)*) -> Fut + Clone + Send + 'static,
Fut: Future<Output = ()> + Send + 'static,
$( $ty: FromMsg + Send, )*
{
type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
fn call(self, msg: SeaMessage, app: Arc<App>) -> Self::Future {
$(
let $ty = $ty::from_msg(&msg, &app);
)*
Box::pin(self($($ty,)*))
}
}
};
}
all_the_tuples!(impl_handler);
pub(crate) struct BoxedHandler(Mutex<Box<dyn ErasedHandler>>);
impl Clone for BoxedHandler {
fn clone(&self) -> Self {
Self(Mutex::new(self.0.lock().unwrap().clone_box()))
}
}
impl BoxedHandler {
pub(crate) fn from_handler<H, T>(handler: H) -> Self
where
H: Handler<T> + Sync,
T: 'static,
{
Self(Mutex::new(Box::new(MakeErasedHandler {
handler,
caller: |handler, msg, app| Box::pin(H::call(handler, msg, app)),
})))
}
pub(crate) fn call(
self,
msg: SeaMessage,
app: Arc<App>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
self.0.into_inner().unwrap().call(msg, app)
}
}
pub(crate) trait ErasedHandler: Send {
fn clone_box(&self) -> Box<dyn ErasedHandler>;
fn call(
self: Box<Self>,
msg: SeaMessage,
app: Arc<App>,
) -> Pin<Box<dyn Future<Output = ()> + Send>>;
}
type HandlerCaller<H> = fn(H, SeaMessage, Arc<App>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
pub(crate) struct MakeErasedHandler<H> {
pub(crate) handler: H,
pub(crate) caller: HandlerCaller<H>,
}
impl<H> Clone for MakeErasedHandler<H>
where
H: Clone,
{
fn clone(&self) -> Self {
Self {
handler: self.handler.clone(),
caller: self.caller,
}
}
}
impl<H> ErasedHandler for MakeErasedHandler<H>
where
H: Clone + Send + Sync + 'static,
{
fn clone_box(&self) -> Box<dyn ErasedHandler> {
Box::new(self.clone())
}
fn call(
self: Box<Self>,
msg: SeaMessage,
app: Arc<App>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
(self.caller)(self.handler, msg, app)
}
}
pub trait TypedHandlerRegistrar: Send + Sync + 'static {
fn install_consumer(&self, jobs: Consumers) -> Consumers;
}
pub trait TypedConsumer {
fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self;
}
impl TypedConsumer for Consumers {
fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self {
factory.install_consumer(self)
}
}
inventory::collect!(&'static dyn TypedHandlerRegistrar);
#[macro_export]
macro_rules! submit_typed_handler {
($ty:ident) => {
::spring_stream::handler::submit! {
&$ty as &dyn ::spring_stream::handler::TypedHandlerRegistrar
}
};
}
pub fn auto_consumers() -> Consumers {
let mut consumers = Consumers::new();
for factory in inventory::iter::<&dyn TypedHandlerRegistrar> {
consumers = factory.install_consumer(consumers);
}
consumers
}