Skip to main content

entrenar/ecosystem/batuta/
queue.rs

1//! Queue state types and ETA adjustment.
2
3use serde::{Deserialize, Serialize};
4use std::time::Duration;
5
6/// Queue state information.
7#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
8pub struct QueueState {
9    /// Current queue depth (number of waiting jobs)
10    pub queue_depth: u32,
11    /// Average wait time in seconds
12    pub avg_wait_seconds: u64,
13    /// Number of available GPUs
14    pub available_gpus: u32,
15    /// Total GPUs in the pool
16    pub total_gpus: u32,
17    /// Estimated time until next available slot (seconds)
18    pub eta_seconds: Option<u64>,
19}
20
21impl QueueState {
22    /// Create a new queue state.
23    pub fn new(queue_depth: u32, available_gpus: u32, total_gpus: u32) -> Self {
24        Self { queue_depth, avg_wait_seconds: 0, available_gpus, total_gpus, eta_seconds: None }
25    }
26
27    /// Set average wait time.
28    pub fn with_avg_wait(mut self, seconds: u64) -> Self {
29        self.avg_wait_seconds = seconds;
30        self
31    }
32
33    /// Set ETA to next available slot.
34    pub fn with_eta(mut self, seconds: u64) -> Self {
35        self.eta_seconds = Some(seconds);
36        self
37    }
38
39    /// Check if GPUs are immediately available.
40    pub fn is_available(&self) -> bool {
41        self.available_gpus > 0
42    }
43
44    /// Calculate queue utilization (0.0 to 1.0).
45    pub fn utilization(&self) -> f64 {
46        if self.total_gpus == 0 {
47            return 1.0;
48        }
49        1.0 - (f64::from(self.available_gpus) / f64::from(self.total_gpus))
50    }
51}
52
53/// Adjust estimated completion time based on queue state.
54///
55/// Takes a base ETA and adjusts it based on:
56/// - Current queue depth
57/// - Average wait time
58/// - Queue utilization
59pub fn adjust_eta(base_eta_seconds: u64, queue_state: &QueueState) -> Duration {
60    let mut adjusted = base_eta_seconds;
61
62    // Add queue wait time if not immediately available
63    if !queue_state.is_available() {
64        // Estimate wait based on queue depth and average wait time
65        let queue_wait = if queue_state.avg_wait_seconds > 0 {
66            u64::from(queue_state.queue_depth) * queue_state.avg_wait_seconds
67        } else {
68            // Default: 5 minutes per queued job
69            u64::from(queue_state.queue_depth) * 300
70        };
71        adjusted += queue_wait;
72    }
73
74    // Add ETA from queue state if available
75    if let Some(eta) = queue_state.eta_seconds {
76        adjusted = adjusted.max(eta);
77    }
78
79    // Apply utilization multiplier (high utilization = longer times)
80    let utilization = queue_state.utilization();
81    if utilization > 0.8 {
82        let multiplier = 1.0 + (utilization - 0.8) * 2.0; // Up to 40% increase at 100% util
83        adjusted = (adjusted as f64 * multiplier) as u64;
84    }
85
86    Duration::from_secs(adjusted)
87}