codex-mobile-bridge 0.3.2

Remote bridge and service manager for codex-mobile.
Documentation
mod directories;
mod events;
mod helpers;
mod management;
mod render;
mod runtime;
mod threads;
mod timeline;

#[cfg(test)]
mod tests;

use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration as StdDuration, Instant};

use anyhow::{Result, bail};
use serde_json::{Value, json};
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio::time::Duration;
use tracing::warn;

use self::directories::seed_directory_bookmarks;
use self::events::run_app_server_event_loop;
use self::runtime::ManagedRuntime;
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{
    DirectoryBookmarkRecord, DirectoryHistoryRecord, PendingServerRequestRecord, PersistedEvent,
    RuntimeStatusSnapshot, RuntimeSummary, ThreadRenderSnapshot, require_payload,
};
use crate::config::Config;
use crate::storage::Storage;

pub struct BridgeState {
    token: String,
    storage: Storage,
    runtimes: RwLock<HashMap<String, Arc<ManagedRuntime>>>,
    primary_runtime_id: String,
    runtime_limit: usize,
    staging_root: PathBuf,
    inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
    events_tx: broadcast::Sender<PersistedEvent>,
    staged_turn_inputs: Mutex<HashMap<String, Vec<PathBuf>>>,
    thread_render_snapshots: Mutex<HashMap<String, ThreadRenderSnapshot>>,
    timeout_warning_tracker: Mutex<HashMap<String, Instant>>,
    external_event_cursor: Mutex<i64>,
}

impl BridgeState {
    pub async fn bootstrap(config: Config) -> Result<Arc<Self>> {
        let storage = Storage::open(config.db_path.clone())?;
        seed_directory_bookmarks(&storage, &config.directory_bookmarks)?;
        let staging_root = staging_root_from_db_path(&config.db_path);
        prepare_staging_root(&staging_root)?;

        let primary_runtime = storage.ensure_primary_runtime(
            config
                .codex_home
                .as_ref()
                .map(|path| path.to_string_lossy().to_string()),
            config.codex_binary.clone(),
        )?;

        let (events_tx, _) = broadcast::channel(512);
        let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();

        let mut runtime_map = HashMap::new();
        for record in storage.list_runtimes()? {
            let runtime = Arc::new(Self::build_runtime(record, inbound_tx.clone()));
            runtime_map.insert(runtime.record.runtime_id.clone(), runtime);
        }

        let state = Arc::new(Self {
            token: config.token,
            storage,
            runtimes: RwLock::new(runtime_map),
            primary_runtime_id: primary_runtime.runtime_id,
            runtime_limit: config.runtime_limit.max(1),
            staging_root,
            inbound_tx: inbound_tx.clone(),
            events_tx,
            staged_turn_inputs: Mutex::new(HashMap::new()),
            thread_render_snapshots: Mutex::new(HashMap::new()),
            timeout_warning_tracker: Mutex::new(HashMap::new()),
            external_event_cursor: Mutex::new(0),
        });

        *state
            .external_event_cursor
            .lock()
            .expect("external event cursor poisoned") = state.storage.latest_event_seq()?;

        tokio::spawn(run_app_server_event_loop(Arc::clone(&state), inbound_rx));
        tokio::spawn(management::run_external_event_relay(Arc::clone(&state)));

        for summary in state.runtime_summaries().await {
            if summary.auto_start {
                let runtime_id = summary.runtime_id.clone();
                let state_ref = Arc::clone(&state);
                tokio::spawn(async move {
                    if let Err(error) = state_ref.start_existing_runtime(&runtime_id).await {
                        let _ = state_ref
                            .emit_runtime_degraded(
                                &runtime_id,
                                format!("自动启动 runtime 失败: {error}"),
                            )
                            .await;
                    }
                });
            }
        }

        Ok(state)
    }

    pub fn subscribe_events(&self) -> broadcast::Receiver<PersistedEvent> {
        self.events_tx.subscribe()
    }

    pub fn config_token(&self) -> &str {
        &self.token
    }

    pub async fn hello_payload(
        &self,
        device_id: &str,
        provided_ack_seq: Option<i64>,
    ) -> Result<(
        RuntimeStatusSnapshot,
        Vec<RuntimeSummary>,
        Vec<DirectoryBookmarkRecord>,
        Vec<DirectoryHistoryRecord>,
        Vec<PendingServerRequestRecord>,
        Vec<PersistedEvent>,
    )> {
        let fallback_ack = self.storage.get_mobile_session_ack(device_id)?.unwrap_or(0);
        let last_ack_seq = provided_ack_seq.unwrap_or(fallback_ack);
        self.storage
            .save_mobile_session_ack(device_id, last_ack_seq)?;

        let runtime = self.runtime_snapshot_for_client().await;
        let runtimes = self.runtime_summaries_for_client().await;
        let directory_bookmarks = self.storage.list_directory_bookmarks()?;
        let directory_history = self.storage.list_directory_history(20)?;
        let pending_requests = self.storage.list_pending_requests()?;
        let replay_events = self.storage.replay_events_after(last_ack_seq)?;

        Ok((
            runtime,
            runtimes,
            directory_bookmarks,
            directory_history,
            pending_requests,
            replay_events,
        ))
    }

    pub fn ack_events(&self, device_id: &str, last_seq: i64) -> Result<()> {
        self.storage.save_mobile_session_ack(device_id, last_seq)
    }

