1pub use inventory::submit;
2
3use crate::{consumer::Consumers, extractor::FromMsg};
4use sea_streamer::SeaMessage;
5use spring::app::App;
6use std::{
7 future::Future,
8 pin::Pin,
9 sync::{Arc, Mutex},
10};
11
12pub trait Handler<T>: Clone + Send + Sized + 'static {
13 type Future: Future<Output = ()> + Send + 'static;
15
16 fn call(self, msg: SeaMessage, app: Arc<App>) -> Self::Future;
18}
19
20impl<F, Fut> Handler<()> for F
22where
23 F: FnOnce() -> Fut + Clone + Send + 'static,
24 Fut: Future<Output = ()> + Send + 'static,
25{
26 type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
27
28 fn call(self, _msg: SeaMessage, _app: Arc<App>) -> Self::Future {
29 Box::pin(self())
30 }
31}
32
33#[rustfmt::skip]
35macro_rules! all_the_tuples {
36 ($name:ident) => {
37 $name!([T1]);
38 $name!([T1, T2]);
39 $name!([T1, T2, T3]);
40 $name!([T1, T2, T3, T4]);
41 $name!([T1, T2, T3, T4, T5]);
42 $name!([T1, T2, T3, T4, T5, T6]);
43 $name!([T1, T2, T3, T4, T5, T6, T7]);
44 $name!([T1, T2, T3, T4, T5, T6, T7, T8]);
45 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9]);
46 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]);
47 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11]);
48 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12]);
49 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13]);
50 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14]);
51 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15]);
52 };
53}
54
55macro_rules! impl_handler {
56 (
57 [$($ty:ident),*]
58 ) => {
59 #[allow(non_snake_case, unused_mut)]
60 impl<F, Fut, $($ty,)*> Handler<($($ty,)*)> for F
61 where
62 F: FnOnce($($ty,)*) -> Fut + Clone + Send + 'static,
63 Fut: Future<Output = ()> + Send + 'static,
64 $( $ty: FromMsg + Send, )*
65 {
66 type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
67
68 fn call(self, msg: SeaMessage, app: Arc<App>) -> Self::Future {
69 $(
70 let $ty = $ty::from_msg(&msg, &app);
71 )*
72 Box::pin(self($($ty,)*))
73 }
74 }
75 };
76}
77
78all_the_tuples!(impl_handler);
79
80pub(crate) struct BoxedHandler(Mutex<Box<dyn ErasedHandler>>);
81
82impl Clone for BoxedHandler {
83 fn clone(&self) -> Self {
84 Self(Mutex::new(self.0.lock().unwrap().clone_box()))
85 }
86}
87
88impl BoxedHandler {
89 pub(crate) fn from_handler<H, T>(handler: H) -> Self
90 where
91 H: Handler<T> + Sync,
92 T: 'static,
93 {
94 Self(Mutex::new(Box::new(MakeErasedHandler {
95 handler,
96 caller: |handler, msg, app| Box::pin(H::call(handler, msg, app)),
97 })))
98 }
99
100 pub(crate) fn call(
101 self,
102 msg: SeaMessage,
103 app: Arc<App>,
104 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
105 self.0.into_inner().unwrap().call(msg, app)
106 }
107}
108
109pub(crate) trait ErasedHandler: Send {
110 fn clone_box(&self) -> Box<dyn ErasedHandler>;
111
112 fn call(
113 self: Box<Self>,
114 msg: SeaMessage,
115 app: Arc<App>,
116 ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
117}
118
119type HandlerCaller<H> = fn(H, SeaMessage, Arc<App>) -> Pin<Box<dyn Future<Output = ()> + Send>>;
120
121pub(crate) struct MakeErasedHandler<H> {
122 pub(crate) handler: H,
123 pub(crate) caller: HandlerCaller<H>,
124}
125
126impl<H> Clone for MakeErasedHandler<H>
127where
128 H: Clone,
129{
130 fn clone(&self) -> Self {
131 Self {
132 handler: self.handler.clone(),
133 caller: self.caller,
134 }
135 }
136}
137
138impl<H> ErasedHandler for MakeErasedHandler<H>
139where
140 H: Clone + Send + Sync + 'static,
141{
142 fn clone_box(&self) -> Box<dyn ErasedHandler> {
143 Box::new(self.clone())
144 }
145
146 fn call(
147 self: Box<Self>,
148 msg: SeaMessage,
149 app: Arc<App>,
150 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
151 (self.caller)(self.handler, msg, app)
152 }
153}
154
155pub trait TypedHandlerRegistrar: Send + Sync + 'static {
158 fn install_consumer(&self, jobs: Consumers) -> Consumers;
159}
160
161pub trait TypedConsumer {
162 fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self;
163}
164
165impl TypedConsumer for Consumers {
166 fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self {
167 factory.install_consumer(self)
168 }
169}
170
171inventory::collect!(&'static dyn TypedHandlerRegistrar);
172
173#[macro_export]
174macro_rules! submit_typed_handler {
175 ($ty:ident) => {
176 ::spring_stream::handler::submit! {
177 &$ty as &dyn ::spring_stream::handler::TypedHandlerRegistrar
178 }
179 };
180}
181
182pub fn auto_consumers() -> Consumers {
183 let mut consumers = Consumers::new();
184 for factory in inventory::iter::<&dyn TypedHandlerRegistrar> {
185 consumers = factory.install_consumer(consumers);
186 }
187 consumers
188}