spring_stream/
handler.rs

1pub use inventory::submit;
2
3use crate::{consumer::Consumers, extractor::FromMsg};
4use sea_streamer::SeaMessage;
5use spring::app::App;
6use std::{
7    future::Future,
8    pin::Pin,
9    sync::{Arc, Mutex},
10};
11
12pub trait Handler<T>: Clone + Send + Sized + 'static {
13    /// The type of future calling this handler returns.
14    type Future: Future<Output = ()> + Send + 'static;
15
16    /// Call the handler with the given request.
17    fn call(self, msg: SeaMessage, app: Arc<App>) -> Self::Future;
18}
19
20/// no args handler impl
21impl<F, Fut> Handler<()> for F
22where
23    F: FnOnce() -> Fut + Clone + Send + 'static,
24    Fut: Future<Output = ()> + Send + 'static,
25{
26    type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
27
28    fn call(self, _msg: SeaMessage, _app: Arc<App>) -> Self::Future {
29        Box::pin(self())
30    }
31}
32
33/// 1~15 args handler impl
34#[rustfmt::skip]
35macro_rules! all_the_tuples {
36    ($name:ident) => {
37        $name!([T1]);
38        $name!([T1, T2]);
39        $name!([T1, T2, T3]);
40        $name!([T1, T2, T3, T4]);
41        $name!([T1, T2, T3, T4, T5]);
42        $name!([T1, T2, T3, T4, T5, T6]);
43        $name!([T1, T2, T3, T4, T5, T6, T7]);
44        $name!([T1, T2, T3, T4, T5, T6, T7, T8]);
45        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9]);
46        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]);
47        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11]);
48        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12]);
49        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13]);
50        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14]);
51        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15]);
52    };
53}
54
55macro_rules! impl_handler {
56    (
57        [$($ty:ident),*]
58    ) => {
59        #[allow(non_snake_case, unused_mut)]
60        impl<F, Fut, $($ty,)*> Handler<($($ty,)*)> for F
61        where
62            F: FnOnce($($ty,)*) -> Fut + Clone + Send + 'static,
63            Fut: Future<Output = ()> + Send + 'static,
64            $( $ty: FromMsg + Send, )*
65        {
66            type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
67
68            fn call(self, msg: SeaMessage, app: Arc<App>) -> Self::Future {
69                $(
70                    let $ty = $ty::from_msg(&msg, &app);
71                )*
72                Box::pin(self($($ty,)*))
73            }
74        }
75    };
76}
77
78all_the_tuples!(impl_handler);
79
80pub(crate) struct BoxedHandler(Mutex<Box<dyn ErasedHandler>>);
81
82impl Clone for BoxedHandler {
83    fn clone(&self) -> Self {
84        Self(Mutex::new(self.0.lock().unwrap().clone_box()))
85    }
86}
87
88impl BoxedHandler {
89    pub(crate) fn from_handler<H, T>(handler: H) -> Self
90    where
91        H: Handler<T> + Sync,
92        T: 'static,
93    {
94        Self(Mutex::new(Box::new(MakeErasedHandler {
95            handler,
96            caller: |handler, msg, app| Box::pin(H::call(handler, msg, app)),
97        })))
98    }
99
100    pub(crate) fn call(
101        self,
102        msg: SeaMessage,
103        app: Arc<App>,
104    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
105        self.0.into_inner().unwrap().call(msg, app)
106    }
107}
108
109pub(crate) trait ErasedHandler: Send {
110    fn clone_box(&self) -> Box<dyn ErasedHandler>;
111
112    fn call(
113        self: Box<Self>,
114        msg: SeaMessage,
115        app: Arc<App>,
116    ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
117}
118
119type HandlerCaller<H> = fn(H, SeaMessage, Arc<App>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
120
121pub(crate) struct MakeErasedHandler<H> {
122    pub(crate) handler: H,
123    pub(crate) caller: HandlerCaller<H>,
124}
125
126impl<H> Clone for MakeErasedHandler<H>
127where
128    H: Clone,
129{
130    fn clone(&self) -> Self {
131        Self {
132            handler: self.handler.clone(),
133            caller: self.caller,
134        }
135    }
136}
137
138impl<H> ErasedHandler for MakeErasedHandler<H>
139where
140    H: Clone + Send + Sync + 'static,
141{
142    fn clone_box(&self) -> Box<dyn ErasedHandler> {
143        Box::new(self.clone())
144    }
145
146    fn call(
147        self: Box<Self>,
148        msg: SeaMessage,
149        app: Arc<App>,
150    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
151        (self.caller)(self.handler, msg, app)
152    }
153}
154
155/// TypeHandler is used to configure the spring-macro marked stream_listener handler
156///
157pub trait TypedHandlerRegistrar: Send + Sync + 'static {
158    fn install_consumer(&self, jobs: Consumers) -> Consumers;
159}
160
161pub trait TypedConsumer {
162    fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self;
163}
164
165impl TypedConsumer for Consumers {
166    fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self {
167        factory.install_consumer(self)
168    }
169}
170
171inventory::collect!(&'static dyn TypedHandlerRegistrar);
172
173#[macro_export]
174macro_rules! submit_typed_handler {
175    ($ty:ident) => {
176        ::spring_stream::handler::submit! {
177            &$ty as &dyn ::spring_stream::handler::TypedHandlerRegistrar
178        }
179    };
180}
181
182pub fn auto_consumers() -> Consumers {
183    let mut consumers = Consumers::new();
184    for factory in inventory::iter::<&dyn TypedHandlerRegistrar> {
185        consumers = factory.install_consumer(consumers);
186    }
187    consumers
188}