use std::sync::Arc;
use anyhow::{Result, bail};
use serde_json::json;
use tokio::time::{Duration, sleep};
use uuid::Uuid;
use super::BridgeState;
use crate::bridge_protocol::{
BridgeManagementPhase, BridgeManagementStatus, BridgeManagementTask,
BridgeManagementTaskPayload, BridgeManagementTaskResultPayload, ReadBridgeManagementRequest,
StartBridgeManagementRequest, now_millis,
};
use crate::manage::spawn_bridge_management_task;
impl BridgeState {
pub(super) async fn start_bridge_management(
&self,
request: StartBridgeManagementRequest,
) -> Result<serde_json::Value> {
if let Some(active_task) = self.storage.active_bridge_management_task()? {
bail!(
"已有远端管理任务正在执行: {} {}",
active_task.operation.as_str(),
bridge_management_status_text(active_task.status)
);
}
let initial_snapshot = self.current_bridge_management_snapshot().await;
let current_version = initial_snapshot
.as_ref()
.and_then(|snapshot| snapshot.install_record.as_ref())
.and_then(|record| record.current_version.clone());
let task = BridgeManagementTask {
task_id: Uuid::new_v4().to_string(),
operation: request.operation,
status: BridgeManagementStatus::Queued,
phase: BridgeManagementPhase::ResolveTarget,
summary: format!("已受理远端 bridge {} 请求", request.operation.as_str()),
detail: Some("等待 bridge 后台任务启动".to_string()),
failure_code: None,
target_version: None,
current_version,
started_at_ms: now_millis(),
updated_at_ms: now_millis(),
snapshot: initial_snapshot,
};
self.persist_bridge_management_task(&task)?;
if let Err(error) = spawn_bridge_management_task(&task.task_id, self.storage.db_path()) {
let failed = BridgeManagementTask {
status: BridgeManagementStatus::Failed,
phase: BridgeManagementPhase::Done,
summary: format!("远端 bridge {} 任务启动失败", task.operation.as_str()),
detail: Some(error.to_string()),
failure_code: Some("spawn_failed".to_string()),
updated_at_ms: now_millis(),
..task
};
self.persist_bridge_management_task(&failed)?;
return Ok(json!(BridgeManagementTaskPayload { task: failed }));
}
Ok(json!(BridgeManagementTaskPayload { task }))
}
pub(super) async fn read_bridge_management(
&self,
request: ReadBridgeManagementRequest,
) -> Result<serde_json::Value> {
let task = match request.task_id.as_deref() {
Some(task_id) => self.storage.get_bridge_management_task(task_id)?,
None => self.storage.latest_bridge_management_task()?,
};
Ok(json!(BridgeManagementTaskResultPayload { task }))
}
pub(super) async fn inspect_remote_state(&self) -> Result<serde_json::Value> {
Ok(json!(crate::manage::inspect_remote_state(
self.storage.db_path()
)?))
}
pub(super) async fn current_bridge_management_snapshot(
&self,
) -> Option<crate::bridge_protocol::ManagedBridgeSnapshot> {
crate::manage::current_bridge_management_snapshot(self.storage.db_path()).ok()
}
fn persist_bridge_management_task(&self, task: &BridgeManagementTask) -> Result<()> {
self.storage.upsert_bridge_management_task(task)?;
self.emit_event(
"bridge_management_updated",
None,
None,
json!(BridgeManagementTaskPayload { task: task.clone() }),
)?;
Ok(())
}
}
fn bridge_management_status_text(status: BridgeManagementStatus) -> &'static str {
match status {
BridgeManagementStatus::Queued => "queued",
BridgeManagementStatus::Running => "running",
BridgeManagementStatus::Reconnecting => "reconnecting",
BridgeManagementStatus::Succeeded => "succeeded",
BridgeManagementStatus::Failed => "failed",
}
}
pub(super) async fn run_external_event_relay(state: Arc<BridgeState>) {
loop {
if let Err(error) = relay_external_events(&state).await {
tracing::warn!("桥接外部事件转发失败: {error}");
}
sleep(Duration::from_millis(500)).await;
}
}
async fn relay_external_events(state: &BridgeState) -> Result<()> {
let last_seq = *state
.external_event_cursor
.lock()
.expect("external event cursor poisoned");
let events = state.storage.replay_events_after(last_seq)?;
if events.is_empty() {
return Ok(());
}
let mut cursor = last_seq;
for event in events {
cursor = event.seq;
let _ = state.events_tx.send(event);
}
*state
.external_event_cursor
.lock()
.expect("external event cursor poisoned") = cursor;
Ok(())
}