Skip to main content

codex_mobile_bridge/state/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, bail};
5use serde_json::{Value, json};
6use tokio::sync::{RwLock, mpsc};
7use tokio::time::timeout;
8use uuid::Uuid;
9
10use super::{BridgeState, CLIENT_STATUS_TIMEOUT};
11use crate::app_server::{AppServerInbound, AppServerLaunchConfig, AppServerManager};
12use crate::bridge_protocol::{
13    AppServerHandshakeSummary, GetRuntimeStatusRequest, PruneRuntimeRequest, RestartRuntimeRequest,
14    RuntimeRecord, RuntimeStatusSnapshot, RuntimeSummary, StartRuntimeRequest, StopRuntimeRequest,
15    now_millis, runtime_list_payload, status_payload,
16};
17use crate::config::expand_path;
18
19pub(super) struct ManagedRuntime {
20    pub(super) record: RuntimeRecord,
21    pub(super) app_server: AppServerManager,
22    pub(super) status: RwLock<RuntimeStatusSnapshot>,
23}
24
25impl ManagedRuntime {
26    pub(super) async fn summary(&self) -> RuntimeSummary {
27        RuntimeSummary::from_parts(&self.record, self.status.read().await.clone())
28    }
29}
30
31impl BridgeState {
32    pub async fn runtime_snapshot(&self) -> RuntimeStatusSnapshot {
33        match self.get_runtime(&self.primary_runtime_id).await {
34            Some(runtime) => runtime.status.read().await.clone(),
35            None => RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()),
36        }
37    }
38
39    pub async fn runtime_summaries(&self) -> Vec<RuntimeSummary> {
40        let runtimes = self.runtimes.read().await;
41        let mut summaries = Vec::with_capacity(runtimes.len());
42        for runtime in runtimes.values() {
43            summaries.push(runtime.summary().await);
44        }
45        summaries.sort_by(|left, right| {
46            right
47                .is_primary
48                .cmp(&left.is_primary)
49                .then_with(|| left.display_name.cmp(&right.display_name))
50        });
51        summaries
52    }
53
54    pub async fn runtime_snapshot_for_client(&self) -> RuntimeStatusSnapshot {
55        match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_snapshot()).await {
56            Ok(snapshot) => snapshot,
57            Err(_) => {
58                self.log_timeout_warning(
59                    "runtime_snapshot:primary",
60                    "读取 primary runtime 状态超时,回退到存储快照",
61                );
62                self.fallback_runtime_snapshot()
63            }
64        }
65    }
66
67    pub async fn runtime_summaries_for_client(&self) -> Vec<RuntimeSummary> {
68        match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_summaries()).await {
69            Ok(summaries) => summaries,
70            Err(_) => {
71                self.log_timeout_warning(
72                    "runtime_summaries:list",
73                    "读取 runtime 列表超时,回退到存储快照",
74                );
75                self.fallback_runtime_summaries()
76            }
77        }
78    }
79
80    pub(super) async fn get_runtime_status(
81        &self,
82        request: GetRuntimeStatusRequest,
83    ) -> Result<Value> {
84        let runtime_id = request
85            .runtime_id
86            .unwrap_or_else(|| self.primary_runtime_id.clone());
87        let runtime = match timeout(
88            CLIENT_STATUS_TIMEOUT,
89            self.require_runtime(Some(&runtime_id)),
90        )
91        .await
92        {
93            Ok(Ok(runtime)) => runtime.summary().await,
94            Ok(Err(error)) => return Err(error),
95            Err(_) => {
96                self.log_timeout_warning(
97                    &format!("runtime_status:{runtime_id}"),
98                    &format!("读取 runtime={runtime_id} 状态超时,回退到存储快照"),
99                );
100                self.fallback_runtime_summary(&runtime_id)?
101            }
102        };
103        Ok(json!({ "runtime": runtime }))
104    }
105
106    pub(super) async fn start_runtime(&self, request: StartRuntimeRequest) -> Result<Value> {
107        let runtime_id = match request.runtime_id.clone() {
108            Some(runtime_id) => runtime_id,
109            None => format!("runtime-{}", &Uuid::new_v4().simple().to_string()[..8]),
110        };
111
112        if self.get_runtime(&runtime_id).await.is_some() {
113            if request.display_name.is_some()
114                || request.codex_home.is_some()
115                || request.codex_binary.is_some()
116            {
117                bail!("已存在 runtime,启动现有 runtime 时不能覆盖配置");
118            }
119
120            self.start_existing_runtime(&runtime_id).await?;
121            let runtime = self
122                .require_runtime(Some(&runtime_id))
123                .await?
124                .summary()
125                .await;
126            return Ok(json!({ "runtime": runtime }));
127        }
128
129        {
130            let runtimes = self.runtimes.read().await;
131            if runtimes.len() >= self.runtime_limit {
132                bail!("已达到 runtime 上限 {}", self.runtime_limit);
133            }
134        }
135
136        let codex_home = request
137            .codex_home
138            .as_deref()
139            .map(|value| expand_path(Path::new(value)))
140            .transpose()?
141            .map(|value| value.to_string_lossy().to_string());
142        let now = now_millis();
143        let record = RuntimeRecord {
144            runtime_id: runtime_id.clone(),
145            display_name: request
146                .display_name
147                .filter(|value| !value.trim().is_empty())
148                .unwrap_or_else(|| runtime_id.clone()),
149            codex_home,
150            codex_binary: request
151                .codex_binary
152                .filter(|value| !value.trim().is_empty())
153                .unwrap_or_else(|| "codex".to_string()),
154            is_primary: false,
155            auto_start: request.auto_start.unwrap_or(false),
156            created_at_ms: now,
157            updated_at_ms: now,
158        };
159
160        self.storage.upsert_runtime(&record)?;
161        self.register_runtime(record).await?;
162        self.start_existing_runtime(&runtime_id).await?;
163
164        let runtime = self
165            .require_runtime(Some(&runtime_id))
166            .await?
167            .summary()
168            .await;
169        Ok(json!({ "runtime": runtime }))
170    }
171
172    pub(super) async fn stop_runtime(&self, request: StopRuntimeRequest) -> Result<Value> {
173        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
174        let current_status = runtime.status.read().await.clone();
175        self.emit_runtime_status(
176            &request.runtime_id,
177            RuntimeStatusSnapshot {
178                runtime_id: request.runtime_id.clone(),
179                status: "stopping".to_string(),
180                codex_home: current_status.codex_home,
181                user_agent: current_status.user_agent,
182                platform_family: current_status.platform_family,
183                platform_os: current_status.platform_os,
184                last_error: None,
185                pid: current_status.pid,
186                app_server_handshake: AppServerHandshakeSummary::inactive(),
187                updated_at_ms: now_millis(),
188            },
189        )
190        .await?;
191        runtime.app_server.stop().await?;
192
193        let runtime = self
194            .require_runtime(Some(&request.runtime_id))
195            .await?
196            .summary()
197            .await;
198        Ok(json!({ "runtime": runtime }))
199    }
200
201    pub(super) async fn restart_runtime(&self, request: RestartRuntimeRequest) -> Result<Value> {
202        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
203        let current_status = runtime.status.read().await.clone();
204        self.emit_runtime_status(
205            &request.runtime_id,
206            RuntimeStatusSnapshot {
207                runtime_id: request.runtime_id.clone(),
208                status: "stopping".to_string(),
209                codex_home: current_status.codex_home,
210                user_agent: current_status.user_agent,
211                platform_family: current_status.platform_family,
212                platform_os: current_status.platform_os,
213                last_error: None,
214                pid: current_status.pid,
215                app_server_handshake: AppServerHandshakeSummary::inactive(),
216                updated_at_ms: now_millis(),
217            },
218        )
219        .await?;
220        runtime.app_server.restart().await?;
221
222        let runtime = self
223            .require_runtime(Some(&request.runtime_id))
224            .await?
225            .summary()
226            .await;
227        Ok(json!({ "runtime": runtime }))
228    }
229
230    pub(super) async fn prune_runtime(&self, request: PruneRuntimeRequest) -> Result<Value> {
231        if request.runtime_id == self.primary_runtime_id {
232            bail!("primary runtime 不能删除");
233        }
234
235        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
236        runtime.app_server.stop().await?;
237
238        {
239            let mut runtimes = self.runtimes.write().await;
240            runtimes.remove(&request.runtime_id);
241        }
242        self.storage.remove_runtime(&request.runtime_id)?;
243        self.emit_runtime_list().await?;
244
245        Ok(json!({ "pruned": true, "runtimeId": request.runtime_id }))
246    }
247
248    pub(super) async fn start_existing_runtime(&self, runtime_id: &str) -> Result<()> {
249        let runtime = self.require_runtime(Some(runtime_id)).await?;
250        runtime.app_server.start().await
251    }
252
253    pub(super) async fn register_runtime(&self, record: RuntimeRecord) -> Result<()> {
254        let managed = Arc::new(Self::build_runtime(record, self.inbound_tx.clone()));
255        {
256            let mut runtimes = self.runtimes.write().await;
257            runtimes.insert(managed.record.runtime_id.clone(), managed);
258        }
259        self.emit_runtime_list().await?;
260        Ok(())
261    }
262
263    pub(super) fn build_runtime(
264        record: RuntimeRecord,
265        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
266    ) -> ManagedRuntime {
267        ManagedRuntime {
268            status: RwLock::new(RuntimeStatusSnapshot::stopped(record.runtime_id.clone())),
269            app_server: AppServerManager::new(
270                AppServerLaunchConfig {
271                    runtime_id: record.runtime_id.clone(),
272                    codex_binary: record.codex_binary.clone(),
273                    codex_home: record.codex_home.as_ref().map(std::path::PathBuf::from),
274                },
275                inbound_tx,
276            ),
277            record,
278        }
279    }
280
281    pub(super) async fn get_runtime(&self, runtime_id: &str) -> Option<Arc<ManagedRuntime>> {
282        self.runtimes.read().await.get(runtime_id).cloned()
283    }
284
285    pub(super) async fn require_runtime(
286        &self,
287        runtime_id: Option<&str>,
288    ) -> Result<Arc<ManagedRuntime>> {
289        let target = runtime_id.unwrap_or(&self.primary_runtime_id);
290        self.get_runtime(target)
291            .await
292            .with_context(|| format!("未找到 runtime: {target}"))
293    }
294
295    pub(super) async fn emit_runtime_status(
296        &self,
297        runtime_id: &str,
298        status: RuntimeStatusSnapshot,
299    ) -> Result<()> {
300        let status_label = status.status.clone();
301        let runtime = self.require_runtime(Some(runtime_id)).await?;
302        {
303            let mut guard = runtime.status.write().await;
304            *guard = status;
305        }
306        let summary = runtime.summary().await;
307        self.emit_event(
308            "runtime_status_changed",
309            Some(runtime_id),
310            None,
311            status_payload(&summary),
312        )?;
313        if matches!(status_label.as_str(), "stopping" | "stopped" | "error") {
314            let reason = match status_label.as_str() {
315                "stopping" => "runtime 正在停止,线程状态待重新加载",
316                "stopped" => "runtime 已停止,线程状态待重新加载",
317                _ => "runtime 异常退出,线程状态待重新加载",
318            };
319            self.downgrade_runtime_threads(runtime_id, reason)?;
320        }
321        Ok(())
322    }
323
324    pub(super) async fn emit_runtime_process_changed(
325        &self,
326        runtime_id: &str,
327        pid: Option<u32>,
328        running: bool,
329    ) -> Result<()> {
330        let runtime = self.require_runtime(Some(runtime_id)).await?;
331        {
332            let mut guard = runtime.status.write().await;
333            guard.pid = pid.map(|value| value as i32);
334            guard.updated_at_ms = now_millis();
335            if !running && guard.status == "starting" {
336                guard.status = "stopped".to_string();
337                guard.last_error = None;
338                guard.app_server_handshake = AppServerHandshakeSummary::inactive();
339            }
340        }
341        self.emit_event(
342            "runtime_process_changed",
343            Some(runtime_id),
344            None,
345            json!({
346                "runtimeId": runtime_id,
347                "pid": pid,
348                "running": running,
349                "updatedAtMs": now_millis(),
350            }),
351        )?;
352        Ok(())
353    }
354
355    pub(super) async fn emit_runtime_degraded(
356        &self,
357        runtime_id: &str,
358        message: String,
359    ) -> Result<()> {
360        self.emit_event(
361            "runtime_degraded",
362            Some(runtime_id),
363            None,
364            json!({
365                "runtimeId": runtime_id,
366                "message": message,
367                "updatedAtMs": now_millis(),
368            }),
369        )
370    }
371
372    pub(super) async fn emit_runtime_list(&self) -> Result<()> {
373        let runtimes = self.runtime_summaries().await;
374        self.emit_event("runtime_list", None, None, runtime_list_payload(&runtimes))
375    }
376
377    pub(super) fn fallback_runtime_snapshot(&self) -> RuntimeStatusSnapshot {
378        self.storage
379            .get_runtime(&self.primary_runtime_id)
380            .ok()
381            .flatten()
382            .map(|record| self.build_fallback_status(&record))
383            .unwrap_or_else(|| RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()))
384    }
385
386    pub(super) fn fallback_runtime_summaries(&self) -> Vec<RuntimeSummary> {
387        let mut summaries = self
388            .storage
389            .list_runtimes()
390            .unwrap_or_default()
391            .into_iter()
392            .map(|record| RuntimeSummary::from_parts(&record, self.build_fallback_status(&record)))
393            .collect::<Vec<_>>();
394        summaries.sort_by(|left, right| {
395            right
396                .is_primary
397                .cmp(&left.is_primary)
398                .then_with(|| left.display_name.cmp(&right.display_name))
399        });
400        summaries
401    }
402
403    pub(super) fn fallback_runtime_summary(&self, runtime_id: &str) -> Result<RuntimeSummary> {
404        let record = self
405            .storage
406            .get_runtime(runtime_id)?
407            .with_context(|| format!("未找到 runtime: {runtime_id}"))?;
408        Ok(RuntimeSummary::from_parts(
409            &record,
410            self.build_fallback_status(&record),
411        ))
412    }
413
414    fn build_fallback_status(&self, record: &RuntimeRecord) -> RuntimeStatusSnapshot {
415        RuntimeStatusSnapshot {
416            runtime_id: record.runtime_id.clone(),
417            status: if record.auto_start {
418                "unknown".to_string()
419            } else {
420                "stopped".to_string()
421            },
422            codex_home: record.codex_home.clone(),
423            user_agent: None,
424            platform_family: None,
425            platform_os: None,
426            last_error: None,
427            pid: None,
428            app_server_handshake: AppServerHandshakeSummary::inactive(),
429            updated_at_ms: now_millis(),
430        }
431    }
432
433    pub(super) async fn transition_runtime_status(
434        &self,
435        runtime_id: &str,
436        status_label: String,
437        last_error: Option<String>,
438        handshake: AppServerHandshakeSummary,
439    ) -> Result<()> {
440        let runtime = self.require_runtime(Some(runtime_id)).await?;
441        let current_status = runtime.status.read().await.clone();
442        self.emit_runtime_status(
443            runtime_id,
444            RuntimeStatusSnapshot {
445                runtime_id: runtime_id.to_string(),
446                status: status_label,
447                codex_home: current_status.codex_home,
448                user_agent: current_status.user_agent,
449                platform_family: current_status.platform_family,
450                platform_os: current_status.platform_os,
451                last_error,
452                pid: current_status.pid,
453                app_server_handshake: handshake,
454                updated_at_ms: now_millis(),
455            },
456        )
457        .await
458    }
459
460    fn downgrade_runtime_threads(&self, runtime_id: &str, reason: &str) -> Result<()> {
461        for thread in self
462            .storage
463            .downgrade_runtime_threads(runtime_id, Some(reason))?
464        {
465            self.emit_event(
466                "thread/status/changed",
467                Some(runtime_id),
468                Some(&thread.id),
469                json!({
470                    "runtimeId": runtime_id,
471                    "threadId": thread.id,
472                    "status": thread.status_info.raw,
473                    "statusInfo": thread.status_info,
474                }),
475            )?;
476        }
477        Ok(())
478    }
479}