use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result, bail};
use serde_json::{Value, json};
use tokio::sync::{RwLock, mpsc};
use tokio::time::timeout;
use uuid::Uuid;
use super::{BridgeState, CLIENT_STATUS_TIMEOUT};
use crate::app_server::{AppServerInbound, AppServerLaunchConfig, AppServerManager};
use crate::bridge_protocol::{
AppServerHandshakeSummary, GetRuntimeStatusRequest, PruneRuntimeRequest,
RestartRuntimeRequest, RuntimeRecord, RuntimeStatusSnapshot, RuntimeSummary,
StartRuntimeRequest, StopRuntimeRequest, now_millis, runtime_list_payload, status_payload,
};
use crate::config::expand_path;
pub(super) struct ManagedRuntime {
pub(super) record: RuntimeRecord,
pub(super) app_server: AppServerManager,
pub(super) status: RwLock<RuntimeStatusSnapshot>,
}
impl ManagedRuntime {
pub(super) async fn summary(&self) -> RuntimeSummary {
RuntimeSummary::from_parts(&self.record, self.status.read().await.clone())
}
}
impl BridgeState {
pub async fn runtime_snapshot(&self) -> RuntimeStatusSnapshot {
match self.get_runtime(&self.primary_runtime_id).await {
Some(runtime) => runtime.status.read().await.clone(),
None => RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()),
}
}
pub async fn runtime_summaries(&self) -> Vec<RuntimeSummary> {
let runtimes = self.runtimes.read().await;
let mut summaries = Vec::with_capacity(runtimes.len());
for runtime in runtimes.values() {
summaries.push(runtime.summary().await);
}
summaries.sort_by(|left, right| {
right
.is_primary
.cmp(&left.is_primary)
.then_with(|| left.display_name.cmp(&right.display_name))
});
summaries
}
pub async fn runtime_snapshot_for_client(&self) -> RuntimeStatusSnapshot {
match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_snapshot()).await {
Ok(snapshot) => snapshot,
Err(_) => {
self.log_timeout_warning(
"runtime_snapshot:primary",
"读取 primary runtime 状态超时,回退到存储快照",
);
self.fallback_runtime_snapshot()
}
}
}
pub async fn runtime_summaries_for_client(&self) -> Vec<RuntimeSummary> {
match timeout(CLIENT_STATUS_TIMEOUT, self.runtime_summaries()).await {
Ok(summaries) => summaries,
Err(_) => {
self.log_timeout_warning(
"runtime_summaries:list",
"读取 runtime 列表超时,回退到存储快照",
);
self.fallback_runtime_summaries()
}
}
}
pub(super) async fn get_runtime_status(
&self,
request: GetRuntimeStatusRequest,
) -> Result<Value> {
let runtime_id = request
.runtime_id
.unwrap_or_else(|| self.primary_runtime_id.clone());
let runtime = match timeout(
CLIENT_STATUS_TIMEOUT,
self.require_runtime(Some(&runtime_id)),
)
.await
{
Ok(Ok(runtime)) => runtime.summary().await,
Ok(Err(error)) => return Err(error),
Err(_) => {
self.log_timeout_warning(
&format!("runtime_status:{runtime_id}"),
&format!("读取 runtime={runtime_id} 状态超时,回退到存储快照"),
);
self.fallback_runtime_summary(&runtime_id)?
}
};
Ok(json!({ "runtime": runtime }))
}
pub(super) async fn start_runtime(&self, request: StartRuntimeRequest) -> Result<Value> {
let runtime_id = match request.runtime_id.clone() {
Some(runtime_id) => runtime_id,
None => format!("runtime-{}", &Uuid::new_v4().simple().to_string()[..8]),
};
if self.get_runtime(&runtime_id).await.is_some() {
if request.display_name.is_some()
|| request.codex_home.is_some()
|| request.codex_binary.is_some()
{
bail!("已存在 runtime,启动现有 runtime 时不能覆盖配置");
}
self.start_existing_runtime(&runtime_id).await?;
let runtime = self
.require_runtime(Some(&runtime_id))
.await?
.summary()
.await;
return Ok(json!({ "runtime": runtime }));
}
{
let runtimes = self.runtimes.read().await;
if runtimes.len() >= self.runtime_limit {
bail!("已达到 runtime 上限 {}", self.runtime_limit);
}
}
let codex_home = request
.codex_home
.as_deref()
.map(|value| expand_path(Path::new(value)))
.transpose()?
.map(|value| value.to_string_lossy().to_string());
let now = now_millis();
let record = RuntimeRecord {
runtime_id: runtime_id.clone(),
display_name: request
.display_name
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| runtime_id.clone()),
codex_home,
codex_binary: request
.codex_binary
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "codex".to_string()),
is_primary: false,
auto_start: request.auto_start.unwrap_or(false),
created_at_ms: now,
updated_at_ms: now,
};
self.storage.upsert_runtime(&record)?;
self.register_runtime(record).await?;
self.start_existing_runtime(&runtime_id).await?;
let runtime = self
.require_runtime(Some(&runtime_id))
.await?
.summary()
.await;
Ok(json!({ "runtime": runtime }))
}
pub(super) async fn stop_runtime(&self, request: StopRuntimeRequest) -> Result<Value> {
let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
let current_status = runtime.status.read().await.clone();
self.emit_runtime_status(
&request.runtime_id,
RuntimeStatusSnapshot {
runtime_id: request.runtime_id.clone(),
status: "stopping".to_string(),
codex_home: current_status.codex_home,
user_agent: current_status.user_agent,
platform_family: current_status.platform_family,
platform_os: current_status.platform_os,
last_error: None,
pid: current_status.pid,
app_server_handshake: AppServerHandshakeSummary::inactive(),
updated_at_ms: now_millis(),
},
)
.await?;
runtime.app_server.stop().await?;
let runtime = self
.require_runtime(Some(&request.runtime_id))
.await?
.summary()
.await;
Ok(json!({ "runtime": runtime }))
}
pub(super) async fn restart_runtime(&self, request: RestartRuntimeRequest) -> Result<Value> {
let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
let current_status = runtime.status.read().await.clone();
self.emit_runtime_status(
&request.runtime_id,
RuntimeStatusSnapshot {
runtime_id: request.runtime_id.clone(),
status: "stopping".to_string(),
codex_home: current_status.codex_home,
user_agent: current_status.user_agent,
platform_family: current_status.platform_family,
platform_os: current_status.platform_os,
last_error: None,
pid: current_status.pid,
app_server_handshake: AppServerHandshakeSummary::inactive(),
updated_at_ms: now_millis(),
},
)
.await?;
runtime.app_server.restart().await?;
let runtime = self
.require_runtime(Some(&request.runtime_id))
.await?
.summary()
.await;
Ok(json!({ "runtime": runtime }))
}
pub(super) async fn prune_runtime(&self, request: PruneRuntimeRequest) -> Result<Value> {
if request.runtime_id == self.primary_runtime_id {
bail!("primary runtime 不能删除");
}
let runtime = self.require_runtime(Some(&request.runtime_id)).await?;
runtime.app_server.stop().await?;
{
let mut runtimes = self.runtimes.write().await;
runtimes.remove(&request.runtime_id);
}
self.storage.remove_runtime(&request.runtime_id)?;
self.emit_runtime_list().await?;
Ok(json!({ "pruned": true, "runtimeId": request.runtime_id }))
}
pub(super) async fn start_existing_runtime(&self, runtime_id: &str) -> Result<()> {
let runtime = self.require_runtime(Some(runtime_id)).await?;
runtime.app_server.start().await
}
pub(super) async fn register_runtime(&self, record: RuntimeRecord) -> Result<()> {
let managed = Arc::new(Self::build_runtime(record, self.inbound_tx.clone()));
{
let mut runtimes = self.runtimes.write().await;
runtimes.insert(managed.record.runtime_id.clone(), managed);
}
self.emit_runtime_list().await?;
Ok(())
}
pub(super) fn build_runtime(
record: RuntimeRecord,
inbound_tx: mpsc::UnboundedSender<AppServerInbound>,
) -> ManagedRuntime {
ManagedRuntime {
status: RwLock::new(RuntimeStatusSnapshot::stopped(record.runtime_id.clone())),
app_server: AppServerManager::new(
AppServerLaunchConfig {
runtime_id: record.runtime_id.clone(),
codex_binary: record.codex_binary.clone(),
codex_home: record.codex_home.as_ref().map(std::path::PathBuf::from),
},
inbound_tx,
),
record,
}
}
pub(super) async fn get_runtime(&self, runtime_id: &str) -> Option<Arc<ManagedRuntime>> {
self.runtimes.read().await.get(runtime_id).cloned()
}
pub(super) async fn require_runtime(
&self,
runtime_id: Option<&str>,
) -> Result<Arc<ManagedRuntime>> {
let target = runtime_id.unwrap_or(&self.primary_runtime_id);
self.get_runtime(target)
.await
.with_context(|| format!("未找到 runtime: {target}"))
}
pub(super) async fn emit_runtime_status(
&self,
runtime_id: &str,
status: RuntimeStatusSnapshot,
) -> Result<()> {
let runtime = self.require_runtime(Some(runtime_id)).await?;
{
let mut guard = runtime.status.write().await;
*guard = status;
}
let summary = runtime.summary().await;
self.emit_event(
"runtime_status_changed",
Some(runtime_id),
None,
status_payload(&summary),
)?;
Ok(())
}
pub(super) async fn emit_runtime_process_changed(
&self,
runtime_id: &str,
pid: Option<u32>,
running: bool,
) -> Result<()> {
let runtime = self.require_runtime(Some(runtime_id)).await?;
{
let mut guard = runtime.status.write().await;
guard.pid = pid;
guard.updated_at_ms = now_millis();
if !running && guard.status == "starting" {
guard.status = "stopped".to_string();
guard.last_error = None;
guard.app_server_handshake = AppServerHandshakeSummary::inactive();
}
}
self.emit_event(
"runtime_process_changed",
Some(runtime_id),
None,
json!({
"runtimeId": runtime_id,
"pid": pid,
"running": running,
"updatedAtMs": now_millis(),
}),
)?;
Ok(())
}
pub(super) async fn emit_runtime_degraded(
&self,
runtime_id: &str,
message: String,
) -> Result<()> {
self.emit_event(
"runtime_degraded",
Some(runtime_id),
None,
json!({
"runtimeId": runtime_id,
"message": message,
"updatedAtMs": now_millis(),
}),
)
}
pub(super) async fn emit_runtime_list(&self) -> Result<()> {
let runtimes = self.runtime_summaries().await;
self.emit_event("runtime_list", None, None, runtime_list_payload(&runtimes))
}
pub(super) fn fallback_runtime_snapshot(&self) -> RuntimeStatusSnapshot {
self.storage
.get_runtime(&self.primary_runtime_id)
.ok()
.flatten()
.map(|record| self.build_fallback_status(&record))
.unwrap_or_else(|| RuntimeStatusSnapshot::stopped(self.primary_runtime_id.clone()))
}
pub(super) fn fallback_runtime_summaries(&self) -> Vec<RuntimeSummary> {
let mut summaries = self
.storage
.list_runtimes()
.unwrap_or_default()
.into_iter()
.map(|record| RuntimeSummary::from_parts(&record, self.build_fallback_status(&record)))
.collect::<Vec<_>>();
summaries.sort_by(|left, right| {
right
.is_primary
.cmp(&left.is_primary)
.then_with(|| left.display_name.cmp(&right.display_name))
});
summaries
}
pub(super) fn fallback_runtime_summary(&self, runtime_id: &str) -> Result<RuntimeSummary> {
let record = self
.storage
.get_runtime(runtime_id)?
.with_context(|| format!("未找到 runtime: {runtime_id}"))?;
Ok(RuntimeSummary::from_parts(
&record,
self.build_fallback_status(&record),
))
}
fn build_fallback_status(&self, record: &RuntimeRecord) -> RuntimeStatusSnapshot {
RuntimeStatusSnapshot {
runtime_id: record.runtime_id.clone(),
status: if record.auto_start {
"unknown".to_string()
} else {
"stopped".to_string()
},
codex_home: record.codex_home.clone(),
user_agent: None,
platform_family: None,
platform_os: None,
last_error: None,
pid: None,
app_server_handshake: AppServerHandshakeSummary::inactive(),
updated_at_ms: now_millis(),
}
}
pub(super) async fn transition_runtime_status(
&self,
runtime_id: &str,
status_label: String,
last_error: Option<String>,
handshake: AppServerHandshakeSummary,
) -> Result<()> {
let runtime = self.require_runtime(Some(runtime_id)).await?;
let current_status = runtime.status.read().await.clone();
self.emit_runtime_status(
runtime_id,
RuntimeStatusSnapshot {
runtime_id: runtime_id.to_string(),
status: status_label,
codex_home: current_status.codex_home,
user_agent: current_status.user_agent,
platform_family: current_status.platform_family,
platform_os: current_status.platform_os,
last_error,
pid: current_status.pid,
app_server_handshake: handshake,
updated_at_ms: now_millis(),
},
)
.await
}
}