entrenar/ecosystem/batuta/
queue.rs1use serde::{Deserialize, Serialize};
4use std::time::Duration;
5
6#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
8pub struct QueueState {
9 pub queue_depth: u32,
11 pub avg_wait_seconds: u64,
13 pub available_gpus: u32,
15 pub total_gpus: u32,
17 pub eta_seconds: Option<u64>,
19}
20
21impl QueueState {
22 pub fn new(queue_depth: u32, available_gpus: u32, total_gpus: u32) -> Self {
24 Self { queue_depth, avg_wait_seconds: 0, available_gpus, total_gpus, eta_seconds: None }
25 }
26
27 pub fn with_avg_wait(mut self, seconds: u64) -> Self {
29 self.avg_wait_seconds = seconds;
30 self
31 }
32
33 pub fn with_eta(mut self, seconds: u64) -> Self {
35 self.eta_seconds = Some(seconds);
36 self
37 }
38
39 pub fn is_available(&self) -> bool {
41 self.available_gpus > 0
42 }
43
44 pub fn utilization(&self) -> f64 {
46 if self.total_gpus == 0 {
47 return 1.0;
48 }
49 1.0 - (f64::from(self.available_gpus) / f64::from(self.total_gpus))
50 }
51}
52
53pub fn adjust_eta(base_eta_seconds: u64, queue_state: &QueueState) -> Duration {
60 let mut adjusted = base_eta_seconds;
61
62 if !queue_state.is_available() {
64 let queue_wait = if queue_state.avg_wait_seconds > 0 {
66 u64::from(queue_state.queue_depth) * queue_state.avg_wait_seconds
67 } else {
68 u64::from(queue_state.queue_depth) * 300
70 };
71 adjusted += queue_wait;
72 }
73
74 if let Some(eta) = queue_state.eta_seconds {
76 adjusted = adjusted.max(eta);
77 }
78
79 let utilization = queue_state.utilization();
81 if utilization > 0.8 {
82 let multiplier = 1.0 + (utilization - 0.8) * 2.0; adjusted = (adjusted as f64 * multiplier) as u64;
84 }
85
86 Duration::from_secs(adjusted)
87}