1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, bail};
5use serde_json::{Value, json};
6use tokio::sync::{RwLock, mpsc};
7use tokio::time::timeout;
8use uuid::Uuid;
9
10use super::{BridgeState, CLIENT_STATUS_TIMEOUT};
11use crate::app_server::{AppServerInbound, AppServerLaunchConfig, AppServerManager};
12use crate::bridge_protocol::{
13 AppServerHandshakeSummary, GetRuntimeStatusRequest, PruneRuntimeRequest, RestartRuntimeRequest,
14 RuntimeRecord, RuntimeStatusSnapshot, RuntimeSummary, StartRuntimeRequest, StopRuntimeRequest,
15 now_millis, runtime_list_payload, status_payload,
16};
17use crate::config::expand_path;
18
19pub(super) struct ManagedRuntime {
20 pub(super) record: RuntimeRecord,
21 pub(super) app_server: AppServerManager,
22 pub(super) status: RwLock<RuntimeStatusSnapshot>,
23}
24
25impl ManagedRuntime {
26 pub(super) async fn summary(&self) -> RuntimeSummary {
27 RuntimeSummary::from_parts(&self.record, self.status.read().await.clone())
28 }
29}
30
31impl BridgeState {
32 pub async fn runtime_snapshot(&self) -> RuntimeStatusSnapshot {
33 match self.get_runtime(&self.primary_runtime_id).await {
34 Some(runtime) => runtime.status.read().await.clone(),
35 None => RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()),
36 }
37 }
38
39 pub async fn runtime_summaries(&self) -> Vec<RuntimeSummary> {
40 let runtimes = self.runtimes.read().await;
41 let mut summaries = Vec::with_capacity(runtimes.len());
42 for runtime in runtimes.values() {
43 summaries.push(runtime.summary().await);
44 }
45 summaries.sort_by(|left, right| {
46 right
47 .is_primary
48 .cmp(&left.is_primary)
49 .then_with(|| left.display_name.cmp(&right.display_name))
50 });
51 summaries
52 }
53
54 pub async fn runtime_snapshot_for_client(&self) -> RuntimeStatusSnapshot {
55 match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_snapshot()).await {
56 Ok(snapshot) => snapshot,
57 Err(_) => {
58 self.log_timeout_warning(
59 "runtime_snapshot:primary",
60 "读取 primary runtime 状态超时,回退到存储快照",
61 );
62 self.fallback_runtime_snapshot()
63 }
64 }
65 }
66
67 pub async fn runtime_summaries_for_client(&self) -> Vec<RuntimeSummary> {
68 match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_summaries()).await {
69 Ok(summaries) => summaries,
70 Err(_) => {
71 self.log_timeout_warning(
72 "runtime_summaries:list",
73 "读取 runtime 列表超时,回退到存储快照",
74 );
75 self.fallback_runtime_summaries()
76 }
77 }
78 }
79
80 pub(super) async fn get_runtime_status(
81 &self,
82 request: GetRuntimeStatusRequest,
83 ) -> Result<Value> {
84 let runtime_id = request
85 .runtime_id
86 .unwrap_or_else(|| self.primary_runtime_id.clone());
87 let runtime = match timeout(
88 CLIENT_STATUS_TIMEOUT,
89 self.require_runtime(Some(&runtime_id)),
90 )
91 .await
92 {
93 Ok(Ok(runtime)) => runtime.summary().await,
94 Ok(Err(error)) => return Err(error),
95 Err(_) => {
96 self.log_timeout_warning(
97 &format!("runtime_status:{runtime_id}"),
98 &format!("读取 runtime={runtime_id} 状态超时,回退到存储快照"),
99 );
100 self.fallback_runtime_summary(&runtime_id)?
101 }
102 };
103 Ok(json!({ "runtime": runtime }))
104 }
105
106 pub(super) async fn start_runtime(&self, request: StartRuntimeRequest) -> Result<Value> {
107 let runtime_id = match request.runtime_id.clone() {
108 Some(runtime_id) => runtime_id,
109 None => format!("runtime-{}", &Uuid::new_v4().simple().to_string()[..8]),
110 };
111
112 if self.get_runtime(&runtime_id).await.is_some() {
113 if request.display_name.is_some()
114 || request.codex_home.is_some()
115 || request.codex_binary.is_some()
116 {
117 bail!("已存在 runtime,启动现有 runtime 时不能覆盖配置");
118 }
119
120 self.start_existing_runtime(&runtime_id).await?;
121 let runtime = self
122 .require_runtime(Some(&runtime_id))
123 .await?
124 .summary()
125 .await;
126 return Ok(json!({ "runtime": runtime }));
127 }
128
129 {
130 let runtimes = self.runtimes.read().await;
131 if runtimes.len() >= self.runtime_limit {
132 bail!("已达到 runtime 上限 {}", self.runtime_limit);
133 }
134 }
135
136 let codex_home = request
137 .codex_home
138 .as_deref()
139 .map(|value| expand_path(Path::new(value)))
140 .transpose()?
141 .map(|value| value.to_string_lossy().to_string());
142 let now = now_millis();
143 let record = RuntimeRecord {
144 runtime_id: runtime_id.clone(),
145 display_name: request
146 .display_name
147 .filter(|value| !value.trim().is_empty())
148 .unwrap_or_else(|| runtime_id.clone()),
149 codex_home,
150 codex_binary: request
151 .codex_binary
152 .filter(|value| !value.trim().is_empty())
153 .unwrap_or_else(|| "codex".to_string()),
154 is_primary: false,
155 auto_start: request.auto_start.unwrap_or(false),
156 created_at_ms: now,
157 updated_at_ms: now,
158 };
159
160 self.storage.upsert_runtime(&record)?;
161 self.register_runtime(record).await?;
162 self.start_existing_runtime(&runtime_id).await?;
163
164 let runtime = self
165 .require_runtime(Some(&runtime_id))
166 .await?
167 .summary()
168 .await;
169 Ok(json!({ "runtime": runtime }))
170 }
171
172 pub(super) async fn stop_runtime(&self, request: StopRuntimeRequest) -> Result<Value> {
173 let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
174 let current_status = runtime.status.read().await.clone();
175 self.emit_runtime_status(
176 &request.runtime_id,
177 RuntimeStatusSnapshot {
178 runtime_id: request.runtime_id.clone(),
179 status: "stopping".to_string(),
180 codex_home: current_status.codex_home,
181 user_agent: current_status.user_agent,
182 platform_family: current_status.platform_family,
183 platform_os: current_status.platform_os,
184 last_error: None,
185 pid: current_status.pid,
186 app_server_handshake: AppServerHandshakeSummary::inactive(),
187 updated_at_ms: now_millis(),
188 },
189 )
190 .await?;
191 runtime.app_server.stop().await?;
192
193 let runtime = self
194 .require_runtime(Some(&request.runtime_id))
195 .await?
196 .summary()
197 .await;
198 Ok(json!({ "runtime": runtime }))
199 }
200
201 pub(super) async fn restart_runtime(&self, request: RestartRuntimeRequest) -> Result<Value> {
202 let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
203 let current_status = runtime.status.read().await.clone();
204 self.emit_runtime_status(
205 &request.runtime_id,
206 RuntimeStatusSnapshot {
207 runtime_id: request.runtime_id.clone(),
208 status: "stopping".to_string(),
209 codex_home: current_status.codex_home,
210 user_agent: current_status.user_agent,
211 platform_family: current_status.platform_family,
212 platform_os: current_status.platform_os,
213 last_error: None,
214 pid: current_status.pid,
215 app_server_handshake: AppServerHandshakeSummary::inactive(),
216 updated_at_ms: now_millis(),
217 },
218 )
219 .await?;
220 runtime.app_server.restart().await?;
221
222 let runtime = self
223 .require_runtime(Some(&request.runtime_id))
224 .await?
225 .summary()
226 .await;
227 Ok(json!({ "runtime": runtime }))
228 }
229
230 pub(super) async fn prune_runtime(&self, request: PruneRuntimeRequest) -> Result<Value> {
231 if request.runtime_id == self.primary_runtime_id {
232 bail!("primary runtime 不能删除");
233 }
234
235 let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
236 runtime.app_server.stop().await?;
237
238 {
239 let mut runtimes = self.runtimes.write().await;
240 runtimes.remove(&request.runtime_id);
241 }
242 self.storage.remove_runtime(&request.runtime_id)?;
243 self.emit_runtime_list().await?;
244
245 Ok(json!({ "pruned": true, "runtimeId": request.runtime_id }))
246 }
247
248 pub(super) async fn start_existing_runtime(&self, runtime_id: &str) -> Result<()> {
249 let runtime = self.require_runtime(Some(runtime_id)).await?;
250 runtime.app_server.start().await
251 }
252
253 pub(super) async fn register_runtime(&self, record: RuntimeRecord) -> Result<()> {
254 let managed = Arc::new(Self::build_runtime(record, self.inbound_tx.clone()));
255 {
256 let mut runtimes = self.runtimes.write().await;
257 runtimes.insert(managed.record.runtime_id.clone(), managed);
258 }
259 self.emit_runtime_list().await?;
260 Ok(())
261 }
262
263 pub(super) fn build_runtime(
264 record: RuntimeRecord,
265 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
266 ) -> ManagedRuntime {
267 ManagedRuntime {
268 status: RwLock::new(RuntimeStatusSnapshot::stopped(record.runtime_id.clone())),
269 app_server: AppServerManager::new(
270 AppServerLaunchConfig {
271 runtime_id: record.runtime_id.clone(),
272 codex_binary: record.codex_binary.clone(),
273 codex_home: record.codex_home.as_ref().map(std::path::PathBuf::from),
274 },
275 inbound_tx,
276 ),
277 record,
278 }
279 }
280
281 pub(super) async fn get_runtime(&self, runtime_id: &str) -> Option<Arc<ManagedRuntime>> {
282 self.runtimes.read().await.get(runtime_id).cloned()
283 }
284
285 pub(super) async fn require_runtime(
286 &self,
287 runtime_id: Option<&str>,
288 ) -> Result<Arc<ManagedRuntime>> {
289 let target = runtime_id.unwrap_or(&self.primary_runtime_id);
290 self.get_runtime(target)
291 .await
292 .with_context(|| format!("未找到 runtime: {target}"))
293 }
294
295 pub(super) async fn emit_runtime_status(
296 &self,
297 runtime_id: &str,
298 status: RuntimeStatusSnapshot,
299 ) -> Result<()> {
300 let runtime = self.require_runtime(Some(runtime_id)).await?;
301 {
302 let mut guard = runtime.status.write().await;
303 *guard = status;
304 }
305 let summary = runtime.summary().await;
306 self.emit_event(
307 "runtime_status_changed",
308 Some(runtime_id),
309 None,
310 status_payload(&summary),
311 )?;
312 Ok(())
313 }
314
315 pub(super) async fn emit_runtime_process_changed(
316 &self,
317 runtime_id: &str,
318 pid: Option<u32>,
319 running: bool,
320 ) -> Result<()> {
321 let runtime = self.require_runtime(Some(runtime_id)).await?;
322 {
323 let mut guard = runtime.status.write().await;
324 guard.pid = pid;
325 guard.updated_at_ms = now_millis();
326 if !running && guard.status == "starting" {
327 guard.status = "stopped".to_string();
328 guard.last_error = None;
329 guard.app_server_handshake = AppServerHandshakeSummary::inactive();
330 }
331 }
332 self.emit_event(
333 "runtime_process_changed",
334 Some(runtime_id),
335 None,
336 json!({
337 "runtimeId": runtime_id,
338 "pid": pid,
339 "running": running,
340 "updatedAtMs": now_millis(),
341 }),
342 )?;
343 Ok(())
344 }
345
346 pub(super) async fn emit_runtime_degraded(
347 &self,
348 runtime_id: &str,
349 message: String,
350 ) -> Result<()> {
351 self.emit_event(
352 "runtime_degraded",
353 Some(runtime_id),
354 None,
355 json!({
356 "runtimeId": runtime_id,
357 "message": message,
358 "updatedAtMs": now_millis(),
359 }),
360 )
361 }
362
363 pub(super) async fn emit_runtime_list(&self) -> Result<()> {
364 let runtimes = self.runtime_summaries().await;
365 self.emit_event("runtime_list", None, None, runtime_list_payload(&runtimes))
366 }
367
368 pub(super) fn fallback_runtime_snapshot(&self) -> RuntimeStatusSnapshot {
369 self.storage
370 .get_runtime(&self.primary_runtime_id)
371 .ok()
372 .flatten()
373 .map(|record| self.build_fallback_status(&record))
374 .unwrap_or_else(|| RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()))
375 }
376
377 pub(super) fn fallback_runtime_summaries(&self) -> Vec<RuntimeSummary> {
378 let mut summaries = self
379 .storage
380 .list_runtimes()
381 .unwrap_or_default()
382 .into_iter()
383 .map(|record| RuntimeSummary::from_parts(&record, self.build_fallback_status(&record)))
384 .collect::<Vec<_>>();
385 summaries.sort_by(|left, right| {
386 right
387 .is_primary
388 .cmp(&left.is_primary)
389 .then_with(|| left.display_name.cmp(&right.display_name))
390 });
391 summaries
392 }
393
394 pub(super) fn fallback_runtime_summary(&self, runtime_id: &str) -> Result<RuntimeSummary> {
395 let record = self
396 .storage
397 .get_runtime(runtime_id)?
398 .with_context(|| format!("未找到 runtime: {runtime_id}"))?;
399 Ok(RuntimeSummary::from_parts(
400 &record,
401 self.build_fallback_status(&record),
402 ))
403 }
404
405 fn build_fallback_status(&self, record: &RuntimeRecord) -> RuntimeStatusSnapshot {
406 RuntimeStatusSnapshot {
407 runtime_id: record.runtime_id.clone(),
408 status: if record.auto_start {
409 "unknown".to_string()
410 } else {
411 "stopped".to_string()
412 },
413 codex_home: record.codex_home.clone(),
414 user_agent: None,
415 platform_family: None,
416 platform_os: None,
417 last_error: None,
418 pid: None,
419 app_server_handshake: AppServerHandshakeSummary::inactive(),
420 updated_at_ms: now_millis(),
421 }
422 }
423
424 pub(super) async fn transition_runtime_status(
425 &self,
426 runtime_id: &str,
427 status_label: String,
428 last_error: Option<String>,
429 handshake: AppServerHandshakeSummary,
430 ) -> Result<()> {
431 let runtime = self.require_runtime(Some(runtime_id)).await?;
432 let current_status = runtime.status.read().await.clone();
433 self.emit_runtime_status(
434 runtime_id,
435 RuntimeStatusSnapshot {
436 runtime_id: runtime_id.to_string(),
437 status: status_label,
438 codex_home: current_status.codex_home,
439 user_agent: current_status.user_agent,
440 platform_family: current_status.platform_family,
441 platform_os: current_status.platform_os,
442 last_error,
443 pid: current_status.pid,
444 app_server_handshake: handshake,
445 updated_at_ms: now_millis(),
446 },
447 )
448 .await
449 }
450}