Skip to main content

summer_pubsub/
handler.rs

1pub use inventory::submit;
2
3use crate::consumer::Consumers;
4use crate::extractor::FromPubSubMsg;
5use google_cloud_pubsub::subscriber::handler::Handler;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::{Arc, Mutex};
9use summer::app::App;
10
11/// Internal type surfaced only to the `FromPubSubMsg` machinery; do not rely on it in application code.
12#[doc(hidden)]
13pub struct PubSubEnvelope {
14    pub(crate) ack: Arc<Mutex<Option<Handler>>>,
15}
16
17impl PubSubEnvelope {
18    pub(crate) fn new(handler: Handler) -> Self {
19        Self {
20            ack: Arc::new(Mutex::new(Some(handler))),
21        }
22    }
23}
24
25impl Drop for PubSubEnvelope {
26    fn drop(&mut self) {
27        if let Ok(mut slot) = self.ack.lock() {
28            if let Some(h) = slot.take() {
29                h.ack();
30            }
31        }
32    }
33}
34
35pub trait HandlerArgs<T>: Clone + Send + Sized + 'static {
36    type Future: Future<Output = ()> + Send + 'static;
37
38    fn call(
39        self,
40        grpc: google_cloud_pubsub::model::Message,
41        env: PubSubEnvelope,
42        app: Arc<App>,
43    ) -> Self::Future;
44}
45
46impl<F, Fut> HandlerArgs<()> for F
47where
48    F: FnOnce() -> Fut + Clone + Send + 'static,
49    Fut: Future<Output = ()> + Send + 'static,
50{
51    type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
52
53    fn call(
54        self,
55        _grpc: google_cloud_pubsub::model::Message,
56        _env: PubSubEnvelope,
57        _app: Arc<App>,
58    ) -> Self::Future {
59        Box::pin(self())
60    }
61}
62
63macro_rules! all_the_tuples {
64    ($name:ident) => {
65        $name!([T1]);
66        $name!([T1, T2]);
67        $name!([T1, T2, T3]);
68        $name!([T1, T2, T3, T4]);
69        $name!([T1, T2, T3, T4, T5]);
70        $name!([T1, T2, T3, T4, T5, T6]);
71        $name!([T1, T2, T3, T4, T5, T6, T7]);
72        $name!([T1, T2, T3, T4, T5, T6, T7, T8]);
73        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9]);
74        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10]);
75        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11]);
76        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12]);
77        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13]);
78        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14]);
79        $name!([T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15]);
80    };
81}
82
83macro_rules! impl_handler_args {
84    (
85        [$($ty:ident),*]
86    ) => {
87        #[allow(non_snake_case, unused_mut)]
88        impl<F, Fut, $($ty,)*> HandlerArgs<($($ty,)*)> for F
89        where
90            F: FnOnce($($ty,)*) -> Fut + Clone + Send + 'static,
91            Fut: Future<Output = ()> + Send + 'static,
92            $( $ty: FromPubSubMsg + Send, )*
93        {
94            type Future = Pin<Box<dyn Future<Output = ()> + Send>>;
95
96            fn call(self, grpc: google_cloud_pubsub::model::Message, env: PubSubEnvelope, app: Arc<App>) -> Self::Future {
97                $(
98                    let $ty = $ty::from_pubsub(&grpc, &env, &app);
99                )*
100                Box::pin(self($($ty,)*))
101            }
102        }
103    };
104}
105
106all_the_tuples!(impl_handler_args);
107
108pub(crate) struct BoxedHandler(std::sync::Mutex<Box<dyn ErasedHandler>>);
109
110impl Clone for BoxedHandler {
111    fn clone(&self) -> Self {
112        Self(std::sync::Mutex::new(self.0.lock().unwrap().clone_box()))
113    }
114}
115
116impl BoxedHandler {
117    pub(crate) fn from_handler<H, T>(handler: H) -> Self
118    where
119        H: HandlerArgs<T> + Sync,
120        T: 'static,
121    {
122        Self(std::sync::Mutex::new(Box::new(MakeErasedHandler {
123            handler,
124            caller: |handler, grpc, env, app| Box::pin(H::call(handler, grpc, env, app)),
125        })))
126    }
127
128    pub(crate) fn call(
129        self,
130        grpc: google_cloud_pubsub::model::Message,
131        env: PubSubEnvelope,
132        app: Arc<App>,
133    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
134        self.0.into_inner().unwrap().call(grpc, env, app)
135    }
136}
137
138pub(crate) trait ErasedHandler: Send {
139    fn clone_box(&self) -> Box<dyn ErasedHandler>;
140
141    fn call(
142        self: Box<Self>,
143        grpc: google_cloud_pubsub::model::Message,
144        env: PubSubEnvelope,
145        app: Arc<App>,
146    ) -> Pin<Box<dyn Future<Output = ()> + Send>>;
147}
148
149type HandlerCaller<H> = fn(
150    H,
151    google_cloud_pubsub::model::Message,
152    PubSubEnvelope,
153    Arc<App>,
154) -> Pin<Box<dyn Future<Output = ()> + Send>>;
155
156pub(crate) struct MakeErasedHandler<H> {
157    pub(crate) handler: H,
158    pub(crate) caller: HandlerCaller<H>,
159}
160
161impl<H> Clone for MakeErasedHandler<H>
162where
163    H: Clone,
164{
165    fn clone(&self) -> Self {
166        Self {
167            handler: self.handler.clone(),
168            caller: self.caller,
169        }
170    }
171}
172
173impl<H> ErasedHandler for MakeErasedHandler<H>
174where
175    H: Clone + Send + Sync + 'static,
176{
177    fn clone_box(&self) -> Box<dyn ErasedHandler> {
178        Box::new(self.clone())
179    }
180
181    fn call(
182        self: Box<Self>,
183        grpc: google_cloud_pubsub::model::Message,
184        env: PubSubEnvelope,
185        app: Arc<App>,
186    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
187        (self.caller)(self.handler, grpc, env, app)
188    }
189}
190
191pub trait TypedHandlerRegistrar: Send + Sync + 'static {
192    fn install_consumer(&self, jobs: Consumers) -> Consumers;
193}
194
195pub trait TypedConsumer {
196    fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self;
197}
198
199impl TypedConsumer for Consumers {
200    fn typed_consumer<F: TypedHandlerRegistrar>(self, factory: F) -> Self {
201        factory.install_consumer(self)
202    }
203}
204
205inventory::collect!(&'static dyn TypedHandlerRegistrar);
206
207#[macro_export]
208macro_rules! submit_typed_handler {
209    ($ty:ident) => {
210        ::summer_pubsub::handler::submit! {
211            &$ty as &dyn ::summer_pubsub::handler::TypedHandlerRegistrar
212        }
213    };
214}
215
216pub fn auto_consumers() -> Consumers {
217    let mut consumers = Consumers::new();
218    for factory in inventory::iter::<&dyn TypedHandlerRegistrar> {
219        consumers = factory.install_consumer(consumers);
220    }
221    consumers
222}