Skip to main content

soli_proxy/app/
deployment.rs

1use anyhow::Result;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use super::AppInfo;
9
10#[derive(Debug, Clone, PartialEq)]
11pub enum DeploymentStatus {
12    Idle,
13    Deploying,
14    RollingBack,
15    Failed(String),
16}
17
18pub struct DeploymentManager {
19    status: Arc<AtomicBool>,
20    dev_mode: bool,
21}
22
23impl Default for DeploymentManager {
24    fn default() -> Self {
25        Self::new(false)
26    }
27}
28
29impl DeploymentManager {
30    pub fn new(dev_mode: bool) -> Self {
31        Self {
32            status: Arc::new(AtomicBool::new(false)),
33            dev_mode,
34        }
35    }
36
37    pub async fn is_deploying(&self) -> bool {
38        self.status.load(Ordering::SeqCst)
39    }
40
41    /// Deploy an app to a slot. Returns the PID of the started process.
42    pub async fn deploy(&self, app: &AppInfo, slot: &str) -> Result<u32> {
43        if self.status.load(Ordering::SeqCst) {
44            anyhow::bail!("Another deployment is in progress");
45        }
46
47        self.status.store(true, Ordering::SeqCst);
48        let _guard = scopeguard::guard((), |_| {
49            self.status.store(false, Ordering::SeqCst);
50        });
51
52        tracing::info!(
53            "Starting deployment of {} to slot {}",
54            app.config.name,
55            slot
56        );
57
58        let pid = self.start_instance(app, slot).await?;
59
60        let healthy = self.wait_for_health(app, slot).await?;
61
62        if !healthy {
63            self.stop_instance(app, slot).await?;
64            anyhow::bail!("Health check failed for {} slot", slot);
65        }
66
67        tracing::info!("Health check passed for {} slot {}", app.config.name, slot);
68        Ok(pid)
69    }
70
71    async fn start_instance(&self, app: &AppInfo, slot: &str) -> Result<u32> {
72        let port = if slot == "blue" {
73            app.blue.port
74        } else {
75            app.green.port
76        };
77
78        let base_script =
79            app.config.start_script.as_ref().ok_or_else(|| {
80                anyhow::anyhow!("No start script configured for {}", app.config.name)
81            })?;
82
83        let script = if self.dev_mode {
84            format!("{} --dev", base_script)
85        } else {
86            base_script.clone()
87        };
88
89        let output_file = PathBuf::from(format!("run/logs/{}/{}.log", app.config.name, slot));
90        std::fs::create_dir_all(output_file.parent().unwrap())?;
91
92        let output = std::fs::File::create(&output_file)?;
93
94        let mut cmd = tokio::process::Command::new("sh");
95        cmd.arg("-c")
96            .arg(&script)
97            .current_dir(&app.path)
98            .env("PATH", std::env::var("PATH").unwrap_or_default())
99            .env("PORT", port.to_string())
100            .env("WORKERS", app.config.workers.to_string())
101            .stdout(std::process::Stdio::from(output.try_clone()?))
102            .stderr(std::process::Stdio::from(output));
103
104        if let (Some(ref user), Some(ref group)) = (&app.config.user, &app.config.group) {
105            let uid = resolve_user(user)?;
106            let gid = resolve_group(group)?;
107            cmd.uid(uid).gid(gid);
108            tracing::info!(
109                "Running {} as user {} (uid: {}, gid: {})",
110                app.config.name,
111                user,
112                uid,
113                gid
114            );
115        } else if let Some(ref user) = &app.config.user {
116            let uid = resolve_user(user)?;
117            let gid = resolve_group(user)?;
118            cmd.uid(uid).gid(gid);
119            tracing::info!(
120                "Running {} as user {} (uid: {}, gid: {})",
121                app.config.name,
122                user,
123                uid,
124                gid
125            );
126        }
127
128        let cmd = unsafe {
129            cmd.pre_exec(|| {
130                libc::setsid();
131                Ok(())
132            })
133            .spawn()?
134        };
135
136        let pid = cmd.id().unwrap_or(0);
137        tracing::info!("Started {} slot {} with PID {}", app.config.name, slot, pid);
138
139        Ok(pid)
140    }
141
142    pub async fn stop_instance(&self, app: &AppInfo, slot: &str) -> Result<()> {
143        let pid = if slot == "blue" {
144            app.blue.pid
145        } else {
146            app.green.pid
147        };
148
149        if let Some(pid) = pid {
150            tracing::info!("Stopping {} slot {} (PID: {})", app.config.name, slot, pid);
151
152            #[cfg(unix)]
153            {
154                // Kill the entire process group (negative PID) so child processes are included
155                let pgid = format!("-{}", pid);
156
157                tokio::process::Command::new("kill")
158                    .arg("-TERM")
159                    .arg("--")
160                    .arg(&pgid)
161                    .output()
162                    .await?;
163
164                let timeout = app.config.graceful_timeout as u64;
165                for _ in 0..timeout {
166                    let output = tokio::process::Command::new("kill")
167                        .arg("-0")
168                        .arg(pid.to_string())
169                        .output()
170                        .await?;
171
172                    if !output.status.success() {
173                        tracing::info!("Process {} terminated gracefully", pid);
174                        return Ok(());
175                    }
176                    sleep(Duration::from_secs(1)).await;
177                }
178
179                tracing::warn!("Force killing process group {}", pid);
180                tokio::process::Command::new("kill")
181                    .arg("-9")
182                    .arg("--")
183                    .arg(&pgid)
184                    .output()
185                    .await?;
186            }
187        }
188
189        Ok(())
190    }
191
192    async fn wait_for_health(&self, app: &AppInfo, slot: &str) -> Result<bool> {
193        let port = if slot == "blue" {
194            app.blue.port
195        } else {
196            app.green.port
197        };
198        let health_path = app.config.health_check.as_deref().unwrap_or("/health");
199
200        let url = format!("http://localhost:{}{}", port, health_path);
201        let timeout_secs = 30;
202
203        for i in 0..timeout_secs {
204            sleep(Duration::from_secs(1)).await;
205
206            match reqwest::Client::new().get(&url).send().await {
207                Ok(resp) if resp.status().is_success() => {
208                    tracing::info!(
209                        "Health check passed for {} slot {} after {}s",
210                        app.config.name,
211                        slot,
212                        i + 1
213                    );
214                    return Ok(true);
215                }
216                Ok(_) => {
217                    tracing::debug!(
218                        "Health check response for {} slot {}: {}",
219                        app.config.name,
220                        slot,
221                        i + 1
222                    );
223                }
224                Err(e) => {
225                    tracing::debug!(
226                        "Health check failed for {} slot {}: {} ({})",
227                        app.config.name,
228                        slot,
229                        e,
230                        i + 1
231                    );
232                }
233            }
234        }
235
236        Ok(false)
237    }
238
239    pub async fn switch_traffic(&self, app: &AppInfo, new_slot: &str) -> Result<()> {
240        tracing::info!(
241            "Switching traffic for {} to slot {}",
242            app.config.name,
243            new_slot
244        );
245
246        let old_slot = if new_slot == "blue" { "green" } else { "blue" };
247        self.stop_instance(app, old_slot).await?;
248
249        Ok(())
250    }
251
252    pub async fn rollback(&self, app: &AppInfo) -> Result<()> {
253        let target_slot = if app.current_slot == "blue" {
254            "green"
255        } else {
256            "blue"
257        };
258        self.deploy(app, target_slot).await?;
259        Ok(())
260    }
261
262    pub async fn get_deployment_log(&self, app_name: &str, slot: &str) -> Result<String> {
263        let log_path = PathBuf::from(format!("run/logs/{}/{}.log", app_name, slot));
264        if log_path.exists() {
265            Ok(std::fs::read_to_string(&log_path)?)
266        } else {
267            Ok(String::new())
268        }
269    }
270}
271
272fn resolve_user(user: &str) -> Result<u32> {
273    use std::ffi::CString;
274    let c_user = CString::new(user)?;
275    let passwd = unsafe { libc::getpwnam(c_user.as_ptr()) };
276    if passwd.is_null() {
277        anyhow::bail!("User '{}' not found", user);
278    }
279    Ok(unsafe { (*passwd).pw_uid })
280}
281
282fn resolve_group(group: &str) -> Result<u32> {
283    use std::ffi::CString;
284    let c_group = CString::new(group)?;
285    let grp = unsafe { libc::getgrnam(c_group.as_ptr()) };
286    if grp.is_null() {
287        anyhow::bail!("Group '{}' not found", group);
288    }
289    Ok(unsafe { (*grp).gr_gid })
290}