1pub use inventory::submit;
2
3use crate::consumer::Consumers;
4use crate::extractor::FromPubSubMsg;
5use google_cloud_pubsub::subscriber::handler::Handler;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::{Arc, Mutex};
9use summer::app::App;
10
11#[doc(hidden)]
13pub struct PubSubEnvelope {
14 pub(crate) ack: Arc<Mutex<Option<Handler>>>,
15}
16
17impl PubSubEnvelope {
18 pub(crate) fn new(handler: Handler) -> Self {
19 Self {
20 ack: Arc::new(Mutex::new(Some(handler))),
21 }
22 }
23}
24
25impl Drop for PubSubEnvelope {
26 fn drop(&mut self) {
27 if let Ok(mut slot) = self.ack.lock() {
28 if let Some(h) = slot.take() {
29 h.ack();
30 }
31 }
32 }
33}
34
35pub trait HandlerArgs<T>: Clone + Send + Sized + 'static {
36 type Future: Future<Output = ()> + Send + 'static;
37
38 fn call(
39 self,
40 grpc: google_cloud_pubsub::model::Message,
41 env: PubSubEnvelope,
42 app: Arc<App>,
43 ) -> Self::Future;
44}
45
46impl<F, Fut> HandlerArgs<()> for F
47where
48 F: FnOnce() -> Fut + Clone + Send + 'static,
49 Fut: Future<Output = ()> + Send + 'static,
50{
51 type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
52
53 fn call(
54 self,
55 _grpc: google_cloud_pubsub::model::Message,
56 _env: PubSubEnvelope,
57 _app: Arc<App>,
58 ) -> Self::Future {
59 Box::pin(self())
60 }
61}
62
63macro_rules! all_the_tuples {
64 ($name:ident) => {
65 $name!([T1]);
66 $name!([T1, T2]);
67 $name!([T1, T2, T3]);
68 $name!([T1, T2, T3, T4]);
69 $name!([T1, T2, T3, T4, T5]);
70 $name!([T1, T2, T3, T4, T5, T6]);
71 $name!([T1, T2, T3, T4, T5, T6, T7]);
72 $name!([T1, T2, T3, T4, T5, T6, T7, T8]);
73 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9]);
74 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]);
75 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11]);
76 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12]);
77 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13]);
78 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14]);
79 $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15]);
80 };
81}
82
83macro_rules! impl_handler_args {
84 (
85 [$($ty:ident),*]
86 ) => {
87 #[allow(non_snake_case, unused_mut)]
88 impl<F, Fut, $($ty,)*> HandlerArgs<($($ty,)*)> for F
89 where
90 F: FnOnce($($ty,)*) -> Fut + Clone + Send + 'static,
91 Fut: Future<Output = ()> + Send + 'static,
92 $( $ty: FromPubSubMsg + Send, )*
93 {
94 type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
95
96 fn call(self, grpc: google_cloud_pubsub::model::Message, env: PubSubEnvelope, app: Arc<App>) -> Self::Future {
97 $(
98 let $ty = $ty::from_pubsub(&grpc, &env, &app);
99 )*
100 Box::pin(self($($ty,)*))
101 }
102 }
103 };
104}
105
106all_the_tuples!(impl_handler_args);
107
108pub(crate) struct BoxedHandler(std::sync::Mutex<Box<dyn ErasedHandler>>);
109
110impl Clone for BoxedHandler {
111 fn clone(&self) -> Self {
112 Self(std::sync::Mutex::new(self.0.lock().unwrap().clone_box()))
113 }
114}
115
116impl BoxedHandler {
117 pub(crate) fn from_handler<H, T>(handler: H) -> Self
118 where
119 H: HandlerArgs<T> + Sync,
120 T: 'static,
121 {
122 Self(std::sync::Mutex::new(Box::new(MakeErasedHandler {
123 handler,
124 caller: |handler, grpc, env, app| Box::pin(H::call(handler, grpc, env, app)),
125 })))
126 }
127
128 pub(crate) fn call(
129 self,
130 grpc: google_cloud_pubsub::model::Message,
131 env: PubSubEnvelope,
132 app: Arc<App>,
133 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
134 self.0.into_inner().unwrap().call(grpc, env, app)
135 }
136}
137
138pub(crate) trait ErasedHandler: Send {
139 fn clone_box(&self) -> Box<dyn ErasedHandler>;
140
141 fn call(
142 self: Box<Self>,
143 grpc: google_cloud_pubsub::model::Message,
144 env: PubSubEnvelope,
145 app: Arc<App>,
146 ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
147}
148
149type HandlerCaller<H> = fn(
150 H,
151 google_cloud_pubsub::model::Message,
152 PubSubEnvelope,
153 Arc<App>,
154) -> Pin<Box<dyn Future<Output = ()> + Send>>;
155
156pub(crate) struct MakeErasedHandler<H> {
157 pub(crate) handler: H,
158 pub(crate) caller: HandlerCaller<H>,
159}
160
161impl<H> Clone for MakeErasedHandler<H>
162where
163 H: Clone,
164{
165 fn clone(&self) -> Self {
166 Self {
167 handler: self.handler.clone(),
168 caller: self.caller,
169 }
170 }
171}
172
173impl<H> ErasedHandler for MakeErasedHandler<H>
174where
175 H: Clone + Send + Sync + 'static,
176{
177 fn clone_box(&self) -> Box<dyn ErasedHandler> {
178 Box::new(self.clone())
179 }
180
181 fn call(
182 self: Box<Self>,
183 grpc: google_cloud_pubsub::model::Message,
184 env: PubSubEnvelope,
185 app: Arc<App>,
186 ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
187 (self.caller)(self.handler, grpc, env, app)
188 }
189}
190
191pub trait TypedHandlerRegistrar: Send + Sync + 'static {
192 fn install_consumer(&self, jobs: Consumers) -> Consumers;
193}
194
195pub trait TypedConsumer {
196 fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self;
197}
198
199impl TypedConsumer for Consumers {
200 fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self {
201 factory.install_consumer(self)
202 }
203}
204
205inventory::collect!(&'static dyn TypedHandlerRegistrar);
206
207#[macro_export]
208macro_rules! submit_typed_handler {
209 ($ty:ident) => {
210 ::summer_pubsub::handler::submit! {
211 &$ty as &dyn ::summer_pubsub::handler::TypedHandlerRegistrar
212 }
213 };
214}
215
216pub fn auto_consumers() -> Consumers {
217 let mut consumers = Consumers::new();
218 for factory in inventory::iter::<&dyn TypedHandlerRegistrar> {
219 consumers = factory.install_consumer(consumers);
220 }
221 consumers
222}