Skip to main content

soli_proxy/app/
deployment.rs

1use anyhow::Result;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use super::AppInfo;
9
10#[derive(Debug, Clone, PartialEq)]
11pub enum DeploymentStatus {
12    Idle,
13    Deploying,
14    RollingBack,
15    Failed(String),
16}
17
18pub struct DeploymentManager {
19    status: Arc<AtomicBool>,
20    dev_mode: bool,
21}
22
23impl Default for DeploymentManager {
24    fn default() -> Self {
25        Self::new(false)
26    }
27}
28
29impl DeploymentManager {
30    pub fn new(dev_mode: bool) -> Self {
31        Self {
32            status: Arc::new(AtomicBool::new(false)),
33            dev_mode,
34        }
35    }
36
37    pub async fn is_deploying(&self) -> bool {
38        self.status.load(Ordering::SeqCst)
39    }
40
41    /// Deploy an app to a slot. Returns the PID of the started process.
42    pub async fn deploy(&self, app: &AppInfo, slot: &str) -> Result<u32> {
43        if self.status.load(Ordering::SeqCst) {
44            anyhow::bail!("Another deployment is in progress");
45        }
46
47        self.status.store(true, Ordering::SeqCst);
48        let _guard = scopeguard::guard((), |_| {
49            self.status.store(false, Ordering::SeqCst);
50        });
51
52        tracing::info!(
53            "Starting deployment of {} to slot {}",
54            app.config.name,
55            slot
56        );
57
58        let pid = self.start_instance(app, slot).await?;
59
60        let healthy = self.wait_for_health(app, slot).await?;
61
62        if !healthy {
63            self.stop_instance(app, slot).await?;
64            anyhow::bail!("Health check failed for {} slot", slot);
65        }
66
67        tracing::info!("Health check passed for {} slot {}", app.config.name, slot);
68        Ok(pid)
69    }
70
71    async fn start_instance(&self, app: &AppInfo, slot: &str) -> Result<u32> {
72        let port = if slot == "blue" {
73            app.blue.port
74        } else {
75            app.green.port
76        };
77
78        let base_script =
79            app.config.start_script.as_ref().ok_or_else(|| {
80                anyhow::anyhow!("No start script configured for {}", app.config.name)
81            })?;
82
83        let script = if self.dev_mode {
84            format!("{} --dev", base_script)
85        } else {
86            base_script.clone()
87        };
88
89        let output_file = PathBuf::from(format!("run/logs/{}/{}.log", app.config.name, slot));
90        std::fs::create_dir_all(output_file.parent().unwrap())?;
91
92        let output = std::fs::File::create(&output_file)?;
93
94        let cmd = unsafe {
95            tokio::process::Command::new("sh")
96                .arg("-c")
97                .arg(&script)
98                .current_dir(&app.path)
99                .env("PATH", std::env::var("PATH").unwrap_or_default())
100                .env("PORT", port.to_string())
101                .env("WORKERS", app.config.workers.to_string())
102                .stdout(std::process::Stdio::from(output.try_clone()?))
103                .stderr(std::process::Stdio::from(output))
104                .pre_exec(|| {
105                    // Create a new process group so we can kill the entire group later
106                    libc::setsid();
107                    Ok(())
108                })
109                .spawn()?
110        };
111
112        let pid = cmd.id().unwrap_or(0);
113        tracing::info!("Started {} slot {} with PID {}", app.config.name, slot, pid);
114
115        Ok(pid)
116    }
117
118    pub async fn stop_instance(&self, app: &AppInfo, slot: &str) -> Result<()> {
119        let pid = if slot == "blue" {
120            app.blue.pid
121        } else {
122            app.green.pid
123        };
124
125        if let Some(pid) = pid {
126            tracing::info!("Stopping {} slot {} (PID: {})", app.config.name, slot, pid);
127
128            #[cfg(unix)]
129            {
130                // Kill the entire process group (negative PID) so child processes are included
131                let pgid = format!("-{}", pid);
132
133                tokio::process::Command::new("kill")
134                    .arg("-TERM")
135                    .arg("--")
136                    .arg(&pgid)
137                    .output()
138                    .await?;
139
140                let timeout = app.config.graceful_timeout as u64;
141                for _ in 0..timeout {
142                    let output = tokio::process::Command::new("kill")
143                        .arg("-0")
144                        .arg(pid.to_string())
145                        .output()
146                        .await?;
147
148                    if !output.status.success() {
149                        tracing::info!("Process {} terminated gracefully", pid);
150                        return Ok(());
151                    }
152                    sleep(Duration::from_secs(1)).await;
153                }
154
155                tracing::warn!("Force killing process group {}", pid);
156                tokio::process::Command::new("kill")
157                    .arg("-9")
158                    .arg("--")
159                    .arg(&pgid)
160                    .output()
161                    .await?;
162            }
163        }
164
165        Ok(())
166    }
167
168    async fn wait_for_health(&self, app: &AppInfo, slot: &str) -> Result<bool> {
169        let port = if slot == "blue" {
170            app.blue.port
171        } else {
172            app.green.port
173        };
174        let health_path = app.config.health_check.as_deref().unwrap_or("/health");
175
176        let url = format!("http://localhost:{}{}", port, health_path);
177        let timeout_secs = 30;
178
179        for i in 0..timeout_secs {
180            sleep(Duration::from_secs(1)).await;
181
182            match reqwest::Client::new().get(&url).send().await {
183                Ok(resp) if resp.status().is_success() => {
184                    tracing::info!(
185                        "Health check passed for {} slot {} after {}s",
186                        app.config.name,
187                        slot,
188                        i + 1
189                    );
190                    return Ok(true);
191                }
192                Ok(_) => {
193                    tracing::debug!(
194                        "Health check response for {} slot {}: {}",
195                        app.config.name,
196                        slot,
197                        i + 1
198                    );
199                }
200                Err(e) => {
201                    tracing::debug!(
202                        "Health check failed for {} slot {}: {} ({})",
203                        app.config.name,
204                        slot,
205                        e,
206                        i + 1
207                    );
208                }
209            }
210        }
211
212        Ok(false)
213    }
214
215    pub async fn switch_traffic(&self, app: &AppInfo, new_slot: &str) -> Result<()> {
216        tracing::info!(
217            "Switching traffic for {} to slot {}",
218            app.config.name,
219            new_slot
220        );
221
222        let old_slot = if new_slot == "blue" { "green" } else { "blue" };
223        self.stop_instance(app, old_slot).await?;
224
225        Ok(())
226    }
227
228    pub async fn rollback(&self, app: &AppInfo) -> Result<()> {
229        let target_slot = if app.current_slot == "blue" {
230            "green"
231        } else {
232            "blue"
233        };
234        self.deploy(app, target_slot).await?;
235        Ok(())
236    }
237
238    pub async fn get_deployment_log(&self, app_name: &str, slot: &str) -> Result<String> {
239        let log_path = PathBuf::from(format!("run/logs/{}/{}.log", app_name, slot));
240        if log_path.exists() {
241            Ok(std::fs::read_to_string(&log_path)?)
242        } else {
243            Ok(String::new())
244        }
245    }
246}