Skip to main content

codex_mobile_bridge/state/
runtime.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, bail};
5use serde_json::{Value, json};
6use tokio::sync::{RwLock, mpsc};
7use tokio::time::timeout;
8use uuid::Uuid;
9
10use super::{BridgeState, CLIENT_STATUS_TIMEOUT};
11use crate::app_server::{AppServerInbound, AppServerLaunchConfig, AppServerManager};
12use crate::bridge_protocol::{
13    GetRuntimeStatusRequest, PruneRuntimeRequest, RestartRuntimeRequest, RuntimeRecord,
14    RuntimeStatusSnapshot, RuntimeSummary, StartRuntimeRequest, StopRuntimeRequest, now_millis,
15    runtime_list_payload, status_payload,
16};
17use crate::config::expand_path;
18
19pub(super) struct ManagedRuntime {
20    pub(super) record: RuntimeRecord,
21    pub(super) app_server: AppServerManager,
22    pub(super) status: RwLock<RuntimeStatusSnapshot>,
23}
24
25impl ManagedRuntime {
26    pub(super) async fn summary(&self) -> RuntimeSummary {
27        RuntimeSummary::from_parts(&self.record, self.status.read().await.clone())
28    }
29}
30
31impl BridgeState {
32    pub async fn runtime_snapshot(&self) -> RuntimeStatusSnapshot {
33        match self.get_runtime(&self.primary_runtime_id).await {
34            Some(runtime) => runtime.status.read().await.clone(),
35            None => RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()),
36        }
37    }
38
39    pub async fn runtime_summaries(&self) -> Vec<RuntimeSummary> {
40        let runtimes = self.runtimes.read().await;
41        let mut summaries = Vec::with_capacity(runtimes.len());
42        for runtime in runtimes.values() {
43            summaries.push(runtime.summary().await);
44        }
45        summaries.sort_by(|left, right| {
46            right
47                .is_primary
48                .cmp(&left.is_primary)
49                .then_with(|| left.display_name.cmp(&right.display_name))
50        });
51        summaries
52    }
53
54    pub async fn runtime_snapshot_for_client(&self) -> RuntimeStatusSnapshot {
55        match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_snapshot()).await {
56            Ok(snapshot) => snapshot,
57            Err(_) => {
58                self.log_timeout_warning(
59                    "runtime_snapshot:primary",
60                    "读取 primary runtime 状态超时,回退到存储快照",
61                );
62                self.fallback_runtime_snapshot()
63            }
64        }
65    }
66
67    pub async fn runtime_summaries_for_client(&self) -> Vec<RuntimeSummary> {
68        match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_summaries()).await {
69            Ok(summaries) => summaries,
70            Err(_) => {
71                self.log_timeout_warning(
72                    "runtime_summaries:list",
73                    "读取 runtime 列表超时,回退到存储快照",
74                );
75                self.fallback_runtime_summaries()
76            }
77        }
78    }
79
80    pub(super) async fn get_runtime_status(
81        &self,
82        request: GetRuntimeStatusRequest,
83    ) -> Result<Value> {
84        let runtime_id = request
85            .runtime_id
86            .unwrap_or_else(|| self.primary_runtime_id.clone());
87        let runtime = match timeout(
88            CLIENT_STATUS_TIMEOUT,
89            self.require_runtime(Some(&runtime_id)),
90        )
91        .await
92        {
93            Ok(Ok(runtime)) => runtime.summary().await,
94            Ok(Err(error)) => return Err(error),
95            Err(_) => {
96                self.log_timeout_warning(
97                    &format!("runtime_status:{runtime_id}"),
98                    &format!("读取 runtime={runtime_id} 状态超时,回退到存储快照"),
99                );
100                self.fallback_runtime_summary(&runtime_id)?
101            }
102        };
103        Ok(json!({ "runtime": runtime }))
104    }
105
106    pub(super) async fn start_runtime(&self, request: StartRuntimeRequest) -> Result<Value> {
107        let runtime_id = match request.runtime_id.clone() {
108            Some(runtime_id) => runtime_id,
109            None => format!("runtime-{}", &Uuid::new_v4().simple().to_string()[..8]),
110        };
111
112        if self.get_runtime(&runtime_id).await.is_some() {
113            if request.display_name.is_some()
114                || request.codex_home.is_some()
115                || request.codex_binary.is_some()
116            {
117                bail!("已存在 runtime,启动现有 runtime 时不能覆盖配置");
118            }
119
120            self.start_existing_runtime(&runtime_id).await?;
121            let runtime = self
122                .require_runtime(Some(&runtime_id))
123                .await?
124                .summary()
125                .await;
126            return Ok(json!({ "runtime": runtime }));
127        }
128
129        {
130            let runtimes = self.runtimes.read().await;
131            if runtimes.len() >= self.runtime_limit {
132                bail!("已达到 runtime 上限 {}", self.runtime_limit);
133            }
134        }
135
136        let codex_home = request
137            .codex_home
138            .as_deref()
139            .map(|value| expand_path(Path::new(value)))
140            .transpose()?
141            .map(|value| value.to_string_lossy().to_string());
142        let now = now_millis();
143        let record = RuntimeRecord {
144            runtime_id: runtime_id.clone(),
145            display_name: request
146                .display_name
147                .filter(|value| !value.trim().is_empty())
148                .unwrap_or_else(|| runtime_id.clone()),
149            codex_home,
150            codex_binary: request
151                .codex_binary
152                .filter(|value| !value.trim().is_empty())
153                .unwrap_or_else(|| "codex".to_string()),
154            is_primary: false,
155            auto_start: request.auto_start.unwrap_or(false),
156            created_at_ms: now,
157            updated_at_ms: now,
158        };
159
160        self.storage.upsert_runtime(&record)?;
161        self.register_runtime(record).await?;
162        self.start_existing_runtime(&runtime_id).await?;
163
164        let runtime = self
165            .require_runtime(Some(&runtime_id))
166            .await?
167            .summary()
168            .await;
169        Ok(json!({ "runtime": runtime }))
170    }
171
172    pub(super) async fn stop_runtime(&self, request: StopRuntimeRequest) -> Result<Value> {
173        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
174        let current_status = runtime.status.read().await.clone();
175        self.emit_runtime_status(
176            &request.runtime_id,
177            RuntimeStatusSnapshot {
178                runtime_id: request.runtime_id.clone(),
179                status: "stopping".to_string(),
180                codex_home: current_status.codex_home,
181                user_agent: current_status.user_agent,
182                platform_family: current_status.platform_family,
183                platform_os: current_status.platform_os,
184                last_error: None,
185                pid: current_status.pid,
186                updated_at_ms: now_millis(),
187            },
188        )
189        .await?;
190        runtime.app_server.stop().await?;
191
192        let runtime = self
193            .require_runtime(Some(&request.runtime_id))
194            .await?
195            .summary()
196            .await;
197        Ok(json!({ "runtime": runtime }))
198    }
199
200    pub(super) async fn restart_runtime(&self, request: RestartRuntimeRequest) -> Result<Value> {
201        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
202        let current_status = runtime.status.read().await.clone();
203        self.emit_runtime_status(
204            &request.runtime_id,
205            RuntimeStatusSnapshot {
206                runtime_id: request.runtime_id.clone(),
207                status: "stopping".to_string(),
208                codex_home: current_status.codex_home,
209                user_agent: current_status.user_agent,
210                platform_family: current_status.platform_family,
211                platform_os: current_status.platform_os,
212                last_error: None,
213                pid: current_status.pid,
214                updated_at_ms: now_millis(),
215            },
216        )
217        .await?;
218        runtime.app_server.restart().await?;
219
220        let runtime = self
221            .require_runtime(Some(&request.runtime_id))
222            .await?
223            .summary()
224            .await;
225        Ok(json!({ "runtime": runtime }))
226    }
227
228    pub(super) async fn prune_runtime(&self, request: PruneRuntimeRequest) -> Result<Value> {
229        if request.runtime_id == self.primary_runtime_id {
230            bail!("primary runtime 不能删除");
231        }
232
233        let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
234        runtime.app_server.stop().await?;
235
236        {
237            let mut runtimes = self.runtimes.write().await;
238            runtimes.remove(&request.runtime_id);
239        }
240        self.storage.remove_runtime(&request.runtime_id)?;
241        self.emit_runtime_list().await?;
242
243        Ok(json!({ "pruned": true, "runtimeId": request.runtime_id }))
244    }
245
246    pub(super) async fn start_existing_runtime(&self, runtime_id: &str) -> Result<()> {
247        let runtime = self.require_runtime(Some(runtime_id)).await?;
248        runtime.app_server.start().await
249    }
250
251    pub(super) async fn register_runtime(&self, record: RuntimeRecord) -> Result<()> {
252        let managed = Arc::new(Self::build_runtime(record, self.inbound_tx.clone()));
253        {
254            let mut runtimes = self.runtimes.write().await;
255            runtimes.insert(managed.record.runtime_id.clone(), managed);
256        }
257        self.emit_runtime_list().await?;
258        Ok(())
259    }
260
261    pub(super) fn build_runtime(
262        record: RuntimeRecord,
263        inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
264    ) -> ManagedRuntime {
265        ManagedRuntime {
266            status: RwLock::new(RuntimeStatusSnapshot::stopped(record.runtime_id.clone())),
267            app_server: AppServerManager::new(
268                AppServerLaunchConfig {
269                    runtime_id: record.runtime_id.clone(),
270                    codex_binary: record.codex_binary.clone(),
271                    codex_home: record.codex_home.as_ref().map(std::path::PathBuf::from),
272                },
273                inbound_tx,
274            ),
275            record,
276        }
277    }
278
279    pub(super) async fn get_runtime(&self, runtime_id: &str) -> Option<Arc<ManagedRuntime>> {
280        self.runtimes.read().await.get(runtime_id).cloned()
281    }
282
283    pub(super) async fn require_runtime(
284        &self,
285        runtime_id: Option<&str>,
286    ) -> Result<Arc<ManagedRuntime>> {
287        let target = runtime_id.unwrap_or(&self.primary_runtime_id);
288        self.get_runtime(target)
289            .await
290            .with_context(|| format!("未找到 runtime: {target}"))
291    }
292
293    pub(super) async fn emit_runtime_status(
294        &self,
295        runtime_id: &str,
296        status: RuntimeStatusSnapshot,
297    ) -> Result<()> {
298        let runtime = self.require_runtime(Some(runtime_id)).await?;
299        {
300            let mut guard = runtime.status.write().await;
301            *guard = status;
302        }
303        let summary = runtime.summary().await;
304        self.emit_event(
305            "runtime_status_changed",
306            Some(runtime_id),
307            None,
308            status_payload(&summary),
309        )?;
310        Ok(())
311    }
312
313    pub(super) async fn emit_runtime_process_changed(
314        &self,
315        runtime_id: &str,
316        pid: Option<u32>,
317        running: bool,
318    ) -> Result<()> {
319        let runtime = self.require_runtime(Some(runtime_id)).await?;
320        {
321            let mut guard = runtime.status.write().await;
322            guard.pid = pid;
323            guard.updated_at_ms = now_millis();
324            if !running && guard.status == "starting" {
325                guard.status = "stopped".to_string();
326                guard.last_error = None;
327            }
328        }
329        self.emit_event(
330            "runtime_process_changed",
331            Some(runtime_id),
332            None,
333            json!({
334                "runtimeId": runtime_id,
335                "pid": pid,
336                "running": running,
337                "updatedAtMs": now_millis(),
338            }),
339        )?;
340        Ok(())
341    }
342
343    pub(super) async fn emit_runtime_degraded(
344        &self,
345        runtime_id: &str,
346        message: String,
347    ) -> Result<()> {
348        self.emit_event(
349            "runtime_degraded",
350            Some(runtime_id),
351            None,
352            json!({
353                "runtimeId": runtime_id,
354                "message": message,
355                "updatedAtMs": now_millis(),
356            }),
357        )
358    }
359
360    pub(super) async fn emit_runtime_list(&self) -> Result<()> {
361        let runtimes = self.runtime_summaries().await;
362        self.emit_event("runtime_list", None, None, runtime_list_payload(&runtimes))
363    }
364
365    pub(super) fn fallback_runtime_snapshot(&self) -> RuntimeStatusSnapshot {
366        self.storage
367            .get_runtime(&self.primary_runtime_id)
368            .ok()
369            .flatten()
370            .map(|record| self.build_fallback_status(&record))
371            .unwrap_or_else(|| RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()))
372    }
373
374    pub(super) fn fallback_runtime_summaries(&self) -> Vec<RuntimeSummary> {
375        let mut summaries = self
376            .storage
377            .list_runtimes()
378            .unwrap_or_default()
379            .into_iter()
380            .map(|record| RuntimeSummary::from_parts(&record, self.build_fallback_status(&record)))
381            .collect::<Vec<_>>();
382        summaries.sort_by(|left, right| {
383            right
384                .is_primary
385                .cmp(&left.is_primary)
386                .then_with(|| left.display_name.cmp(&right.display_name))
387        });
388        summaries
389    }
390
391    pub(super) fn fallback_runtime_summary(&self, runtime_id: &str) -> Result<RuntimeSummary> {
392        let record = self
393            .storage
394            .get_runtime(runtime_id)?
395            .with_context(|| format!("未找到 runtime: {runtime_id}"))?;
396        Ok(RuntimeSummary::from_parts(
397            &record,
398            self.build_fallback_status(&record),
399        ))
400    }
401
402    fn build_fallback_status(&self, record: &RuntimeRecord) -> RuntimeStatusSnapshot {
403        RuntimeStatusSnapshot {
404            runtime_id: record.runtime_id.clone(),
405            status: if record.auto_start {
406                "unknown".to_string()
407            } else {
408                "stopped".to_string()
409            },
410            codex_home: record.codex_home.clone(),
411            user_agent: None,
412            platform_family: None,
413            platform_os: None,
414            last_error: None,
415            pid: None,
416            updated_at_ms: now_millis(),
417        }
418    }
419}