ora_server/events.rs
1//! Various events within the server, this can be used
2//! for auditing purposes or triggering operations.
3
4use paste::paste;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8/// An event that occurred within the server.
9///
10/// These events encompass a wide range of actions
11/// and can be used for many purposes, such as
12/// auditing, triggering operations, or monitoring.
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
14pub struct AuditEvent {
15 /// The timestamp of the event.
16 pub timestamp: std::time::SystemTime,
17 /// The event that occurred.
18 pub kind: AuditEventKind,
19}
20
21/// The kind of event that occurred.
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub enum AuditEventKind {
24 /// A job was created.
25 JobAdded {
26 /// The ID of the job that was added.
27 job_id: Uuid,
28 /// The type of the job that was added.
29 job_type_id: String,
30 },
31 /// A job was cancelled.
32 JobCancelled {
33 /// The ID of the job that was added.
34 job_id: Uuid,
35 },
36 /// A job was deleted.
37 JobDeleted {
38 /// The ID of the job that was deleted.
39 job_id: Uuid,
40 },
41 /// A new execution was scheduled for a job.
42 ExecutionAdded {
43 /// The ID of the execution.
44 execution_id: Uuid,
45 /// The ID of the job the execution
46 /// is associated with.
47 job_id: Uuid,
48 /// The target execution time.
49 target_execution_time: std::time::SystemTime,
50 },
51 /// An execution is ready to be
52 /// executed.
53 ExecutionReady {
54 /// The ID of the execution.
55 execution_id: Uuid,
56 },
57 /// An execution was assigned to an executor.
58 ExecutionAssigned {
59 /// The ID of the execution.
60 execution_id: Uuid,
61 /// The ID of the executor.
62 executor_id: Uuid,
63 },
64 /// An execution was started by an executor.
65 ExecutionStarted {
66 /// The ID of the execution.
67 execution_id: Uuid,
68 },
69 /// An execution has succeeded.
70 ExecutionSucceeded {
71 /// The ID of the execution.
72 execution_id: Uuid,
73 },
74 /// An execution has failed.
75 ExecutionFailed {
76 /// The ID of the execution.
77 execution_id: Uuid,
78 /// Whether there will be no more
79 /// attempts to execute the job.
80 terminal: bool,
81 },
82 /// An executor has connected.
83 ExecutorConnected {
84 /// The ID of the executor.
85 executor_id: Uuid,
86 },
87 /// An executor has disconnected.
88 ExecutorDisconnected {
89 /// The ID of the executor.
90 executor_id: Uuid,
91 },
92 /// A schedule was created.
93 ScheduleAdded {
94 /// The ID of the schedule.
95 schedule_id: Uuid,
96 },
97 /// A schedule was cancelled.
98 ScheduleCancelled {
99 /// The ID of the schedule.
100 schedule_id: Uuid,
101 },
102 /// A schedule was marked as unschedulable and no more
103 /// jobs will be created from it.
104 ScheduleUnschedulable {
105 /// The ID of the schedule.
106 schedule_id: Uuid,
107 },
108 /// Schedule was deleted.
109 ScheduleDeleted {
110 /// The ID of the schedule.
111 schedule_id: Uuid,
112 },
113 /// A snapshot was exported.
114 SnapshotExported,
115 /// A snapshot was imported.
116 SnapshotImported,
117}
118
119/// Events associated with executions.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub(crate) enum ExecutionEvent {
122 /// New executions are ready.
123 TimedExecutionsReady,
124 /// New executions are ready to run.
125 ExecutionsReadyToRun,
126 /// Executions were added.
127 ExecutionsAdded,
128 /// Executions have started.
129 ExecutionsFinished,
130}
131
132/// Events associated with jobs.
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub(crate) enum JobEvent {
135 /// New jobs were added.
136 JobsCreated,
137}
138
139/// Events associated with schedules.
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub(crate) enum ScheduleEvent {
142 /// New schedules were added.
143 SchedulesAdded,
144}
145
146/// Events associated with executors.
147#[derive(Debug, Clone, Copy, PartialEq, Eq)]
148pub(crate) enum ExecutorEvent {
149 /// An executor has been added.
150 ExecutorReady,
151}
152
153macro_rules! create_bus {
154 ($($event_name:ident),*$(,)?) => {
155 paste! {
156 /// A bus for various events
157 #[derive(Debug, Clone)]
158 pub(crate) struct EventBus {
159 $(
160 [<$event_name:snake s>]: tokio::sync::broadcast::Sender<[<$event_name>]>,
161 )*
162 audit_events: tokio::sync::broadcast::Sender<AuditEvent>,
163
164 }
165
166 impl EventBus {
167 /// Create a new event bus with the given capacity.
168 pub fn new(capacity: usize) -> Self {
169 Self {
170 $(
171 [<$event_name:snake s>]: tokio::sync::broadcast::Sender::new(capacity),
172 )*
173 audit_events: tokio::sync::broadcast::Sender::new(capacity),
174 }
175 }
176
177 $(
178 /// Emit the given event.
179 pub(crate) fn [<emit_ $event_name:snake>](&self, event: [<$event_name>]) {
180 _ = self.[<$event_name:snake s>].send(event);
181 }
182
183 /// Subscribe to the given event.
184 pub(crate) fn [<subscribe_ $event_name:snake s>](&self) -> impl futures::Stream<Item = [<$event_name>]> {
185 use futures::StreamExt;
186
187 tokio_stream::wrappers::BroadcastStream::new(self.[<$event_name:snake s>].subscribe())
188 .filter_map(|event| {
189 if event.is_err() {
190 tracing::warn!("subscription has lagged behind, some events have been dropped");
191 }
192
193 futures::future::ready(event.ok())
194 })
195 }
196 )*
197
198 pub(crate) fn emit_audit_event(&self, create_event: impl FnOnce() -> AuditEventKind) {
199 _ = self.audit_events.send(AuditEvent {
200 timestamp: std::time::SystemTime::now(),
201 kind: create_event(),
202 });
203 }
204
205 pub(crate) fn audit_events_enabled(&self) -> bool {
206 self.audit_events.receiver_count() > 0
207 }
208
209 pub(crate) fn subscribe_audit_events(&self) -> impl futures::Stream<Item = AuditEvent> + Unpin + Send + Sync + 'static {
210 use futures::StreamExt;
211
212 tokio_stream::wrappers::BroadcastStream::new(self.audit_events.subscribe())
213 .filter_map(|event| {
214 if event.is_err() {
215 tracing::warn!("subscription has lagged behind, some events have been dropped");
216 }
217
218 futures::future::ready(event.ok())
219 })
220 }
221 }
222 }
223 };
224}
225
226create_bus!(ExecutionEvent, JobEvent, ExecutorEvent, ScheduleEvent);