use std::time::Duration;
use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum EventLevel {
WorkerHealth = 0,
TaskLifecycle = 1,
QueueConcurrency = 2,
DistributedTracing = 3,
}
pub trait EventEmitter: Send + Sync {
fn on_worker_started(&self, _runner_id: &RunnerId) {}
fn on_worker_shutdown(&self, _runner_id: &RunnerId) {}
fn on_task_submitted(&self, _task_id: &TaskId, _inv_id: &InvocationId) {}
fn on_task_started(&self, _task_id: &TaskId, _inv_id: &InvocationId) {}
fn on_task_succeeded(&self, _task_id: &TaskId, _inv_id: &InvocationId, _duration: Duration) {}
fn on_task_failed(
&self,
_task_id: &TaskId,
_inv_id: &InvocationId,
_error: &str,
_duration: Duration,
) {
}
fn on_task_retried(&self, _task_id: &TaskId, _inv_id: &InvocationId, _attempt: u32) {}
fn on_queue_depth(&self, _queue: &str, _depth: usize) {}
fn on_cc_rejected(&self, _task_id: &TaskId) {}
fn on_cc_slot_acquired(&self, _task_id: &TaskId) {}
fn on_cc_slot_released(&self, _task_id: &TaskId) {}
}
pub struct NoopEmitter;
impl EventEmitter for NoopEmitter {}
pub struct CompositeEmitter {
sinks: Vec<(EventLevel, Box<dyn EventEmitter>)>,
}
impl CompositeEmitter {
pub fn new() -> Self {
Self { sinks: Vec::new() }
}
pub fn add_sink(&mut self, level: EventLevel, sink: impl EventEmitter + 'static) {
self.sinks.push((level, Box::new(sink)));
}
fn for_level(&self, event_level: EventLevel, f: impl Fn(&dyn EventEmitter)) {
for (max_level, sink) in &self.sinks {
if *max_level >= event_level {
f(sink.as_ref());
}
}
}
}
impl Default for CompositeEmitter {
fn default() -> Self {
Self::new()
}
}
impl EventEmitter for CompositeEmitter {
fn on_worker_started(&self, runner_id: &RunnerId) {
self.for_level(EventLevel::WorkerHealth, |s| s.on_worker_started(runner_id));
}
fn on_worker_shutdown(&self, runner_id: &RunnerId) {
self.for_level(EventLevel::WorkerHealth, |s| {
s.on_worker_shutdown(runner_id)
});
}
fn on_task_submitted(&self, task_id: &TaskId, inv_id: &InvocationId) {
self.for_level(EventLevel::TaskLifecycle, |s| {
s.on_task_submitted(task_id, inv_id);
});
}
fn on_task_started(&self, task_id: &TaskId, inv_id: &InvocationId) {
self.for_level(EventLevel::TaskLifecycle, |s| {
s.on_task_started(task_id, inv_id);
});
}
fn on_task_succeeded(&self, task_id: &TaskId, inv_id: &InvocationId, duration: Duration) {
self.for_level(EventLevel::TaskLifecycle, |s| {
s.on_task_succeeded(task_id, inv_id, duration);
});
}
fn on_task_failed(
&self,
task_id: &TaskId,
inv_id: &InvocationId,
error: &str,
duration: Duration,
) {
self.for_level(EventLevel::TaskLifecycle, |s| {
s.on_task_failed(task_id, inv_id, error, duration);
});
}
fn on_task_retried(&self, task_id: &TaskId, inv_id: &InvocationId, attempt: u32) {
self.for_level(EventLevel::TaskLifecycle, |s| {
s.on_task_retried(task_id, inv_id, attempt);
});
}
fn on_queue_depth(&self, queue: &str, depth: usize) {
self.for_level(EventLevel::QueueConcurrency, |s| {
s.on_queue_depth(queue, depth);
});
}
fn on_cc_rejected(&self, task_id: &TaskId) {
self.for_level(EventLevel::QueueConcurrency, |s| {
s.on_cc_rejected(task_id);
});
}
fn on_cc_slot_acquired(&self, task_id: &TaskId) {
self.for_level(EventLevel::QueueConcurrency, |s| {
s.on_cc_slot_acquired(task_id);
});
}
fn on_cc_slot_released(&self, task_id: &TaskId) {
self.for_level(EventLevel::QueueConcurrency, |s| {
s.on_cc_slot_released(task_id);
});
}
}
#[derive(Debug, Clone)]
pub struct WorkerState {
pub runner_id: RunnerId,
pub current_invocation: Option<InvocationId>,
pub current_task: Option<TaskId>,
pub started_at: Option<std::time::Instant>,
pub last_result: Option<LastResult>,
pub invocations_completed: u64,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum LastResult {
Success { task_id: TaskId, duration: Duration },
Failed { task_id: TaskId, error: String },
}
impl WorkerState {
pub fn new(runner_id: RunnerId) -> Self {
Self {
runner_id,
current_invocation: None,
current_task: None,
started_at: None,
last_result: None,
invocations_completed: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
struct CountingSink {
started: Arc<AtomicU32>,
submitted: Arc<AtomicU32>,
}
impl CountingSink {
fn new() -> Self {
Self {
started: Arc::new(AtomicU32::new(0)),
submitted: Arc::new(AtomicU32::new(0)),
}
}
}
impl EventEmitter for CountingSink {
fn on_worker_started(&self, _: &RunnerId) {
self.started.fetch_add(1, Ordering::Relaxed);
}
fn on_task_submitted(&self, _: &TaskId, _: &InvocationId) {
self.submitted.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn noop_emitter_compiles() {
let emitter = NoopEmitter;
let rid = RunnerId::new();
emitter.on_worker_started(&rid);
emitter.on_worker_shutdown(&rid);
}
#[test]
fn composite_filters_by_level() {
let mut composite = CompositeEmitter::new();
let health_sink = CountingSink::new();
let health_started = Arc::clone(&health_sink.started);
let health_submitted = Arc::clone(&health_sink.submitted);
composite.add_sink(EventLevel::WorkerHealth, health_sink);
let task_sink = CountingSink::new();
let task_started = Arc::clone(&task_sink.started);
let task_submitted = Arc::clone(&task_sink.submitted);
composite.add_sink(EventLevel::TaskLifecycle, task_sink);
let rid = RunnerId::new();
let tid = TaskId::new("mod", "task");
let iid = InvocationId::new();
composite.on_worker_started(&rid);
composite.on_task_submitted(&tid, &iid);
assert_eq!(health_started.load(Ordering::Relaxed), 1);
assert_eq!(task_started.load(Ordering::Relaxed), 1);
assert_eq!(health_submitted.load(Ordering::Relaxed), 0);
assert_eq!(task_submitted.load(Ordering::Relaxed), 1);
}
#[test]
fn worker_state_new() {
let rid = RunnerId::new();
let ws = WorkerState::new(rid.clone());
assert_eq!(ws.runner_id, rid);
assert!(ws.current_invocation.is_none());
assert_eq!(ws.invocations_completed, 0);
}
}