previa-runner 1.0.0-alpha.40

API for remote execution of integration and load tests via HTTP streaming (SSE).
Documentation
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::server::load_dispatch::DispatchClock;
use crate::server::models::LoadProfile;

#[derive(Debug, Clone, Copy, PartialEq)]
pub struct WaveDispatchSlot {
    pub elapsed_ms: u64,
    pub expires_at_elapsed_ms: u64,
    pub planned_starts: usize,
    pub target_rps_limit: f64,
    pub scheduled_total: usize,
    pub scheduler_lag_ms: u64,
    pub missed_due_to_scheduler_lag: usize,
}

#[derive(Debug, Clone, Copy, PartialEq)]
pub enum WaveSchedulerMetric {
    DispatchScheduled {
        elapsed_ms: u64,
        count: usize,
    },
    SlotEnqueued {
        elapsed_ms: u64,
        count: usize,
    },
    SchedulerLag {
        elapsed_ms: u64,
        lag_ms: u64,
        missed_starts: usize,
    },
    SlotBackpressure {
        elapsed_ms: u64,
        dropped_starts: usize,
    },
}

pub fn build_dispatch_slot(
    tick: crate::server::load_dispatch::DispatchTick,
    tick_ms: u64,
) -> WaveDispatchSlot {
    WaveDispatchSlot {
        elapsed_ms: tick.elapsed_ms,
        expires_at_elapsed_ms: tick.elapsed_ms.saturating_add(tick_ms),
        planned_starts: tick.scheduled_starts,
        target_rps_limit: tick.target_rps,
        scheduled_total: tick.scheduled_total,
        scheduler_lag_ms: tick.scheduler_lag_ms,
        missed_due_to_scheduler_lag: tick.missed_due_to_scheduler_lag,
    }
}

pub fn slot_is_expired(slot: &WaveDispatchSlot, actual_elapsed_ms: u64) -> bool {
    actual_elapsed_ms > slot.expires_at_elapsed_ms
}

pub fn try_send_slot_or_metric(
    slot_tx: &mpsc::Sender<WaveDispatchSlot>,
    metric_tx: &mpsc::UnboundedSender<WaveSchedulerMetric>,
    slot: WaveDispatchSlot,
) -> bool {
    if slot.planned_starts == 0 {
        return true;
    }

    match slot_tx.try_send(slot) {
        Ok(()) => {
            let _ = metric_tx.send(WaveSchedulerMetric::SlotEnqueued {
                elapsed_ms: slot.elapsed_ms,
                count: slot.planned_starts,
            });
            true
        }
        Err(mpsc::error::TrySendError::Full(slot)) => {
            let _ = metric_tx.send(WaveSchedulerMetric::SlotBackpressure {
                elapsed_ms: slot.elapsed_ms,
                dropped_starts: slot.planned_starts,
            });
            false
        }
        Err(mpsc::error::TrySendError::Closed(_)) => false,
    }
}

pub fn spawn_wave_scheduler_thread(
    load: LoadProfile,
    tick_ms: u64,
    slot_tx: mpsc::Sender<WaveDispatchSlot>,
    metric_tx: mpsc::UnboundedSender<WaveSchedulerMetric>,
    token: CancellationToken,
) -> std::thread::JoinHandle<()> {
    std::thread::Builder::new()
        .name("previa-wave-clock".to_owned())
        .spawn(move || run_wave_scheduler_loop(load, tick_ms, slot_tx, metric_tx, token))
        .expect("failed to spawn previa wave scheduler thread")
}

