use super::*;
impl BridgeState {
pub(in super::super) async fn list_threads(
&self,
request: ListThreadsRequest,
) -> Result<Value> {
let runtime = self.require_runtime(request.runtime_id.as_deref()).await?;
let runtime_id = runtime.record.runtime_id.clone();
let limit = request.limit.unwrap_or(50).min(200);
let archived = request.archived.unwrap_or(false);
let directory_prefix = request
.directory_prefix
.as_deref()
.map(|path| expand_path(Path::new(path)))
.transpose()?
.map(|path| normalize_absolute_directory(&path))
.transpose()?;
let directory_prefix_string = directory_prefix
.as_ref()
.map(|path| path.to_string_lossy().to_string());
let search_term = request
.search_term
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty());
let mut collected = Vec::new();
let mut cursor = request.cursor.clone();
let mut next_cursor = None;
loop {
let page_size = limit.min(100).max(20);
let result = runtime
.app_server
.request(
"thread/list",
request_payloads::build_thread_list_request_payload(
cursor.as_deref(),
page_size,
archived,
search_term,
),
)
.await;
let value = match result {
Ok(value) => value,
Err(error) if collected.is_empty() => {
let threads = self.storage.list_thread_index(
directory_prefix_string.as_deref(),
Some(&runtime_id),
Some(archived),
search_term,
)?;
return Ok(json!({
"runtimeId": runtime_id,
"directoryPrefix": directory_prefix_string,
"threads": threads,
"stale": true,
"warning": error.to_string(),
}));
}
Err(error) => {
return Ok(json!({
"runtimeId": runtime_id,
"directoryPrefix": directory_prefix_string,
"threads": collected,
"nextCursor": next_cursor,
"stale": true,
"warning": error.to_string(),
}));
}
};
let data = value
.get("data")
.and_then(Value::as_array)
.context("thread/list 返回格式不正确")?;
next_cursor = optional_string(&value, "nextCursor");
for thread_value in data {
let thread = normalize_thread(&runtime_id, thread_value, archived)?;
if let Some(directory_prefix) = directory_prefix.as_ref() {
if !directory_contains(directory_prefix, Path::new(&thread.cwd)) {
continue;
}
}
self.storage.upsert_thread_index(&thread)?;
collected.push(self.storage.get_thread_index(&thread.id)?.unwrap_or(thread));
if collected.len() >= limit {
break;
}
}
if collected.len() >= limit || next_cursor.is_none() {
break;
}
cursor = next_cursor.clone();
}
Ok(json!({
"runtimeId": runtime_id,
"directoryPrefix": directory_prefix_string,
"threads": collected,
"nextCursor": next_cursor,
}))
}
pub(in super::super) async fn start_thread(
&self,
request: StartThreadRequest,
) -> Result<Value> {
let runtime = self.require_runtime(request.runtime_id.as_deref()).await?;
let runtime_id = runtime.record.runtime_id.clone();
let cwd = canonicalize_directory(&expand_path(Path::new(&request.cwd))?)?;
let result = runtime
.app_server
.request(
"thread/start",
request_payloads::build_thread_start_request_payload(
cwd.to_string_lossy().as_ref(),
request.model.as_deref(),
),
)
.await?;
let thread_id = result
.get("thread")
.and_then(|thread| thread.get("id"))
.and_then(Value::as_str)
.context("thread/start 缺少 thread.id")?;
if let Some(name) = normalize_name(request.name.clone()) {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": thread_id,
"name": name,
}),
)
.await?;
}
if request.note.is_some() {
let normalized_note = normalize_note(request.note);
self.storage
.save_thread_note(thread_id, normalized_note.as_deref())?;
}
let read_result = runtime
.app_server
.request(
"thread/read",
json!({
"threadId": thread_id,
"includeTurns": true,
}),
)
.await?;
self.thread_response_payload(&runtime_id, &read_result, false)
}
pub(in super::super) async fn read_thread(&self, request: ReadThreadRequest) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/read",
json!({
"threadId": request.thread_id,
"includeTurns": true,
}),
)
.await?;
self.thread_response_payload(&indexed.runtime_id, &result, indexed.archived)
}
pub(in super::super) async fn resume_thread(
&self,
request: ResumeThreadRequest,
) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/resume",
json!({
"threadId": request.thread_id,
"persistExtendedHistory": true,
}),
)
.await?;
self.thread_response_payload(&indexed.runtime_id, &result, false)
}
pub(in super::super) async fn update_thread(
&self,
request: UpdateThreadRequest,
) -> Result<Value> {
let mut thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let mut changed = false;
match resolve_optional_text_update(thread.name.as_deref(), request.name.as_deref()) {
OptionalTextUpdate::Set(name) => {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": request.thread_id,
"name": name,
}),
)
.await?;
thread.name = Some(name);
changed = true;
}
OptionalTextUpdate::Clear => {
runtime
.app_server
.request(
"thread/name/set",
json!({
"threadId": request.thread_id,
"name": "",
}),
)
.await?;
thread.name = None;
changed = true;
}
OptionalTextUpdate::Unchanged => {}
}
if changed {
self.storage.upsert_thread_index(&thread)?;
}
match resolve_optional_text_update(thread.note.as_deref(), request.note.as_deref()) {
OptionalTextUpdate::Set(note) => {
self.storage
.save_thread_note(&request.thread_id, Some(¬e))?;
}
OptionalTextUpdate::Clear => {
self.storage.save_thread_note(&request.thread_id, None)?;
}
OptionalTextUpdate::Unchanged => {}
}
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.unwrap_or(thread);
Ok(json!({
"runtimeId": thread.runtime_id,
"thread": thread,
"entries": [],
}))
}
pub(in super::super) async fn archive_thread(
&self,
request: ArchiveThreadRequest,
) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
runtime
.app_server
.request(
"thread/archive",
json!({
"threadId": request.thread_id,
}),
)
.await?;
self.storage.set_thread_archived(&request.thread_id, true)?;
let mut thread = self
.storage
.get_thread_index(&request.thread_id)?
.unwrap_or(indexed);
thread.archived = true;
Ok(json!({
"runtimeId": thread.runtime_id,
"thread": thread,
"entries": [],
}))
}
pub(in super::super) async fn unarchive_thread(
&self,
request: UnarchiveThreadRequest,
) -> Result<Value> {
let indexed = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&indexed.runtime_id)).await?;
let result = runtime
.app_server
.request(
"thread/unarchive",
json!({
"threadId": request.thread_id,
}),
)
.await?;
self.storage
.set_thread_archived(&request.thread_id, false)?;
self.thread_response_payload(&indexed.runtime_id, &result, false)
}
pub(in super::super) async fn stage_input_image(
&self,
request: StageInputImageRequest,
) -> Result<Value> {
let mime_type = turn_inputs::normalize_optional_trimmed(request.mime_type.as_deref());
if mime_type
.as_deref()
.is_some_and(|value| !value.starts_with("image/"))
{
bail!("仅支持图片附件");
}
let bytes = base64::engine::general_purpose::STANDARD
.decode(request.base64_data.trim())
.context("图片 base64 解码失败")?;
if bytes.is_empty() {
bail!("图片内容不能为空");
}
let extension =
turn_inputs::infer_image_extension(request.file_name.as_deref(), mime_type.as_deref())
.unwrap_or("bin");
let display_name = request
.file_name
.as_deref()
.and_then(turn_inputs::sanitize_file_name)
.or_else(|| Some(format!("image-{extension}")));
let staged_path = self.staging_root().join(format!(
"{}-{}.{}",
crate::bridge_protocol::now_millis(),
Uuid::new_v4(),
extension
));
fs::write(&staged_path, &bytes)
.with_context(|| format!("写入暂存图片失败: {}", staged_path.display()))?;
let image = StagedInputImage {
local_path: staged_path.to_string_lossy().to_string(),
display_name,
mime_type,
size_bytes: bytes.len() as i64,
};
Ok(json!({ "image": image }))
}
pub(in super::super) async fn send_turn(&self, request: SendTurnRequest) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let (input_items, staged_paths) =
turn_inputs::build_turn_input_items(&request, self.staging_root())?;
let result = runtime
.app_server
.request(
"turn/start",
request_payloads::build_turn_start_request_payload(
&request.thread_id,
input_items,
None,
),
)
.await;
let result = match result {
Ok(result) => result,
Err(error) => {
let _ = self.cleanup_staged_paths(staged_paths);
return Err(error);
}
};
let turn_id = result
.get("turn")
.and_then(|turn| turn.get("id"))
.and_then(Value::as_str)
.context("turn/start 缺少 turn.id");
let turn_id = match turn_id {
Ok(turn_id) => turn_id.to_string(),
Err(error) => {
let _ = self.cleanup_staged_paths(staged_paths);
return Err(error);
}
};
self.register_staged_turn_inputs(&turn_id, staged_paths);
Ok(json!({
"runtimeId": thread.runtime_id,
"threadId": request.thread_id,
"turn": result.get("turn").cloned().unwrap_or(Value::Null),
}))
}
pub(in super::super) async fn interrupt_turn(
&self,
request: InterruptTurnRequest,
) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
runtime
.app_server
.request(
"turn/interrupt",
request_payloads::build_turn_interrupt_request_payload(
&request.thread_id,
Some(request.turn_id.as_str()),
),
)
.await?;
Ok(json!({ "interrupted": true }))
}
pub(in super::super) async fn respond_pending_request(
&self,
request: RespondPendingRequestRequest,
) -> Result<Value> {
let pending_request = self
.storage
.get_pending_request(&request.request_id)?
.context("待处理请求不存在或已处理")?;
let runtime = self
.require_runtime(Some(&pending_request.runtime_id))
.await?;
runtime
.app_server
.respond(pending_request.rpc_request_id.clone(), request.response)
.await?;
Ok(json!({ "submitted": true }))
}
fn thread_response_payload(
&self,
runtime_id: &str,
result: &Value,
archived: bool,
) -> Result<Value> {
let thread_value = result.get("thread").context("返回缺少 thread 字段")?;
let thread = normalize_thread(runtime_id, thread_value, archived)?;
self.storage.upsert_thread_index(&thread)?;
let _ = self.storage.record_directory_usage(Path::new(&thread.cwd));
let _ = self.emit_directory_state();
let thread = self.storage.get_thread_index(&thread.id)?.unwrap_or(thread);
let mut entries = timeline_entries_from_thread(runtime_id, thread_value)?;
if entries.is_empty() {
entries = timeline_entries_from_events(&self.storage.load_thread_events(&thread.id)?);
}
Ok(json!({
"runtimeId": runtime_id,
"thread": thread,
"entries": entries,
}))
}
}