codex_mobile_bridge/state/
runtime.rs1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, bail};
5use serde_json::{Value, json};
6use tokio::sync::{RwLock, mpsc};
7use tokio::time::timeout;
8use uuid::Uuid;
9
10use super::{BridgeState, CLIENT_STATUS_TIMEOUT};
11use crate::app_server::{AppServerInbound, AppServerLaunchConfig, AppServerManager};
12use crate::bridge_protocol::{
13 GetRuntimeStatusRequest, PruneRuntimeRequest, RestartRuntimeRequest, RuntimeRecord,
14 RuntimeStatusSnapshot, RuntimeSummary, StartRuntimeRequest, StopRuntimeRequest, now_millis,
15 runtime_list_payload, status_payload,
16};
17use crate::config::expand_path;
18
19pub(super) struct ManagedRuntime {
20 pub(super) record: RuntimeRecord,
21 pub(super) app_server: AppServerManager,
22 pub(super) status: RwLock<RuntimeStatusSnapshot>,
23}
24
25impl ManagedRuntime {
26 pub(super) async fn summary(&self) -> RuntimeSummary {
27 RuntimeSummary::from_parts(&self.record, self.status.read().await.clone())
28 }
29}
30
31impl BridgeState {
32 pub async fn runtime_snapshot(&self) -> RuntimeStatusSnapshot {
33 match self.get_runtime(&self.primary_runtime_id).await {
34 Some(runtime) => runtime.status.read().await.clone(),
35 None => RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()),
36 }
37 }
38
39 pub async fn runtime_summaries(&self) -> Vec<RuntimeSummary> {
40 let runtimes = self.runtimes.read().await;
41 let mut summaries = Vec::with_capacity(runtimes.len());
42 for runtime in runtimes.values() {
43 summaries.push(runtime.summary().await);
44 }
45 summaries.sort_by(|left, right| {
46 right
47 .is_primary
48 .cmp(&left.is_primary)
49 .then_with(|| left.display_name.cmp(&right.display_name))
50 });
51 summaries
52 }
53
54 pub async fn runtime_snapshot_for_client(&self) -> RuntimeStatusSnapshot {
55 match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_snapshot()).await {
56 Ok(snapshot) => snapshot,
57 Err(_) => {
58 self.log_timeout_warning(
59 "runtime_snapshot:primary",
60 "读取 primary runtime 状态超时,回退到存储快照",
61 );
62 self.fallback_runtime_snapshot()
63 }
64 }
65 }
66
67 pub async fn runtime_summaries_for_client(&self) -> Vec<RuntimeSummary> {
68 match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_summaries()).await {
69 Ok(summaries) => summaries,
70 Err(_) => {
71 self.log_timeout_warning(
72 "runtime_summaries:list",
73 "读取 runtime 列表超时,回退到存储快照",
74 );
75 self.fallback_runtime_summaries()
76 }
77 }
78 }
79
80 pub(super) async fn get_runtime_status(
81 &self,
82 request: GetRuntimeStatusRequest,
83 ) -> Result<Value> {
84 let runtime_id = request
85 .runtime_id
86 .unwrap_or_else(|| self.primary_runtime_id.clone());
87 let runtime = match timeout(
88 CLIENT_STATUS_TIMEOUT,
89 self.require_runtime(Some(&runtime_id)),
90 )
91 .await
92 {
93 Ok(Ok(runtime)) => runtime.summary().await,
94 Ok(Err(error)) => return Err(error),
95 Err(_) => {
96 self.log_timeout_warning(
97 &format!("runtime_status:{runtime_id}"),
98 &format!("读取 runtime={runtime_id} 状态超时,回退到存储快照"),
99 );
100 self.fallback_runtime_summary(&runtime_id)?
101 }
102 };
103 Ok(json!({ "runtime": runtime }))
104 }
105
106 pub(super) async fn start_runtime(&self, request: StartRuntimeRequest) -> Result<Value> {
107 let runtime_id = match request.runtime_id.clone() {
108 Some(runtime_id) => runtime_id,
109 None => format!("runtime-{}", &Uuid::new_v4().simple().to_string()[..8]),
110 };
111
112 if self.get_runtime(&runtime_id).await.is_some() {
113 if request.display_name.is_some()
114 || request.codex_home.is_some()
115 || request.codex_binary.is_some()
116 {
117 bail!("已存在 runtime,启动现有 runtime 时不能覆盖配置");
118 }
119
120 self.start_existing_runtime(&runtime_id).await?;
121 let runtime = self
122 .require_runtime(Some(&runtime_id))
123 .await?
124 .summary()
125 .await;
126 return Ok(json!({ "runtime": runtime }));
127 }
128
129 {
130 let runtimes = self.runtimes.read().await;
131 if runtimes.len() >= self.runtime_limit {
132 bail!("已达到 runtime 上限 {}", self.runtime_limit);
133 }
134 }
135
136 let codex_home = request
137 .codex_home
138 .as_deref()
139 .map(|value| expand_path(Path::new(value)))
140 .transpose()?
141 .map(|value| value.to_string_lossy().to_string());
142 let now = now_millis();
143 let record = RuntimeRecord {
144 runtime_id: runtime_id.clone(),
145 display_name: request
146 .display_name
147 .filter(|value| !value.trim().is_empty())
148 .unwrap_or_else(|| runtime_id.clone()),
149 codex_home,
150 codex_binary: request
151 .codex_binary
152 .filter(|value| !value.trim().is_empty())
153 .unwrap_or_else(|| "codex".to_string()),
154 is_primary: false,
155 auto_start: request.auto_start.unwrap_or(false),
156 created_at_ms: now,
157 updated_at_ms: now,
158 };
159
160 self.storage.upsert_runtime(&record)?;
161 self.register_runtime(record).await?;
162 self.start_existing_runtime(&runtime_id).await?;
163
164 let runtime = self
165 .require_runtime(Some(&runtime_id))
166 .await?
167 .summary()
168 .await;
169 Ok(json!({ "runtime": runtime }))
170 }
171
172 pub(super) async fn stop_runtime(&self, request: StopRuntimeRequest) -> Result<Value> {
173 let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
174 let current_status = runtime.status.read().await.clone();
175 self.emit_runtime_status(
176 &request.runtime_id,
177 RuntimeStatusSnapshot {
178 runtime_id: request.runtime_id.clone(),
179 status: "stopping".to_string(),
180 codex_home: current_status.codex_home,
181 user_agent: current_status.user_agent,
182 platform_family: current_status.platform_family,
183 platform_os: current_status.platform_os,
184 last_error: None,
185 pid: current_status.pid,
186 updated_at_ms: now_millis(),
187 },
188 )
189 .await?;
190 runtime.app_server.stop().await?;
191
192 let runtime = self
193 .require_runtime(Some(&request.runtime_id))
194 .await?
195 .summary()
196 .await;
197 Ok(json!({ "runtime": runtime }))
198 }
199
200 pub(super) async fn restart_runtime(&self, request: RestartRuntimeRequest) -> Result<Value> {
201 let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
202 let current_status = runtime.status.read().await.clone();
203 self.emit_runtime_status(
204 &request.runtime_id,
205 RuntimeStatusSnapshot {
206 runtime_id: request.runtime_id.clone(),
207 status: "stopping".to_string(),
208 codex_home: current_status.codex_home,
209 user_agent: current_status.user_agent,
210 platform_family: current_status.platform_family,
211 platform_os: current_status.platform_os,
212 last_error: None,
213 pid: current_status.pid,
214 updated_at_ms: now_millis(),
215 },
216 )
217 .await?;
218 runtime.app_server.restart().await?;
219
220 let runtime = self
221 .require_runtime(Some(&request.runtime_id))
222 .await?
223 .summary()
224 .await;
225 Ok(json!({ "runtime": runtime }))
226 }
227
228 pub(super) async fn prune_runtime(&self, request: PruneRuntimeRequest) -> Result<Value> {
229 if request.runtime_id == self.primary_runtime_id {
230 bail!("primary runtime 不能删除");
231 }
232
233 let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
234 runtime.app_server.stop().await?;
235
236 {
237 let mut runtimes = self.runtimes.write().await;
238 runtimes.remove(&request.runtime_id);
239 }
240 self.storage.remove_runtime(&request.runtime_id)?;
241 self.emit_runtime_list().await?;
242
243 Ok(json!({ "pruned": true, "runtimeId": request.runtime_id }))
244 }
245
246 pub(super) async fn start_existing_runtime(&self, runtime_id: &str) -> Result<()> {
247 let runtime = self.require_runtime(Some(runtime_id)).await?;
248 runtime.app_server.start().await
249 }
250
251 pub(super) async fn register_runtime(&self, record: RuntimeRecord) -> Result<()> {
252 let managed = Arc::new(Self::build_runtime(record, self.inbound_tx.clone()));
253 {
254 let mut runtimes = self.runtimes.write().await;
255 runtimes.insert(managed.record.runtime_id.clone(), managed);
256 }
257 self.emit_runtime_list().await?;
258 Ok(())
259 }
260
261 pub(super) fn build_runtime(
262 record: RuntimeRecord,
263 inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
264 ) -> ManagedRuntime {
265 ManagedRuntime {
266 status: RwLock::new(RuntimeStatusSnapshot::stopped(record.runtime_id.clone())),
267 app_server: AppServerManager::new(
268 AppServerLaunchConfig {
269 runtime_id: record.runtime_id.clone(),
270 codex_binary: record.codex_binary.clone(),
271 codex_home: record.codex_home.as_ref().map(std::path::PathBuf::from),
272 },
273 inbound_tx,
274 ),
275 record,
276 }
277 }
278
279 pub(super) async fn get_runtime(&self, runtime_id: &str) -> Option<Arc<ManagedRuntime>> {
280 self.runtimes.read().await.get(runtime_id).cloned()
281 }
282
283 pub(super) async fn require_runtime(
284 &self,
285 runtime_id: Option<&str>,
286 ) -> Result<Arc<ManagedRuntime>> {
287 let target = runtime_id.unwrap_or(&self.primary_runtime_id);
288 self.get_runtime(target)
289 .await
290 .with_context(|| format!("未找到 runtime: {target}"))
291 }
292
293 pub(super) async fn emit_runtime_status(
294 &self,
295 runtime_id: &str,
296 status: RuntimeStatusSnapshot,
297 ) -> Result<()> {
298 let runtime = self.require_runtime(Some(runtime_id)).await?;
299 {
300 let mut guard = runtime.status.write().await;
301 *guard = status;
302 }
303 let summary = runtime.summary().await;
304 self.emit_event(
305 "runtime_status_changed",
306 Some(runtime_id),
307 None,
308 status_payload(&summary),
309 )?;
310 Ok(())
311 }
312
313 pub(super) async fn emit_runtime_process_changed(
314 &self,
315 runtime_id: &str,
316 pid: Option<u32>,
317 running: bool,
318 ) -> Result<()> {
319 let runtime = self.require_runtime(Some(runtime_id)).await?;
320 {
321 let mut guard = runtime.status.write().await;
322 guard.pid = pid;
323 guard.updated_at_ms = now_millis();
324 if !running && guard.status == "starting" {
325 guard.status = "stopped".to_string();
326 guard.last_error = None;
327 }
328 }
329 self.emit_event(
330 "runtime_process_changed",
331 Some(runtime_id),
332 None,
333 json!({
334 "runtimeId": runtime_id,
335 "pid": pid,
336 "running": running,
337 "updatedAtMs": now_millis(),
338 }),
339 )?;
340 Ok(())
341 }
342
343 pub(super) async fn emit_runtime_degraded(
344 &self,
345 runtime_id: &str,
346 message: String,
347 ) -> Result<()> {
348 self.emit_event(
349 "runtime_degraded",
350 Some(runtime_id),
351 None,
352 json!({
353 "runtimeId": runtime_id,
354 "message": message,
355 "updatedAtMs": now_millis(),
356 }),
357 )
358 }
359
360 pub(super) async fn emit_runtime_list(&self) -> Result<()> {
361 let runtimes = self.runtime_summaries().await;
362 self.emit_event("runtime_list", None, None, runtime_list_payload(&runtimes))
363 }
364
365 pub(super) fn fallback_runtime_snapshot(&self) -> RuntimeStatusSnapshot {
366 self.storage
367 .get_runtime(&self.primary_runtime_id)
368 .ok()
369 .flatten()
370 .map(|record| self.build_fallback_status(&record))
371 .unwrap_or_else(|| RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()))
372 }
373
374 pub(super) fn fallback_runtime_summaries(&self) -> Vec<RuntimeSummary> {
375 let mut summaries = self
376 .storage
377 .list_runtimes()
378 .unwrap_or_default()
379 .into_iter()
380 .map(|record| RuntimeSummary::from_parts(&record, self.build_fallback_status(&record)))
381 .collect::<Vec<_>>();
382 summaries.sort_by(|left, right| {
383 right
384 .is_primary
385 .cmp(&left.is_primary)
386 .then_with(|| left.display_name.cmp(&right.display_name))
387 });
388 summaries
389 }
390
391 pub(super) fn fallback_runtime_summary(&self, runtime_id: &str) -> Result<RuntimeSummary> {
392 let record = self
393 .storage
394 .get_runtime(runtime_id)?
395 .with_context(|| format!("未找到 runtime: {runtime_id}"))?;
396 Ok(RuntimeSummary::from_parts(
397 &record,
398 self.build_fallback_status(&record),
399 ))
400 }
401
402 fn build_fallback_status(&self, record: &RuntimeRecord) -> RuntimeStatusSnapshot {
403 RuntimeStatusSnapshot {
404 runtime_id: record.runtime_id.clone(),
405 status: if record.auto_start {
406 "unknown".to_string()
407 } else {
408 "stopped".to_string()
409 },
410 codex_home: record.codex_home.clone(),
411 user_agent: None,
412 platform_family: None,
413 platform_os: None,
414 last_error: None,
415 pid: None,
416 updated_at_ms: now_millis(),
417 }
418 }
419}