codex_mobile_bridge/storage/
message_queue.rs1use rusqlite::{OptionalExtension, params};
2use serde::Serialize;
3use serde_json::Value;
4
5use super::Storage;
6use super::decode::decode_json_row;
7use crate::bridge_protocol::{QueueMessageStatus, QueuedThreadMessageRecord, now_millis};
8
9#[derive(Debug, Clone)]
10pub struct StoredQueuedThreadMessage {
11 pub record: QueuedThreadMessageRecord,
12 pub input_items: Vec<Value>,
13}
14
15impl Storage {
16 pub fn insert_queued_thread_message(
17 &self,
18 record: &QueuedThreadMessageRecord,
19 input_items: &[Value],
20 ) -> anyhow::Result<()> {
21 let conn = self.connect()?;
22 conn.execute(
23 "INSERT INTO queued_thread_messages (
24 queue_id, runtime_id, thread_id, position, dispatch_mode, status, draft_text,
25 draft_images, image_send_mode, cwd, armed_turn_id, failure_message,
26 reserved_by_device_id, input_items, created_at_ms, updated_at_ms, raw_json
27 )
28 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
29 params![
30 record.queue_id,
31 record.runtime_id,
32 record.thread_id,
33 record.position,
34 enum_label(&record.dispatch_mode)?,
35 enum_label(&record.status)?,
36 record.draft_text,
37 serde_json::to_string(&record.draft_images)?,
38 enum_label(&record.image_send_mode)?,
39 record.cwd,
40 record.armed_turn_id,
41 record.failure_message,
42 record.reserved_by_device_id,
43 serde_json::to_string(input_items)?,
44 record.created_at_ms,
45 record.updated_at_ms,
46 serde_json::to_string(record)?,
47 ],
48 )?;
49 Ok(())
50 }
51
52 pub fn next_thread_queue_position(&self, thread_id: &str) -> anyhow::Result<i64> {
53 let conn = self.connect()?;
54 let position = conn
55 .query_row(
56 "SELECT COALESCE(MAX(position), 0) + 1
57 FROM queued_thread_messages
58 WHERE thread_id = ?1",
59 params![thread_id],
60 |row| row.get(0),
61 )
62 .optional()?
63 .unwrap_or(1_i64);
64 Ok(position)
65 }
66
67 pub fn list_queued_thread_messages(
68 &self,
69 thread_id: &str,
70 ) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
71 let conn = self.connect()?;
72 let mut stmt = conn.prepare(
73 "SELECT raw_json
74 FROM queued_thread_messages
75 WHERE thread_id = ?1
76 ORDER BY position ASC",
77 )?;
78 let rows = stmt.query_map(params![thread_id], |row| {
79 let raw: String = row.get(0)?;
80 decode_json_row(raw)
81 })?;
82 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
83 }
84
85 pub fn get_stored_queued_thread_message(
86 &self,
87 queue_id: &str,
88 ) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
89 let conn = self.connect()?;
90 let record = conn
91 .query_row(
92 "SELECT raw_json, input_items
93 FROM queued_thread_messages
94 WHERE queue_id = ?1",
95 params![queue_id],
96 |row| {
97 let raw_json: String = row.get(0)?;
98 let input_items_raw: String = row.get(1)?;
99 Ok(StoredQueuedThreadMessage {
100 record: decode_json_row(raw_json)?,
101 input_items: decode_json_row(input_items_raw)?,
102 })
103 },
104 )
105 .optional()?;
106 Ok(record)
107 }
108
109 pub fn update_queued_thread_message(
110 &self,
111 record: &QueuedThreadMessageRecord,
112 input_items: &[Value],
113 ) -> anyhow::Result<()> {
114 let conn = self.connect()?;
115 conn.execute(
116 "UPDATE queued_thread_messages
117 SET dispatch_mode = ?2,
118 status = ?3,
119 draft_text = ?4,
120 draft_images = ?5,
121 image_send_mode = ?6,
122 cwd = ?7,
123 armed_turn_id = ?8,
124 failure_message = ?9,
125 reserved_by_device_id = ?10,
126 input_items = ?11,
127 updated_at_ms = ?12,
128 raw_json = ?13
129 WHERE queue_id = ?1",
130 params![
131 record.queue_id,
132 enum_label(&record.dispatch_mode)?,
133 enum_label(&record.status)?,
134 record.draft_text,
135 serde_json::to_string(&record.draft_images)?,
136 enum_label(&record.image_send_mode)?,
137 record.cwd,
138 record.armed_turn_id,
139 record.failure_message,
140 record.reserved_by_device_id,
141 serde_json::to_string(input_items)?,
142 record.updated_at_ms,
143 serde_json::to_string(record)?,
144 ],
145 )?;
146 Ok(())
147 }
148
149 pub fn delete_queued_thread_message(
150 &self,
151 queue_id: &str,
152 ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
153 let Some(existing) = self.get_stored_queued_thread_message(queue_id)? else {
154 return Ok(None);
155 };
156 let conn = self.connect()?;
157 conn.execute(
158 "DELETE FROM queued_thread_messages WHERE queue_id = ?1",
159 params![queue_id],
160 )?;
161 compact_thread_queue_positions(&conn, &existing.record.thread_id)?;
162 Ok(Some(existing.record))
163 }
164
165 pub fn clear_thread_queue(&self, thread_id: &str) -> anyhow::Result<()> {
166 let conn = self.connect()?;
167 conn.execute(
168 "DELETE FROM queued_thread_messages
169 WHERE thread_id = ?1 AND status != ?2",
170 params![thread_id, enum_label(&QueueMessageStatus::Sending)?],
171 )?;
172 compact_thread_queue_positions(&conn, thread_id)?;
173 Ok(())
174 }
175
176 pub fn reserve_queued_thread_message(
177 &self,
178 queue_id: &str,
179 device_id: &str,
180 ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
181 let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
182 return Ok(None);
183 };
184 if matches!(existing.record.status, QueueMessageStatus::Sending) {
185 return Ok(None);
186 }
187 existing.record.status = QueueMessageStatus::ReservedForEdit;
188 existing.record.reserved_by_device_id = Some(device_id.to_string());
189 existing.record.failure_message = None;
190 existing.record.updated_at_ms = now_millis();
191 self.update_queued_thread_message(&existing.record, &existing.input_items)?;
192 Ok(Some(existing.record))
193 }
194
195 pub fn cancel_queued_thread_message_edit(
196 &self,
197 queue_id: &str,
198 ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
199 reset_reserved_message(self, queue_id)
200 }
201
202 pub fn release_reserved_queue_messages_for_device(
203 &self,
204 device_id: &str,
205 ) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
206 let conn = self.connect()?;
207 let mut stmt = conn.prepare(
208 "SELECT queue_id
209 FROM queued_thread_messages
210 WHERE reserved_by_device_id = ?1",
211 )?;
212 let queue_ids = stmt
213 .query_map(params![device_id], |row| row.get::<_, String>(0))?
214 .collect::<rusqlite::Result<Vec<_>>>()?;
215 let mut released = Vec::new();
216 for queue_id in queue_ids {
217 if let Some(record) = reset_reserved_message(self, &queue_id)? {
218 released.push(record);
219 }
220 }
221 Ok(released)
222 }
223
224 pub fn try_mark_queued_thread_message_sending(
225 &self,
226 queue_id: &str,
227 ) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
228 let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
229 return Ok(None);
230 };
231 if !matches!(existing.record.status, QueueMessageStatus::Queued) {
232 return Ok(None);
233 }
234 existing.record.status = QueueMessageStatus::Sending;
235 existing.record.failure_message = None;
236 existing.record.reserved_by_device_id = None;
237 existing.record.updated_at_ms = now_millis();
238 self.update_queued_thread_message(&existing.record, &existing.input_items)?;
239 Ok(Some(existing))
240 }
241
242 pub fn mark_queued_thread_message_failed(
243 &self,
244 queue_id: &str,
245 failure_message: &str,
246 ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
247 let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
248 return Ok(None);
249 };
250 existing.record.status = QueueMessageStatus::Failed;
251 existing.record.failure_message = Some(failure_message.to_string());
252 existing.record.updated_at_ms = now_millis();
253 self.update_queued_thread_message(&existing.record, &existing.input_items)?;
254 Ok(Some(existing.record))
255 }
256}
257
258fn compact_thread_queue_positions(
259 conn: &rusqlite::Connection,
260 thread_id: &str,
261) -> anyhow::Result<()> {
262 let mut stmt = conn.prepare(
263 "SELECT queue_id
264 FROM queued_thread_messages
265 WHERE thread_id = ?1
266 ORDER BY position ASC",
267 )?;
268 let queue_ids = stmt
269 .query_map(params![thread_id], |row| row.get::<_, String>(0))?
270 .collect::<rusqlite::Result<Vec<_>>>()?;
271 for (index, queue_id) in queue_ids.iter().enumerate() {
272 conn.execute(
273 "UPDATE queued_thread_messages
274 SET position = ?2
275 WHERE queue_id = ?1",
276 params![queue_id, i64::try_from(index + 1).unwrap_or(i64::MAX)],
277 )?;
278 }
279 Ok(())
280}
281
282fn reset_reserved_message(
283 storage: &Storage,
284 queue_id: &str,
285) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
286 let Some(mut existing) = storage.get_stored_queued_thread_message(queue_id)? else {
287 return Ok(None);
288 };
289 if !matches!(existing.record.status, QueueMessageStatus::ReservedForEdit) {
290 return Ok(None);
291 }
292 existing.record.status = QueueMessageStatus::Queued;
293 existing.record.reserved_by_device_id = None;
294 existing.record.updated_at_ms = now_millis();
295 storage.update_queued_thread_message(&existing.record, &existing.input_items)?;
296 Ok(Some(existing.record))
297}
298
299fn enum_label<T: Serialize>(value: &T) -> anyhow::Result<String> {
300 let Some(label) = serde_json::to_value(value)?.as_str().map(ToOwned::to_owned) else {
301 anyhow::bail!("枚举序列化结果不是字符串");
302 };
303 Ok(label)
304}