soli_proxy/app/
deployment.rs1use anyhow::Result;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use super::AppInfo;
9
10#[derive(Debug, Clone, PartialEq)]
11pub enum DeploymentStatus {
12 Idle,
13 Deploying,
14 RollingBack,
15 Failed(String),
16}
17
18pub struct DeploymentManager {
19 status: Arc<AtomicBool>,
20 dev_mode: bool,
21}
22
23impl Default for DeploymentManager {
24 fn default() -> Self {
25 Self::new(false)
26 }
27}
28
29impl DeploymentManager {
30 pub fn new(dev_mode: bool) -> Self {
31 Self {
32 status: Arc::new(AtomicBool::new(false)),
33 dev_mode,
34 }
35 }
36
37 pub async fn is_deploying(&self) -> bool {
38 self.status.load(Ordering::SeqCst)
39 }
40
41 pub async fn deploy(&self, app: &AppInfo, slot: &str) -> Result<u32> {
43 if self.status.load(Ordering::SeqCst) {
44 anyhow::bail!("Another deployment is in progress");
45 }
46
47 self.status.store(true, Ordering::SeqCst);
48 let _guard = scopeguard::guard((), |_| {
49 self.status.store(false, Ordering::SeqCst);
50 });
51
52 tracing::info!(
53 "Starting deployment of {} to slot {}",
54 app.config.name,
55 slot
56 );
57
58 let pid = self.start_instance(app, slot).await?;
59
60 let healthy = self.wait_for_health(app, slot).await?;
61
62 if !healthy {
63 self.stop_instance(app, slot).await?;
64 anyhow::bail!("Health check failed for {} slot", slot);
65 }
66
67 tracing::info!("Health check passed for {} slot {}", app.config.name, slot);
68 Ok(pid)
69 }
70
71 async fn start_instance(&self, app: &AppInfo, slot: &str) -> Result<u32> {
72 let port = if slot == "blue" {
73 app.blue.port
74 } else {
75 app.green.port
76 };
77
78 let base_script =
79 app.config.start_script.as_ref().ok_or_else(|| {
80 anyhow::anyhow!("No start script configured for {}", app.config.name)
81 })?;
82
83 let script = if self.dev_mode {
84 format!("{} --dev", base_script)
85 } else {
86 base_script.clone()
87 };
88
89 let output_file = PathBuf::from(format!("run/logs/{}/{}.log", app.config.name, slot));
90 std::fs::create_dir_all(output_file.parent().unwrap())?;
91
92 let output = std::fs::File::create(&output_file)?;
93
94 let cmd = unsafe {
95 tokio::process::Command::new("sh")
96 .arg("-c")
97 .arg(&script)
98 .current_dir(&app.path)
99 .env("PATH", std::env::var("PATH").unwrap_or_default())
100 .env("PORT", port.to_string())
101 .env("WORKERS", app.config.workers.to_string())
102 .stdout(std::process::Stdio::from(output.try_clone()?))
103 .stderr(std::process::Stdio::from(output))
104 .pre_exec(|| {
105 libc::setsid();
107 Ok(())
108 })
109 .spawn()?
110 };
111
112 let pid = cmd.id().unwrap_or(0);
113 tracing::info!("Started {} slot {} with PID {}", app.config.name, slot, pid);
114
115 Ok(pid)
116 }
117
118 pub async fn stop_instance(&self, app: &AppInfo, slot: &str) -> Result<()> {
119 let pid = if slot == "blue" {
120 app.blue.pid
121 } else {
122 app.green.pid
123 };
124
125 if let Some(pid) = pid {
126 tracing::info!("Stopping {} slot {} (PID: {})", app.config.name, slot, pid);
127
128 #[cfg(unix)]
129 {
130 let pgid = format!("-{}", pid);
132
133 tokio::process::Command::new("kill")
134 .arg("-TERM")
135 .arg("--")
136 .arg(&pgid)
137 .output()
138 .await?;
139
140 let timeout = app.config.graceful_timeout as u64;
141 for _ in 0..timeout {
142 let output = tokio::process::Command::new("kill")
143 .arg("-0")
144 .arg(pid.to_string())
145 .output()
146 .await?;
147
148 if !output.status.success() {
149 tracing::info!("Process {} terminated gracefully", pid);
150 return Ok(());
151 }
152 sleep(Duration::from_secs(1)).await;
153 }
154
155 tracing::warn!("Force killing process group {}", pid);
156 tokio::process::Command::new("kill")
157 .arg("-9")
158 .arg("--")
159 .arg(&pgid)
160 .output()
161 .await?;
162 }
163 }
164
165 Ok(())
166 }
167
168 async fn wait_for_health(&self, app: &AppInfo, slot: &str) -> Result<bool> {
169 let port = if slot == "blue" {
170 app.blue.port
171 } else {
172 app.green.port
173 };
174 let health_path = app.config.health_check.as_deref().unwrap_or("/health");
175
176 let url = format!("http://localhost:{}{}", port, health_path);
177 let timeout_secs = 30;
178
179 for i in 0..timeout_secs {
180 sleep(Duration::from_secs(1)).await;
181
182 match reqwest::Client::new().get(&url).send().await {
183 Ok(resp) if resp.status().is_success() => {
184 tracing::info!(
185 "Health check passed for {} slot {} after {}s",
186 app.config.name,
187 slot,
188 i + 1
189 );
190 return Ok(true);
191 }
192 Ok(_) => {
193 tracing::debug!(
194 "Health check response for {} slot {}: {}",
195 app.config.name,
196 slot,
197 i + 1
198 );
199 }
200 Err(e) => {
201 tracing::debug!(
202 "Health check failed for {} slot {}: {} ({})",
203 app.config.name,
204 slot,
205 e,
206 i + 1
207 );
208 }
209 }
210 }
211
212 Ok(false)
213 }
214
215 pub async fn switch_traffic(&self, app: &AppInfo, new_slot: &str) -> Result<()> {
216 tracing::info!(
217 "Switching traffic for {} to slot {}",
218 app.config.name,
219 new_slot
220 );
221
222 let old_slot = if new_slot == "blue" { "green" } else { "blue" };
223 self.stop_instance(app, old_slot).await?;
224
225 Ok(())
226 }
227
228 pub async fn rollback(&self, app: &AppInfo) -> Result<()> {
229 let target_slot = if app.current_slot == "blue" {
230 "green"
231 } else {
232 "blue"
233 };
234 self.deploy(app, target_slot).await?;
235 Ok(())
236 }
237
238 pub async fn get_deployment_log(&self, app_name: &str, slot: &str) -> Result<String> {
239 let log_path = PathBuf::from(format!("run/logs/{}/{}.log", app_name, slot));
240 if log_path.exists() {
241 Ok(std::fs::read_to_string(&log_path)?)
242 } else {
243 Ok(String::new())
244 }
245 }
246}