use std::sync::Arc;
use anyhow::{Context, Result, bail};
use serde_json::{Value, json};
use uuid::Uuid;
use super::request_payloads;
use super::*;
use crate::bridge_protocol::{
QueueDispatchMode, QueueMessageStatus, QueuedThreadMessageRecord, now_millis,
};
use crate::state::BridgeState;
const QUEUE_SETTLE_DELAY_MS: u64 = 350;
impl BridgeState {
pub(in super::super) async fn enqueue_thread_message(
&self,
request: EnqueueThreadMessageRequest,
) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
let input_items = normalize_queue_input_items(&request.text, request.input_items)?;
let record = QueuedThreadMessageRecord {
queue_id: Uuid::new_v4().to_string(),
runtime_id: thread.runtime_id.clone(),
thread_id: thread.id.clone(),
position: self.storage.next_thread_queue_position(&thread.id)?,
dispatch_mode: request.dispatch_mode,
status: QueueMessageStatus::Queued,
draft_text: request.text,
draft_images: request.draft_images,
image_send_mode: request.image_send_mode,
cwd: request.cwd,
armed_turn_id: request.armed_turn_id.clone(),
failure_message: None,
reserved_by_device_id: None,
created_at_ms: now_millis(),
updated_at_ms: now_millis(),
};
self.storage
.insert_queued_thread_message(&record, &input_items)?;
self.emit_thread_queue_event("thread/queueUpdated", &thread.id)?;
if matches!(record.dispatch_mode, QueueDispatchMode::InterruptAndSend) {
if let Some(turn_id) = request.armed_turn_id.as_deref() {
if let Err(error) = self
.interrupt_thread_turn(&thread.runtime_id, &thread.id, turn_id)
.await
{
let failure = format!("打断当前 turn 失败: {error}");
self.storage
.mark_queued_thread_message_failed(&record.queue_id, &failure)?;
self.emit_thread_queue_notice(
"thread/queueSendFailed",
&thread.runtime_id,
&thread.id,
&record.queue_id,
Some(failure.as_str()),
)?;
return self.thread_queue_payload_json(&thread.runtime_id, &thread.id);
}
} else {
self.process_thread_queue(&thread.id, None).await?;
}
} else {
self.process_thread_queue(&thread.id, None).await?;
}
self.thread_queue_payload_json(&thread.runtime_id, &thread.id)
}
pub(in super::super) async fn update_queued_thread_message(
&self,
request: UpdateQueuedThreadMessageRequest,
) -> Result<Value> {
let existing = self
.storage
.get_stored_queued_thread_message(&request.queue_id)?
.context("队列消息不存在")?;
let mut record = existing.record;
let input_items = normalize_queue_input_items(&request.text, request.input_items)?;
record.draft_text = request.text;
record.draft_images = request.draft_images;
record.image_send_mode = request.image_send_mode;
record.cwd = request.cwd;
record.status = QueueMessageStatus::Queued;
record.failure_message = None;
record.reserved_by_device_id = None;
record.updated_at_ms = now_millis();
self.storage
.update_queued_thread_message(&record, &input_items)?;
self.emit_thread_queue_event("thread/queueUpdated", &record.thread_id)?;
self.process_thread_queue(&record.thread_id, None).await?;
self.thread_queue_payload_json(&record.runtime_id, &record.thread_id)
}
pub(in super::super) async fn delete_queued_thread_message(
&self,
request: DeleteQueuedThreadMessageRequest,
) -> Result<Value> {
let deleted = self
.storage
.delete_queued_thread_message(&request.queue_id)?
.context("队列消息不存在")?;
self.emit_thread_queue_event("thread/queueUpdated", &deleted.thread_id)?;
self.process_thread_queue(&deleted.thread_id, None).await?;
self.thread_queue_payload_json(&deleted.runtime_id, &deleted.thread_id)
}
pub(in super::super) async fn clear_thread_queue(
&self,
request: ClearThreadQueueRequest,
) -> Result<Value> {
let thread = self
.storage
.get_thread_index(&request.thread_id)?
.context("线程不存在或尚未缓存")?;
self.storage.clear_thread_queue(&thread.id)?;
self.emit_thread_queue_event("thread/queueUpdated", &thread.id)?;
self.thread_queue_payload_json(&thread.runtime_id, &thread.id)
}
pub(in super::super) async fn reserve_queued_thread_message_for_edit(
&self,
request: ReserveQueuedThreadMessageForEditRequest,
device_id: &str,
) -> Result<Value> {
let record = self
.storage
.reserve_queued_thread_message(&request.queue_id, device_id)?
.context("队列消息不存在或不可编辑")?;
let queued_messages = self.storage.list_queued_thread_messages(&record.thread_id)?;
self.emit_thread_queue_edit_event(
"thread/queueEditReserved",
&record.runtime_id,
&record.thread_id,
Some(record.clone()),
)?;
Ok(json!({
"runtimeId": record.runtime_id,
"threadId": record.thread_id,
"queuedMessages": queued_messages,
"message": record,
}))
}
pub(in super::super) async fn cancel_queued_thread_message_edit(
&self,
request: CancelQueuedThreadMessageEditRequest,
) -> Result<Value> {
let record = self
.storage
.cancel_queued_thread_message_edit(&request.queue_id)?
.context("队列消息不存在或未处于编辑中")?;
self.emit_thread_queue_edit_event(
"thread/queueEditReleased",
&record.runtime_id,
&record.thread_id,
None,
)?;
self.process_thread_queue(&record.thread_id, None).await?;
self.thread_queue_payload_json(&record.runtime_id, &record.thread_id)
}
pub(super) async fn process_thread_queue(
&self,
thread_id: &str,
completed_turn_id: Option<&str>,
) -> Result<()> {
let Some(head) = self
.storage
.list_queued_thread_messages(thread_id)?
.into_iter()
.next()
else {
return Ok(());
};
if !matches!(head.status, QueueMessageStatus::Queued) {
return Ok(());
}
if !self.queue_head_ready(&head, completed_turn_id)? {
return Ok(());
}
let Some(stored) = self
.storage
.try_mark_queued_thread_message_sending(&head.queue_id)?
else {
return Ok(());
};
match self
.send_prepared_thread_message(&stored.record, stored.input_items)
.await
{
Ok(_) => {
let _ = self.storage.delete_queued_thread_message(&stored.record.queue_id)?;
self.emit_thread_queue_notice(
"thread/queueAutoSent",
&stored.record.runtime_id,
&stored.record.thread_id,
&stored.record.queue_id,
Some("已自动发送队列中的下一条消息"),
)?;
self.emit_thread_queue_event("thread/queueUpdated", &stored.record.thread_id)?;
}
Err(error) => {
let failure = format!("自动发送失败: {error}");
self.storage
.mark_queued_thread_message_failed(&stored.record.queue_id, &failure)?;
self.emit_thread_queue_notice(
"thread/queueSendFailed",
&stored.record.runtime_id,
&stored.record.thread_id,
&stored.record.queue_id,
Some(failure.as_str()),
)?;
self.emit_thread_queue_event("thread/queueUpdated", &stored.record.thread_id)?;
}
}
Ok(())
}
pub(in super::super) fn schedule_thread_queue_recheck(
self: &Arc<Self>,
thread_id: String,
completed_turn_id: Option<String>,
) {
let state = Arc::clone(self);
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(QUEUE_SETTLE_DELAY_MS)).await;
let _ = state
.process_thread_queue(&thread_id, completed_turn_id.as_deref())
.await;
});
}
pub(crate) async fn release_reserved_queue_messages_for_device(
&self,
device_id: &str,
) -> Result<()> {
let released = self
.storage
.release_reserved_queue_messages_for_device(device_id)?;
for record in released {
self.emit_thread_queue_edit_event(
"thread/queueEditReleased",
&record.runtime_id,
&record.thread_id,
None,
)?;
self.emit_thread_queue_event("thread/queueUpdated", &record.thread_id)?;
self.process_thread_queue(&record.thread_id, None).await?;
}
Ok(())
}
pub(super) fn thread_queue_payload_json(
&self,
runtime_id: &str,
thread_id: &str,
) -> Result<Value> {
Ok(json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"queuedMessages": self.storage.list_queued_thread_messages(thread_id)?,
}))
}
fn emit_thread_queue_event(&self, event_type: &str, thread_id: &str) -> Result<()> {
let queued_messages = self.storage.list_queued_thread_messages(thread_id)?;
let runtime_id = queued_messages
.first()
.map(|record| record.runtime_id.clone())
.or_else(|| {
self.storage
.get_thread_index(thread_id)
.ok()
.flatten()
.map(|thread| thread.runtime_id)
})
.unwrap_or_else(|| "primary".to_string());
self.emit_event(
event_type,
Some(&runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"queuedMessages": queued_messages,
}),
)
}
fn emit_thread_queue_edit_event(
&self,
event_type: &str,
runtime_id: &str,
thread_id: &str,
message: Option<QueuedThreadMessageRecord>,
) -> Result<()> {
self.emit_event(
event_type,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"queuedMessages": self.storage.list_queued_thread_messages(thread_id)?,
"message": message,
}),
)
}
fn emit_thread_queue_notice(
&self,
event_type: &str,
runtime_id: &str,
thread_id: &str,
queue_id: &str,
notice: Option<&str>,
) -> Result<()> {
self.emit_event(
event_type,
Some(runtime_id),
Some(thread_id),
json!({
"runtimeId": runtime_id,
"threadId": thread_id,
"queueId": queue_id,
"notice": notice,
"queuedMessages": self.storage.list_queued_thread_messages(thread_id)?,
}),
)
}
fn queue_head_ready(
&self,
head: &QueuedThreadMessageRecord,
completed_turn_id: Option<&str>,
) -> Result<bool> {
if self
.storage
.get_thread_index(&head.thread_id)?
.is_some_and(|thread| thread.is_active || thread.status == "active")
{
return Ok(false);
}
Ok(match head.dispatch_mode {
QueueDispatchMode::AfterThreadIdle | QueueDispatchMode::InterruptAndSend => true,
QueueDispatchMode::AfterNextResultBatch => head
.armed_turn_id
.as_deref()
.zip(completed_turn_id)
.map(|(armed, completed)| armed == completed)
.unwrap_or(true),
})
}
async fn send_prepared_thread_message(
&self,
record: &QueuedThreadMessageRecord,
input_items: Vec<Value>,
) -> Result<()> {
let thread = self
.storage
.get_thread_index(&record.thread_id)?
.context("线程不存在或尚未缓存")?;
let runtime = self.require_runtime(Some(&thread.runtime_id)).await?;
let cwd = if let Some(value) = record.cwd.as_deref() {
let expanded = expand_path(std::path::Path::new(value))?;
let canonical = canonicalize_directory(&expanded)?;
Some(canonical.to_string_lossy().to_string())
} else {
None
};
runtime
.app_server
.request(
"turn/start",
request_payloads::build_turn_start_request_payload(&record.thread_id, input_items, cwd),
)
.await?;
Ok(())
}
async fn interrupt_thread_turn(
&self,
runtime_id: &str,
thread_id: &str,
turn_id: &str,
) -> Result<()> {
let runtime = self.require_runtime(Some(runtime_id)).await?;
runtime
.app_server
.request(
"turn/interrupt",
request_payloads::build_turn_interrupt_request_payload(thread_id, Some(turn_id)),
)
.await?;
Ok(())
}
}
fn normalize_queue_input_items(
text: &str,
input_items: Option<Vec<SendTurnInputItem>>,
) -> Result<Vec<Value>> {
let mut normalized = Vec::new();
if let Some(items) = input_items {
for item in items {
match item {
SendTurnInputItem::Text { text } if !text.trim().is_empty() => {
normalized.push(json!({
"type": "text",
"text": text,
"text_elements": [],
}));
}
SendTurnInputItem::Image { url } if !url.trim().is_empty() => {
normalized.push(json!({
"type": "image",
"url": url,
}));
}
SendTurnInputItem::LocalImage { .. } => {
bail!("消息队列暂不支持本地图片占位输入");
}
_ => {}
}
}
}
if normalized.is_empty() && !text.trim().is_empty() {
normalized.push(json!({
"type": "text",
"text": text,
"text_elements": [],
}));
}
if normalized.is_empty() {
bail!("输入内容不能为空");
}
Ok(normalized)
}