graphile_worker_lifecycle_hooks/
events.rs1use futures::future::BoxFuture;
2
3use crate::context::{
4 AfterJobRunContext, BeforeJobRunContext, BeforeJobScheduleContext, CronJobScheduledContext,
5 CronTickContext, JobCompleteContext, JobFailContext, JobFetchContext,
6 JobPermanentlyFailContext, JobStartContext, LocalQueueGetJobsCompleteContext,
7 LocalQueueInitContext, LocalQueueRefetchDelayAbortContext,
8 LocalQueueRefetchDelayExpiredContext, LocalQueueRefetchDelayStartContext,
9 LocalQueueReturnJobsContext, LocalQueueSetModeContext, WorkerInitContext,
10 WorkerShutdownContext, WorkerStartContext,
11};
12use crate::event::{Event, HookOutput, Interceptable};
13use crate::result::{HookResult, JobScheduleResult};
14use crate::TypeErasedHooks;
15
16#[doc(hidden)]
17pub trait Emittable: Clone + Send + 'static {
18 #[doc(hidden)]
19 fn emit_to(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, ()>;
20}
21
22macro_rules! define_observer_event {
23 ($event:ident, $context:ty) => {
24 pub struct $event;
25
26 impl Event for $event {
27 type Context = $context;
28 type Output = ();
29
30 fn register_boxed(
31 hooks: &mut TypeErasedHooks,
32 handler: Box<
33 dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
34 >,
35 ) {
36 hooks.get_handlers_mut::<Self>().push(handler);
37 }
38 }
39
40 impl Emittable for $context {
41 fn emit_to(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, ()> {
42 Box::pin(async move {
43 if let Some(handlers) = hooks.get_handlers::<$event>() {
44 let futures: Vec<_> = handlers.iter().map(|h| h(self.clone())).collect();
45 futures::future::join_all(futures).await;
46 }
47 })
48 }
49 }
50 };
51}
52
53macro_rules! define_interceptor_event {
54 ($event:ident, $context:ty) => {
55 pub struct $event;
56
57 impl Event for $event {
58 type Context = $context;
59 type Output = HookResult;
60
61 fn register_boxed(
62 hooks: &mut TypeErasedHooks,
63 handler: Box<
64 dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
65 >,
66 ) {
67 hooks.get_handlers_mut::<Self>().push(handler);
68 }
69 }
70
71 impl Interceptable for $context {
72 type Output = HookResult;
73
74 fn intercept_with(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, Self::Output> {
75 Box::pin(async move {
76 let Some(handlers) = hooks.get_handlers::<$event>() else {
77 return HookResult::default();
78 };
79 for handler in handlers {
80 let result = handler(self.clone()).await;
81 if result.is_terminal() {
82 return result;
83 }
84 }
85 HookResult::default()
86 })
87 }
88 }
89 };
90
91 ($event:ident, $context:ty, $output:ty, $chain_field:ident) => {
92 pub struct $event;
93
94 impl Event for $event {
95 type Context = $context;
96 type Output = $output;
97
98 fn register_boxed(
99 hooks: &mut TypeErasedHooks,
100 handler: Box<
101 dyn Fn(Self::Context) -> BoxFuture<'static, Self::Output> + Send + Sync,
102 >,
103 ) {
104 hooks.get_handlers_mut::<Self>().push(handler);
105 }
106 }
107
108 impl Interceptable for $context {
109 type Output = $output;
110
111 fn intercept_with(self, hooks: &TypeErasedHooks) -> BoxFuture<'_, Self::Output> {
112 Box::pin(async move {
113 let Some(handlers) = hooks.get_handlers::<$event>() else {
114 return <$output as HookOutput>::default_with_value(self.$chain_field);
115 };
116
117 let mut current_value = self.$chain_field.clone();
118 for handler in handlers {
119 let mut ctx = self.clone();
120 ctx.$chain_field = current_value;
121 let result = handler(ctx).await;
122 if result.is_terminal() {
123 return result;
124 }
125 current_value = result.chain_value().unwrap();
126 }
127 <$output as HookOutput>::default_with_value(current_value)
128 })
129 }
130 }
131 };
132}
133
134define_observer_event!(WorkerInit, WorkerInitContext);
135define_observer_event!(WorkerStart, WorkerStartContext);
136define_observer_event!(WorkerShutdown, WorkerShutdownContext);
137define_observer_event!(JobFetch, JobFetchContext);
138define_observer_event!(JobStart, JobStartContext);
139define_observer_event!(JobComplete, JobCompleteContext);
140define_observer_event!(JobFail, JobFailContext);
141define_observer_event!(JobPermanentlyFail, JobPermanentlyFailContext);
142define_observer_event!(CronTick, CronTickContext);
143define_observer_event!(CronJobScheduled, CronJobScheduledContext);
144define_observer_event!(LocalQueueInit, LocalQueueInitContext);
145define_observer_event!(LocalQueueSetMode, LocalQueueSetModeContext);
146define_observer_event!(LocalQueueGetJobsComplete, LocalQueueGetJobsCompleteContext);
147define_observer_event!(LocalQueueReturnJobs, LocalQueueReturnJobsContext);
148define_observer_event!(
149 LocalQueueRefetchDelayStart,
150 LocalQueueRefetchDelayStartContext
151);
152define_observer_event!(
153 LocalQueueRefetchDelayAbort,
154 LocalQueueRefetchDelayAbortContext
155);
156define_observer_event!(
157 LocalQueueRefetchDelayExpired,
158 LocalQueueRefetchDelayExpiredContext
159);
160
161define_interceptor_event!(BeforeJobRun, BeforeJobRunContext);
162define_interceptor_event!(AfterJobRun, AfterJobRunContext);
163define_interceptor_event!(
164 BeforeJobSchedule,
165 BeforeJobScheduleContext,
166 JobScheduleResult,
167 payload
168);