Skip to main content

codex_mobile_bridge/state/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, bail};
5use serde_json::{Value, json};
6use tokio::sync::{RwLock, mpsc};
7use tokio::time::timeout;
8use uuid::Uuid;
9
10use super::{BridgeState, CLIENT_STATUS_TIMEOUT};
11use crate::app_server::{AppServerInbound, AppServerLaunchConfig, AppServerManager};
12use crate::bridge_protocol::{
13    AppServerHandshakeSummary, GetRuntimeStatusRequest, PruneRuntimeRequest, RestartRuntimeRequest,
14    RuntimeRecord, RuntimeStatusSnapshot, RuntimeSummary, StartRuntimeRequest, StopRuntimeRequest,
15    now_millis, runtime_list_payload, status_payload,
16};
17use crate::config::expand_path;
18
19pub(super) struct ManagedRuntime {
20    pub(super) record: RuntimeRecord,
21    pub(super) app_server: AppServerManager,
22    pub(super) status: RwLock<RuntimeStatusSnapshot>,
23}
24
25impl ManagedRuntime {
26    pub(super) async fn summary(&self) -> RuntimeSummary {
27        RuntimeSummary::from_parts(&self.record, self.status.read().await.clone())
28    }
29}
30
31impl BridgeState {
32    pub async fn runtime_snapshot(&self) -> RuntimeStatusSnapshot {
33        match self.get_runtime(&self.primary_runtime_id).await {
34            Some(runtime) => runtime.status.read().await.clone(),
35            None => RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()),
36        }
37    }
38
39    pub async fn runtime_summaries(&self) -> Vec<RuntimeSummary> {
40        let runtimes = self.runtimes.read().await;
41        let mut summaries = Vec::with_capacity(runtimes.len());
42        for runtime in runtimes.values() {
43            summaries.push(runtime.summary().await);
44        }
45        summaries.sort_by(|left, right| {
46            right
47                .is_primary
48                .cmp(&left.is_primary)
49                .then_with(|| left.display_name.cmp(&right.display_name))
50        });
51        summaries
52    }
53
54    pub async fn runtime_snapshot_for_client(&self) -> RuntimeStatusSnapshot {
55        match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_snapshot()).await {
56            Ok(snapshot) => snapshot,
57            Err(_) => {
58                self.log_timeout_warning(
59                    "runtime_snapshot:primary",
60                    "读取 primary runtime 状态超时,回退到存储快照",
61                );
62                self.fallback_runtime_snapshot()
63            }
64        }
65    }
66
67    pub async fn runtime_summaries_for_client(&self) -> Vec<RuntimeSummary> {
68        match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_summaries()).await {
69            Ok(summaries) => summaries,
70            Err(_) => {
71                self.log_timeout_warning(
72                    "runtime_summaries:list",
73                    "读取 runtime 列表超时,回退到存储快照",
74                );
75                self.fallback_runtime_summaries()
76            }
77        }
78    }
79
80    pub(super) async fn get_runtime_status(
81        &self,
82        request: GetRuntimeStatusRequest,
83    ) -> Result<Value> {
84        let runtime_id = request
85            .runtime_id
86            .unwrap_or_else(|| self.primary_runtime_id.clone());
87        let runtime = match timeout(
88            CLIENT_STATUS_TIMEOUT,
89            self.require_runtime(Some(&runtime_id)),
90        )
91        .await
92        {
93            Ok(Ok(runtime)) => runtime.summary().await,
94            Ok(Err(error)) => return Err(error),
95            Err(_) => {
96                self.log_timeout_warning(
97                    &format!("runtime_status:{runtime_id}"),
98                    &format!("读取 runtime={runtime_id} 状态超时,回退到存储快照"),
99                );
100                self.fallback_runtime_summary(&runtime_id)?
101            }
102        };
103        Ok(json!({ "runtime": runtime }))
104    }
105
106    pub(super) async fn start_runtime(&self, request: StartRuntimeRequest) -> Result<Value> {
107        let runtime_id = match request.runtime_id.clone() {
108            Some(runtime_id) => runtime_id,
109            None => format!("runtime-{}", &Uuid::new_v4().simple().to_string()[..8]),
110        };
111
112        if self.get_runtime(&runtime_id).await.is_some() {
113            if request.display_name.is_some()
114                || request.codex_home.is_some()
115                || request.codex_binary.is_some()
116            {
117                bail!("已存在 runtime,启动现有 runtime 时不能覆盖配置");
118            }
119
120            self.start_existing_runtime(&runtime_id).await?;
121            let runtime = self
122                .require_runtime(Some(&runtime_id))
123                .await?
124                .summary()
125                .await;
126            return Ok(json!({ "runtime": runtime }));
127        }
128
129        {
130            let runtimes = self.runtimes.read().await;
131            if runtimes.len() >= self.runtime_limit {
132                bail!("已达到 runtime 上限 {}", self.runtime_limit);
133            }
134        }
135
136        let codex_home = request
137            .codex_home
138            .as_deref()
139            .map(|value| expand_path(Path::new(value)))
140            .transpose()?
141            .map(|value| value.to_string_lossy().to_string());
142        let now = now_millis();
143        let record = RuntimeRecord {
144            runtime_id: runtime_id.clone(),
145            display_name: request
146                .display_name
147                .filter(|value| !value.trim().is_empty())
148                .unwrap_or_else(|| runtime_id.clone()),
149            codex_home,
150            codex_binary: request
151                .codex_binary
152                .filter(|value| !value.trim().is_empty())
153                .unwrap_or_else(|| "codex".to_string()),
154            is_primary: false,
155            auto_start: request.auto_start.unwrap_or(false),
156            created_at_ms: now,
157            updated_at_ms: now,
158        };
159
160        self.storage.upsert_runtime(&record)?;
161        self.register_runtime(record).await?;
162        self.start_existing_runtime(&runtime_id).await?;
163
164        let runtime = self
165            .require_runtime(Some(&runtime_id))
166            .await?
167            .summary()
168            .await;
169        Ok(json!({ "runtime": runtime }))
170    }
171
172    pub(super) async fn stop_runtime(&self, request: StopRuntimeRequest) -> Result<Value> {
173        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
174        let current_status = runtime.status.read().await.clone();
175        self.emit_runtime_status(
176            &request.runtime_id,
177            RuntimeStatusSnapshot {
178                runtime_id: request.runtime_id.clone(),
179                status: "stopping".to_string(),
180                codex_home: current_status.codex_home,
181                user_agent: current_status.user_agent,
182                platform_family: current_status.platform_family,
183                platform_os: current_status.platform_os,
184                last_error: None,
185                pid: current_status.pid,
186                app_server_handshake: AppServerHandshakeSummary::inactive(),
187                updated_at_ms: now_millis(),
188            },
189        )
190        .await?;
191        runtime.app_server.stop().await?;
192
193        let runtime = self
194            .require_runtime(Some(&request.runtime_id))
195            .await?
196            .summary()
197            .await;
198        Ok(json!({ "runtime": runtime }))
199    }
200
201    pub(super) async fn restart_runtime(&self, request: RestartRuntimeRequest) -> Result<Value> {
202        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
203        let current_status = runtime.status.read().await.clone();
204        self.emit_runtime_status(
205            &request.runtime_id,
206            RuntimeStatusSnapshot {
207                runtime_id: request.runtime_id.clone(),
208                status: "stopping".to_string(),
209                codex_home: current_status.codex_home,
210                user_agent: current_status.user_agent,
211                platform_family: current_status.platform_family,
212                platform_os: current_status.platform_os,
213                last_error: None,
214                pid: current_status.pid,
215                app_server_handshake: AppServerHandshakeSummary::inactive(),
216                updated_at_ms: now_millis(),
217            },
218        )
219        .await?;
220        runtime.app_server.restart().await?;
221
222        let runtime = self
223            .require_runtime(Some(&request.runtime_id))
224            .await?
225            .summary()
226            .await;
227        Ok(json!({ "runtime": runtime }))
228    }
229
230    pub(super) async fn prune_runtime(&self, request: PruneRuntimeRequest) -> Result<Value> {
231        if request.runtime_id == self.primary_runtime_id {
232            bail!("primary runtime 不能删除");
233        }
234
235        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
236        runtime.app_server.stop().await?;
237
238        {
239            let mut runtimes = self.runtimes.write().await;
240            runtimes.remove(&request.runtime_id);
241        }
242        self.storage.remove_runtime(&request.runtime_id)?;
243        self.emit_runtime_list().await?;
244
245        Ok(json!({ "pruned": true, "runtimeId": request.runtime_id }))
246    }
247
248    pub(super) async fn start_existing_runtime(&self, runtime_id: &str) -> Result<()> {
249        let runtime = self.require_runtime(Some(runtime_id)).await?;
250        runtime.app_server.start().await
251    }
252
253    pub(super) async fn register_runtime(&self, record: RuntimeRecord) -> Result<()> {
254        let managed = Arc::new(Self::build_runtime(record, self.inbound_tx.clone()));
255        {
256            let mut runtimes = self.runtimes.write().await;
257            runtimes.insert(managed.record.runtime_id.clone(), managed);
258        }
259        self.emit_runtime_list().await?;
260        Ok(())
261    }
262
263    pub(super) fn build_runtime(
264        record: RuntimeRecord,
265        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
266    ) -> ManagedRuntime {
267        ManagedRuntime {
268            status: RwLock::new(RuntimeStatusSnapshot::stopped(record.runtime_id.clone())),
269            app_server: AppServerManager::new(
270                AppServerLaunchConfig {
271                    runtime_id: record.runtime_id.clone(),
272                    codex_binary: record.codex_binary.clone(),
273                    codex_home: record.codex_home.as_ref().map(std::path::PathBuf::from),
274                },
275                inbound_tx,
276            ),
277            record,
278        }
279    }
280
281    pub(super) async fn get_runtime(&self, runtime_id: &str) -> Option<Arc<ManagedRuntime>> {
282        self.runtimes.read().await.get(runtime_id).cloned()
283    }
284
285    pub(super) async fn require_runtime(
286        &self,
287        runtime_id: Option<&str>,
288    ) -> Result<Arc<ManagedRuntime>> {
289        let target = runtime_id.unwrap_or(&self.primary_runtime_id);
290        self.get_runtime(target)
291            .await
292            .with_context(|| format!("未找到 runtime: {target}"))
293    }
294
295    pub(super) async fn emit_runtime_status(
296        &self,
297        runtime_id: &str,
298        status: RuntimeStatusSnapshot,
299    ) -> Result<()> {
300        let runtime = self.require_runtime(Some(runtime_id)).await?;
301        {
302            let mut guard = runtime.status.write().await;
303            *guard = status;
304        }
305        let summary = runtime.summary().await;
306        self.emit_event(
307            "runtime_status_changed",
308            Some(runtime_id),
309            None,
310            status_payload(&summary),
311        )?;
312        Ok(())
313    }
314
315    pub(super) async fn emit_runtime_process_changed(
316        &self,
317        runtime_id: &str,
318        pid: Option<u32>,
319        running: bool,
320    ) -> Result<()> {
321        let runtime = self.require_runtime(Some(runtime_id)).await?;
322        {
323            let mut guard = runtime.status.write().await;
324            guard.pid = pid;
325            guard.updated_at_ms = now_millis();
326            if !running && guard.status == "starting" {
327                guard.status = "stopped".to_string();
328                guard.last_error = None;
329                guard.app_server_handshake = AppServerHandshakeSummary::inactive();
330            }
331        }
332        self.emit_event(
333            "runtime_process_changed",
334            Some(runtime_id),
335            None,
336            json!({
337                "runtimeId": runtime_id,
338                "pid": pid,
339                "running": running,
340                "updatedAtMs": now_millis(),
341            }),
342        )?;
343        Ok(())
344    }
345
346    pub(super) async fn emit_runtime_degraded(
347        &self,
348        runtime_id: &str,
349        message: String,
350    ) -> Result<()> {
351        self.emit_event(
352            "runtime_degraded",
353            Some(runtime_id),
354            None,
355            json!({
356                "runtimeId": runtime_id,
357                "message": message,
358                "updatedAtMs": now_millis(),
359            }),
360        )
361    }
362
363    pub(super) async fn emit_runtime_list(&self) -> Result<()> {
364        let runtimes = self.runtime_summaries().await;
365        self.emit_event("runtime_list", None, None, runtime_list_payload(&runtimes))
366    }
367
368    pub(super) fn fallback_runtime_snapshot(&self) -> RuntimeStatusSnapshot {
369        self.storage
370            .get_runtime(&self.primary_runtime_id)
371            .ok()
372            .flatten()
373            .map(|record| self.build_fallback_status(&record))
374            .unwrap_or_else(|| RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()))
375    }
376
377    pub(super) fn fallback_runtime_summaries(&self) -> Vec<RuntimeSummary> {
378        let mut summaries = self
379            .storage
380            .list_runtimes()
381            .unwrap_or_default()
382            .into_iter()
383            .map(|record| RuntimeSummary::from_parts(&record, self.build_fallback_status(&record)))
384            .collect::<Vec<_>>();
385        summaries.sort_by(|left, right| {
386            right
387                .is_primary
388                .cmp(&left.is_primary)
389                .then_with(|| left.display_name.cmp(&right.display_name))
390        });
391        summaries
392    }
393
394    pub(super) fn fallback_runtime_summary(&self, runtime_id: &str) -> Result<RuntimeSummary> {
395        let record = self
396            .storage
397            .get_runtime(runtime_id)?
398            .with_context(|| format!("未找到 runtime: {runtime_id}"))?;
399        Ok(RuntimeSummary::from_parts(
400            &record,
401            self.build_fallback_status(&record),
402        ))
403    }
404
405    fn build_fallback_status(&self, record: &RuntimeRecord) -> RuntimeStatusSnapshot {
406        RuntimeStatusSnapshot {
407            runtime_id: record.runtime_id.clone(),
408            status: if record.auto_start {
409                "unknown".to_string()
410            } else {
411                "stopped".to_string()
412            },
413            codex_home: record.codex_home.clone(),
414            user_agent: None,
415            platform_family: None,
416            platform_os: None,
417            last_error: None,
418            pid: None,
419            app_server_handshake: AppServerHandshakeSummary::inactive(),
420            updated_at_ms: now_millis(),
421        }
422    }
423
424    pub(super) async fn transition_runtime_status(
425        &self,
426        runtime_id: &str,
427        status_label: String,
428        last_error: Option<String>,
429        handshake: AppServerHandshakeSummary,
430    ) -> Result<()> {
431        let runtime = self.require_runtime(Some(runtime_id)).await?;
432        let current_status = runtime.status.read().await.clone();
433        self.emit_runtime_status(
434            runtime_id,
435            RuntimeStatusSnapshot {
436                runtime_id: runtime_id.to_string(),
437                status: status_label,
438                codex_home: current_status.codex_home,
439                user_agent: current_status.user_agent,
440                platform_family: current_status.platform_family,
441                platform_os: current_status.platform_os,
442                last_error,
443                pid: current_status.pid,
444                app_server_handshake: handshake,
445                updated_at_ms: now_millis(),
446            },
447        )
448        .await
449    }
450}