Skip to main content

graphile_worker_lifecycle_hooks/
events.rs

1use futures::future::BoxFuture;
2
3use crate::context::{
4    AfterJobRunContext, BeforeJobRunContext, BeforeJobScheduleContext, CronJobScheduledContext,
5    CronTickContext, JobCompleteContext, JobFailContext, JobFetchContext,
6    JobPermanentlyFailContext, JobStartContext, LocalQueueGetJobsCompleteContext,
7    LocalQueueInitContext, LocalQueueRefetchDelayAbortContext,
8    LocalQueueRefetchDelayExpiredContext, LocalQueueRefetchDelayStartContext,
9    LocalQueueReturnJobsContext, LocalQueueSetModeContext, WorkerInitContext,
10    WorkerShutdownContext, WorkerStartContext,
11};
12use crate::event::{Event, HookOutput, Interceptable};
13use crate::result::{HookResult, JobScheduleResult};
14use crate::TypeErasedHooks;
15
16#[doc(hidden)]
17pub trait Emittable: Clone + Send + 'static {
18    #[doc(hidden)]
19    fn emit_to(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, ()>;
20}
21
22macro_rules! define_observer_event {
23    ($event:ident, $context:ty) => {
24        pub struct $event;
25
26        impl Event for $event {
27            type Context = $context;
28            type Output = ();
29
30            fn register_boxed(
31                hooks: &mut TypeErasedHooks,
32                handler: Box<
33                    dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
34                >,
35            ) {
36                hooks.get_handlers_mut::<Self>().push(handler);
37            }
38        }
39
40        impl Emittable for $context {
41            fn emit_to(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, ()> {
42                Box::pin(async move {
43                    if let Some(handlers) = hooks.get_handlers::<$event>() {
44                        let futures: Vec<_> = handlers.iter().map(|h| h(self.clone())).collect();
45                        futures::future::join_all(futures).await;
46                    }
47                })
48            }
49        }
50    };
51}
52
53macro_rules! define_interceptor_event {
54    ($event:ident, $context:ty) => {
55        pub struct $event;
56
57        impl Event for $event {
58            type Context = $context;
59            type Output = HookResult;
60
61            fn register_boxed(
62                hooks: &mut TypeErasedHooks,
63                handler: Box<
64                    dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
65                >,
66            ) {
67                hooks.get_handlers_mut::<Self>().push(handler);
68            }
69        }
70
71        impl Interceptable for $context {
72            type Output = HookResult;
73
74            fn intercept_with(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, Self::Output> {
75                Box::pin(async move {
76                    let Some(handlers) = hooks.get_handlers::<$event>() else {
77                        return HookResult::default();
78                    };
79                    for handler in handlers {
80                        let result = handler(self.clone()).await;
81                        if result.is_terminal() {
82                            return result;
83                        }
84                    }
85                    HookResult::default()
86                })
87            }
88        }
89    };
90
91    ($event:ident, $context:ty, $output:ty, $chain_field:ident) => {
92        pub struct $event;
93
94        impl Event for $event {
95            type Context = $context;
96            type Output = $output;
97
98            fn register_boxed(
99                hooks: &mut TypeErasedHooks,
100                handler: Box<
101                    dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
102                >,
103            ) {
104                hooks.get_handlers_mut::<Self>().push(handler);
105            }
106        }
107
108        impl Interceptable for $context {
109            type Output = $output;
110
111            fn intercept_with(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, Self::Output> {
112                Box::pin(async move {
113                    let Some(handlers) = hooks.get_handlers::<$event>() else {
114                        return <$output as HookOutput>::default_with_value(self.$chain_field);
115                    };
116
117                    let mut current_value = self.$chain_field.clone();
118                    for handler in handlers {
119                        let mut ctx = self.clone();
120                        ctx.$chain_field = current_value;
121                        let result = handler(ctx).await;
122                        if result.is_terminal() {
123                            return result;
124                        }
125                        current_value = result.chain_value().unwrap();
126                    }
127                    <$output as HookOutput>::default_with_value(current_value)
128                })
129            }
130        }
131    };
132}
133
134define_observer_event!(WorkerInit, WorkerInitContext);
135define_observer_event!(WorkerStart, WorkerStartContext);
136define_observer_event!(WorkerShutdown, WorkerShutdownContext);
137define_observer_event!(JobFetch, JobFetchContext);
138define_observer_event!(JobStart, JobStartContext);
139define_observer_event!(JobComplete, JobCompleteContext);
140define_observer_event!(JobFail, JobFailContext);
141define_observer_event!(JobPermanentlyFail, JobPermanentlyFailContext);
142define_observer_event!(CronTick, CronTickContext);
143define_observer_event!(CronJobScheduled, CronJobScheduledContext);
144define_observer_event!(LocalQueueInit, LocalQueueInitContext);
145define_observer_event!(LocalQueueSetMode, LocalQueueSetModeContext);
146define_observer_event!(LocalQueueGetJobsComplete, LocalQueueGetJobsCompleteContext);
147define_observer_event!(LocalQueueReturnJobs, LocalQueueReturnJobsContext);
148define_observer_event!(
149    LocalQueueRefetchDelayStart,
150    LocalQueueRefetchDelayStartContext
151);
152define_observer_event!(
153    LocalQueueRefetchDelayAbort,
154    LocalQueueRefetchDelayAbortContext
155);
156define_observer_event!(
157    LocalQueueRefetchDelayExpired,
158    LocalQueueRefetchDelayExpiredContext
159);
160
161define_interceptor_event!(BeforeJobRun, BeforeJobRunContext);
162define_interceptor_event!(AfterJobRun, AfterJobRunContext);
163define_interceptor_event!(
164    BeforeJobSchedule,
165    BeforeJobScheduleContext,
166    JobScheduleResult,
167    payload
168);