use rusqlite::{OptionalExtension, params};
use serde::Serialize;
use serde_json::Value;
use super::Storage;
use super::decode::decode_json_row;
use crate::bridge_protocol::{QueueMessageStatus, QueuedThreadMessageRecord, now_millis};
#[derive(Debug, Clone)]
pub struct StoredQueuedThreadMessage {
pub record: QueuedThreadMessageRecord,
pub input_items: Vec<Value>,
}
impl Storage {
pub fn insert_queued_thread_message(
&self,
record: &QueuedThreadMessageRecord,
input_items: &[Value],
) -> anyhow::Result<()> {
let conn = self.connect()?;
conn.execute(
"INSERT INTO queued_thread_messages (
queue_id, runtime_id, thread_id, position, dispatch_mode, status, draft_text,
draft_images, image_send_mode, cwd, armed_turn_id, failure_message,
reserved_by_device_id, input_items, created_at_ms, updated_at_ms, raw_json
)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
params![
record.queue_id,
record.runtime_id,
record.thread_id,
record.position,
enum_label(&record.dispatch_mode)?,
enum_label(&record.status)?,
record.draft_text,
serde_json::to_string(&record.draft_images)?,
enum_label(&record.image_send_mode)?,
record.cwd,
record.armed_turn_id,
record.failure_message,
record.reserved_by_device_id,
serde_json::to_string(input_items)?,
record.created_at_ms,
record.updated_at_ms,
serde_json::to_string(record)?,
],
)?;
Ok(())
}
pub fn next_thread_queue_position(&self, thread_id: &str) -> anyhow::Result<i64> {
let conn = self.connect()?;
let position = conn
.query_row(
"SELECT COALESCE(MAX(position), 0) + 1
FROM queued_thread_messages
WHERE thread_id = ?1",
params![thread_id],
|row| row.get(0),
)
.optional()?
.unwrap_or(1_i64);
Ok(position)
}
pub fn list_queued_thread_messages(
&self,
thread_id: &str,
) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
let conn = self.connect()?;
let mut stmt = conn.prepare(
"SELECT raw_json
FROM queued_thread_messages
WHERE thread_id = ?1
ORDER BY position ASC",
)?;
let rows = stmt.query_map(params![thread_id], |row| {
let raw: String = row.get(0)?;
decode_json_row(raw)
})?;
Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
}
pub fn get_stored_queued_thread_message(
&self,
queue_id: &str,
) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
let conn = self.connect()?;
let record = conn
.query_row(
"SELECT raw_json, input_items
FROM queued_thread_messages
WHERE queue_id = ?1",
params![queue_id],
|row| {
let raw_json: String = row.get(0)?;
let input_items_raw: String = row.get(1)?;
Ok(StoredQueuedThreadMessage {
record: decode_json_row(raw_json)?,
input_items: decode_json_row(input_items_raw)?,
})
},
)
.optional()?;
Ok(record)
}
pub fn update_queued_thread_message(
&self,
record: &QueuedThreadMessageRecord,
input_items: &[Value],
) -> anyhow::Result<()> {
let conn = self.connect()?;
conn.execute(
"UPDATE queued_thread_messages
SET dispatch_mode = ?2,
status = ?3,
draft_text = ?4,
draft_images = ?5,
image_send_mode = ?6,
cwd = ?7,
armed_turn_id = ?8,
failure_message = ?9,
reserved_by_device_id = ?10,
input_items = ?11,
updated_at_ms = ?12,
raw_json = ?13
WHERE queue_id = ?1",
params![
record.queue_id,
enum_label(&record.dispatch_mode)?,
enum_label(&record.status)?,
record.draft_text,
serde_json::to_string(&record.draft_images)?,
enum_label(&record.image_send_mode)?,
record.cwd,
record.armed_turn_id,
record.failure_message,
record.reserved_by_device_id,
serde_json::to_string(input_items)?,
record.updated_at_ms,
serde_json::to_string(record)?,
],
)?;
Ok(())
}
pub fn delete_queued_thread_message(
&self,
queue_id: &str,
) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
let Some(existing) = self.get_stored_queued_thread_message(queue_id)? else {
return Ok(None);
};
let conn = self.connect()?;
conn.execute(
"DELETE FROM queued_thread_messages WHERE queue_id = ?1",
params![queue_id],
)?;
compact_thread_queue_positions(&conn, &existing.record.thread_id)?;
Ok(Some(existing.record))
}
pub fn clear_thread_queue(&self, thread_id: &str) -> anyhow::Result<()> {
let conn = self.connect()?;
conn.execute(
"DELETE FROM queued_thread_messages
WHERE thread_id = ?1 AND status != ?2",
params![thread_id, enum_label(&QueueMessageStatus::Sending)?],
)?;
compact_thread_queue_positions(&conn, thread_id)?;
Ok(())
}
pub fn reserve_queued_thread_message(
&self,
queue_id: &str,
device_id: &str,
) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
return Ok(None);
};
if matches!(existing.record.status, QueueMessageStatus::Sending) {
return Ok(None);
}
existing.record.status = QueueMessageStatus::ReservedForEdit;
existing.record.reserved_by_device_id = Some(device_id.to_string());
existing.record.failure_message = None;
existing.record.updated_at_ms = now_millis();
self.update_queued_thread_message(&existing.record, &existing.input_items)?;
Ok(Some(existing.record))
}
pub fn cancel_queued_thread_message_edit(
&self,
queue_id: &str,
) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
reset_reserved_message(self, queue_id)
}
pub fn release_reserved_queue_messages_for_device(
&self,
device_id: &str,
) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
let conn = self.connect()?;
let mut stmt = conn.prepare(
"SELECT queue_id
FROM queued_thread_messages
WHERE reserved_by_device_id = ?1",
)?;
let queue_ids = stmt
.query_map(params![device_id], |row| row.get::<_, String>(0))?
.collect::<rusqlite::Result<Vec<_>>>()?;
let mut released = Vec::new();
for queue_id in queue_ids {
if let Some(record) = reset_reserved_message(self, &queue_id)? {
released.push(record);
}
}
Ok(released)
}
pub fn try_mark_queued_thread_message_sending(
&self,
queue_id: &str,
) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
return Ok(None);
};
if !matches!(existing.record.status, QueueMessageStatus::Queued) {
return Ok(None);
}
existing.record.status = QueueMessageStatus::Sending;
existing.record.failure_message = None;
existing.record.reserved_by_device_id = None;
existing.record.updated_at_ms = now_millis();
self.update_queued_thread_message(&existing.record, &existing.input_items)?;
Ok(Some(existing))
}
pub fn mark_queued_thread_message_failed(
&self,
queue_id: &str,
failure_message: &str,
) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
return Ok(None);
};
existing.record.status = QueueMessageStatus::Failed;
existing.record.failure_message = Some(failure_message.to_string());
existing.record.updated_at_ms = now_millis();
self.update_queued_thread_message(&existing.record, &existing.input_items)?;
Ok(Some(existing.record))
}
}
fn compact_thread_queue_positions(
conn: &rusqlite::Connection,
thread_id: &str,
) -> anyhow::Result<()> {
let mut stmt = conn.prepare(
"SELECT queue_id
FROM queued_thread_messages
WHERE thread_id = ?1
ORDER BY position ASC",
)?;
let queue_ids = stmt
.query_map(params![thread_id], |row| row.get::<_, String>(0))?
.collect::<rusqlite::Result<Vec<_>>>()?;
for (index, queue_id) in queue_ids.iter().enumerate() {
conn.execute(
"UPDATE queued_thread_messages
SET position = ?2
WHERE queue_id = ?1",
params![queue_id, i64::try_from(index + 1).unwrap_or(i64::MAX)],
)?;
}
Ok(())
}
fn reset_reserved_message(
storage: &Storage,
queue_id: &str,
) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
let Some(mut existing) = storage.get_stored_queued_thread_message(queue_id)? else {
return Ok(None);
};
if !matches!(existing.record.status, QueueMessageStatus::ReservedForEdit) {
return Ok(None);
}
existing.record.status = QueueMessageStatus::Queued;
existing.record.reserved_by_device_id = None;
existing.record.updated_at_ms = now_millis();
storage.update_queued_thread_message(&existing.record, &existing.input_items)?;
Ok(Some(existing.record))
}
fn enum_label<T: Serialize>(value: &T) -> anyhow::Result<String> {
let Some(label) = serde_json::to_value(value)?.as_str().map(ToOwned::to_owned) else {
anyhow::bail!("枚举序列化结果不是字符串");
};
Ok(label)
}