use super::super::events::{EventBus, EventEnvelope, SchedulerEvent};
use super::*;
use std::sync::Arc;
use tracing::Instrument;
pub async fn run_event_driven(shared_state: SharedState, event_bus: Arc<EventBus>) {
let handles = vec![
tokio::spawn(
cascade_handler(event_bus.subscribe(), Arc::clone(&shared_state))
.instrument(tracing::info_span!("cascade_handler_task")),
),
tokio::spawn(
scheduler_trigger_handler_with_debounce(
event_bus.subscribe(),
Arc::clone(&shared_state),
)
.instrument(tracing::info_span!("scheduler_trigger_task")),
),
tokio::spawn(
super::monitors::gpu_monitor_task(Arc::clone(&shared_state), Arc::clone(&event_bus))
.instrument(tracing::info_span!("gpu_monitor_task")),
),
tokio::spawn(
super::monitors::zombie_monitor_task(Arc::clone(&shared_state), Arc::clone(&event_bus))
.instrument(tracing::info_span!("zombie_monitor_task")),
),
tokio::spawn(
super::monitors::zombie_handler_task(event_bus.subscribe(), Arc::clone(&shared_state))
.instrument(tracing::info_span!("zombie_handler_task")),
),
tokio::spawn(
super::monitors::timeout_monitor_task(
Arc::clone(&shared_state),
Arc::clone(&event_bus),
)
.instrument(tracing::info_span!("timeout_monitor_task")),
),
tokio::spawn(
super::monitors::timeout_handler_task(event_bus.subscribe(), Arc::clone(&shared_state))
.instrument(tracing::info_span!("timeout_handler_task")),
),
tokio::spawn(
super::monitors::reservation_monitor_task(
Arc::clone(&shared_state),
Arc::clone(&event_bus),
event_bus.subscribe(),
)
.instrument(tracing::info_span!("reservation_monitor_task")),
),
#[cfg(feature = "metrics")]
tokio::spawn(
super::monitors::metrics_updater_task(Arc::clone(&shared_state))
.instrument(tracing::info_span!("metrics_updater_task")),
),
];
for handle in handles {
if let Err(e) = handle.await {
tracing::error!(error = ?e, "Event handler task panicked");
}
}
}
async fn cascade_handler(
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
state: SharedState,
) {
loop {
match events.recv().await {
Ok(event) => {
let handling_span = event.handling_span("cascade_handler");
let _entered = handling_span.enter();
if let SchedulerEvent::JobCompleted {
job_id,
final_state,
..
} = event.event
{
if matches!(
final_state,
JobState::Failed | JobState::Cancelled | JobState::Timeout
) {
let mut state_guard = state.write().await;
let cancelled = state_guard.scheduler.auto_cancel_dependent_jobs(job_id);
if !cancelled.is_empty() {
tracing::info!(
job_id,
final_state = ?final_state,
cancelled_count = cancelled.len(),
cancelled_jobs = ?cancelled,
"Auto-cancelled dependent jobs"
);
state_guard.mark_dirty();
}
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(skipped, "Cascade handler lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, cascade handler exiting");
break;
}
}
}
}
async fn scheduler_trigger_handler_with_debounce(
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
state: SharedState,
) {
let mut debounce = tokio::time::interval(Duration::from_millis(100));
let mut pending_schedule = false;
loop {
tokio::select! {
result = events.recv() => {
match result {
Ok(event) => {
let handling_span = event.handling_span("scheduler_trigger_handler");
let _entered = handling_span.enter();
match event.event {
SchedulerEvent::JobSubmitted { .. }
| SchedulerEvent::JobUpdated { .. }
| SchedulerEvent::JobCompleted { .. }
| SchedulerEvent::GpuAvailabilityChanged { .. }
| SchedulerEvent::MemoryAvailabilityChanged { .. } => {
pending_schedule = true;
}
_ => {}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(skipped, "Scheduler trigger handler lagged");
pending_schedule = true; }
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, scheduler trigger handler exiting");
break;
}
}
}
_ = debounce.tick() => {
if pending_schedule {
trigger_scheduling(&state).await;
pending_schedule = false;
}
}
}
}
}
async fn trigger_scheduling(state: &SharedState) {
let scheduling_span = tracing::info_span!("trigger_scheduling");
let _entered = scheduling_span.enter();
#[cfg(feature = "metrics")]
let started_at = std::time::Instant::now();
let jobs_to_execute = {
let mut state_guard = state.write().await;
let jobs = state_guard.scheduler.prepare_jobs_for_execution();
if !jobs.is_empty() {
state_guard.refresh_gpu_slots();
state_guard.mark_dirty();
}
jobs
};
if jobs_to_execute.is_empty() {
#[cfg(feature = "metrics")]
gflow::metrics::observe_scheduler_latency("trigger_scheduling", started_at.elapsed());
return;
}
tracing::info!(
job_count = jobs_to_execute.len(),
"Prepared jobs for execution"
);
let executor = {
let state_guard = state.read().await;
state_guard.executor.clone()
};
let mut execution_results = Vec::new();
for job in &jobs_to_execute {
let should_execute = {
let state_guard = state.read().await;
state_guard
.scheduler
.get_job_runtime(job.id)
.map(|rt| rt.state == JobState::Running)
.unwrap_or(false)
};
if !should_execute {
tracing::info!(
job_id = job.id,
"Skipping execution because state changed before execution"
);
execution_results.push((
job.id,
Err("Job state changed before execution".to_string()),
));
continue;
}
match executor.execute(job) {
Ok(_) => {
tracing::info!(job_id = job.id, "Executed job");
execution_results.push((job.id, Ok(())));
}
Err(e) => {
tracing::error!(job_id = job.id, error = ?e, "Failed to execute job");
execution_results.push((job.id, Err(e.to_string())));
}
}
}
if !execution_results.is_empty() {
let mut state_guard = state.write().await;
state_guard
.scheduler
.handle_execution_failures(&execution_results);
state_guard.mark_dirty();
}
#[cfg(feature = "metrics")]
gflow::metrics::observe_scheduler_latency("trigger_scheduling", started_at.elapsed());
}