Skip to main content

codex_mobile_bridge/state/
mod.rs

1mod directories;
2mod events;
3mod helpers;
4mod management;
5mod render;
6mod runtime;
7mod threads;
8mod timeline;
9
10#[cfg(test)]
11mod tests;
12
13use std::collections::HashMap;
14use std::fs;
15use std::path::{Path, PathBuf};
16use std::sync::{Arc, Mutex};
17use std::time::{Duration as StdDuration, Instant};
18
19use anyhow::{Result, bail};
20use serde_json::{Value, json};
21use tokio::sync::{RwLock, broadcast, mpsc};
22use tokio::time::Duration;
23use tracing::warn;
24
25use self::directories::seed_directory_bookmarks;
26use self::events::run_app_server_event_loop;
27use self::runtime::ManagedRuntime;
28use crate::app_server::AppServerInbound;
29use crate::bridge_protocol::{
30    DirectoryBookmarkRecord, DirectoryHistoryRecord, PendingServerRequestRecord, PersistedEvent,
31    RuntimeStatusSnapshot, RuntimeSummary, ThreadRenderSnapshot, require_payload,
32};
33use crate::config::Config;
34use crate::storage::Storage;
35
36pub struct BridgeState {
37    token: String,
38    storage: Storage,
39    runtimes: RwLock<HashMap<String, Arc<ManagedRuntime>>>,
40    primary_runtime_id: String,
41    runtime_limit: usize,
42    staging_root: PathBuf,
43    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
44    events_tx: broadcast::Sender<PersistedEvent>,
45    staged_turn_inputs: Mutex<HashMap<String, Vec<PathBuf>>>,
46    thread_render_snapshots: Mutex<HashMap<String, ThreadRenderSnapshot>>,
47    timeout_warning_tracker: Mutex<HashMap<String, Instant>>,
48    external_event_cursor: Mutex<i64>,
49}
50
51impl BridgeState {
52    pub async fn bootstrap(config: Config) -> Result<Arc<Self>> {
53        let storage = Storage::open(config.db_path.clone())?;
54        seed_directory_bookmarks(&storage, &config.directory_bookmarks)?;
55        let staging_root = staging_root_from_db_path(&config.db_path);
56        prepare_staging_root(&staging_root)?;
57
58        let primary_runtime = storage.ensure_primary_runtime(
59            config
60                .codex_home
61                .as_ref()
62                .map(|path| path.to_string_lossy().to_string()),
63            config.codex_binary.clone(),
64        )?;
65
66        let (events_tx, _) = broadcast::channel(512);
67        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
68
69        let mut runtime_map = HashMap::new();
70        for record in storage.list_runtimes()? {
71            let runtime = Arc::new(Self::build_runtime(record, inbound_tx.clone()));
72            runtime_map.insert(runtime.record.runtime_id.clone(), runtime);
73        }
74
75        let state = Arc::new(Self {
76            token: config.token,
77            storage,
78            runtimes: RwLock::new(runtime_map),
79            primary_runtime_id: primary_runtime.runtime_id,
80            runtime_limit: config.runtime_limit.max(1),
81            staging_root,
82            inbound_tx: inbound_tx.clone(),
83            events_tx,
84            staged_turn_inputs: Mutex::new(HashMap::new()),
85            thread_render_snapshots: Mutex::new(HashMap::new()),
86            timeout_warning_tracker: Mutex::new(HashMap::new()),
87            external_event_cursor: Mutex::new(0),
88        });
89
90        *state
91            .external_event_cursor
92            .lock()
93            .expect("external event cursor poisoned") = state.storage.latest_event_seq()?;
94
95        tokio::spawn(run_app_server_event_loop(Arc::clone(&state), inbound_rx));
96        tokio::spawn(management::run_external_event_relay(Arc::clone(&state)));
97
98        for summary in state.runtime_summaries().await {
99            if summary.auto_start {
100                let runtime_id = summary.runtime_id.clone();
101                let state_ref = Arc::clone(&state);
102                tokio::spawn(async move {
103                    if let Err(error) = state_ref.start_existing_runtime(&runtime_id).await {
104                        let _ = state_ref
105                            .emit_runtime_degraded(
106                                &runtime_id,
107                                format!("自动启动 runtime 失败: {error}"),
108                            )
109                            .await;
110                    }
111                });
112            }
113        }
114
115        Ok(state)
116    }
117
118    pub fn subscribe_events(&self) -> broadcast::Receiver<PersistedEvent> {
119        self.events_tx.subscribe()
120    }
121
122    pub fn config_token(&self) -> &str {
123        &self.token
124    }
125
126    pub async fn hello_payload(
127        &self,
128        device_id: &str,
129        provided_ack_seq: Option<i64>,
130    ) -> Result<(
131        RuntimeStatusSnapshot,
132        Vec<RuntimeSummary>,
133        Vec<DirectoryBookmarkRecord>,
134        Vec<DirectoryHistoryRecord>,
135        Vec<PendingServerRequestRecord>,
136        Vec<PersistedEvent>,
137    )> {
138        let fallback_ack = self.storage.get_mobile_session_ack(device_id)?.unwrap_or(0);
139        let last_ack_seq = provided_ack_seq.unwrap_or(fallback_ack);
140        self.storage
141            .save_mobile_session_ack(device_id, last_ack_seq)?;
142
143        let runtime = self.runtime_snapshot_for_client().await;
144        let runtimes = self.runtime_summaries_for_client().await;
145        let directory_bookmarks = self.storage.list_directory_bookmarks()?;
146        let directory_history = self.storage.list_directory_history(20)?;
147        let pending_requests = self.storage.list_pending_requests()?;
148        let replay_events = self.storage.replay_events_after(last_ack_seq)?;
149
150        Ok((
151            runtime,
152            runtimes,
153            directory_bookmarks,
154            directory_history,
155            pending_requests,
156            replay_events,
157        ))
158    }
159
160    pub fn ack_events(&self, device_id: &str, last_seq: i64) -> Result<()> {
161        self.storage.save_mobile_session_ack(device_id, last_seq)
162    }
163
164    pub async fn handle_request(&self, action: &str, payload: Value) -> Result<Value> {
165        match action {
166            "get_runtime_status" => self.get_runtime_status(require_payload(payload)?).await,
167            "list_runtimes" => Ok(json!({ "runtimes": self.runtime_summaries_for_client().await })),
168            "start_runtime" => self.start_runtime(require_payload(payload)?).await,
169            "stop_runtime" => self.stop_runtime(require_payload(payload)?).await,
170            "restart_runtime" => self.restart_runtime(require_payload(payload)?).await,
171            "prune_runtime" => self.prune_runtime(require_payload(payload)?).await,
172            "read_directory" => self.read_directory(require_payload(payload)?).await,
173            "create_directory_bookmark" => {
174                self.create_directory_bookmark(require_payload(payload)?)
175                    .await
176            }
177            "remove_directory_bookmark" => {
178                self.remove_directory_bookmark(require_payload(payload)?)
179                    .await
180            }
181            "list_threads" => self.list_threads(require_payload(payload)?).await,
182            "start_thread" => self.start_thread(require_payload(payload)?).await,
183            "read_thread" => self.read_thread(require_payload(payload)?).await,
184            "resume_thread" => self.resume_thread(require_payload(payload)?).await,
185            "update_thread" => self.update_thread(require_payload(payload)?).await,
186            "archive_thread" => self.archive_thread(require_payload(payload)?).await,
187            "unarchive_thread" => self.unarchive_thread(require_payload(payload)?).await,
188            "stage_input_image" => self.stage_input_image(require_payload(payload)?).await,
189            "send_turn" => self.send_turn(require_payload(payload)?).await,
190            "interrupt_turn" => self.interrupt_turn(require_payload(payload)?).await,
191            "respond_pending_request" => {
192                self.respond_pending_request(require_payload(payload)?)
193                    .await
194            }
195            "start_bridge_management" => {
196                self.start_bridge_management(require_payload(payload)?)
197                    .await
198            }
199            "read_bridge_management" => {
200                self.read_bridge_management(require_payload(payload)?).await
201            }
202            "inspect_remote_state" => self.inspect_remote_state().await,
203            _ => bail!("未知 action: {action}"),
204        }
205    }
206
207    fn log_timeout_warning(&self, key: &str, message: &str) {
208        if self.should_emit_rate_limited_notice(key) {
209            warn!("{message}");
210        }
211    }
212
213    pub(super) fn should_emit_rate_limited_notice(&self, key: &str) -> bool {
214        let now = Instant::now();
215        let mut tracker = self
216            .timeout_warning_tracker
217            .lock()
218            .expect("timeout warning tracker poisoned");
219        let should_emit = tracker
220            .get(key)
221            .map(|last| now.duration_since(*last) >= CLIENT_TIMEOUT_WARN_COOLDOWN)
222            .unwrap_or(true);
223        if should_emit {
224            tracker.insert(key.to_string(), now);
225        }
226        should_emit
227    }
228
229    fn emit_event(
230        &self,
231        event_type: &str,
232        runtime_id: Option<&str>,
233        thread_id: Option<&str>,
234        payload: Value,
235    ) -> Result<()> {
236        let event = self
237            .storage
238            .append_event(event_type, runtime_id, thread_id, &payload)?;
239        *self
240            .external_event_cursor
241            .lock()
242            .expect("external event cursor poisoned") = event.seq;
243        let _ = self.events_tx.send(event);
244        Ok(())
245    }
246
247    pub(super) fn staging_root(&self) -> &Path {
248        &self.staging_root
249    }
250
251    pub(super) fn register_staged_turn_inputs(&self, turn_id: &str, paths: Vec<PathBuf>) {
252        if paths.is_empty() {
253            return;
254        }
255        let mut staged_turn_inputs = self
256            .staged_turn_inputs
257            .lock()
258            .expect("staged turn inputs poisoned");
259        staged_turn_inputs.insert(turn_id.to_string(), paths);
260    }
261
262    pub(super) fn cleanup_staged_turn_inputs(&self, turn_id: &str) -> Result<()> {
263        let paths = self
264            .staged_turn_inputs
265            .lock()
266            .expect("staged turn inputs poisoned")
267            .remove(turn_id)
268            .unwrap_or_default();
269        self.cleanup_staged_paths(paths)
270    }
271
272    pub(super) fn cleanup_staged_paths<I>(&self, paths: I) -> Result<()>
273    where
274        I: IntoIterator<Item = PathBuf>,
275    {
276        for path in paths {
277            self.remove_staged_path(&path)?;
278        }
279        Ok(())
280    }
281
282    fn remove_staged_path(&self, path: &Path) -> Result<()> {
283        if !path.starts_with(&self.staging_root) {
284            bail!("拒绝清理 staging 目录之外的文件: {}", path.display());
285        }
286        if path.exists() {
287            fs::remove_file(path)?;
288        }
289        Ok(())
290    }
291}
292
293fn staging_root_from_db_path(db_path: &Path) -> PathBuf {
294    db_path
295        .parent()
296        .unwrap_or_else(|| Path::new("."))
297        .join("staged-inputs")
298}
299
300fn prepare_staging_root(staging_root: &Path) -> Result<()> {
301    fs::create_dir_all(staging_root)?;
302    for entry in fs::read_dir(staging_root)? {
303        let path = entry?.path();
304        if path.is_dir() {
305            fs::remove_dir_all(&path)?;
306        } else {
307            fs::remove_file(&path)?;
308        }
309    }
310    Ok(())
311}
312
313pub(super) const CLIENT_STATUS_TIMEOUT: Duration = Duration::from_millis(400);
314pub(super) const CLIENT_TIMEOUT_WARN_COOLDOWN: StdDuration = StdDuration::from_secs(30);