use super::super::events::{EventBus, EventEnvelope, SchedulerEvent};
use super::*;
use std::sync::Arc;
use tracing::Instrument;
pub async fn run_event_driven(
shared_state: SharedState,
event_bus: Arc<EventBus>,
gpu_poll_interval: Duration,
) {
let handles = vec![
tokio::spawn(
scheduler_trigger_handler_with_debounce(
event_bus.subscribe(),
Arc::clone(&shared_state),
Arc::clone(&event_bus),
)
.instrument(tracing::info_span!("scheduler_trigger_task")),
),
tokio::spawn(
super::monitors::gpu_monitor_task(
Arc::clone(&shared_state),
Arc::clone(&event_bus),
gpu_poll_interval,
)
.instrument(tracing::info_span!("gpu_monitor_task")),
),
tokio::spawn(
super::monitors::zombie_monitor_task(Arc::clone(&shared_state), Arc::clone(&event_bus))
.instrument(tracing::info_span!("zombie_monitor_task")),
),
tokio::spawn(
super::monitors::zombie_handler_task(
event_bus.subscribe(),
Arc::clone(&shared_state),
Arc::clone(&event_bus),
)
.instrument(tracing::info_span!("zombie_handler_task")),
),
tokio::spawn(
super::monitors::timeout_monitor_task(
Arc::clone(&shared_state),
Arc::clone(&event_bus),
)
.instrument(tracing::info_span!("timeout_monitor_task")),
),
tokio::spawn(
super::monitors::timeout_handler_task(
event_bus.subscribe(),
Arc::clone(&shared_state),
Arc::clone(&event_bus),
)
.instrument(tracing::info_span!("timeout_handler_task")),
),
tokio::spawn(
super::monitors::reservation_monitor_task(
Arc::clone(&shared_state),
Arc::clone(&event_bus),
event_bus.subscribe(),
)
.instrument(tracing::info_span!("reservation_monitor_task")),
),
#[cfg(feature = "metrics")]
tokio::spawn(
super::monitors::metrics_updater_task(Arc::clone(&shared_state))
.instrument(tracing::info_span!("metrics_updater_task")),
),
];
for handle in handles {
if let Err(e) = handle.await {
tracing::error!(error = ?e, "Event handler task panicked");
}
}
}
async fn scheduler_trigger_handler_with_debounce(
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
state: SharedState,
event_bus: Arc<EventBus>,
) {
let mut debounce = tokio::time::interval(Duration::from_millis(100));
let mut pending_schedule = false;
loop {
tokio::select! {
result = events.recv() => {
match result {
Ok(event) => {
let handling_span = event.handling_span("scheduler_trigger_handler");
let _entered = handling_span.enter();
match event.event {
SchedulerEvent::JobSubmitted { .. }
| SchedulerEvent::JobUpdated { .. }
| SchedulerEvent::JobCompleted { .. }
| SchedulerEvent::JobTimedOut { .. }
| SchedulerEvent::GpuAvailabilityChanged { .. }
| SchedulerEvent::ManualGpuOverrideChanged { .. }
| SchedulerEvent::MemoryAvailabilityChanged { .. } => {
pending_schedule = true;
}
_ => {}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(skipped, "Scheduler trigger handler lagged");
pending_schedule = true; }
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, scheduler trigger handler exiting");
break;
}
}
}
_ = debounce.tick() => {
if pending_schedule {
trigger_scheduling(&state, &event_bus).await;
pending_schedule = false;
}
}
}
}
}
async fn trigger_scheduling(state: &SharedState, event_bus: &Arc<EventBus>) {
let scheduling_span = tracing::info_span!("trigger_scheduling");
let _entered = scheduling_span.enter();
#[cfg(feature = "metrics")]
let started_at = std::time::Instant::now();
let jobs_to_execute = {
let mut state_guard = state.write().await;
let jobs = state_guard.scheduler.prepare_jobs_for_execution();
if !jobs.is_empty() {
state_guard.refresh_gpu_slots();
state_guard.mark_dirty();
}
jobs
};
if jobs_to_execute.is_empty() {
#[cfg(feature = "metrics")]
gflow::metrics::observe_scheduler_latency("trigger_scheduling", started_at.elapsed());
return;
}
tracing::info!(
job_count = jobs_to_execute.len(),
"Prepared jobs for execution"
);
let executor = {
let state_guard = state.read().await;
state_guard.executor.clone()
};
let mut execution_results = Vec::new();
for job in &jobs_to_execute {
let should_execute = {
let state_guard = state.read().await;
state_guard
.scheduler
.get_job_runtime(job.id)
.map(|rt| rt.state == JobState::Running)
.unwrap_or(false)
};
if !should_execute {
tracing::info!(
job_id = job.id,
"Skipping execution because state changed before execution"
);
execution_results.push((
job.id,
Err("Job state changed before execution".to_string()),
));
continue;
}
match executor.execute(job) {
Ok(_) => {
tracing::info!(job_id = job.id, "Executed job");
execution_results.push((job.id, Ok(())));
}
Err(e) => {
tracing::error!(job_id = job.id, error = ?e, "Failed to execute job");
execution_results.push((job.id, Err(e.to_string())));
}
}
}
if !execution_results.is_empty() {
let mut retried_jobs = Vec::new();
let mut state_guard = state.write().await;
for (job_id, result) in &execution_results {
if result.is_ok() {
continue;
}
let Some((had_gpus, was_running)) = (|| {
let rt = state_guard.scheduler.get_job_runtime_mut(*job_id)?;
Some((rt.gpu_ids.take().is_some(), rt.state == JobState::Running))
})() else {
continue;
};
if was_running {
if let Some(Some(new_job_id)) = state_guard.fail_job(*job_id).await {
retried_jobs.push(new_job_id);
}
}
if had_gpus {
state_guard.scheduler.refresh_available_memory();
}
}
drop(state_guard);
for job_id in retried_jobs {
event_bus.publish(SchedulerEvent::JobSubmitted { job_id });
}
}
#[cfg(feature = "metrics")]
gflow::metrics::observe_scheduler_latency("trigger_scheduling", started_at.elapsed());
}