use std::path::Path;
use anyhow::{Context, Result, bail};
use serde_json::{Value, json};
use super::BridgeState;
use super::helpers::{
OptionalTextUpdate, normalize_name, normalize_note, optional_string, required_string,
resolve_optional_text_update,
};
use super::timeline::{
normalize_thread, timeline_entries_from_events, timeline_entries_from_thread,
};
use crate::bridge_protocol::{
ArchiveThreadRequest, InterruptTurnRequest, ListThreadsRequest, ReadThreadRequest,
RespondPendingRequestRequest, ResumeThreadRequest, SendTurnRequest, StartThreadRequest,
UnarchiveThreadRequest, UpdateThreadRequest, UpsertWorkspaceRequest, WorkspaceRecord,
};
use crate::config::expand_path;
use crate::storage::Storage;
use crate::workspace::{
canonicalize_directory, default_display_name, resolve_workspace_path, workspace_matches,
};
pub(super) fn seed_workspaces(
storage: &Storage,
workspace_roots: &[std::path::PathBuf],
) -> Result<()> {
for root in workspace_roots {
let canonical = canonicalize_directory(root)?;
let display_name = default_display_name(&canonical);
storage.upsert_workspace(&display_name, &canonical, true)?;
}
Ok(())
}
impl BridgeState {
pub(super) async fn upsert_workspace(&self, request: UpsertWorkspaceRequest) -> Result<Value> {
let root = expand_path(Path::new(&request.root_path))?;
let canonical = canonicalize_directory(&root)?;
let display_name = if request.display_name.trim().is_empty() {
default_display_name(&canonical)
} else {
request.display_name.trim().to_string()
};
let workspace = self.storage.upsert_workspace(
&display_name,
&canonical,
request.trusted.unwrap_or(true),
)?;
let workspaces = self.storage.list_workspaces()?;
self.emit_event(
"workspace_list",
None,
None,
json!({ "workspaces": workspaces }),
)?;
Ok(json!({ "workspace": workspace }))
}
pub(super) async fn list_threads(&self, request: ListThreadsRequest) -> Result<Value> {
let runtime = self.require_runtime(request.runtime_id.as_deref()).await?;
let runtime_id = runtime.record.runtime_id.clone();
let limit = request.limit.unwrap_or(50).min(200);
let archived = request.archived.unwrap_or(false);
let workspace = request
.workspace_id
.as_deref()
.map(|workspace_id| self.require_workspace(workspace_id))
.transpose()?;
let search_term = request
.search_term
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let mut collected = Vec::new();
let mut cursor = request.cursor.clone();
let mut next_cursor = None;
loop {
let page_size = limit.min(100).max(20);
let result = runtime
.app_server
.request(
"thread/list",
json!({
"cursor": cursor,
"limit": page_size,
"archived": archived,
"searchTerm": search_term,
}),
)
.await;
let value = match result {
Ok(value) => value,
Err(error) if collected.is_empty() => {
let threads = self.storage.list_thread_index(
workspace.as_ref().map(|item| item.id.as_str()),
Some(&runtime_id),
Some(archived),
search_term,
)?;
return Ok(json!({
"runtimeId": runtime_id,
"workspaceId": request.workspace_id,
"threads": threads,
"stale": true,
"warning": error.to_string(),
}));
}
Err(error) => {
return Ok(json!({
"runtimeId": runtime_id,
"workspaceId": request.workspace_id,
"threads": collected,
"nextCursor": next_cursor,
"stale": true,
"warning": error.to_string(),
}));
}
};
let data = value
.get("data")
.and_then(Value::as_array)
.context("thread/list 返回格式不正确")?;
next_cursor = optional_string(&value, "nextCursor");
for thread_value in data {
let workspace_for_thread = self.find_workspace_for_thread(thread_value)?;
let thread = normalize_thread(
&runtime_id,
thread_value,
workspace_for_thread.as_ref(),
archived,
)?;
if let Some(target_workspace) = workspace.as_ref() {
if !workspace_matches(target_workspace, &thread.cwd) {
continue;
}
}
self.storage.upsert_thread_index(&thread)?;
collected.push(self.storage.get_thread_index(&thread.id)?.unwrap_or(thread));
if collected.len() >= limit {
break;
}
}
if collected.len() >= limit || next_cursor.is_none() {
break;
}
cursor = next_cursor.clone();
}
Ok(json!({
"runtimeId": runtime_id,
"workspaceId": request.workspace_id,
"threads": collected,
"nextCursor": next_cursor,
}))
}
pub(super) async fn start_thread(&self, request: StartThreadRequest) -> Result<Value> {
let workspace = self.require_workspace(&request.workspace_id)?;
let runtime = self.require_runtime(request.runtime_id.as_deref()).await?;
let runtime_id = runtime.record.runtime_id.clone();
let cwd = resolve_workspace_path(
Path::new(&workspace.root_path),
request.relative_path.as_deref(),
)?;
let result = runtime
.app_server
.request(
"thread/start",
json!({
"cwd": cwd.to_string_lossy(),
"model": request.model,
"experimentalRawEvents": false,
"persistExtendedHistory": true,
"ephemeral": false,
}),
)
.await?;
let thread_id = result
.get("thread")
.and_then(|thread| thread.get("id"))
.and_then(Value::as_str)
.context("thread/start 缺少 thread.id")?;
if let Some(name) = normalize_name(request.name.clone()) {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": thread_id,
"name": name,
}),
)
.await?;
}
if request.note.is_some() {
let normalized_note = normalize_note(request.note);
self.storage
.save_thread_note(thread_id, normalized_note.as_deref())?;
}
let read_result = runtime
.app_server
.request(
"thread/read",
json!({
"threadId": thread_id,
"includeTurns": true,
}),
)
.await?;
self.thread_response_payload(&runtime_id, &read_result, Some(&workspace), false)
}
pub(super) async fn read_thread(&self, request: ReadThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/read",
json!({
"threadId": request.thread_id,
"includeTurns": true,
}),
)
.await?;
let thread_value = result.get("thread").context("thread/read 缺少 thread")?;
let workspace = self.find_workspace_for_thread(thread_value)?;
self.thread_response_payload(
&indexed.runtime_id,
&result,
workspace.as_ref(),
indexed.archived,
)
}
pub(super) async fn resume_thread(&self, request: ResumeThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/resume",
json!({
"threadId": request.thread_id,
"persistExtendedHistory": true,
}),
)
.await?;
let thread_value = result.get("thread").context("thread/resume 缺少 thread")?;
let workspace = self.find_workspace_for_thread(thread_value)?;
self.thread_response_payload(&indexed.runtime_id, &result, workspace.as_ref(), false)
}
pub(super) async fn update_thread(&self, request: UpdateThreadRequest) -> Result<Value> {
let mut thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let mut changed = false;
match resolve_optional_text_update(thread.name.as_deref(), request.name.as_deref()) {
OptionalTextUpdate::Set(name) => {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": request.thread_id,
"name": name,
}),
)
.await?;
thread.name = Some(name);
changed = true;
}
OptionalTextUpdate::Clear => {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": request.thread_id,
"name": "",
}),
)
.await?;
thread.name = None;
changed = true;
}
OptionalTextUpdate::Unchanged => {}
}
if changed {
self.storage.upsert_thread_index(&thread)?;
}
match resolve_optional_text_update(thread.note.as_deref(), request.note.as_deref()) {
OptionalTextUpdate::Set(note) => {
self.storage
.save_thread_note(&request.thread_id, Some(¬e))?;
}
OptionalTextUpdate::Clear => {
self.storage.save_thread_note(&request.thread_id, None)?;
}
OptionalTextUpdate::Unchanged => {}
}
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.unwrap_or(thread);
Ok(json!({
"runtimeId": thread.runtime_id,
"workspaceId": thread.workspace_id,
"thread": thread,
"entries": [],
}))
}
pub(super) async fn archive_thread(&self, request: ArchiveThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
runtime
.app_server
.request(
"thread/archive",
json!({
"threadId": request.thread_id,
}),
)
.await?;
self.storage.set_thread_archived(&request.thread_id, true)?;
let mut thread = self
.storage
.get_thread_index(&request.thread_id)?
.unwrap_or(indexed);
thread.archived = true;
Ok(json!({
"runtimeId": thread.runtime_id,
"workspaceId": thread.workspace_id,
"thread": thread,
"entries": [],
}))
}
pub(super) async fn unarchive_thread(&self, request: UnarchiveThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/unarchive",
json!({
"threadId": request.thread_id,
}),
)
.await?;
self.storage
.set_thread_archived(&request.thread_id, false)?;
let thread_value = result
.get("thread")
.context("thread/unarchive 缺少 thread")?;
let workspace = self.find_workspace_for_thread(thread_value)?;
self.thread_response_payload(&indexed.runtime_id, &result, workspace.as_ref(), false)
}
pub(super) async fn send_turn(&self, request: SendTurnRequest) -> Result<Value> {
if request.text.trim().is_empty() {
bail!("输入内容不能为空");
}
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let workspace = self.find_workspace_by_thread_id(&request.thread_id)?;
let cwd = match (workspace, request.relative_path.as_deref()) {
(_, None) => None,
(Some(workspace), Some(relative_path)) => Some(resolve_workspace_path(
Path::new(&workspace.root_path),
Some(relative_path),
)?),
(None, Some(_)) => bail!("线程未绑定已知工作区,无法覆盖 cwd"),
};
let result = runtime
.app_server
.request(
"turn/start",
json!({
"threadId": request.thread_id,
"input": [{
"type": "text",
"text": request.text,
"text_elements": [],
}],
"cwd": cwd.map(|value| value.to_string_lossy().to_string()),
}),
)
.await?;
Ok(json!({
"runtimeId": thread.runtime_id,
"threadId": request.thread_id,
"turn": result.get("turn").cloned().unwrap_or(Value::Null),
}))
}
pub(super) async fn interrupt_turn(&self, request: InterruptTurnRequest) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
runtime
.app_server
.request(
"turn/interrupt",
json!({
"threadId": request.thread_id,
"turnId": request.turn_id,
}),
)
.await?;
Ok(json!({ "interrupted": true }))
}
pub(super) async fn respond_pending_request(
&self,
request: RespondPendingRequestRequest,
) -> Result<Value> {
let pending_request = self
.storage
.get_pending_request(&request.request_id)?
.context("待处理请求不存在或已处理")?;
let runtime = self
.require_runtime(Some(&pending_request.runtime_id))
.await?;
runtime
.app_server
.respond(pending_request.rpc_request_id.clone(), request.response)
.await?;
Ok(json!({ "submitted": true }))
}
pub(super) fn require_workspace(&self, workspace_id: &str) -> Result<WorkspaceRecord> {
self.storage
.get_workspace(workspace_id)?
.with_context(|| format!("未找到工作区: {workspace_id}"))
}
pub(super) fn find_workspace_for_thread(
&self,
thread_value: &Value,
) -> Result<Option<WorkspaceRecord>> {
let cwd = required_string(thread_value, "cwd")?;
let workspaces = self.storage.list_workspaces()?;
Ok(workspaces
.into_iter()
.find(|workspace| workspace_matches(workspace, cwd)))
}
pub(super) fn find_workspace_by_thread_id(
&self,
thread_id: &str,
) -> Result<Option<WorkspaceRecord>> {
let Some(thread) = self.storage.get_thread_index(thread_id)? else {
return Ok(None);
};
let workspaces = self.storage.list_workspaces()?;
let workspace = workspaces
.into_iter()
.find(|workspace| workspace_matches(workspace, &thread.cwd));
Ok(workspace)
}
fn thread_response_payload(
&self,
runtime_id: &str,
result: &Value,
workspace: Option<&WorkspaceRecord>,
archived: bool,
) -> Result<Value> {
let thread_value = result.get("thread").context("返回缺少 thread 字段")?;
let thread = normalize_thread(runtime_id, thread_value, workspace, archived)?;
self.storage.upsert_thread_index(&thread)?;
let thread = self.storage.get_thread_index(&thread.id)?.unwrap_or(thread);
let mut entries = timeline_entries_from_thread(runtime_id, thread_value)?;
if entries.is_empty() {
entries = timeline_entries_from_events(&self.storage.load_thread_events(&thread.id)?);
}
Ok(json!({
"runtimeId": runtime_id,
"workspaceId": workspace.map(|item| item.id.clone()),
"thread": thread,
"entries": entries,
}))
}
}