Skip to main content

rustvello_core/
observability.rs

1//! Observability infrastructure for the Rustvello distributed task system.
2//!
3//! Defines the [`EventEmitter`] trait, [`CompositeEmitter`], and
4//! [`EventLevel`] for configurable event propagation to monitoring backends.
5
6use std::time::Duration;
7
8use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
9
10// ---------------------------------------------------------------------------
11// EventLevel — granularity of observable events
12// ---------------------------------------------------------------------------
13
14/// Granularity level for observable events.
15///
16/// Events at a lower level are implicitly included when a higher level is
17/// enabled. For example, a sink configured at `TaskLifecycle` receives both
18/// Level 0 (Worker Health) and Level 1 (Task Lifecycle) events.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
20#[non_exhaustive]
21pub enum EventLevel {
22    /// Level 0: Worker start/stop/heartbeat events.
23    WorkerHealth = 0,
24    /// Level 1: Per-invocation lifecycle events (submitted, started, succeeded, failed).
25    TaskLifecycle = 1,
26    /// Level 2: Queue depth, enqueue/dequeue, concurrency control metrics.
27    QueueConcurrency = 2,
28    /// Level 3: Full distributed tracing with context propagation.
29    DistributedTracing = 3,
30}
31
32// ---------------------------------------------------------------------------
33// EventEmitter trait — the central abstraction
34// ---------------------------------------------------------------------------
35
36/// Observable events emitted during task lifecycle.
37///
38/// Each method corresponds to one event in the event taxonomy.
39/// The default implementation is a no-op, allowing the compiler to
40/// inline and eliminate calls when no monitoring feature is enabled.
41///
42/// # Levels
43///
44/// | Method | Level |
45/// |---|---|
46/// | `on_worker_started`, `on_worker_shutdown` | 0 (WorkerHealth) |
47/// | `on_task_submitted` .. `on_task_retried` | 1 (TaskLifecycle) |
48/// | `on_queue_depth`, `on_cc_rejected` | 2 (QueueConcurrency) |
49pub trait EventEmitter: Send + Sync {
50    // -- Level 0: Worker Health --
51
52    /// A runner process has started.
53    fn on_worker_started(&self, _runner_id: &RunnerId) {}
54
55    /// A runner process is shutting down.
56    fn on_worker_shutdown(&self, _runner_id: &RunnerId) {}
57
58    // -- Level 1: Task Lifecycle --
59
60    /// A task invocation was submitted to the system.
61    fn on_task_submitted(&self, _task_id: &TaskId, _inv_id: &InvocationId) {}
62
63    /// A runner began executing a task invocation.
64    fn on_task_started(&self, _task_id: &TaskId, _inv_id: &InvocationId) {}
65
66    /// A task invocation completed successfully.
67    fn on_task_succeeded(&self, _task_id: &TaskId, _inv_id: &InvocationId, _duration: Duration) {}
68
69    /// A task invocation failed (final failure, retries exhausted).
70    fn on_task_failed(
71        &self,
72        _task_id: &TaskId,
73        _inv_id: &InvocationId,
74        _error: &str,
75        _duration: Duration,
76    ) {
77    }
78
79    /// A task invocation is being retried.
80    fn on_task_retried(&self, _task_id: &TaskId, _inv_id: &InvocationId, _attempt: u32) {}
81
82    // -- Level 2: Queue & Concurrency --
83
84    /// Current queue depth snapshot.
85    fn on_queue_depth(&self, _queue: &str, _depth: usize) {}
86
87    /// Concurrency control rejected an invocation registration.
88    fn on_cc_rejected(&self, _task_id: &TaskId) {}
89
90    /// A concurrency slot was successfully acquired.
91    fn on_cc_slot_acquired(&self, _task_id: &TaskId) {}
92
93    /// A concurrency slot was released (invocation reached terminal state).
94    fn on_cc_slot_released(&self, _task_id: &TaskId) {}
95}
96
97// ---------------------------------------------------------------------------
98// NoopEmitter — zero-cost default
99// ---------------------------------------------------------------------------
100
101/// A no-op event emitter that discards all events.
102///
103/// Used as the default when no monitoring feature is enabled.
104pub struct NoopEmitter;
105
106impl EventEmitter for NoopEmitter {}
107
108// ---------------------------------------------------------------------------
109// CompositeEmitter — fan-out to multiple sinks
110// ---------------------------------------------------------------------------
111
112/// Fans out events to multiple sinks, each with an associated [`EventLevel`]
113/// filter. A sink only receives events at or below its configured level.
114pub struct CompositeEmitter {
115    sinks: Vec<(EventLevel, Box<dyn EventEmitter>)>,
116}
117
118impl CompositeEmitter {
119    /// Create a new composite emitter with no sinks.
120    pub fn new() -> Self {
121        Self { sinks: Vec::new() }
122    }
123
124    /// Add a sink that receives events up to (and including) the given level.
125    pub fn add_sink(&mut self, level: EventLevel, sink: impl EventEmitter + 'static) {
126        self.sinks.push((level, Box::new(sink)));
127    }
128
129    /// Helper: iterate sinks that accept the given event level.
130    fn for_level(&self, event_level: EventLevel, f: impl Fn(&dyn EventEmitter)) {
131        for (max_level, sink) in &self.sinks {
132            if *max_level >= event_level {
133                f(sink.as_ref());
134            }
135        }
136    }
137}
138
139impl Default for CompositeEmitter {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145impl EventEmitter for CompositeEmitter {
146    fn on_worker_started(&self, runner_id: &RunnerId) {
147        self.for_level(EventLevel::WorkerHealth, |s| s.on_worker_started(runner_id));
148    }
149
150    fn on_worker_shutdown(&self, runner_id: &RunnerId) {
151        self.for_level(EventLevel::WorkerHealth, |s| {
152            s.on_worker_shutdown(runner_id)
153        });
154    }
155
156    fn on_task_submitted(&self, task_id: &TaskId, inv_id: &InvocationId) {
157        self.for_level(EventLevel::TaskLifecycle, |s| {
158            s.on_task_submitted(task_id, inv_id);
159        });
160    }
161
162    fn on_task_started(&self, task_id: &TaskId, inv_id: &InvocationId) {
163        self.for_level(EventLevel::TaskLifecycle, |s| {
164            s.on_task_started(task_id, inv_id);
165        });
166    }
167
168    fn on_task_succeeded(&self, task_id: &TaskId, inv_id: &InvocationId, duration: Duration) {
169        self.for_level(EventLevel::TaskLifecycle, |s| {
170            s.on_task_succeeded(task_id, inv_id, duration);
171        });
172    }
173
174    fn on_task_failed(
175        &self,
176        task_id: &TaskId,
177        inv_id: &InvocationId,
178        error: &str,
179        duration: Duration,
180    ) {
181        self.for_level(EventLevel::TaskLifecycle, |s| {
182            s.on_task_failed(task_id, inv_id, error, duration);
183        });
184    }
185
186    fn on_task_retried(&self, task_id: &TaskId, inv_id: &InvocationId, attempt: u32) {
187        self.for_level(EventLevel::TaskLifecycle, |s| {
188            s.on_task_retried(task_id, inv_id, attempt);
189        });
190    }
191
192    fn on_queue_depth(&self, queue: &str, depth: usize) {
193        self.for_level(EventLevel::QueueConcurrency, |s| {
194            s.on_queue_depth(queue, depth);
195        });
196    }
197
198    fn on_cc_rejected(&self, task_id: &TaskId) {
199        self.for_level(EventLevel::QueueConcurrency, |s| {
200            s.on_cc_rejected(task_id);
201        });
202    }
203
204    fn on_cc_slot_acquired(&self, task_id: &TaskId) {
205        self.for_level(EventLevel::QueueConcurrency, |s| {
206            s.on_cc_slot_acquired(task_id);
207        });
208    }
209
210    fn on_cc_slot_released(&self, task_id: &TaskId) {
211        self.for_level(EventLevel::QueueConcurrency, |s| {
212            s.on_cc_slot_released(task_id);
213        });
214    }
215}
216
217// ---------------------------------------------------------------------------
218// WorkerState — lightweight in-memory registry
219// ---------------------------------------------------------------------------
220
221/// Tracks what a worker is currently doing (Phase 3C).
222#[derive(Debug, Clone)]
223pub struct WorkerState {
224    pub runner_id: RunnerId,
225    pub current_invocation: Option<InvocationId>,
226    pub current_task: Option<TaskId>,
227    pub started_at: Option<std::time::Instant>,
228    pub last_result: Option<LastResult>,
229    pub invocations_completed: u64,
230}
231
232/// The outcome of the most recently completed invocation.
233#[derive(Debug, Clone)]
234#[non_exhaustive]
235pub enum LastResult {
236    Success { task_id: TaskId, duration: Duration },
237    Failed { task_id: TaskId, error: String },
238}
239
240impl WorkerState {
241    /// Create a new idle worker state.
242    pub fn new(runner_id: RunnerId) -> Self {
243        Self {
244            runner_id,
245            current_invocation: None,
246            current_task: None,
247            started_at: None,
248            last_result: None,
249            invocations_completed: 0,
250        }
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::sync::atomic::{AtomicU32, Ordering};
258    use std::sync::Arc;
259
260    struct CountingSink {
261        started: Arc<AtomicU32>,
262        submitted: Arc<AtomicU32>,
263    }
264
265    impl CountingSink {
266        fn new() -> Self {
267            Self {
268                started: Arc::new(AtomicU32::new(0)),
269                submitted: Arc::new(AtomicU32::new(0)),
270            }
271        }
272    }
273
274    impl EventEmitter for CountingSink {
275        fn on_worker_started(&self, _: &RunnerId) {
276            self.started.fetch_add(1, Ordering::Relaxed);
277        }
278        fn on_task_submitted(&self, _: &TaskId, _: &InvocationId) {
279            self.submitted.fetch_add(1, Ordering::Relaxed);
280        }
281    }
282
283    #[test]
284    fn noop_emitter_compiles() {
285        let emitter = NoopEmitter;
286        let rid = RunnerId::new();
287        emitter.on_worker_started(&rid);
288        emitter.on_worker_shutdown(&rid);
289    }
290
291    #[test]
292    fn composite_filters_by_level() {
293        let mut composite = CompositeEmitter::new();
294
295        // This sink only sees WorkerHealth events (level 0)
296        let health_sink = CountingSink::new();
297        let health_started = Arc::clone(&health_sink.started);
298        let health_submitted = Arc::clone(&health_sink.submitted);
299        composite.add_sink(EventLevel::WorkerHealth, health_sink);
300
301        // This sink sees TaskLifecycle (level 0 + 1)
302        let task_sink = CountingSink::new();
303        let task_started = Arc::clone(&task_sink.started);
304        let task_submitted = Arc::clone(&task_sink.submitted);
305        composite.add_sink(EventLevel::TaskLifecycle, task_sink);
306
307        let rid = RunnerId::new();
308        let tid = TaskId::new("mod", "task");
309        let iid = InvocationId::new();
310
311        composite.on_worker_started(&rid);
312        composite.on_task_submitted(&tid, &iid);
313
314        // Both sinks should see worker_started (level 0)
315        assert_eq!(health_started.load(Ordering::Relaxed), 1);
316        assert_eq!(task_started.load(Ordering::Relaxed), 1);
317
318        // Only the task sink should see task_submitted (level 1)
319        assert_eq!(health_submitted.load(Ordering::Relaxed), 0);
320        assert_eq!(task_submitted.load(Ordering::Relaxed), 1);
321    }
322
323    #[test]
324    fn worker_state_new() {
325        let rid = RunnerId::new();
326        let ws = WorkerState::new(rid.clone());
327        assert_eq!(ws.runner_id, rid);
328        assert!(ws.current_invocation.is_none());
329        assert_eq!(ws.invocations_completed, 0);
330    }
331}