use std::fs;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result, bail};
use base64::Engine as _;
use serde_json::{Map, Value, json};
use uuid::Uuid;
use super::BridgeState;
use super::helpers::{
OptionalTextUpdate, normalize_name, normalize_note, optional_string, required_string,
resolve_optional_text_update,
};
use super::timeline::{
normalize_thread, timeline_entries_from_events, timeline_entries_from_thread,
};
use crate::bridge_protocol::{
ArchiveThreadRequest, InterruptTurnRequest, ListThreadsRequest, ReadThreadRequest,
RespondPendingRequestRequest, ResumeThreadRequest, SendTurnInputItem, SendTurnRequest,
StageInputImageRequest, StagedInputImage, StartThreadRequest, UnarchiveThreadRequest,
UpdateThreadRequest, UpsertWorkspaceRequest, WorkspaceRecord,
};
use crate::config::expand_path;
use crate::storage::Storage;
use crate::workspace::{
canonicalize_directory, default_display_name, resolve_workspace_path, workspace_matches,
};
pub(super) fn seed_workspaces(
storage: &Storage,
workspace_roots: &[std::path::PathBuf],
) -> Result<()> {
for root in workspace_roots {
let canonical = canonicalize_directory(root)?;
let display_name = default_display_name(&canonical);
storage.upsert_workspace(&display_name, &canonical, true)?;
}
Ok(())
}
impl BridgeState {
pub(super) async fn upsert_workspace(&self, request: UpsertWorkspaceRequest) -> Result<Value> {
let root = expand_path(Path::new(&request.root_path))?;
let canonical = canonicalize_directory(&root)?;
let display_name = if request.display_name.trim().is_empty() {
default_display_name(&canonical)
} else {
request.display_name.trim().to_string()
};
let workspace = self.storage.upsert_workspace(
&display_name,
&canonical,
request.trusted.unwrap_or(true),
)?;
let workspaces = self.storage.list_workspaces()?;
self.emit_event(
"workspace_list",
None,
None,
json!({ "workspaces": workspaces }),
)?;
Ok(json!({ "workspace": workspace }))
}
pub(super) async fn list_threads(&self, request: ListThreadsRequest) -> Result<Value> {
let runtime = self.require_runtime(request.runtime_id.as_deref()).await?;
let runtime_id = runtime.record.runtime_id.clone();
let limit = request.limit.unwrap_or(50).min(200);
let archived = request.archived.unwrap_or(false);
let workspace = request
.workspace_id
.as_deref()
.map(|workspace_id| self.require_workspace(workspace_id))
.transpose()?;
let search_term = request
.search_term
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let mut collected = Vec::new();
let mut cursor = request.cursor.clone();
let mut next_cursor = None;
loop {
let page_size = limit.min(100).max(20);
let result = runtime
.app_server
.request("thread/list", build_thread_list_request_payload(
cursor.as_deref(),
page_size,
archived,
search_term,
))
.await;
let value = match result {
Ok(value) => value,
Err(error) if collected.is_empty() => {
let threads = self.storage.list_thread_index(
workspace.as_ref().map(|item| item.id.as_str()),
Some(&runtime_id),
Some(archived),
search_term,
)?;
return Ok(json!({
"runtimeId": runtime_id,
"workspaceId": request.workspace_id,
"threads": threads,
"stale": true,
"warning": error.to_string(),
}));
}
Err(error) => {
return Ok(json!({
"runtimeId": runtime_id,
"workspaceId": request.workspace_id,
"threads": collected,
"nextCursor": next_cursor,
"stale": true,
"warning": error.to_string(),
}));
}
};
let data = value
.get("data")
.and_then(Value::as_array)
.context("thread/list 返回格式不正确")?;
next_cursor = optional_string(&value, "nextCursor");
for thread_value in data {
let workspace_for_thread = self.find_workspace_for_thread(thread_value)?;
let thread = normalize_thread(
&runtime_id,
thread_value,
workspace_for_thread.as_ref(),
archived,
)?;
if let Some(target_workspace) = workspace.as_ref() {
if !workspace_matches(target_workspace, &thread.cwd) {
continue;
}
}
self.storage.upsert_thread_index(&thread)?;
collected.push(self.storage.get_thread_index(&thread.id)?.unwrap_or(thread));
if collected.len() >= limit {
break;
}
}
if collected.len() >= limit || next_cursor.is_none() {
break;
}
cursor = next_cursor.clone();
}
Ok(json!({
"runtimeId": runtime_id,
"workspaceId": request.workspace_id,
"threads": collected,
"nextCursor": next_cursor,
}))
}
pub(super) async fn start_thread(&self, request: StartThreadRequest) -> Result<Value> {
let workspace = self.require_workspace(&request.workspace_id)?;
let runtime = self.require_runtime(request.runtime_id.as_deref()).await?;
let runtime_id = runtime.record.runtime_id.clone();
let cwd = resolve_workspace_path(
Path::new(&workspace.root_path),
request.relative_path.as_deref(),
)?;
let result = runtime
.app_server
.request(
"thread/start",
build_thread_start_request_payload(
cwd.to_string_lossy().as_ref(),
request.model.as_deref(),
),
)
.await?;
let thread_id = result
.get("thread")
.and_then(|thread| thread.get("id"))
.and_then(Value::as_str)
.context("thread/start 缺少 thread.id")?;
if let Some(name) = normalize_name(request.name.clone()) {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": thread_id,
"name": name,
}),
)
.await?;
}
if request.note.is_some() {
let normalized_note = normalize_note(request.note);
self.storage
.save_thread_note(thread_id, normalized_note.as_deref())?;
}
let read_result = runtime
.app_server
.request(
"thread/read",
json!({
"threadId": thread_id,
"includeTurns": true,
}),
)
.await?;
self.thread_response_payload(&runtime_id, &read_result, Some(&workspace), false)
}
pub(super) async fn read_thread(&self, request: ReadThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/read",
json!({
"threadId": request.thread_id,
"includeTurns": true,
}),
)
.await?;
let thread_value = result.get("thread").context("thread/read 缺少 thread")?;
let workspace = self.find_workspace_for_thread(thread_value)?;
self.thread_response_payload(
&indexed.runtime_id,
&result,
workspace.as_ref(),
indexed.archived,
)
}
pub(super) async fn resume_thread(&self, request: ResumeThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/resume",
json!({
"threadId": request.thread_id,
"persistExtendedHistory": true,
}),
)
.await?;
let thread_value = result.get("thread").context("thread/resume 缺少 thread")?;
let workspace = self.find_workspace_for_thread(thread_value)?;
self.thread_response_payload(&indexed.runtime_id, &result, workspace.as_ref(), false)
}
pub(super) async fn update_thread(&self, request: UpdateThreadRequest) -> Result<Value> {
let mut thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let mut changed = false;
match resolve_optional_text_update(thread.name.as_deref(), request.name.as_deref()) {
OptionalTextUpdate::Set(name) => {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": request.thread_id,
"name": name,
}),
)
.await?;
thread.name = Some(name);
changed = true;
}
OptionalTextUpdate::Clear => {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": request.thread_id,
"name": "",
}),
)
.await?;
thread.name = None;
changed = true;
}
OptionalTextUpdate::Unchanged => {}
}
if changed {
self.storage.upsert_thread_index(&thread)?;
}
match resolve_optional_text_update(thread.note.as_deref(), request.note.as_deref()) {
OptionalTextUpdate::Set(note) => {
self.storage
.save_thread_note(&request.thread_id, Some(¬e))?;
}
OptionalTextUpdate::Clear => {
self.storage.save_thread_note(&request.thread_id, None)?;
}
OptionalTextUpdate::Unchanged => {}
}
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.unwrap_or(thread);
Ok(json!({
"runtimeId": thread.runtime_id,
"workspaceId": thread.workspace_id,
"thread": thread,
"entries": [],
}))
}
pub(super) async fn archive_thread(&self, request: ArchiveThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
runtime
.app_server
.request(
"thread/archive",
json!({
"threadId": request.thread_id,
}),
)
.await?;
self.storage.set_thread_archived(&request.thread_id, true)?;
let mut thread = self
.storage
.get_thread_index(&request.thread_id)?
.unwrap_or(indexed);
thread.archived = true;
Ok(json!({
"runtimeId": thread.runtime_id,
"workspaceId": thread.workspace_id,
"thread": thread,
"entries": [],
}))
}
pub(super) async fn unarchive_thread(&self, request: UnarchiveThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/unarchive",
json!({
"threadId": request.thread_id,
}),
)
.await?;
self.storage
.set_thread_archived(&request.thread_id, false)?;
let thread_value = result
.get("thread")
.context("thread/unarchive 缺少 thread")?;
let workspace = self.find_workspace_for_thread(thread_value)?;
self.thread_response_payload(&indexed.runtime_id, &result, workspace.as_ref(), false)
}
pub(super) async fn stage_input_image(&self, request: StageInputImageRequest) -> Result<Value> {
let mime_type = normalize_optional_trimmed(request.mime_type.as_deref());
if mime_type
.as_deref()
.is_some_and(|value| !value.starts_with("image/"))
{
bail!("仅支持图片附件");
}
let bytes = base64::engine::general_purpose::STANDARD
.decode(request.base64_data.trim())
.context("图片 base64 解码失败")?;
if bytes.is_empty() {
bail!("图片内容不能为空");
}
let extension = infer_image_extension(request.file_name.as_deref(), mime_type.as_deref())
.unwrap_or("bin");
let display_name = request
.file_name
.as_deref()
.and_then(sanitize_file_name)
.or_else(|| Some(format!("image-{extension}")));
let staged_path = self.staging_root().join(format!(
"{}-{}.{}",
crate::bridge_protocol::now_millis(),
Uuid::new_v4(),
extension
));
fs::write(&staged_path, &bytes)
.with_context(|| format!("写入暂存图片失败: {}", staged_path.display()))?;
let image = StagedInputImage {
local_path: staged_path.to_string_lossy().to_string(),
display_name,
mime_type,
size_bytes: bytes.len() as i64,
};
Ok(json!({ "image": image }))
}
pub(super) async fn send_turn(&self, request: SendTurnRequest) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let (input_items, staged_paths) = build_turn_input_items(&request, self.staging_root())?;
let workspace = self.find_workspace_by_thread_id(&request.thread_id)?;
let cwd = match (workspace, request.relative_path.as_deref()) {
(_, None) => None,
(Some(workspace), Some(relative_path)) => Some(resolve_workspace_path(
Path::new(&workspace.root_path),
Some(relative_path),
)?),
(None, Some(_)) => bail!("线程未绑定已知工作区,无法覆盖 cwd"),
};
let result = runtime
.app_server
.request(
"turn/start",
build_turn_start_request_payload(
&request.thread_id,
input_items,
cwd.as_ref().map(|value| value.to_string_lossy().to_string()),
),
)
.await;
let result = match result {
Ok(result) => result,
Err(error) => {
let _ = self.cleanup_staged_paths(staged_paths);
return Err(error);
}
};
let turn_id = result
.get("turn")
.and_then(|turn| turn.get("id"))
.and_then(Value::as_str)
.context("turn/start 缺少 turn.id");
let turn_id = match turn_id {
Ok(turn_id) => turn_id.to_string(),
Err(error) => {
let _ = self.cleanup_staged_paths(staged_paths);
return Err(error);
}
};
self.register_staged_turn_inputs(&turn_id, staged_paths);
Ok(json!({
"runtimeId": thread.runtime_id,
"threadId": request.thread_id,
"turn": result.get("turn").cloned().unwrap_or(Value::Null),
}))
}
pub(super) async fn interrupt_turn(&self, request: InterruptTurnRequest) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
runtime
.app_server
.request(
"turn/interrupt",
build_turn_interrupt_request_payload(
&request.thread_id,
Some(request.turn_id.as_str()),
),
)
.await?;
Ok(json!({ "interrupted": true }))
}
pub(super) async fn respond_pending_request(
&self,
request: RespondPendingRequestRequest,
) -> Result<Value> {
let pending_request = self
.storage
.get_pending_request(&request.request_id)?
.context("待处理请求不存在或已处理")?;
let runtime = self
.require_runtime(Some(&pending_request.runtime_id))
.await?;
runtime
.app_server
.respond(pending_request.rpc_request_id.clone(), request.response)
.await?;
Ok(json!({ "submitted": true }))
}
pub(super) fn require_workspace(&self, workspace_id: &str) -> Result<WorkspaceRecord> {
self.storage
.get_workspace(workspace_id)?
.with_context(|| format!("未找到工作区: {workspace_id}"))
}
pub(super) fn find_workspace_for_thread(
&self,
thread_value: &Value,
) -> Result<Option<WorkspaceRecord>> {
let cwd = required_string(thread_value, "cwd")?;
let workspaces = self.storage.list_workspaces()?;
Ok(workspaces
.into_iter()
.find(|workspace| workspace_matches(workspace, cwd)))
}
pub(super) fn find_workspace_by_thread_id(
&self,
thread_id: &str,
) -> Result<Option<WorkspaceRecord>> {
let Some(thread) = self.storage.get_thread_index(thread_id)? else {
return Ok(None);
};
let workspaces = self.storage.list_workspaces()?;
let workspace = workspaces
.into_iter()
.find(|workspace| workspace_matches(workspace, &thread.cwd));
Ok(workspace)
}
fn thread_response_payload(
&self,
runtime_id: &str,
result: &Value,
workspace: Option<&WorkspaceRecord>,
archived: bool,
) -> Result<Value> {
let thread_value = result.get("thread").context("返回缺少 thread 字段")?;
let thread = normalize_thread(runtime_id, thread_value, workspace, archived)?;
self.storage.upsert_thread_index(&thread)?;
let thread = self.storage.get_thread_index(&thread.id)?.unwrap_or(thread);
let mut entries = timeline_entries_from_thread(runtime_id, thread_value)?;
if entries.is_empty() {
entries = timeline_entries_from_events(&self.storage.load_thread_events(&thread.id)?);
}
Ok(json!({
"runtimeId": runtime_id,
"workspaceId": workspace.map(|item| item.id.clone()),
"thread": thread,
"entries": entries,
}))
}
}
fn build_thread_list_request_payload(
cursor: Option<&str>,
limit: usize,
archived: bool,
search_term: Option<&str>,
) -> Value {
let mut payload = Map::new();
payload.insert("limit".to_string(), json!(limit));
payload.insert("archived".to_string(), json!(archived));
if let Some(cursor) = cursor {
payload.insert("cursor".to_string(), json!(cursor));
}
if let Some(search_term) = search_term {
payload.insert("searchTerm".to_string(), json!(search_term));
}
Value::Object(payload)
}
fn build_thread_start_request_payload(cwd: &str, model: Option<&str>) -> Value {
let mut payload = Map::new();
payload.insert("cwd".to_string(), json!(cwd));
payload.insert("experimentalRawEvents".to_string(), json!(false));
payload.insert("persistExtendedHistory".to_string(), json!(true));
payload.insert("ephemeral".to_string(), json!(false));
if let Some(model) = model.filter(|value| !value.trim().is_empty()) {
payload.insert("model".to_string(), json!(model));
}
Value::Object(payload)
}
fn build_turn_start_request_payload(
thread_id: &str,
input_items: Vec<Value>,
cwd: Option<String>,
) -> Value {
let mut payload = Map::new();
payload.insert("threadId".to_string(), json!(thread_id));
payload.insert("input".to_string(), json!(input_items));
if let Some(cwd) = cwd {
payload.insert("cwd".to_string(), json!(cwd));
}
Value::Object(payload)
}
fn build_turn_interrupt_request_payload(thread_id: &str, turn_id: Option<&str>) -> Value {
let mut payload = Map::new();
payload.insert("threadId".to_string(), json!(thread_id));
if let Some(turn_id) = turn_id {
payload.insert("turnId".to_string(), json!(turn_id));
}
Value::Object(payload)
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::build_thread_list_request_payload;
#[test]
fn thread_list_request_payload_omits_absent_optional_strings() {
let payload = build_thread_list_request_payload(None, 50, false, None);
assert_eq!(
payload,
json!({
"limit": 50,
"archived": false,
}),
);
}
#[test]
fn thread_list_request_payload_keeps_present_optional_strings() {
let payload = build_thread_list_request_payload(Some("cursor-1"), 20, true, Some("abc"));
assert_eq!(
payload,
json!({
"cursor": "cursor-1",
"limit": 20,
"archived": true,
"searchTerm": "abc",
}),
);
}
}
pub(super) fn build_turn_input_items(
request: &SendTurnRequest,
staging_root: &Path,
) -> Result<(Vec<Value>, Vec<PathBuf>)> {
let mut input_items = Vec::new();
let mut staged_paths = Vec::new();
if let Some(items) = request.input_items.as_ref() {
for item in items {
match item {
SendTurnInputItem::Text { text } => {
if text.trim().is_empty() {
continue;
}
input_items.push(text_input_item(text));
}
SendTurnInputItem::LocalImage { path } => {
let path = validate_staged_image_path(path, staging_root)?;
staged_paths.push(path.clone());
input_items.push(json!({
"type": "localImage",
"path": path.to_string_lossy().to_string(),
}));
}
}
}
}
if input_items.is_empty() && !request.text.trim().is_empty() {
input_items.push(text_input_item(&request.text));
}
if input_items.is_empty() {
bail!("输入内容不能为空");
}
Ok((input_items, staged_paths))
}
fn text_input_item(text: &str) -> Value {
json!({
"type": "text",
"text": text,
"text_elements": [],
})
}
fn validate_staged_image_path(path: &str, staging_root: &Path) -> Result<PathBuf> {
let candidate = PathBuf::from(path);
if !candidate.is_absolute() {
bail!("图片路径必须为绝对路径");
}
if !candidate.starts_with(staging_root) {
bail!("图片路径不属于 bridge staging 目录");
}
let metadata = fs::metadata(&candidate)
.with_context(|| format!("图片暂存文件不存在: {}", candidate.display()))?;
if !metadata.is_file() {
bail!("图片暂存路径无效: {}", candidate.display());
}
Ok(candidate)
}
fn infer_image_extension(file_name: Option<&str>, mime_type: Option<&str>) -> Option<&'static str> {
file_name
.and_then(|value| Path::new(value).extension().and_then(|ext| ext.to_str()))
.and_then(normalize_extension)
.or_else(|| {
mime_type.and_then(|value| match value.trim().to_ascii_lowercase().as_str() {
"image/png" => Some("png"),
"image/jpeg" | "image/jpg" => Some("jpg"),
"image/webp" => Some("webp"),
"image/gif" => Some("gif"),
"image/bmp" => Some("bmp"),
"image/heic" => Some("heic"),
"image/heif" => Some("heif"),
_ => None,
})
})
}
fn normalize_extension(extension: &str) -> Option<&'static str> {
match extension
.trim()
.trim_start_matches('.')
.to_ascii_lowercase()
.as_str()
{
"png" => Some("png"),
"jpg" | "jpeg" => Some("jpg"),
"webp" => Some("webp"),
"gif" => Some("gif"),
"bmp" => Some("bmp"),
"heic" => Some("heic"),
"heif" => Some("heif"),
_ => None,
}
}
fn sanitize_file_name(file_name: &str) -> Option<String> {
let file_name = Path::new(file_name)
.file_name()?
.to_string_lossy()
.trim()
.to_string();
(!file_name.is_empty()).then_some(file_name)
}
fn normalize_optional_trimmed(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
}