use super::super::events::{EventBus, EventEnvelope, SchedulerEvent};
use super::*;
use gflow::tmux::disable_pipe_pane_for_job;
use std::sync::Arc;
const ZOMBIE_STARTUP_GRACE_PERIOD: Duration = Duration::from_secs(30);
fn should_check_missing_session_as_zombie(
started_at: Option<std::time::SystemTime>,
now: std::time::SystemTime,
) -> bool {
let Some(started_at) = started_at else {
return true;
};
let Ok(elapsed) = now.duration_since(started_at) else {
return true;
};
elapsed >= ZOMBIE_STARTUP_GRACE_PERIOD
}
pub(super) async fn gpu_monitor_task(
state: SharedState,
event_bus: Arc<EventBus>,
poll_interval: Duration,
) {
let mut interval = tokio::time::interval(poll_interval);
let mut previous_gpu_states: HashMap<u32, bool> = HashMap::new();
loop {
interval.tick().await;
let info = {
let mut state_guard = state.write().await;
state_guard.refresh_gpu_slots();
state_guard.info()
};
for gpu_info in &info.gpus {
let previous_available = previous_gpu_states.get(&gpu_info.index).copied();
if previous_available != Some(gpu_info.available) {
event_bus.publish(SchedulerEvent::GpuAvailabilityChanged {
gpu_index: gpu_info.index,
available: gpu_info.available,
});
previous_gpu_states.insert(gpu_info.index, gpu_info.available);
}
}
}
}
pub(super) async fn zombie_monitor_task(state: SharedState, event_bus: Arc<EventBus>) {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let running_jobs = {
let state_guard = state.read().await;
state_guard
.job_runtimes()
.iter()
.filter(|rt| rt.state == JobState::Running)
.map(|rt| {
let run_name = state_guard
.scheduler
.get_job_spec(rt.id)
.and_then(|spec| spec.run_name.clone());
(rt.id, run_name, rt.started_at)
})
.collect::<Vec<_>>()
};
if running_jobs.is_empty() {
continue;
}
let now = std::time::SystemTime::now();
let existing_sessions = gflow::tmux::get_all_session_names();
for (job_id, run_name, started_at) in running_jobs {
if let Some(rn) = run_name {
if !should_check_missing_session_as_zombie(started_at, now) {
continue;
}
if !existing_sessions.contains(rn.as_str()) {
tracing::warn!(job_id, run_name = %rn, "Found zombie job");
event_bus.publish(SchedulerEvent::ZombieJobDetected { job_id });
}
}
}
}
}
pub(super) async fn zombie_handler_task(
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
state: SharedState,
event_bus: Arc<EventBus>,
) {
loop {
match events.recv().await {
Ok(event) => {
let handling_span = event.handling_span("zombie_handler");
let _entered = handling_span.enter();
let SchedulerEvent::ZombieJobDetected { job_id } = event.event else {
continue;
};
let run_name = {
let state_guard = state.read().await;
state_guard
.scheduler
.get_job_spec(job_id)
.and_then(|spec| spec.run_name.clone())
};
let result = {
let mut state_guard = state.write().await;
state_guard.fail_job(job_id).await
};
if let Some(Some(new_job_id)) = result {
event_bus.publish(SchedulerEvent::JobSubmitted { job_id: new_job_id });
} else if result.is_some() {
tracing::info!(job_id, "Marked zombie job as failed");
}
if let Some(rn) = run_name {
disable_pipe_pane_for_job(job_id, &rn, true);
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(skipped, "Zombie handler lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, zombie handler exiting");
break;
}
}
}
}
pub(super) async fn timeout_monitor_task(state: SharedState, event_bus: Arc<EventBus>) {
let mut interval = tokio::time::interval(Duration::from_secs(10));
loop {
interval.tick().await;
let timed_out_jobs = {
let state_guard = state.read().await;
let now = std::time::SystemTime::now();
state_guard
.job_runtimes()
.iter()
.filter(|rt| rt.state == JobState::Running)
.filter_map(|rt| {
let (Some(time_limit), Some(started_at)) = (rt.time_limit, rt.started_at)
else {
return None;
};
let Ok(elapsed) = now.duration_since(started_at) else {
return None;
};
if elapsed > time_limit {
let run_name = state_guard
.scheduler
.get_job_spec(rt.id)
.and_then(|spec| spec.run_name.as_ref().map(|s| s.to_string()));
tracing::warn!(job_id = rt.id, "Job exceeded time limit");
Some((rt.id, run_name))
} else {
None
}
})
.collect::<Vec<_>>()
};
for (job_id, run_name) in timed_out_jobs {
event_bus.publish(SchedulerEvent::JobTimedOut { job_id, run_name });
}
}
}
pub(super) async fn timeout_handler_task(
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
state: SharedState,
event_bus: Arc<EventBus>,
) {
loop {
match events.recv().await {
Ok(event) => {
let handling_span = event.handling_span("timeout_handler");
let _entered = handling_span.enter();
let SchedulerEvent::JobTimedOut { job_id, run_name } = event.event else {
continue;
};
if let Some(rn) = &run_name {
if let Err(e) = gflow::tmux::send_ctrl_c(rn) {
tracing::error!(job_id, error = %e, "Failed to send Ctrl-C to timed-out job");
}
}
let result = {
let mut state_guard = state.write().await;
state_guard.timeout_job(job_id).await
};
if let Some(Some(new_job_id)) = result {
event_bus.publish(SchedulerEvent::JobSubmitted { job_id: new_job_id });
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
tracing::warn!(skipped, "Timeout handler lagged");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, timeout handler exiting");
break;
}
}
}
}
#[cfg(feature = "metrics")]
pub(super) async fn metrics_updater_task(state: SharedState) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let state_guard = state.read().await;
gflow::metrics::update_job_state_metrics_runtimes(state_guard.job_runtimes());
let info = state_guard.info();
let available_gpus = info.gpus.iter().filter(|g| g.available).count();
let total_gpus = info.gpus.len();
gflow::metrics::update_resource_metrics(
available_gpus,
total_gpus,
state_guard.available_memory_mb(),
state_guard.total_memory_mb(),
);
}
}
pub(super) async fn reservation_monitor_task(
state: SharedState,
event_bus: Arc<EventBus>,
mut events: tokio::sync::broadcast::Receiver<EventEnvelope>,
) {
{
let mut state_guard = state.write().await;
let before_count = state_guard.scheduler.reservations.len();
state_guard.scheduler.update_reservation_statuses();
let after_count = state_guard.scheduler.reservations.len();
if before_count != after_count {
tracing::info!(
"Startup: Updated reservation statuses ({} -> {} active reservations)",
before_count,
after_count
);
state_guard.mark_dirty();
}
drop(state_guard);
event_bus.publish(SchedulerEvent::PeriodicHealthCheck);
}
loop {
let next_transition = {
let state_guard = state.read().await;
calculate_next_reservation_transition(&state_guard.scheduler.reservations)
};
match next_transition {
Some(deadline) => {
let now = std::time::SystemTime::now();
let sleep_duration = deadline
.duration_since(now)
.unwrap_or(Duration::from_secs(0));
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {
let mut state_guard = state.write().await;
state_guard.scheduler.update_reservation_statuses();
drop(state_guard);
event_bus.publish(SchedulerEvent::PeriodicHealthCheck);
}
result = events.recv() => {
match result {
Ok(event) => {
let handling_span = event.handling_span("reservation_monitor");
let _entered = handling_span.enter();
match event.event {
SchedulerEvent::ReservationCreated { .. } | SchedulerEvent::ReservationCancelled { .. } => {
continue;
}
_ => {}
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, reservation monitor exiting");
break;
}
_ => {}
}
}
}
}
None => {
match events.recv().await {
Ok(event) => {
let handling_span = event.handling_span("reservation_monitor");
let _entered = handling_span.enter();
if matches!(event.event, SchedulerEvent::ReservationCreated { .. }) {
continue;
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Event bus closed, reservation monitor exiting");
break;
}
_ => {}
}
}
}
}
}
fn calculate_next_reservation_transition(
reservations: &[gflow::core::reservation::GpuReservation],
) -> Option<std::time::SystemTime> {
let now = std::time::SystemTime::now();
reservations
.iter()
.filter_map(|r| r.next_transition_time(now))
.min()
}
#[cfg(test)]
mod tests {
use super::*;
use gflow::core::reservation::{GpuReservation, GpuSpec, ReservationStatus};
use std::time::{Duration, SystemTime};
#[test]
fn zombie_check_allows_legacy_jobs_without_start_time() {
let now = SystemTime::now();
assert!(should_check_missing_session_as_zombie(None, now));
}
#[test]
fn zombie_check_skips_recently_started_jobs() {
let now = SystemTime::now();
let started_at = now.checked_sub(Duration::from_secs(5));
assert!(!should_check_missing_session_as_zombie(started_at, now));
}
#[test]
fn zombie_check_allows_old_running_jobs() {
let now = SystemTime::now();
let started_at = now.checked_sub(Duration::from_secs(45));
assert!(should_check_missing_session_as_zombie(started_at, now));
}
#[test]
fn zombie_check_allows_future_started_at_jobs() {
let now = SystemTime::now();
let started_at = now.checked_add(Duration::from_secs(45));
assert!(should_check_missing_session_as_zombie(started_at, now));
}
#[test]
fn test_calculate_next_transition_no_reservations() {
let reservations = vec![];
let result = calculate_next_reservation_transition(&reservations);
assert!(result.is_none());
}
#[test]
fn test_calculate_next_transition_pending_reservation() {
let now = SystemTime::now();
let start_time = now + Duration::from_secs(3600);
let reservation = GpuReservation {
id: 1,
user: "alice".into(),
gpu_spec: GpuSpec::Count(2),
start_time,
duration: Duration::from_secs(7200), status: ReservationStatus::Pending,
created_at: now,
cancelled_at: None,
};
let result = calculate_next_reservation_transition(&[reservation]);
assert_eq!(result, Some(start_time));
}
#[test]
fn test_calculate_next_transition_active_reservation() {
let now = SystemTime::now();
let start_time = now - Duration::from_secs(1800); let duration = Duration::from_secs(3600); let end_time = start_time + duration;
let mut reservation = GpuReservation {
id: 1,
user: "alice".into(),
gpu_spec: GpuSpec::Count(2),
start_time,
duration,
status: ReservationStatus::Active,
created_at: now - Duration::from_secs(2000),
cancelled_at: None,
};
let result = calculate_next_reservation_transition(&[reservation.clone()]);
assert_eq!(result, Some(end_time));
reservation.status = ReservationStatus::Completed;
let result = calculate_next_reservation_transition(&[reservation]);
assert!(result.is_none());
}
#[test]
fn test_calculate_next_transition_multiple_reservations() {
let now = SystemTime::now();
let start1 = now + Duration::from_secs(3600); let start2 = now + Duration::from_secs(1800); let start3 = now + Duration::from_secs(7200);
let reservations = vec![
GpuReservation {
id: 1,
user: "alice".into(),
gpu_spec: GpuSpec::Count(2),
start_time: start1,
duration: Duration::from_secs(3600),
status: ReservationStatus::Pending,
created_at: now,
cancelled_at: None,
},
GpuReservation {
id: 2,
user: "bob".into(),
gpu_spec: GpuSpec::Count(1),
start_time: start2,
duration: Duration::from_secs(3600),
status: ReservationStatus::Pending,
created_at: now,
cancelled_at: None,
},
GpuReservation {
id: 3,
user: "charlie".into(),
gpu_spec: GpuSpec::Count(1),
start_time: start3,
duration: Duration::from_secs(3600),
status: ReservationStatus::Pending,
created_at: now,
cancelled_at: None,
},
];
let result = calculate_next_reservation_transition(&reservations);
assert_eq!(result, Some(start2));
}
#[test]
fn test_calculate_next_transition_ignores_past_times() {
let now = SystemTime::now();
let past_time = now - Duration::from_secs(3600); let future_time = now + Duration::from_secs(3600);
let reservations = vec![
GpuReservation {
id: 1,
user: "alice".into(),
gpu_spec: GpuSpec::Count(2),
start_time: past_time,
duration: Duration::from_secs(1800),
status: ReservationStatus::Pending,
created_at: now - Duration::from_secs(7200),
cancelled_at: None,
},
GpuReservation {
id: 2,
user: "bob".into(),
gpu_spec: GpuSpec::Count(1),
start_time: future_time,
duration: Duration::from_secs(3600),
status: ReservationStatus::Pending,
created_at: now,
cancelled_at: None,
},
];
let result = calculate_next_reservation_transition(&reservations);
assert_eq!(result, Some(future_time));
}
#[test]
fn test_calculate_next_transition_cancelled_ignored() {
let now = SystemTime::now();
let start_time = now + Duration::from_secs(3600);
let reservation = GpuReservation {
id: 1,
user: "alice".into(),
gpu_spec: GpuSpec::Count(2),
start_time,
duration: Duration::from_secs(3600),
status: ReservationStatus::Cancelled,
created_at: now,
cancelled_at: Some(now),
};
let result = calculate_next_reservation_transition(&[reservation]);
assert!(result.is_none());
}
}