ora_server/
events.rs

1//! Various events within the server, this can be used
2//! for auditing purposes or triggering operations.
3
4use paste::paste;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8/// An event that occurred within the server.
9///
10/// These events encompass a wide range of actions
11/// and can be used for many purposes, such as
12/// auditing, triggering operations, or monitoring.
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct AuditEvent {
15    /// The timestamp of the event.
16    pub timestamp: std::time::SystemTime,
17    /// The event that occurred.
18    pub kind: AuditEventKind,
19}
20
21/// The kind of event that occurred.
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub enum AuditEventKind {
24    /// A job was created.
25    JobAdded {
26        /// The ID of the job that was added.
27        job_id: Uuid,
28        /// The type of the job that was added.
29        job_type_id: String,
30    },
31    /// A job was cancelled.
32    JobCancelled {
33        /// The ID of the job that was added.
34        job_id: Uuid,
35    },
36    /// A job was deleted.
37    JobDeleted {
38        /// The ID of the job that was deleted.
39        job_id: Uuid,
40    },
41    /// A new execution was scheduled for a job.
42    ExecutionAdded {
43        /// The ID of the execution.
44        execution_id: Uuid,
45        /// The ID of the job the execution
46        /// is associated with.
47        job_id: Uuid,
48        /// The target execution time.
49        target_execution_time: std::time::SystemTime,
50    },
51    /// An execution is ready to be
52    /// executed.
53    ExecutionReady {
54        /// The ID of the execution.
55        execution_id: Uuid,
56    },
57    /// An execution was assigned to an executor.
58    ExecutionAssigned {
59        /// The ID of the execution.
60        execution_id: Uuid,
61        /// The ID of the executor.
62        executor_id: Uuid,
63    },
64    /// An execution was started by an executor.
65    ExecutionStarted {
66        /// The ID of the execution.
67        execution_id: Uuid,
68    },
69    /// An execution has succeeded.
70    ExecutionSucceeded {
71        /// The ID of the execution.
72        execution_id: Uuid,
73    },
74    /// An execution has failed.
75    ExecutionFailed {
76        /// The ID of the execution.
77        execution_id: Uuid,
78        /// Whether there will be no more
79        /// attempts to execute the job.
80        terminal: bool,
81    },
82    /// An executor has connected.
83    ExecutorConnected {
84        /// The ID of the executor.
85        executor_id: Uuid,
86    },
87    /// An executor has disconnected.
88    ExecutorDisconnected {
89        /// The ID of the executor.
90        executor_id: Uuid,
91    },
92    /// A schedule was created.
93    ScheduleAdded {
94        /// The ID of the schedule.
95        schedule_id: Uuid,
96    },
97    /// A schedule was cancelled.
98    ScheduleCancelled {
99        /// The ID of the schedule.
100        schedule_id: Uuid,
101    },
102    /// A schedule was marked as unschedulable and no more
103    /// jobs will be created from it.
104    ScheduleUnschedulable {
105        /// The ID of the schedule.
106        schedule_id: Uuid,
107    },
108    /// Schedule was deleted.
109    ScheduleDeleted {
110        /// The ID of the schedule.
111        schedule_id: Uuid,
112    },
113    /// A snapshot was exported.
114    SnapshotExported,
115    /// A snapshot was imported.
116    SnapshotImported,
117}
118
119/// Events associated with executions.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub(crate) enum ExecutionEvent {
122    /// New executions are ready.
123    TimedExecutionsReady,
124    /// New executions are ready to run.
125    ExecutionsReadyToRun,
126    /// Executions were added.
127    ExecutionsAdded,
128    /// Executions have started.
129    ExecutionsFinished,
130}
131
132/// Events associated with jobs.
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub(crate) enum JobEvent {
135    /// New jobs were added.
136    JobsCreated,
137}
138
139/// Events associated with schedules.
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub(crate) enum ScheduleEvent {
142    /// New schedules were added.
143    SchedulesAdded,
144}
145
146/// Events associated with executors.
147#[derive(Debug, Clone, Copy, PartialEq, Eq)]
148pub(crate) enum ExecutorEvent {
149    /// An executor has been added.
150    ExecutorReady,
151}
152
153macro_rules! create_bus {
154    ($($event_name:ident),*$(,)?) => {
155        paste! {
156            /// A bus for various events
157            #[derive(Debug, Clone)]
158            pub(crate) struct EventBus {
159                $(
160                    [<$event_name:snake s>]: tokio::sync::broadcast::Sender<[<$event_name>]>,
161                )*
162                audit_events: tokio::sync::broadcast::Sender<AuditEvent>,
163
164            }
165
166            impl EventBus {
167                /// Create a new event bus with the given capacity.
168                pub fn new(capacity: usize) -> Self {
169                    Self {
170                        $(
171                            [<$event_name:snake s>]: tokio::sync::broadcast::Sender::new(capacity),
172                        )*
173                        audit_events: tokio::sync::broadcast::Sender::new(capacity),
174                    }
175                }
176
177                $(
178                    /// Emit the given event.
179                    pub(crate) fn [<emit_ $event_name:snake>](&self, event: [<$event_name>]) {
180                        _ = self.[<$event_name:snake s>].send(event);
181                    }
182
183                    /// Subscribe to the given event.
184                    pub(crate) fn [<subscribe_ $event_name:snake s>](&self) -> impl futures::Stream<Item = [<$event_name>]> {
185                        use futures::StreamExt;
186
187                        tokio_stream::wrappers::BroadcastStream::new(self.[<$event_name:snake s>].subscribe())
188                            .filter_map(|event| {
189                                if event.is_err() {
190                                    tracing::warn!("subscription has lagged behind, some events have been dropped");
191                                }
192
193                                futures::future::ready(event.ok())
194                            })
195                    }
196                )*
197
198                pub(crate) fn emit_audit_event(&self, create_event: impl FnOnce() -> AuditEventKind) {
199                    _ = self.audit_events.send(AuditEvent {
200                        timestamp: std::time::SystemTime::now(),
201                        kind: create_event(),
202                    });
203                }
204
205                pub(crate) fn audit_events_enabled(&self) -> bool {
206                    self.audit_events.receiver_count() > 0
207                }
208
209                pub(crate) fn subscribe_audit_events(&self) -> impl futures::Stream<Item = AuditEvent> + Unpin + Send + Sync + 'static {
210                    use futures::StreamExt;
211
212                    tokio_stream::wrappers::BroadcastStream::new(self.audit_events.subscribe())
213                        .filter_map(|event| {
214                            if event.is_err() {
215                                tracing::warn!("subscription has lagged behind, some events have been dropped");
216                            }
217
218                            futures::future::ready(event.ok())
219                        })
220                }
221            }
222        }
223    };
224}
225
226create_bus!(ExecutionEvent, JobEvent, ExecutorEvent, ScheduleEvent);