    pub async fn handle_request(&self, action: &str, payload: Value) -> Result<Value> {
        match action {
            "get_runtime_status" => self.get_runtime_status(require_payload(payload)?).await,
            "list_runtimes" => Ok(json!({ "runtimes": self.runtime_summaries_for_client().await })),
            "start_runtime" => self.start_runtime(require_payload(payload)?).await,
            "stop_runtime" => self.stop_runtime(require_payload(payload)?).await,
            "restart_runtime" => self.restart_runtime(require_payload(payload)?).await,
            "prune_runtime" => self.prune_runtime(require_payload(payload)?).await,
            "read_directory" => self.read_directory(require_payload(payload)?).await,
            "create_directory_bookmark" => {
                self.create_directory_bookmark(require_payload(payload)?)
                    .await
            }
            "remove_directory_bookmark" => {
                self.remove_directory_bookmark(require_payload(payload)?)
                    .await
            }
            "list_threads" => self.list_threads(require_payload(payload)?).await,
            "start_thread" => self.start_thread(require_payload(payload)?).await,
            "read_thread" => self.read_thread(require_payload(payload)?).await,
            "resume_thread" => self.resume_thread(require_payload(payload)?).await,
            "update_thread" => self.update_thread(require_payload(payload)?).await,
            "archive_thread" => self.archive_thread(require_payload(payload)?).await,
            "unarchive_thread" => self.unarchive_thread(require_payload(payload)?).await,
            "stage_input_image" => self.stage_input_image(require_payload(payload)?).await,
            "send_turn" => self.send_turn(require_payload(payload)?).await,
            "interrupt_turn" => self.interrupt_turn(require_payload(payload)?).await,
            "respond_pending_request" => {
                self.respond_pending_request(require_payload(payload)?)
                    .await
            }
            "start_bridge_management" => {
                self.start_bridge_management(require_payload(payload)?)
                    .await
            }
            "read_bridge_management" => {
                self.read_bridge_management(require_payload(payload)?).await
            }
            "inspect_remote_state" => self.inspect_remote_state().await,
            _ => bail!("未知 action: {action}"),
        }
    }

    fn log_timeout_warning(&self, key: &str, message: &str) {
        if self.should_emit_rate_limited_notice(key) {
            warn!("{message}");
        }
    }

    pub(super) fn should_emit_rate_limited_notice(&self, key: &str) -> bool {
        let now = Instant::now();
        let mut tracker = self
            .timeout_warning_tracker
            .lock()
            .expect("timeout warning tracker poisoned");
        let should_emit = tracker
            .get(key)
            .map(|last| now.duration_since(*last) >= CLIENT_TIMEOUT_WARN_COOLDOWN)
            .unwrap_or(true);
        if should_emit {
            tracker.insert(key.to_string(), now);
        }
        should_emit
    }

    fn emit_event(
        &self,
        event_type: &str,
        runtime_id: Option<&str>,
        thread_id: Option<&str>,
        payload: Value,
    ) -> Result<()> {
        let event = self
            .storage
            .append_event(event_type, runtime_id, thread_id, &payload)?;
        *self
            .external_event_cursor
            .lock()
            .expect("external event cursor poisoned") = event.seq;
        let _ = self.events_tx.send(event);
        Ok(())
    }

    pub(super) fn staging_root(&self) -> &Path {
        &self.staging_root
    }

    pub(super) fn register_staged_turn_inputs(&self, turn_id: &str, paths: Vec<PathBuf>) {
        if paths.is_empty() {
            return;
        }
        let mut staged_turn_inputs = self
            .staged_turn_inputs
            .lock()
            .expect("staged turn inputs poisoned");
        staged_turn_inputs.insert(turn_id.to_string(), paths);
    }

    pub(super) fn cleanup_staged_turn_inputs(&self, turn_id: &str) -> Result<()> {
        let paths = self
            .staged_turn_inputs
            .lock()
            .expect("staged turn inputs poisoned")
            .remove(turn_id)
            .unwrap_or_default();
        self.cleanup_staged_paths(paths)
    }

    pub(super) fn cleanup_staged_paths<I>(&self, paths: I) -> Result<()>
    where
        I: IntoIterator<Item = PathBuf>,
    {
        for path in paths {
            self.remove_staged_path(&path)?;
        }
        Ok(())
    }

    fn remove_staged_path(&self, path: &Path) -> Result<()> {
        if !path.starts_with(&self.staging_root) {
            bail!("拒绝清理 staging 目录之外的文件: {}", path.display());
        }
        if path.exists() {
            fs::remove_file(path)?;
        }
        Ok(())
    }
}

fn staging_root_from_db_path(db_path: &Path) -> PathBuf {
    db_path
        .parent()
        .unwrap_or_else(|| Path::new("."))
        .join("staged-inputs")
}

fn prepare_staging_root(staging_root: &Path) -> Result<()> {
    fs::create_dir_all(staging_root)?;
    for entry in fs::read_dir(staging_root)? {
        let path = entry?.path();
        if path.is_dir() {
            fs::remove_dir_all(&path)?;
        } else {
            fs::remove_file(&path)?;
        }
    }
    Ok(())
}

pub(super) const CLIENT_STATUS_TIMEOUT: Duration = Duration::from_millis(400);
pub(super) const CLIENT_TIMEOUT_WARN_COOLDOWN: StdDuration = StdDuration::from_secs(30);