use crate::app::state::automation::rate_limit::RateLimitManager;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutomationSchedulerMetrics {
pub active_runs: usize,
pub queued_runs_by_reason: HashMap<String, usize>,
pub admitted_total: u64,
pub completed_total: u64,
pub avg_wait_ms: u64,
pub p95_wait_ms: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum QueueReason {
Capacity,
WorkspaceLock,
RateLimit,
}
impl QueueReason {
pub fn as_str(&self) -> &'static str {
match self {
Self::Capacity => "capacity",
Self::WorkspaceLock => "workspace_lock",
Self::RateLimit => "rate_limit",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SchedulerMetadata {
pub queue_reason: Option<QueueReason>,
#[serde(skip_serializing_if = "Option::is_none")]
pub resource_key: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rate_limited_provider: Option<String>,
#[serde(default)]
pub queued_at_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidatedArtifact {
pub path: String,
pub content_digest: String,
}
#[derive(Debug, Default)]
pub struct PreexistingArtifactRegistry {
pub artifacts: HashMap<String, HashMap<String, ValidatedArtifact>>,
}
impl PreexistingArtifactRegistry {
pub fn new() -> Self {
Self {
artifacts: HashMap::new(),
}
}
pub fn register_validated(&mut self, run_id: &str, node_id: &str, artifact: ValidatedArtifact) {
self.artifacts
.entry(run_id.to_string())
.or_default()
.insert(node_id.to_string(), artifact);
}
pub fn is_artifact_prevalidated(&self, run_id: &str, node_id: &str) -> bool {
self.artifacts
.get(run_id)
.and_then(|nodes| nodes.get(node_id))
.is_some()
}
pub fn get_prevalidated_path(&self, run_id: &str, node_id: &str) -> Option<&str> {
self.artifacts
.get(run_id)
.and_then(|nodes| nodes.get(node_id))
.map(|a| a.path.as_str())
}
pub fn clear_run(&mut self, run_id: &str) {
self.artifacts.remove(run_id);
}
}
pub struct AutomationScheduler {
pub max_concurrent_runs: usize,
pub active_runs: HashMap<String, String>,
pub locked_workspaces: HashMap<String, String>,
pub rate_limits: RateLimitManager,
pub preexisting_registry: PreexistingArtifactRegistry,
pub admitted_total: u64,
pub completed_total: u64,
pub queue_state: HashMap<String, SchedulerMetadata>,
pub wait_times: std::collections::VecDeque<u64>,
}
impl AutomationScheduler {
pub fn new(max_concurrent_runs: usize) -> Self {
Self {
max_concurrent_runs,
active_runs: HashMap::new(),
locked_workspaces: HashMap::new(),
rate_limits: RateLimitManager::new(),
preexisting_registry: PreexistingArtifactRegistry::new(),
admitted_total: 0,
completed_total: 0,
queue_state: HashMap::new(),
wait_times: std::collections::VecDeque::with_capacity(1000),
}
}
pub fn can_admit(
&self,
run_id: &str,
workspace_root: Option<&str>,
required_providers: &[String],
) -> Result<(), SchedulerMetadata> {
for provider in required_providers {
if self.rate_limits.is_provider_throttled(provider) {
return Err(SchedulerMetadata {
queue_reason: Some(QueueReason::RateLimit),
resource_key: None,
rate_limited_provider: Some(provider.clone()),
queued_at_ms: self.get_queued_at(run_id),
});
}
}
if let Some(root) = workspace_root {
if self.locked_workspaces.contains_key(root) {
return Err(SchedulerMetadata {
queue_reason: Some(QueueReason::WorkspaceLock),
resource_key: Some(root.to_string()),
rate_limited_provider: None,
queued_at_ms: self.get_queued_at(run_id),
});
}
}
if self.active_runs.len() >= self.max_concurrent_runs {
return Err(SchedulerMetadata {
queue_reason: Some(QueueReason::Capacity),
resource_key: None,
rate_limited_provider: None,
queued_at_ms: self.get_queued_at(run_id),
});
}
Ok(())
}
fn get_queued_at(&self, run_id: &str) -> u64 {
self.queue_state
.get(run_id)
.map(|m| m.queued_at_ms)
.unwrap_or_else(crate::util::time::now_ms)
}
pub fn track_queue_state(&mut self, run_id: &str, metadata: SchedulerMetadata) {
self.queue_state.insert(run_id.to_string(), metadata);
}
pub fn admit_run(&mut self, run_id: &str, workspace_root: Option<&str>) {
let root = workspace_root.unwrap_or("").to_string();
if !root.is_empty() {
self.locked_workspaces
.insert(root.clone(), run_id.to_string());
}
self.active_runs.insert(run_id.to_string(), root);
self.admitted_total += 1;
if let Some(meta) = self.queue_state.remove(run_id) {
let wait_ms = crate::util::time::now_ms().saturating_sub(meta.queued_at_ms);
if self.wait_times.len() >= 1000 {
self.wait_times.pop_front();
}
self.wait_times.push_back(wait_ms);
}
}
pub fn reserve_workspace(&mut self, run_id: &str, workspace_root: Option<&str>) {
let root = workspace_root.unwrap_or("").to_string();
if root.is_empty() {
return;
}
self.locked_workspaces.insert(root, run_id.to_string());
}
pub fn release_capacity(&mut self, run_id: &str) {
if self.active_runs.remove(run_id).is_some() {
self.completed_total += 1;
}
}
pub fn release_workspace(&mut self, run_id: &str) {
self.locked_workspaces.retain(|_, holder| holder != run_id);
self.preexisting_registry.clear_run(run_id);
self.queue_state.remove(run_id);
}
pub fn release_run(&mut self, run_id: &str) {
self.release_capacity(run_id);
self.release_workspace(run_id);
}
pub fn metrics(&self) -> AutomationSchedulerMetrics {
let mut reasons = HashMap::new();
for meta in self.queue_state.values() {
if let Some(reason) = meta.queue_reason {
*reasons.entry(reason.as_str().to_string()).or_default() += 1;
}
}
let mut wait_times: Vec<u64> = self.wait_times.iter().cloned().collect();
wait_times.sort_unstable();
let avg_wait = if wait_times.is_empty() {
0
} else {
wait_times.iter().sum::<u64>() / wait_times.len() as u64
};
let p95_wait = if wait_times.is_empty() {
0
} else {
let idx = (wait_times.len() as f64 * 0.95).round() as usize;
wait_times
.get(idx.min(wait_times.len() - 1))
.cloned()
.unwrap_or(0)
};
AutomationSchedulerMetrics {
active_runs: self.active_runs.len(),
queued_runs_by_reason: reasons,
admitted_total: self.admitted_total,
completed_total: self.completed_total,
avg_wait_ms: avg_wait,
p95_wait_ms: p95_wait,
}
}
pub fn active_count(&self) -> usize {
self.active_runs.len()
}
pub fn is_at_capacity(&self) -> bool {
self.active_runs.len() >= self.max_concurrent_runs
}
}