Skip to main content

codex_mobile_bridge/state/
mod.rs

1mod directories;
2mod events;
3mod helpers;
4mod management;
5mod render;
6mod runtime;
7mod threads;
8
9#[cfg(test)]
10mod tests;
11
12use std::collections::HashMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration as StdDuration, Instant};
17
18use anyhow::{Context, Result, bail};
19use serde_json::{Value, json};
20use tokio::sync::{RwLock, broadcast, mpsc};
21use tokio::time::Duration;
22use tracing::warn;
23
24use self::directories::seed_directory_bookmarks;
25use self::events::run_app_server_event_loop;
26use self::runtime::ManagedRuntime;
27use crate::app_server::AppServerInbound;
28use crate::bridge_protocol::{
29    DirectoryBookmarkRecord, DirectoryHistoryRecord, PendingServerRequestRecord, PersistedEvent,
30    RuntimeStatusSnapshot, RuntimeSummary, ThreadRenderSnapshot, require_payload,
31};
32use crate::config::Config;
33use crate::storage::Storage;
34
35pub struct BridgeState {
36    token: String,
37    storage: Storage,
38    runtimes: RwLock<HashMap<String, Arc<ManagedRuntime>>>,
39    primary_runtime_id: String,
40    runtime_limit: usize,
41    staging_root: PathBuf,
42    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
43    events_tx: broadcast::Sender<PersistedEvent>,
44    staged_turn_inputs: Mutex<HashMap<String, Vec<PathBuf>>>,
45    thread_render_snapshots: Mutex<HashMap<String, ThreadRenderSnapshot>>,
46    timeout_warning_tracker: Mutex<HashMap<String, Instant>>,
47    external_event_cursor: Mutex<i64>,
48}
49
50impl BridgeState {
51    pub async fn bootstrap(config: Config) -> Result<Arc<Self>> {
52        let storage = Storage::open(config.db_path.clone())?;
53        seed_directory_bookmarks(&storage, &config.directory_bookmarks)?;
54        let staging_root = staging_root_from_db_path(&config.db_path);
55        prepare_staging_root(&staging_root)?;
56
57        let primary_runtime = storage.ensure_primary_runtime(
58            config
59                .codex_home
60                .as_ref()
61                .map(|path| path.to_string_lossy().to_string()),
62            config.codex_binary.clone(),
63        )?;
64
65        let (events_tx, _) = broadcast::channel(512);
66        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
67
68        let mut runtime_map = HashMap::new();
69        for record in storage.list_runtimes()? {
70            let runtime = Arc::new(Self::build_runtime(record, inbound_tx.clone()));
71            runtime_map.insert(runtime.record.runtime_id.clone(), runtime);
72        }
73
74        let state = Arc::new(Self {
75            token: config.token,
76            storage,
77            runtimes: RwLock::new(runtime_map),
78            primary_runtime_id: primary_runtime.runtime_id,
79            runtime_limit: config.runtime_limit.max(1),
80            staging_root,
81            inbound_tx: inbound_tx.clone(),
82            events_tx,
83            staged_turn_inputs: Mutex::new(HashMap::new()),
84            thread_render_snapshots: Mutex::new(HashMap::new()),
85            timeout_warning_tracker: Mutex::new(HashMap::new()),
86            external_event_cursor: Mutex::new(0),
87        });
88
89        *state
90            .external_event_cursor
91            .lock()
92            .expect("external event cursor poisoned") = state.storage.latest_event_seq()?;
93
94        tokio::spawn(run_app_server_event_loop(Arc::clone(&state), inbound_rx));
95        tokio::spawn(management::run_external_event_relay(Arc::clone(&state)));
96
97        for summary in state.runtime_summaries().await {
98            if summary.auto_start {
99                let runtime_id = summary.runtime_id.clone();
100                let state_ref = Arc::clone(&state);
101                tokio::spawn(async move {
102                    if let Err(error) = state_ref.start_existing_runtime(&runtime_id).await {
103                        let _ = state_ref
104                            .emit_runtime_degraded(
105                                &runtime_id,
106                                format!("自动启动 runtime 失败: {error}"),
107                            )
108                            .await;
109                    }
110                });
111            }
112        }
113
114        Ok(state)
115    }
116
117    pub fn subscribe_events(&self) -> broadcast::Receiver<PersistedEvent> {
118        self.events_tx.subscribe()
119    }
120
121    pub fn config_token(&self) -> &str {
122        &self.token
123    }
124
125    pub async fn hello_payload(
126        &self,
127        device_id: &str,
128        provided_ack_seq: Option<i64>,
129    ) -> Result<(
130        RuntimeStatusSnapshot,
131        Vec<RuntimeSummary>,
132        Vec<DirectoryBookmarkRecord>,
133        Vec<DirectoryHistoryRecord>,
134        Vec<PendingServerRequestRecord>,
135        Vec<PersistedEvent>,
136    )> {
137        let fallback_ack = self.storage.get_mobile_session_ack(device_id)?.unwrap_or(0);
138        let last_ack_seq = provided_ack_seq.unwrap_or(fallback_ack);
139        self.storage
140            .save_mobile_session_ack(device_id, last_ack_seq)?;
141
142        let runtime = self.runtime_snapshot_for_client().await;
143        let runtimes = self.runtime_summaries_for_client().await;
144        let directory_bookmarks = self.storage.list_directory_bookmarks()?;
145        let directory_history = self.storage.list_directory_history(20)?;
146        let pending_requests = self.storage.list_pending_requests()?;
147        let replay_events = self.storage.replay_events_after(last_ack_seq)?;
148
149        Ok((
150            runtime,
151            runtimes,
152            directory_bookmarks,
153            directory_history,
154            pending_requests,
155            replay_events,
156        ))
157    }
158
159    pub fn ack_events(&self, device_id: &str, last_seq: i64) -> Result<()> {
160        self.storage.save_mobile_session_ack(device_id, last_seq)
161    }
162
163    pub async fn handle_request(&self, action: &str, payload: Value) -> Result<Value> {
164        self.handle_request_from_device(action, payload, None).await
165    }
166
167    pub async fn handle_request_from_device(
168        &self,
169        action: &str,
170        payload: Value,
171        device_id: Option<&str>,
172    ) -> Result<Value> {
173        match action {
174            "get_runtime_status" => self.get_runtime_status(require_payload(payload)?).await,
175            "list_runtimes" => Ok(json!({ "runtimes": self.runtime_summaries_for_client().await })),
176            "start_runtime" => self.start_runtime(require_payload(payload)?).await,
177            "stop_runtime" => self.stop_runtime(require_payload(payload)?).await,
178            "restart_runtime" => self.restart_runtime(require_payload(payload)?).await,
179            "prune_runtime" => self.prune_runtime(require_payload(payload)?).await,
180            "read_directory" => self.read_directory(require_payload(payload)?).await,
181            "create_directory_bookmark" => {
182                self.create_directory_bookmark(require_payload(payload)?)
183                    .await
184            }
185            "remove_directory_bookmark" => {
186                self.remove_directory_bookmark(require_payload(payload)?)
187                    .await
188            }
189            "list_threads" => self.list_threads(require_payload(payload)?).await,
190            "start_thread" => self.start_thread(require_payload(payload)?).await,
191            "read_thread" => self.read_thread(require_payload(payload)?).await,
192            "resume_thread" => self.resume_thread(require_payload(payload)?).await,
193            "update_thread" => self.update_thread(require_payload(payload)?).await,
194            "archive_thread" => self.archive_thread(require_payload(payload)?).await,
195            "unarchive_thread" => self.unarchive_thread(require_payload(payload)?).await,
196            "stage_input_image" => self.stage_input_image(require_payload(payload)?).await,
197            "send_turn" => self.send_turn(require_payload(payload)?).await,
198            "interrupt_turn" => self.interrupt_turn(require_payload(payload)?).await,
199            "enqueue_thread_message" => self.enqueue_thread_message(require_payload(payload)?).await,
200            "update_queued_thread_message" => {
201                self.update_queued_thread_message(require_payload(payload)?).await
202            }
203            "delete_queued_thread_message" => {
204                self.delete_queued_thread_message(require_payload(payload)?).await
205            }
206            "clear_thread_queue" => self.clear_thread_queue(require_payload(payload)?).await,
207            "reserve_queued_thread_message_for_edit" => self
208                .reserve_queued_thread_message_for_edit(
209                    require_payload(payload)?,
210                    device_id.context("编辑队列消息需要 device_id")?,
211                )
212                .await,
213            "cancel_queued_thread_message_edit" => {
214                self.cancel_queued_thread_message_edit(require_payload(payload)?)
215                    .await
216            }
217            "respond_pending_request" => {
218                self.respond_pending_request(require_payload(payload)?)
219                    .await
220            }
221            "start_bridge_management" => {
222                self.start_bridge_management(require_payload(payload)?)
223                    .await
224            }
225            "read_bridge_management" => {
226                self.read_bridge_management(require_payload(payload)?).await
227            }
228            "inspect_remote_state" => self.inspect_remote_state().await,
229            _ => bail!("未知 action: {action}"),
230        }
231    }
232
233    fn log_timeout_warning(&self, key: &str, message: &str) {
234        if self.should_emit_rate_limited_notice(key) {
235            warn!("{message}");
236        }
237    }
238
239    pub(super) fn should_emit_rate_limited_notice(&self, key: &str) -> bool {
240        let now = Instant::now();
241        let mut tracker = self
242            .timeout_warning_tracker
243            .lock()
244            .expect("timeout warning tracker poisoned");
245        let should_emit = tracker
246            .get(key)
247            .map(|last| now.duration_since(*last) >= CLIENT_TIMEOUT_WARN_COOLDOWN)
248            .unwrap_or(true);
249        if should_emit {
250            tracker.insert(key.to_string(), now);
251        }
252        should_emit
253    }
254
255    fn emit_event(
256        &self,
257        event_type: &str,
258        runtime_id: Option<&str>,
259        thread_id: Option<&str>,
260        payload: Value,
261    ) -> Result<()> {
262        let event = self
263            .storage
264            .append_event(event_type, runtime_id, thread_id, &payload)?;
265        *self
266            .external_event_cursor
267            .lock()
268            .expect("external event cursor poisoned") = event.seq;
269        let _ = self.events_tx.send(event);
270        Ok(())
271    }
272
273    pub(super) fn staging_root(&self) -> &Path {
274        &self.staging_root
275    }
276
277    pub(super) fn register_staged_turn_inputs(&self, turn_id: &str, paths: Vec<PathBuf>) {
278        if paths.is_empty() {
279            return;
280        }
281        let mut staged_turn_inputs = self
282            .staged_turn_inputs
283            .lock()
284            .expect("staged turn inputs poisoned");
285        staged_turn_inputs.insert(turn_id.to_string(), paths);
286    }
287
288    pub(super) fn cleanup_staged_turn_inputs(&self, turn_id: &str) -> Result<()> {
289        let paths = self
290            .staged_turn_inputs
291            .lock()
292            .expect("staged turn inputs poisoned")
293            .remove(turn_id)
294            .unwrap_or_default();
295        self.cleanup_staged_paths(paths)
296    }
297
298    pub(super) fn cleanup_staged_paths<I>(&self, paths: I) -> Result<()>
299    where
300        I: IntoIterator<Item = PathBuf>,
301    {
302        for path in paths {
303            self.remove_staged_path(&path)?;
304        }
305        Ok(())
306    }
307
308    fn remove_staged_path(&self, path: &Path) -> Result<()> {
309        if !path.starts_with(&self.staging_root) {
310            bail!("拒绝清理 staging 目录之外的文件: {}", path.display());
311        }
312        if path.exists() {
313            fs::remove_file(path)?;
314        }
315        Ok(())
316    }
317}
318
319fn staging_root_from_db_path(db_path: &Path) -> PathBuf {
320    db_path
321        .parent()
322        .unwrap_or_else(|| Path::new("."))
323        .join("staged-inputs")
324}
325
326fn prepare_staging_root(staging_root: &Path) -> Result<()> {
327    fs::create_dir_all(staging_root)?;
328    for entry in fs::read_dir(staging_root)? {
329        let path = entry?.path();
330        if path.is_dir() {
331            fs::remove_dir_all(&path)?;
332        } else {
333            fs::remove_file(&path)?;
334        }
335    }
336    Ok(())
337}
338
339pub(super) const CLIENT_STATUS_TIMEOUT: Duration = Duration::from_millis(400);
340pub(super) const CLIENT_TIMEOUT_WARN_COOLDOWN: StdDuration = StdDuration::from_secs(30);