Skip to main content

sandbox_agent_opencode_server_manager/
lib.rs

1use std::fs::{self, OpenOptions};
2use std::net::TcpListener;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, ExitStatus, Stdio};
5use std::sync::{Arc, Mutex as StdMutex};
6use std::time::Duration;
7
8use reqwest::Client;
9use sandbox_agent_agent_management::agents::{AgentId, AgentManager};
10use tokio::sync::Mutex;
11use tokio::time::sleep;
12use tracing::warn;
13
14const HEALTH_ENDPOINTS: [&str; 4] = ["health", "healthz", "app/agents", "agents"];
15const HEALTH_ATTEMPTS: usize = 20;
16const HEALTH_DELAY_MS: u64 = 150;
17const MONITOR_DELAY_MS: u64 = 500;
18
19#[derive(Debug, Clone)]
20pub struct OpenCodeServerManagerConfig {
21    pub log_dir: PathBuf,
22    pub auto_restart: bool,
23}
24
25impl Default for OpenCodeServerManagerConfig {
26    fn default() -> Self {
27        Self {
28            log_dir: default_log_dir(),
29            auto_restart: true,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct OpenCodeServerManager {
36    inner: Arc<Inner>,
37}
38
39#[derive(Debug)]
40struct Inner {
41    agent_manager: Arc<AgentManager>,
42    http_client: Client,
43    config: OpenCodeServerManagerConfig,
44    ensure_lock: Mutex<()>,
45    state: Mutex<ManagerState>,
46}
47
48#[derive(Debug, Default)]
49struct ManagerState {
50    server: Option<RunningServer>,
51    restart_count: u64,
52    shutdown_requested: bool,
53    last_error: Option<String>,
54}
55
56#[derive(Debug, Clone)]
57struct RunningServer {
58    base_url: String,
59    child: Arc<StdMutex<Option<Child>>>,
60    instance_id: u64,
61}
62
63impl OpenCodeServerManager {
64    pub fn new(agent_manager: Arc<AgentManager>, config: OpenCodeServerManagerConfig) -> Self {
65        Self {
66            inner: Arc::new(Inner {
67                agent_manager,
68                http_client: Client::new(),
69                config,
70                ensure_lock: Mutex::new(()),
71                state: Mutex::new(ManagerState::default()),
72            }),
73        }
74    }
75
76    pub async fn ensure_server(&self) -> Result<String, String> {
77        let _guard = self.inner.ensure_lock.lock().await;
78
79        if let Some(base_url) = self.running_base_url().await {
80            return Ok(base_url);
81        }
82
83        let (base_url, child) = self.spawn_http_server().await?;
84
85        if let Err(err) = self.wait_for_http_server(&base_url).await {
86            kill_child(&child);
87            let mut state = self.inner.state.lock().await;
88            state.last_error = Some(err.clone());
89            return Err(err);
90        }
91
92        let instance_id = {
93            let mut state = self.inner.state.lock().await;
94            state.shutdown_requested = false;
95            state.restart_count += 1;
96            let instance_id = state.restart_count;
97            state.server = Some(RunningServer {
98                base_url: base_url.clone(),
99                child: child.clone(),
100                instance_id,
101            });
102            state.last_error = None;
103            instance_id
104        };
105
106        self.spawn_monitor_task(instance_id, child);
107
108        Ok(base_url)
109    }
110
111    pub async fn shutdown(&self) {
112        let _guard = self.inner.ensure_lock.lock().await;
113
114        let child = {
115            let mut state = self.inner.state.lock().await;
116            state.shutdown_requested = true;
117            state.server.take().map(|server| server.child)
118        };
119
120        if let Some(child) = child {
121            kill_child(&child);
122        }
123    }
124
125    async fn running_base_url(&self) -> Option<String> {
126        let running = {
127            let state = self.inner.state.lock().await;
128            state.server.clone()
129        }?;
130
131        if child_is_alive(&running.child) {
132            return Some(running.base_url);
133        }
134
135        let mut state = self.inner.state.lock().await;
136        if state
137            .server
138            .as_ref()
139            .map(|server| server.instance_id == running.instance_id)
140            .unwrap_or(false)
141        {
142            state.server = None;
143        }
144
145        None
146    }
147
148    async fn wait_for_http_server(&self, base_url: &str) -> Result<(), String> {
149        for _ in 0..HEALTH_ATTEMPTS {
150            for endpoint in HEALTH_ENDPOINTS {
151                let url = format!("{base_url}/{endpoint}");
152                match self.inner.http_client.get(&url).send().await {
153                    Ok(response) if response.status().is_success() => return Ok(()),
154                    Ok(_) | Err(_) => {}
155                }
156            }
157            sleep(Duration::from_millis(HEALTH_DELAY_MS)).await;
158        }
159
160        Err("OpenCode server health check failed".to_string())
161    }
162
163    async fn spawn_http_server(&self) -> Result<(String, Arc<StdMutex<Option<Child>>>), String> {
164        let agent_manager = self.inner.agent_manager.clone();
165        let log_dir = self.inner.config.log_dir.clone();
166
167        let (base_url, child) = tokio::task::spawn_blocking(move || {
168            let path = agent_manager
169                .resolve_binary(AgentId::Opencode)
170                .map_err(|err| err.to_string())?;
171            let port = find_available_port()?;
172            let mut command = Command::new(path);
173            let stderr = open_opencode_log(&log_dir).unwrap_or_else(|_| Stdio::null());
174            command
175                .arg("serve")
176                .arg("--port")
177                .arg(port.to_string())
178                .stdout(Stdio::null())
179                .stderr(stderr);
180
181            let child = command.spawn().map_err(|err| err.to_string())?;
182            Ok::<(String, Child), String>((format!("http://127.0.0.1:{port}"), child))
183        })
184        .await
185        .map_err(|err| err.to_string())??;
186
187        Ok((base_url, Arc::new(StdMutex::new(Some(child)))))
188    }
189
190    fn spawn_monitor_task(&self, instance_id: u64, child: Arc<StdMutex<Option<Child>>>) {
191        let manager = self.clone();
192        tokio::spawn(async move {
193            loop {
194                let status = {
195                    let mut guard = match child.lock() {
196                        Ok(guard) => guard,
197                        Err(_) => return,
198                    };
199                    match guard.as_mut() {
200                        Some(child) => match child.try_wait() {
201                            Ok(status) => status,
202                            Err(_) => None,
203                        },
204                        None => return,
205                    }
206                };
207
208                if let Some(status) = status {
209                    manager.handle_process_exit(instance_id, status).await;
210                    return;
211                }
212
213                sleep(Duration::from_millis(MONITOR_DELAY_MS)).await;
214            }
215        });
216    }
217
218    async fn handle_process_exit(&self, instance_id: u64, status: ExitStatus) {
219        let (should_restart, error_message) = {
220            let mut state = self.inner.state.lock().await;
221            let Some(server) = state.server.as_ref() else {
222                return;
223            };
224            if server.instance_id != instance_id {
225                return;
226            }
227
228            let message = format!("OpenCode server exited with status {:?}", status);
229            let shutdown_requested = state.shutdown_requested;
230            if !shutdown_requested {
231                state.last_error = Some(message.clone());
232            }
233            state.server = None;
234
235            (
236                !shutdown_requested && self.inner.config.auto_restart,
237                message,
238            )
239        };
240
241        if !should_restart {
242            return;
243        }
244
245        let manager = self.clone();
246        tokio::spawn(async move {
247            sleep(Duration::from_millis(MONITOR_DELAY_MS)).await;
248            if let Err(err) = manager.ensure_server().await {
249                warn!(
250                    error = ?err,
251                    prior_exit = %error_message,
252                    "failed to restart OpenCode compat sidecar"
253                );
254            }
255        });
256    }
257}
258
259fn default_log_dir() -> PathBuf {
260    let mut base = dirs::data_local_dir().unwrap_or_else(|| std::env::temp_dir());
261    base.push("sandbox-agent");
262    base.push("agent-logs");
263    base
264}
265
266fn open_opencode_log(log_dir: &Path) -> Result<Stdio, String> {
267    let directory = log_dir.join("opencode");
268    fs::create_dir_all(&directory).map_err(|err| err.to_string())?;
269    let path = directory.join("opencode-compat.log");
270    let file = OpenOptions::new()
271        .create(true)
272        .append(true)
273        .open(path)
274        .map_err(|err| err.to_string())?;
275    Ok(file.into())
276}
277
278fn find_available_port() -> Result<u16, String> {
279    let listener = TcpListener::bind("127.0.0.1:0").map_err(|err| err.to_string())?;
280    let port = listener.local_addr().map_err(|err| err.to_string())?.port();
281    drop(listener);
282    Ok(port)
283}
284
285fn child_is_alive(child: &Arc<StdMutex<Option<Child>>>) -> bool {
286    let mut guard = match child.lock() {
287        Ok(guard) => guard,
288        Err(_) => return false,
289    };
290    let Some(child) = guard.as_mut() else {
291        return false;
292    };
293    match child.try_wait() {
294        Ok(Some(_)) => {
295            *guard = None;
296            false
297        }
298        Ok(None) => true,
299        Err(_) => false,
300    }
301}
302
303fn kill_child(child: &Arc<StdMutex<Option<Child>>>) {
304    if let Ok(mut guard) = child.lock() {
305        if let Some(child) = guard.as_mut() {
306            let _ = child.kill();
307        }
308        *guard = None;
309    }
310}