codex-mobile-bridge 0.3.3

Remote bridge and service manager for codex-mobile.
Documentation
use std::sync::Arc;

use anyhow::{Result, bail};
use serde_json::json;
use tokio::time::{Duration, sleep};
use uuid::Uuid;

use super::BridgeState;
use crate::bridge_protocol::{
    BridgeManagementPhase, BridgeManagementStatus, BridgeManagementTask,
    BridgeManagementTaskPayload, BridgeManagementTaskResultPayload, ReadBridgeManagementRequest,
    StartBridgeManagementRequest, now_millis,
};
use crate::manage::spawn_bridge_management_task;

impl BridgeState {
    pub(super) async fn start_bridge_management(
        &self,
        request: StartBridgeManagementRequest,
    ) -> Result<serde_json::Value> {
        if let Some(active_task) = self.storage.active_bridge_management_task()? {
            bail!(
                "已有远端管理任务正在执行: {} {}",
                active_task.operation.as_str(),
                bridge_management_status_text(active_task.status)
            );
        }

        let initial_snapshot = self.current_bridge_management_snapshot().await;
        let current_version = initial_snapshot
            .as_ref()
            .and_then(|snapshot| snapshot.install_record.as_ref())
            .and_then(|record| record.current_version.clone());
        let task = BridgeManagementTask {
            task_id: Uuid::new_v4().to_string(),
            operation: request.operation,
            status: BridgeManagementStatus::Queued,
            phase: BridgeManagementPhase::ResolveTarget,
            summary: format!("已受理远端 bridge {} 请求", request.operation.as_str()),
            detail: Some("等待 bridge 后台任务启动".to_string()),
            failure_code: None,
            target_version: None,
            current_version,
            started_at_ms: now_millis(),
            updated_at_ms: now_millis(),
            snapshot: initial_snapshot,
        };
        self.persist_bridge_management_task(&task)?;

        if let Err(error) = spawn_bridge_management_task(&task.task_id, self.storage.db_path()) {
            let failed = BridgeManagementTask {
                status: BridgeManagementStatus::Failed,
                phase: BridgeManagementPhase::Done,
                summary: format!("远端 bridge {} 任务启动失败", task.operation.as_str()),
                detail: Some(error.to_string()),
                failure_code: Some("spawn_failed".to_string()),
                updated_at_ms: now_millis(),
                ..task
            };
            self.persist_bridge_management_task(&failed)?;
            return Ok(json!(BridgeManagementTaskPayload { task: failed }));
        }

        Ok(json!(BridgeManagementTaskPayload { task }))
    }

    pub(super) async fn read_bridge_management(
        &self,
        request: ReadBridgeManagementRequest,
    ) -> Result<serde_json::Value> {
        let task = match request.task_id.as_deref() {
            Some(task_id) => self.storage.get_bridge_management_task(task_id)?,
            None => self.storage.latest_bridge_management_task()?,
        };
        Ok(json!(BridgeManagementTaskResultPayload { task }))
    }

    pub(super) async fn inspect_remote_state(&self) -> Result<serde_json::Value> {
        Ok(json!(crate::manage::inspect_remote_state(
            self.storage.db_path()
        )?))
    }

    pub(super) async fn current_bridge_management_snapshot(
        &self,
    ) -> Option<crate::bridge_protocol::ManagedBridgeSnapshot> {
        crate::manage::current_bridge_management_snapshot(self.storage.db_path()).ok()
    }

    fn persist_bridge_management_task(&self, task: &BridgeManagementTask) -> Result<()> {
        self.storage.upsert_bridge_management_task(task)?;
        self.emit_event(
            "bridge_management_updated",
            None,
            None,
            json!(BridgeManagementTaskPayload { task: task.clone() }),
        )?;
        Ok(())
    }
}

fn bridge_management_status_text(status: BridgeManagementStatus) -> &'static str {
    match status {
        BridgeManagementStatus::Queued => "queued",
        BridgeManagementStatus::Running => "running",
        BridgeManagementStatus::Reconnecting => "reconnecting",
        BridgeManagementStatus::Succeeded => "succeeded",
        BridgeManagementStatus::Failed => "failed",
    }
}

pub(super) async fn run_external_event_relay(state: Arc<BridgeState>) {
    loop {
        if let Err(error) = relay_external_events(&state).await {
            tracing::warn!("桥接外部事件转发失败: {error}");
        }
        sleep(Duration::from_millis(500)).await;
    }
}

async fn relay_external_events(state: &BridgeState) -> Result<()> {
    let last_seq = *state
        .external_event_cursor
        .lock()
        .expect("external event cursor poisoned");
    let events = state.storage.replay_events_after(last_seq)?;
    if events.is_empty() {
        return Ok(());
    }

    let mut cursor = last_seq;
    for event in events {
        cursor = event.seq;
        let _ = state.events_tx.send(event);
    }
    *state
        .external_event_cursor
        .lock()
        .expect("external event cursor poisoned") = cursor;
    Ok(())
}