use gflow::core::job::{GpuIds, JobState, JobStateReason};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::Span;
#[derive(Debug, Clone)]
#[allow(dead_code)] pub enum SchedulerEvent {
JobStateChanged {
job_id: u32,
old_state: JobState,
new_state: JobState,
reason: Option<JobStateReason>,
},
JobSubmitted { job_id: u32 },
JobUpdated { job_id: u32 },
JobCompleted {
job_id: u32,
final_state: JobState,
gpu_ids: Option<GpuIds>,
memory_mb: Option<u64>,
},
GpuAvailabilityChanged { gpu_index: u32, available: bool },
ManualGpuOverrideChanged { gpu_index: u32, available: bool },
MemoryAvailabilityChanged { freed_mb: u64 },
JobTimedOut {
job_id: u32,
run_name: Option<String>,
},
ZombieJobDetected { job_id: u32 },
PeriodicHealthCheck,
ReservationCreated { reservation_id: u32 },
ReservationCancelled { reservation_id: u32 },
DaemonStarted,
}
impl SchedulerEvent {
pub fn name(&self) -> &'static str {
match self {
Self::JobStateChanged { .. } => "job_state_changed",
Self::JobSubmitted { .. } => "job_submitted",
Self::JobUpdated { .. } => "job_updated",
Self::JobCompleted { .. } => "job_completed",
Self::GpuAvailabilityChanged { .. } => "gpu_availability_changed",
Self::ManualGpuOverrideChanged { .. } => "manual_gpu_override_changed",
Self::MemoryAvailabilityChanged { .. } => "memory_availability_changed",
Self::JobTimedOut { .. } => "job_timed_out",
Self::ZombieJobDetected { .. } => "zombie_job_detected",
Self::PeriodicHealthCheck => "periodic_health_check",
Self::ReservationCreated { .. } => "reservation_created",
Self::ReservationCancelled { .. } => "reservation_cancelled",
Self::DaemonStarted => "daemon_started",
}
}
}
#[derive(Debug, Clone)]
pub struct EventEnvelope {
pub event: SchedulerEvent,
pub span: Span,
}
impl EventEnvelope {
pub fn handling_span(&self, handler: &'static str) -> Span {
tracing::info_span!(
parent: &self.span,
"scheduler_event",
handler = handler,
event_type = self.event.name()
)
}
}
#[derive(Clone)]
pub struct EventBus {
sender: Arc<broadcast::Sender<EventEnvelope>>,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self {
sender: Arc::new(sender),
}
}
pub fn publish(&self, event: SchedulerEvent) {
let event_name = event.name();
let subscriber_count = self.subscriber_count();
let envelope = EventEnvelope {
event,
span: Span::current(),
};
tracing::debug!(
event_type = event_name,
subscriber_count,
"Publishing scheduler event"
);
let _ = self.sender.send(envelope);
}
pub fn subscribe(&self) -> broadcast::Receiver<EventEnvelope> {
self.sender.subscribe()
}
#[allow(dead_code)] pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_event_bus_publish_subscribe() {
let bus = EventBus::new(100);
let mut rx = bus.subscribe();
bus.publish(SchedulerEvent::JobSubmitted { job_id: 1 });
let event = rx.recv().await.unwrap();
match event.event {
SchedulerEvent::JobSubmitted { job_id } => assert_eq!(job_id, 1),
_ => panic!("Unexpected event type"),
}
}
#[tokio::test]
async fn test_multiple_subscribers() {
let bus = EventBus::new(100);
let mut rx1 = bus.subscribe();
let mut rx2 = bus.subscribe();
bus.publish(SchedulerEvent::JobSubmitted { job_id: 42 });
let event1 = rx1.recv().await.unwrap();
let event2 = rx2.recv().await.unwrap();
match (event1.event, event2.event) {
(
SchedulerEvent::JobSubmitted { job_id: id1 },
SchedulerEvent::JobSubmitted { job_id: id2 },
) => {
assert_eq!(id1, 42);
assert_eq!(id2, 42);
}
_ => panic!("Unexpected event types"),
}
}
#[tokio::test]
async fn test_subscriber_count() {
let bus = EventBus::new(100);
assert_eq!(bus.subscriber_count(), 0);
let _rx1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _rx2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
drop(_rx1);
}
#[tokio::test]
async fn test_no_subscribers_ok() {
let bus = EventBus::new(100);
bus.publish(SchedulerEvent::JobSubmitted { job_id: 1 });
}
}