Skip to main content

graphile_worker_lifecycle_hooks/events/
observer.rs

1use futures::future::BoxFuture;
2
3use crate::context::{
4    CronJobScheduledContext, CronTickContext, JobCompleteContext, JobFailContext, JobFetchContext,
5    JobInterruptedContext, JobPermanentlyFailContext, JobStartContext,
6    LocalQueueGetJobsCompleteContext, LocalQueueInitContext, LocalQueueRefetchDelayAbortContext,
7    LocalQueueRefetchDelayExpiredContext, LocalQueueRefetchDelayStartContext,
8    LocalQueueReturnJobsContext, LocalQueueSetModeContext, WorkerInitContext,
9    WorkerRecoveredContext, WorkerShutdownContext, WorkerStartContext,
10};
11use crate::event::Event;
12use crate::events::Emittable;
13use crate::TypeErasedHooks;
14
15macro_rules! define_observer_event {
16    ($event:ident, $context:ty) => {
17        pub struct $event;
18
19        impl Event for $event {
20            type Context = $context;
21            type Output = ();
22
23            fn register_boxed(
24                hooks: &mut TypeErasedHooks,
25                handler: Box<
26                    dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
27                >,
28            ) {
29                hooks.get_handlers_mut::<Self>().push(handler);
30            }
31        }
32
33        impl Emittable for $context {
34            fn emit_to(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, ()> {
35                Box::pin(async move {
36                    if let Some(handlers) = hooks.get_handlers::<$event>() {
37                        let futures: Vec<_> = handlers.iter().map(|h| h(self.clone())).collect();
38                        futures::future::join_all(futures).await;
39                    }
40                })
41            }
42        }
43    };
44}
45
46define_observer_event!(WorkerInit, WorkerInitContext);
47define_observer_event!(WorkerStart, WorkerStartContext);
48define_observer_event!(WorkerShutdown, WorkerShutdownContext);
49define_observer_event!(JobFetch, JobFetchContext);
50define_observer_event!(JobStart, JobStartContext);
51define_observer_event!(JobComplete, JobCompleteContext);
52define_observer_event!(JobFail, JobFailContext);
53define_observer_event!(JobPermanentlyFail, JobPermanentlyFailContext);
54define_observer_event!(JobInterrupted, JobInterruptedContext);
55define_observer_event!(WorkerRecovered, WorkerRecoveredContext);
56define_observer_event!(CronTick, CronTickContext);
57define_observer_event!(CronJobScheduled, CronJobScheduledContext);
58define_observer_event!(LocalQueueInit, LocalQueueInitContext);
59define_observer_event!(LocalQueueSetMode, LocalQueueSetModeContext);
60define_observer_event!(LocalQueueGetJobsComplete, LocalQueueGetJobsCompleteContext);
61define_observer_event!(LocalQueueReturnJobs, LocalQueueReturnJobsContext);
62define_observer_event!(
63    LocalQueueRefetchDelayStart,
64    LocalQueueRefetchDelayStartContext
65);
66define_observer_event!(
67    LocalQueueRefetchDelayAbort,
68    LocalQueueRefetchDelayAbortContext
69);
70define_observer_event!(
71    LocalQueueRefetchDelayExpired,
72    LocalQueueRefetchDelayExpiredContext
73);