mod events;
mod helpers;
mod runtime;
mod threads;
mod timeline;
#[cfg(test)]
mod tests;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration as StdDuration, Instant};
use anyhow::{Result, bail};
use serde_json::{Value, json};
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio::time::Duration;
use tracing::warn;
use self::events::run_app_server_event_loop;
use self::runtime::ManagedRuntime;
use self::threads::seed_workspaces;
use crate::app_server::AppServerInbound;
use crate::bridge_protocol::{
PendingServerRequestRecord, PersistedEvent, RuntimeStatusSnapshot, RuntimeSummary,
WorkspaceRecord, require_payload,
};
use crate::config::Config;
use crate::storage::Storage;
pub struct BridgeState {
token: String,
storage: Storage,
runtimes: RwLock<HashMap<String, Arc<ManagedRuntime>>>,
primary_runtime_id: String,
runtime_limit: usize,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
events_tx: broadcast::Sender<PersistedEvent>,
timeout_warning_tracker: Mutex<HashMap<String, Instant>>,
}
impl BridgeState {
pub async fn bootstrap(config: Config) -> Result<Arc<Self>> {
let storage = Storage::open(config.db_path.clone())?;
seed_workspaces(&storage, &config.workspace_roots)?;
let primary_runtime = storage.ensure_primary_runtime(
config
.codex_home
.as_ref()
.map(|path| path.to_string_lossy().to_string()),
config.codex_binary.clone(),
)?;
let (events_tx, _) = broadcast::channel(512);
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
let mut runtime_map = HashMap::new();
for record in storage.list_runtimes()? {
let runtime = Arc::new(Self::build_runtime(record, inbound_tx.clone()));
runtime_map.insert(runtime.record.runtime_id.clone(), runtime);
}
let state = Arc::new(Self {
token: config.token,
storage,
runtimes: RwLock::new(runtime_map),
primary_runtime_id: primary_runtime.runtime_id,
runtime_limit: config.runtime_limit.max(1),
inbound_tx: inbound_tx.clone(),
events_tx,
timeout_warning_tracker: Mutex::new(HashMap::new()),
});
tokio::spawn(run_app_server_event_loop(Arc::clone(&state), inbound_rx));
for summary in state.runtime_summaries().await {
if summary.auto_start {
let runtime_id = summary.runtime_id.clone();
let state_ref = Arc::clone(&state);
tokio::spawn(async move {
if let Err(error) = state_ref.start_existing_runtime(&runtime_id).await {
let _ = state_ref
.emit_runtime_degraded(
&runtime_id,
format!("自动启动 runtime 失败: {error}"),
)
.await;
}
});
}
}
Ok(state)
}
pub fn subscribe_events(&self) -> broadcast::Receiver<PersistedEvent> {
self.events_tx.subscribe()
}
pub fn config_token(&self) -> &str {
&self.token
}
pub async fn hello_payload(
&self,
device_id: &str,
provided_ack_seq: Option<i64>,
) -> Result<(
RuntimeStatusSnapshot,
Vec<RuntimeSummary>,
Vec<WorkspaceRecord>,
Vec<PendingServerRequestRecord>,
Vec<PersistedEvent>,
)> {
let fallback_ack = self.storage.get_mobile_session_ack(device_id)?.unwrap_or(0);
let last_ack_seq = provided_ack_seq.unwrap_or(fallback_ack);
self.storage
.save_mobile_session_ack(device_id, last_ack_seq)?;
let runtime = self.runtime_snapshot_for_client().await;
let runtimes = self.runtime_summaries_for_client().await;
let workspaces = self.storage.list_workspaces()?;
let pending_requests = self.storage.list_pending_requests()?;
let replay_events = self.storage.replay_events_after(last_ack_seq)?;
Ok((
runtime,
runtimes,
workspaces,
pending_requests,
replay_events,
))
}
pub fn ack_events(&self, device_id: &str, last_seq: i64) -> Result<()> {
self.storage.save_mobile_session_ack(device_id, last_seq)
}
pub async fn handle_request(&self, action: &str, payload: Value) -> Result<Value> {
match action {
"get_runtime_status" => self.get_runtime_status(require_payload(payload)?).await,
"list_runtimes" => Ok(json!({ "runtimes": self.runtime_summaries_for_client().await })),
"start_runtime" => self.start_runtime(require_payload(payload)?).await,
"stop_runtime" => self.stop_runtime(require_payload(payload)?).await,
"restart_runtime" => self.restart_runtime(require_payload(payload)?).await,
"prune_runtime" => self.prune_runtime(require_payload(payload)?).await,
"list_workspaces" => Ok(json!({ "workspaces": self.storage.list_workspaces()? })),
"upsert_workspace" => self.upsert_workspace(require_payload(payload)?).await,
"list_threads" => self.list_threads(require_payload(payload)?).await,
"start_thread" => self.start_thread(require_payload(payload)?).await,
"read_thread" => self.read_thread(require_payload(payload)?).await,
"resume_thread" => self.resume_thread(require_payload(payload)?).await,
"update_thread" => self.update_thread(require_payload(payload)?).await,
"archive_thread" => self.archive_thread(require_payload(payload)?).await,
"unarchive_thread" => self.unarchive_thread(require_payload(payload)?).await,
"send_turn" => self.send_turn(require_payload(payload)?).await,
"interrupt_turn" => self.interrupt_turn(require_payload(payload)?).await,
"respond_pending_request" => {
self.respond_pending_request(require_payload(payload)?)
.await
}
_ => bail!("未知 action: {action}"),
}
}
fn log_timeout_warning(&self, key: &str, message: &str) {
let now = Instant::now();
let mut tracker = self
.timeout_warning_tracker
.lock()
.expect("timeout warning tracker poisoned");
let should_log = tracker
.get(key)
.map(|last| now.duration_since(*last) >= CLIENT_TIMEOUT_WARN_COOLDOWN)
.unwrap_or(true);
if should_log {
tracker.insert(key.to_string(), now);
warn!("{message}");
}
}
fn emit_event(
&self,
event_type: &str,
runtime_id: Option<&str>,
thread_id: Option<&str>,
payload: Value,
) -> Result<()> {
let event = self
.storage
.append_event(event_type, runtime_id, thread_id, &payload)?;
let _ = self.events_tx.send(event);
Ok(())
}
}
pub(super) const CLIENT_STATUS_TIMEOUT: Duration = Duration::from_millis(400);
pub(super) const CLIENT_TIMEOUT_WARN_COOLDOWN: StdDuration = StdDuration::from_secs(30);