1use std::time::Duration;
7
8use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
20#[non_exhaustive]
21pub enum EventLevel {
22 WorkerHealth = 0,
24 TaskLifecycle = 1,
26 QueueConcurrency = 2,
28 DistributedTracing = 3,
30}
31
32pub trait EventEmitter: Send + Sync {
50 fn on_worker_started(&self, _runner_id: &RunnerId) {}
54
55 fn on_worker_shutdown(&self, _runner_id: &RunnerId) {}
57
58 fn on_task_submitted(&self, _task_id: &TaskId, _inv_id: &InvocationId) {}
62
63 fn on_task_started(&self, _task_id: &TaskId, _inv_id: &InvocationId) {}
65
66 fn on_task_succeeded(&self, _task_id: &TaskId, _inv_id: &InvocationId, _duration: Duration) {}
68
69 fn on_task_failed(
71 &self,
72 _task_id: &TaskId,
73 _inv_id: &InvocationId,
74 _error: &str,
75 _duration: Duration,
76 ) {
77 }
78
79 fn on_task_retried(&self, _task_id: &TaskId, _inv_id: &InvocationId, _attempt: u32) {}
81
82 fn on_queue_depth(&self, _queue: &str, _depth: usize) {}
86
87 fn on_cc_rejected(&self, _task_id: &TaskId) {}
89
90 fn on_cc_slot_acquired(&self, _task_id: &TaskId) {}
92
93 fn on_cc_slot_released(&self, _task_id: &TaskId) {}
95}
96
97pub struct NoopEmitter;
105
106impl EventEmitter for NoopEmitter {}
107
108pub struct CompositeEmitter {
115 sinks: Vec<(EventLevel, Box<dyn EventEmitter>)>,
116}
117
118impl CompositeEmitter {
119 pub fn new() -> Self {
121 Self { sinks: Vec::new() }
122 }
123
124 pub fn add_sink(&mut self, level: EventLevel, sink: impl EventEmitter + 'static) {
126 self.sinks.push((level, Box::new(sink)));
127 }
128
129 fn for_level(&self, event_level: EventLevel, f: impl Fn(&dyn EventEmitter)) {
131 for (max_level, sink) in &self.sinks {
132 if *max_level >= event_level {
133 f(sink.as_ref());
134 }
135 }
136 }
137}
138
139impl Default for CompositeEmitter {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145impl EventEmitter for CompositeEmitter {
146 fn on_worker_started(&self, runner_id: &RunnerId) {
147 self.for_level(EventLevel::WorkerHealth, |s| s.on_worker_started(runner_id));
148 }
149
150 fn on_worker_shutdown(&self, runner_id: &RunnerId) {
151 self.for_level(EventLevel::WorkerHealth, |s| {
152 s.on_worker_shutdown(runner_id)
153 });
154 }
155
156 fn on_task_submitted(&self, task_id: &TaskId, inv_id: &InvocationId) {
157 self.for_level(EventLevel::TaskLifecycle, |s| {
158 s.on_task_submitted(task_id, inv_id);
159 });
160 }
161
162 fn on_task_started(&self, task_id: &TaskId, inv_id: &InvocationId) {
163 self.for_level(EventLevel::TaskLifecycle, |s| {
164 s.on_task_started(task_id, inv_id);
165 });
166 }
167
168 fn on_task_succeeded(&self, task_id: &TaskId, inv_id: &InvocationId, duration: Duration) {
169 self.for_level(EventLevel::TaskLifecycle, |s| {
170 s.on_task_succeeded(task_id, inv_id, duration);
171 });
172 }
173
174 fn on_task_failed(
175 &self,
176 task_id: &TaskId,
177 inv_id: &InvocationId,
178 error: &str,
179 duration: Duration,
180 ) {
181 self.for_level(EventLevel::TaskLifecycle, |s| {
182 s.on_task_failed(task_id, inv_id, error, duration);
183 });
184 }
185
186 fn on_task_retried(&self, task_id: &TaskId, inv_id: &InvocationId, attempt: u32) {
187 self.for_level(EventLevel::TaskLifecycle, |s| {
188 s.on_task_retried(task_id, inv_id, attempt);
189 });
190 }
191
192 fn on_queue_depth(&self, queue: &str, depth: usize) {
193 self.for_level(EventLevel::QueueConcurrency, |s| {
194 s.on_queue_depth(queue, depth);
195 });
196 }
197
198 fn on_cc_rejected(&self, task_id: &TaskId) {
199 self.for_level(EventLevel::QueueConcurrency, |s| {
200 s.on_cc_rejected(task_id);
201 });
202 }
203
204 fn on_cc_slot_acquired(&self, task_id: &TaskId) {
205 self.for_level(EventLevel::QueueConcurrency, |s| {
206 s.on_cc_slot_acquired(task_id);
207 });
208 }
209
210 fn on_cc_slot_released(&self, task_id: &TaskId) {
211 self.for_level(EventLevel::QueueConcurrency, |s| {
212 s.on_cc_slot_released(task_id);
213 });
214 }
215}
216
217#[derive(Debug, Clone)]
223pub struct WorkerState {
224 pub runner_id: RunnerId,
225 pub current_invocation: Option<InvocationId>,
226 pub current_task: Option<TaskId>,
227 pub started_at: Option<std::time::Instant>,
228 pub last_result: Option<LastResult>,
229 pub invocations_completed: u64,
230}
231
232#[derive(Debug, Clone)]
234#[non_exhaustive]
235pub enum LastResult {
236 Success { task_id: TaskId, duration: Duration },
237 Failed { task_id: TaskId, error: String },
238}
239
240impl WorkerState {
241 pub fn new(runner_id: RunnerId) -> Self {
243 Self {
244 runner_id,
245 current_invocation: None,
246 current_task: None,
247 started_at: None,
248 last_result: None,
249 invocations_completed: 0,
250 }
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::sync::atomic::{AtomicU32, Ordering};
258 use std::sync::Arc;
259
260 struct CountingSink {
261 started: Arc<AtomicU32>,
262 submitted: Arc<AtomicU32>,
263 }
264
265 impl CountingSink {
266 fn new() -> Self {
267 Self {
268 started: Arc::new(AtomicU32::new(0)),
269 submitted: Arc::new(AtomicU32::new(0)),
270 }
271 }
272 }
273
274 impl EventEmitter for CountingSink {
275 fn on_worker_started(&self, _: &RunnerId) {
276 self.started.fetch_add(1, Ordering::Relaxed);
277 }
278 fn on_task_submitted(&self, _: &TaskId, _: &InvocationId) {
279 self.submitted.fetch_add(1, Ordering::Relaxed);
280 }
281 }
282
283 #[test]
284 fn noop_emitter_compiles() {
285 let emitter = NoopEmitter;
286 let rid = RunnerId::new();
287 emitter.on_worker_started(&rid);
288 emitter.on_worker_shutdown(&rid);
289 }
290
291 #[test]
292 fn composite_filters_by_level() {
293 let mut composite = CompositeEmitter::new();
294
295 let health_sink = CountingSink::new();
297 let health_started = Arc::clone(&health_sink.started);
298 let health_submitted = Arc::clone(&health_sink.submitted);
299 composite.add_sink(EventLevel::WorkerHealth, health_sink);
300
301 let task_sink = CountingSink::new();
303 let task_started = Arc::clone(&task_sink.started);
304 let task_submitted = Arc::clone(&task_sink.submitted);
305 composite.add_sink(EventLevel::TaskLifecycle, task_sink);
306
307 let rid = RunnerId::new();
308 let tid = TaskId::new("mod", "task");
309 let iid = InvocationId::new();
310
311 composite.on_worker_started(&rid);
312 composite.on_task_submitted(&tid, &iid);
313
314 assert_eq!(health_started.load(Ordering::Relaxed), 1);
316 assert_eq!(task_started.load(Ordering::Relaxed), 1);
317
318 assert_eq!(health_submitted.load(Ordering::Relaxed), 0);
320 assert_eq!(task_submitted.load(Ordering::Relaxed), 1);
321 }
322
323 #[test]
324 fn worker_state_new() {
325 let rid = RunnerId::new();
326 let ws = WorkerState::new(rid.clone());
327 assert_eq!(ws.runner_id, rid);
328 assert!(ws.current_invocation.is_none());
329 assert_eq!(ws.invocations_completed, 0);
330 }
331}