pub fn run_wave_scheduler_loop(
    load: LoadProfile,
    tick_ms: u64,
    slot_tx: mpsc::Sender<WaveDispatchSlot>,
    metric_tx: mpsc::UnboundedSender<WaveSchedulerMetric>,
    token: CancellationToken,
) {
    let started = std::time::Instant::now();
    let end_ms = crate::server::load_wave::timeline_end_ms(&load);
    let mut clock = DispatchClock::new(tick_ms);
    let mut next_wake = started;

    loop {
        if token.is_cancelled() {
            break;
        }

        let elapsed_ms = started.elapsed().as_millis() as u64;
        if elapsed_ms >= end_ms {
            break;
        }

        let target_rps_limit = crate::server::load_wave::local_rps_limit(&load, elapsed_ms);
        let tick = clock.plan_tick(elapsed_ms, target_rps_limit);
        let _ = metric_tx.send(WaveSchedulerMetric::DispatchScheduled {
            elapsed_ms: tick.elapsed_ms,
            count: tick.scheduled_starts,
        });
        if tick.scheduler_lag_ms > 0 || tick.missed_due_to_scheduler_lag > 0 {
            let _ = metric_tx.send(WaveSchedulerMetric::SchedulerLag {
                elapsed_ms: tick.elapsed_ms,
                lag_ms: tick.scheduler_lag_ms,
                missed_starts: tick.missed_due_to_scheduler_lag,
            });
        }
        let _ = try_send_slot_or_metric(&slot_tx, &metric_tx, build_dispatch_slot(tick, tick_ms));

        next_wake += std::time::Duration::from_millis(tick_ms);
        let now = std::time::Instant::now();
        if next_wake > now {
            std::thread::sleep(next_wake - now);
        } else {
            next_wake = now;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn build_slot_from_clock_tick_uses_tick_window_only() {
        let mut clock = DispatchClock::new(100);
        let tick = clock.plan_tick(500, 100.0);
        let slot = build_dispatch_slot(tick, 100);

        assert_eq!(slot.planned_starts, 10);
        assert_eq!(slot.elapsed_ms, 500);
        assert_eq!(slot.expires_at_elapsed_ms, 600);
        assert_eq!(slot.target_rps_limit, 100.0);
    }

    #[test]
    fn dispatch_slot_is_fresh_until_expiration_elapsed_ms() {
        let slot = WaveDispatchSlot {
            elapsed_ms: 500,
            expires_at_elapsed_ms: 600,
            planned_starts: 10,
            target_rps_limit: 100.0,
            scheduled_total: 20,
            scheduler_lag_ms: 0,
            missed_due_to_scheduler_lag: 0,
        };

        assert!(!slot_is_expired(&slot, 599));
        assert!(!slot_is_expired(&slot, 600));
        assert!(slot_is_expired(&slot, 601));
    }

    #[test]
    fn build_slot_from_clock_tick_sets_expiration_to_next_tick_boundary() {
        let mut clock = DispatchClock::new(100);
        let tick = clock.plan_tick(500, 100.0);
        let slot = build_dispatch_slot(tick, 100);

        assert_eq!(slot.elapsed_ms, 500);
        assert_eq!(slot.expires_at_elapsed_ms, 600);
        assert_eq!(slot.planned_starts, 10);
        assert_eq!(slot.target_rps_limit, 100.0);
    }

    fn short_flat_load() -> LoadProfile {
        LoadProfile {
            points: vec![
                crate::server::models::LoadPoint {
                    at_ms: 0,
                    intensity: 50.0,
                },
                crate::server::models::LoadPoint {
                    at_ms: 250,
                    intensity: 50.0,
                },
            ],
            interpolation: crate::server::models::LoadInterpolation::Linear,
            runner_max_rps: 1000.0,
            grace_period_ms: 0,
        }
    }

    #[tokio::test]
    async fn scheduler_thread_emits_slots_without_tokio_spawn() {
        let (slot_tx, mut slot_rx) = mpsc::channel(8);
        let (metric_tx, _metric_rx) = mpsc::unbounded_channel();
        let token = CancellationToken::new();

        let handle = spawn_wave_scheduler_thread(short_flat_load(), 100, slot_tx, metric_tx, token);

        let first = tokio::time::timeout(std::time::Duration::from_millis(300), slot_rx.recv())
            .await
            .expect("scheduler thread should emit a slot")
            .expect("slot channel should stay open while scheduler runs");

        assert!(first.planned_starts > 0);
        assert_eq!(first.expires_at_elapsed_ms, first.elapsed_ms + 100);

        handle.join().expect("scheduler thread should exit cleanly");
    }

    #[tokio::test]
    async fn scheduler_thread_emits_dispatch_scheduled_metric() {
        let (slot_tx, mut slot_rx) = mpsc::channel(8);
        let (metric_tx, mut metric_rx) = mpsc::unbounded_channel();
        let token = CancellationToken::new();

        let handle = spawn_wave_scheduler_thread(short_flat_load(), 100, slot_tx, metric_tx, token);

        let metric = tokio::time::timeout(std::time::Duration::from_millis(300), metric_rx.recv())
            .await
            .expect("scheduler should emit a metric")
            .expect("metric channel should stay open");

        assert!(matches!(
            metric,
            WaveSchedulerMetric::DispatchScheduled { count, .. } if count > 0
        ));
        assert!(slot_rx.recv().await.is_some());

        handle.join().expect("scheduler thread should exit cleanly");
    }

    #[tokio::test]
    async fn scheduler_emits_metric_when_slot_channel_is_full() {
        let (slot_tx, mut slot_rx) = mpsc::channel(1);
        let (metric_tx, mut metric_rx) = mpsc::unbounded_channel();

        slot_tx
            .send(WaveDispatchSlot {
                elapsed_ms: 0,
                expires_at_elapsed_ms: 100,
                planned_starts: 1,
                target_rps_limit: 10.0,
                scheduled_total: 1,
                scheduler_lag_ms: 0,
                missed_due_to_scheduler_lag: 0,
            })
            .await
            .unwrap();

        let sent = try_send_slot_or_metric(
            &slot_tx,
            &metric_tx,
            WaveDispatchSlot {
                elapsed_ms: 100,
                expires_at_elapsed_ms: 200,
                planned_starts: 7,
                target_rps_limit: 70.0,
                scheduled_total: 8,
                scheduler_lag_ms: 0,
                missed_due_to_scheduler_lag: 0,
            },
        );

        assert!(!sent);
        assert!(matches!(
            metric_rx.recv().await,
            Some(WaveSchedulerMetric::SlotBackpressure {
                elapsed_ms: 100,
                dropped_starts: 7,
            })
        ));

        assert!(slot_rx.recv().await.is_some());
    }
}