soli_proxy/app/
deployment.rs1use anyhow::Result;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use super::AppInfo;
9
10#[derive(Debug, Clone, PartialEq)]
11pub enum DeploymentStatus {
12 Idle,
13 Deploying,
14 RollingBack,
15 Failed(String),
16}
17
18pub struct DeploymentManager {
19 status: Arc<AtomicBool>,
20 dev_mode: bool,
21}
22
23impl Default for DeploymentManager {
24 fn default() -> Self {
25 Self::new(false)
26 }
27}
28
29impl DeploymentManager {
30 pub fn new(dev_mode: bool) -> Self {
31 Self {
32 status: Arc::new(AtomicBool::new(false)),
33 dev_mode,
34 }
35 }
36
37 pub async fn is_deploying(&self) -> bool {
38 self.status.load(Ordering::SeqCst)
39 }
40
41 pub async fn deploy(&self, app: &AppInfo, slot: &str) -> Result<u32> {
43 if self.status.load(Ordering::SeqCst) {
44 anyhow::bail!("Another deployment is in progress");
45 }
46
47 self.status.store(true, Ordering::SeqCst);
48 let _guard = scopeguard::guard((), |_| {
49 self.status.store(false, Ordering::SeqCst);
50 });
51
52 tracing::info!(
53 "Starting deployment of {} to slot {}",
54 app.config.name,
55 slot
56 );
57
58 let pid = self.start_instance(app, slot).await?;
59
60 let healthy = self.wait_for_health(app, slot).await?;
61
62 if !healthy {
63 self.stop_instance(app, slot).await?;
64 anyhow::bail!("Health check failed for {} slot", slot);
65 }
66
67 tracing::info!("Health check passed for {} slot {}", app.config.name, slot);
68 Ok(pid)
69 }
70
71 async fn start_instance(&self, app: &AppInfo, slot: &str) -> Result<u32> {
72 let port = if slot == "blue" {
73 app.blue.port
74 } else {
75 app.green.port
76 };
77
78 let base_script =
79 app.config.start_script.as_ref().ok_or_else(|| {
80 anyhow::anyhow!("No start script configured for {}", app.config.name)
81 })?;
82
83 let script = if self.dev_mode {
84 format!("{} --dev", base_script)
85 } else {
86 base_script.clone()
87 };
88
89 let output_file = PathBuf::from(format!("run/logs/{}/{}.log", app.config.name, slot));
90 std::fs::create_dir_all(output_file.parent().unwrap())?;
91
92 let output = std::fs::File::create(&output_file)?;
93
94 let mut cmd = tokio::process::Command::new("sh");
95 cmd.arg("-c")
96 .arg(&script)
97 .current_dir(&app.path)
98 .env("PATH", std::env::var("PATH").unwrap_or_default())
99 .env("PORT", port.to_string())
100 .env("WORKERS", app.config.workers.to_string())
101 .stdout(std::process::Stdio::from(output.try_clone()?))
102 .stderr(std::process::Stdio::from(output));
103
104 if let (Some(ref user), Some(ref group)) = (&app.config.user, &app.config.group) {
105 let uid = resolve_user(user)?;
106 let gid = resolve_group(group)?;
107 cmd.uid(uid).gid(gid);
108 tracing::info!(
109 "Running {} as user {} (uid: {}, gid: {})",
110 app.config.name,
111 user,
112 uid,
113 gid
114 );
115 } else if let Some(ref user) = &app.config.user {
116 let uid = resolve_user(user)?;
117 let gid = resolve_group(user)?;
118 cmd.uid(uid).gid(gid);
119 tracing::info!(
120 "Running {} as user {} (uid: {}, gid: {})",
121 app.config.name,
122 user,
123 uid,
124 gid
125 );
126 }
127
128 let cmd = unsafe {
129 cmd.pre_exec(|| {
130 libc::setsid();
131 Ok(())
132 })
133 .spawn()?
134 };
135
136 let pid = cmd.id().unwrap_or(0);
137 tracing::info!("Started {} slot {} with PID {}", app.config.name, slot, pid);
138
139 Ok(pid)
140 }
141
142 pub async fn stop_instance(&self, app: &AppInfo, slot: &str) -> Result<()> {
143 let pid = if slot == "blue" {
144 app.blue.pid
145 } else {
146 app.green.pid
147 };
148
149 if let Some(pid) = pid {
150 tracing::info!("Stopping {} slot {} (PID: {})", app.config.name, slot, pid);
151
152 #[cfg(unix)]
153 {
154 let pgid = format!("-{}", pid);
156
157 tokio::process::Command::new("kill")
158 .arg("-TERM")
159 .arg("--")
160 .arg(&pgid)
161 .output()
162 .await?;
163
164 let timeout = app.config.graceful_timeout as u64;
165 for _ in 0..timeout {
166 let output = tokio::process::Command::new("kill")
167 .arg("-0")
168 .arg(pid.to_string())
169 .output()
170 .await?;
171
172 if !output.status.success() {
173 tracing::info!("Process {} terminated gracefully", pid);
174 return Ok(());
175 }
176 sleep(Duration::from_secs(1)).await;
177 }
178
179 tracing::warn!("Force killing process group {}", pid);
180 tokio::process::Command::new("kill")
181 .arg("-9")
182 .arg("--")
183 .arg(&pgid)
184 .output()
185 .await?;
186 }
187 }
188
189 Ok(())
190 }
191
192 async fn wait_for_health(&self, app: &AppInfo, slot: &str) -> Result<bool> {
193 let port = if slot == "blue" {
194 app.blue.port
195 } else {
196 app.green.port
197 };
198 let health_path = app.config.health_check.as_deref().unwrap_or("/health");
199
200 let url = format!("http://localhost:{}{}", port, health_path);
201 let timeout_secs = 30;
202
203 for i in 0..timeout_secs {
204 sleep(Duration::from_secs(1)).await;
205
206 match reqwest::Client::new().get(&url).send().await {
207 Ok(resp) if resp.status().is_success() => {
208 tracing::info!(
209 "Health check passed for {} slot {} after {}s",
210 app.config.name,
211 slot,
212 i + 1
213 );
214 return Ok(true);
215 }
216 Ok(_) => {
217 tracing::debug!(
218 "Health check response for {} slot {}: {}",
219 app.config.name,
220 slot,
221 i + 1
222 );
223 }
224 Err(e) => {
225 tracing::debug!(
226 "Health check failed for {} slot {}: {} ({})",
227 app.config.name,
228 slot,
229 e,
230 i + 1
231 );
232 }
233 }
234 }
235
236 Ok(false)
237 }
238
239 pub async fn switch_traffic(&self, app: &AppInfo, new_slot: &str) -> Result<()> {
240 tracing::info!(
241 "Switching traffic for {} to slot {}",
242 app.config.name,
243 new_slot
244 );
245
246 let old_slot = if new_slot == "blue" { "green" } else { "blue" };
247 self.stop_instance(app, old_slot).await?;
248
249 Ok(())
250 }
251
252 pub async fn rollback(&self, app: &AppInfo) -> Result<()> {
253 let target_slot = if app.current_slot == "blue" {
254 "green"
255 } else {
256 "blue"
257 };
258 self.deploy(app, target_slot).await?;
259 Ok(())
260 }
261
262 pub async fn get_deployment_log(&self, app_name: &str, slot: &str) -> Result<String> {
263 let log_path = PathBuf::from(format!("run/logs/{}/{}.log", app_name, slot));
264 if log_path.exists() {
265 Ok(std::fs::read_to_string(&log_path)?)
266 } else {
267 Ok(String::new())
268 }
269 }
270}
271
272fn resolve_user(user: &str) -> Result<u32> {
273 use std::ffi::CString;
274 let c_user = CString::new(user)?;
275 let passwd = unsafe { libc::getpwnam(c_user.as_ptr()) };
276 if passwd.is_null() {
277 anyhow::bail!("User '{}' not found", user);
278 }
279 Ok(unsafe { (*passwd).pw_uid })
280}
281
282fn resolve_group(group: &str) -> Result<u32> {
283 use std::ffi::CString;
284 let c_group = CString::new(group)?;
285 let grp = unsafe { libc::getgrnam(c_group.as_ptr()) };
286 if grp.is_null() {
287 anyhow::bail!("Group '{}' not found", group);
288 }
289 Ok(unsafe { (*grp).gr_gid })
290}