1use std::collections::HashMap;
14#[cfg(windows)]
15use std::os::windows::process::CommandExt;
16use std::sync::Arc;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19use serde::{Deserialize, Serialize};
20use tokio::process::Command;
21use tokio::sync::RwLock;
22
23use super::paths::AcpPaths;
24use super::registry_fetch::fetch_registry;
25use super::runtime_manager::{AcpRuntimeManager, RuntimeType};
26#[cfg(windows)]
27use super::CREATE_NO_WINDOW;
28
29const PREWARM_TIMEOUT_SECS: u64 = 5 * 60; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36#[serde(rename_all = "lowercase")]
37pub enum WarmupState {
38 Idle,
39 Warming,
40 Warm,
41 Failed,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct WarmupStatus {
46 #[serde(rename = "agentId")]
47 pub agent_id: String,
48 pub state: WarmupState,
49 #[serde(rename = "startedAt", skip_serializing_if = "Option::is_none")]
50 pub started_at: Option<u64>,
51 #[serde(rename = "finishedAt", skip_serializing_if = "Option::is_none")]
52 pub finished_at: Option<u64>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub error: Option<String>,
55}
56
57impl WarmupStatus {
58 fn idle(agent_id: &str) -> Self {
59 Self {
60 agent_id: agent_id.to_string(),
61 state: WarmupState::Idle,
62 started_at: None,
63 finished_at: None,
64 error: None,
65 }
66 }
67}
68
69fn now_secs() -> u64 {
70 SystemTime::now()
71 .duration_since(UNIX_EPOCH)
72 .map(|d| d.as_secs())
73 .unwrap_or(0)
74}
75
76pub struct AcpWarmupService {
80 paths: AcpPaths,
81 states: Arc<RwLock<HashMap<String, WarmupStatus>>>,
82}
83
84impl AcpWarmupService {
85 pub fn new(paths: AcpPaths) -> Self {
86 Self {
87 paths,
88 states: Arc::new(RwLock::new(HashMap::new())),
89 }
90 }
91
92 fn make_runtime_manager(&self) -> AcpRuntimeManager {
93 AcpRuntimeManager::new(self.paths.clone())
94 }
95
96 pub async fn is_warming_up(&self, agent_id: &str) -> bool {
99 self.states
100 .read()
101 .await
102 .get(agent_id)
103 .map(|s| s.state == WarmupState::Warming)
104 .unwrap_or(false)
105 }
106
107 pub async fn is_warmed_up(&self, agent_id: &str) -> bool {
108 self.states
109 .read()
110 .await
111 .get(agent_id)
112 .map(|s| s.state == WarmupState::Warm)
113 .unwrap_or(false)
114 }
115
116 pub async fn needs_warmup(&self, agent_id: &str) -> bool {
117 matches!(
118 self.states.read().await.get(agent_id).map(|s| &s.state),
119 None | Some(WarmupState::Idle) | Some(WarmupState::Failed)
120 )
121 }
122
123 pub async fn get_status(&self, agent_id: &str) -> WarmupStatus {
124 self.states
125 .read()
126 .await
127 .get(agent_id)
128 .cloned()
129 .unwrap_or_else(|| WarmupStatus::idle(agent_id))
130 }
131
132 pub async fn get_all_statuses(&self) -> Vec<WarmupStatus> {
133 self.states.read().await.values().cloned().collect()
134 }
135
136 pub async fn warmup_in_background(&self, agent_id: &str) {
141 if !self.needs_warmup(agent_id).await {
142 return;
143 }
144 let agent_id = agent_id.to_string();
145 let states = self.states.clone();
146 let paths = self.paths.clone();
147 tokio::spawn(async move {
148 let tmp = AcpWarmupService { paths, states };
149 let _ = tmp.warmup(&agent_id).await;
150 });
151 }
152
153 pub async fn warmup(&self, agent_id: &str) -> Result<bool, String> {
156 if !self.needs_warmup(agent_id).await {
157 return Ok(self.is_warmed_up(agent_id).await);
158 }
159
160 self.set_state(
161 agent_id,
162 WarmupStatus {
163 agent_id: agent_id.to_string(),
164 state: WarmupState::Warming,
165 started_at: Some(now_secs()),
166 finished_at: None,
167 error: None,
168 },
169 )
170 .await;
171
172 let result = self.run_warmup(agent_id).await;
173
174 let (state, error) = match &result {
175 Ok(true) => (WarmupState::Warm, None),
176 Ok(false) => (WarmupState::Failed, None),
177 Err(e) => (WarmupState::Failed, Some(e.clone())),
178 };
179
180 self.set_state(
181 agent_id,
182 WarmupStatus {
183 agent_id: agent_id.to_string(),
184 state,
185 started_at: None,
186 finished_at: Some(now_secs()),
187 error,
188 },
189 )
190 .await;
191
192 result.map_err(|_| "Warmup failed".to_string())
193 }
194
195 async fn run_warmup(&self, agent_id: &str) -> Result<bool, String> {
198 let registry = fetch_registry()
200 .await
201 .map_err(|e| format!("Registry fetch failed: {e}"))?;
202
203 let agent = registry
204 .agents
205 .iter()
206 .find(|a| a.id == agent_id)
207 .ok_or_else(|| format!("Agent '{agent_id}' not found in registry"))?
208 .clone();
209
210 let dist = &agent.distribution;
211
212 if let Some(npx_dist) = dist.npx.as_ref() {
214 let package = npx_dist.package.clone();
215
216 let runtime_info = self
217 .make_runtime_manager()
218 .ensure_runtime(&RuntimeType::Npx)
219 .await?;
220
221 return self
222 .execute_prewarm_command("npx", &runtime_info.path, &package)
223 .await;
224 }
225
226 if let Some(uvx_dist) = dist.uvx.as_ref() {
228 let package = uvx_dist.package.clone();
229
230 let runtime_info = self
231 .make_runtime_manager()
232 .ensure_runtime(&RuntimeType::Uvx)
233 .await?;
234
235 return self
236 .execute_prewarm_command("uvx", &runtime_info.path, &package)
237 .await;
238 }
239
240 tracing::info!("[AcpWarmup] Agent {} is binary, no warmup needed", agent_id);
242 Ok(true)
243 }
244
245 pub async fn execute_prewarm_command(
247 &self,
248 runner: &str,
249 runtime_path: &std::path::Path,
250 package_name: &str,
251 ) -> Result<bool, String> {
252 let args: Vec<&str> = if runner == "npx" {
253 vec!["-y", package_name]
254 } else {
255 vec![package_name, "--help"]
257 };
258
259 tracing::info!(
260 "[AcpWarmup] Pre-warming {} package: {} (via {:?})",
261 runner,
262 package_name,
263 runtime_path
264 );
265
266 let runtime_dir = runtime_path
267 .parent()
268 .map(|p| p.to_string_lossy().to_string())
269 .unwrap_or_default();
270
271 let mut cmd = Command::new(runtime_path);
272 cmd.args(&args)
273 .stdout(std::process::Stdio::null())
274 .stderr(std::process::Stdio::null());
275
276 #[cfg(windows)]
277 cmd.as_std_mut().creation_flags(CREATE_NO_WINDOW);
278
279 if let Ok(path_env) = std::env::var("PATH") {
281 let sep = if cfg!(windows) { ";" } else { ":" };
282 cmd.env("PATH", format!("{runtime_dir}{sep}{path_env}"));
283 }
284
285 let child = cmd.spawn().map_err(|e| format!("spawn failed: {e}"))?;
286
287 match tokio::time::timeout(
288 std::time::Duration::from_secs(PREWARM_TIMEOUT_SECS),
289 child.wait_with_output(),
290 )
291 .await
292 {
293 Ok(Ok(output)) => {
294 tracing::info!(
297 "[AcpWarmup] Prewarm done for {} (exit={})",
298 package_name,
299 output.status.code().unwrap_or(-1)
300 );
301 Ok(true)
302 }
303 Ok(Err(e)) => {
304 tracing::error!(
305 "[AcpWarmup] Prewarm command error for {}: {}",
306 package_name,
307 e
308 );
309 Err(e.to_string())
310 }
311 Err(_) => {
312 tracing::warn!(
313 "[AcpWarmup] Prewarm timed out after {}s for {}",
314 PREWARM_TIMEOUT_SECS,
315 package_name
316 );
317 Ok(false)
318 }
319 }
320 }
321
322 async fn set_state(&self, agent_id: &str, status: WarmupStatus) {
323 self.states
324 .write()
325 .await
326 .insert(agent_id.to_string(), status);
327 }
328}