Skip to main content

codex_mobile_bridge/storage/
message_queue.rs

1use rusqlite::{OptionalExtension, params};
2use serde::Serialize;
3use serde_json::Value;
4
5use super::Storage;
6use super::decode::decode_json_row;
7use crate::bridge_protocol::{QueueMessageStatus, QueuedThreadMessageRecord, now_millis};
8
9#[derive(Debug, Clone)]
10pub struct StoredQueuedThreadMessage {
11    pub record: QueuedThreadMessageRecord,
12    pub input_items: Vec<Value>,
13}
14
15impl Storage {
16    pub fn insert_queued_thread_message(
17        &self,
18        record: &QueuedThreadMessageRecord,
19        input_items: &[Value],
20    ) -> anyhow::Result<()> {
21        let conn = self.connect()?;
22        conn.execute(
23            "INSERT INTO queued_thread_messages (
24                 queue_id, runtime_id, thread_id, position, dispatch_mode, status, draft_text,
25                 draft_images, image_send_mode, cwd, armed_turn_id, failure_message,
26                 reserved_by_device_id, input_items, created_at_ms, updated_at_ms, raw_json
27             )
28             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
29            params![
30                record.queue_id,
31                record.runtime_id,
32                record.thread_id,
33                record.position,
34                enum_label(&record.dispatch_mode)?,
35                enum_label(&record.status)?,
36                record.draft_text,
37                serde_json::to_string(&record.draft_images)?,
38                enum_label(&record.image_send_mode)?,
39                record.cwd,
40                record.armed_turn_id,
41                record.failure_message,
42                record.reserved_by_device_id,
43                serde_json::to_string(input_items)?,
44                record.created_at_ms,
45                record.updated_at_ms,
46                serde_json::to_string(record)?,
47            ],
48        )?;
49        Ok(())
50    }
51
52    pub fn next_thread_queue_position(&self, thread_id: &str) -> anyhow::Result<i64> {
53        let conn = self.connect()?;
54        let position = conn
55            .query_row(
56                "SELECT COALESCE(MAX(position), 0) + 1
57                 FROM queued_thread_messages
58                 WHERE thread_id = ?1",
59                params![thread_id],
60                |row| row.get(0),
61            )
62            .optional()?
63            .unwrap_or(1_i64);
64        Ok(position)
65    }
66
67    pub fn list_queued_thread_messages(
68        &self,
69        thread_id: &str,
70    ) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
71        let conn = self.connect()?;
72        let mut stmt = conn.prepare(
73            "SELECT raw_json
74             FROM queued_thread_messages
75             WHERE thread_id = ?1
76             ORDER BY position ASC",
77        )?;
78        let rows = stmt.query_map(params![thread_id], |row| {
79            let raw: String = row.get(0)?;
80            decode_json_row(raw)
81        })?;
82        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
83    }
84
85    pub fn get_stored_queued_thread_message(
86        &self,
87        queue_id: &str,
88    ) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
89        let conn = self.connect()?;
90        let record = conn
91            .query_row(
92                "SELECT raw_json, input_items
93                 FROM queued_thread_messages
94                 WHERE queue_id = ?1",
95                params![queue_id],
96                |row| {
97                    let raw_json: String = row.get(0)?;
98                    let input_items_raw: String = row.get(1)?;
99                    Ok(StoredQueuedThreadMessage {
100                        record: decode_json_row(raw_json)?,
101                        input_items: decode_json_row(input_items_raw)?,
102                    })
103                },
104            )
105            .optional()?;
106        Ok(record)
107    }
108
109    pub fn update_queued_thread_message(
110        &self,
111        record: &QueuedThreadMessageRecord,
112        input_items: &[Value],
113    ) -> anyhow::Result<()> {
114        let conn = self.connect()?;
115        conn.execute(
116            "UPDATE queued_thread_messages
117             SET dispatch_mode = ?2,
118                 status = ?3,
119                 draft_text = ?4,
120                 draft_images = ?5,
121                 image_send_mode = ?6,
122                 cwd = ?7,
123                 armed_turn_id = ?8,
124                 failure_message = ?9,
125                 reserved_by_device_id = ?10,
126                 input_items = ?11,
127                 updated_at_ms = ?12,
128                 raw_json = ?13
129             WHERE queue_id = ?1",
130            params![
131                record.queue_id,
132                enum_label(&record.dispatch_mode)?,
133                enum_label(&record.status)?,
134                record.draft_text,
135                serde_json::to_string(&record.draft_images)?,
136                enum_label(&record.image_send_mode)?,
137                record.cwd,
138                record.armed_turn_id,
139                record.failure_message,
140                record.reserved_by_device_id,
141                serde_json::to_string(input_items)?,
142                record.updated_at_ms,
143                serde_json::to_string(record)?,
144            ],
145        )?;
146        Ok(())
147    }
148
149    pub fn delete_queued_thread_message(
150        &self,
151        queue_id: &str,
152    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
153        let Some(existing) = self.get_stored_queued_thread_message(queue_id)? else {
154            return Ok(None);
155        };
156        let conn = self.connect()?;
157        conn.execute(
158            "DELETE FROM queued_thread_messages WHERE queue_id = ?1",
159            params![queue_id],
160        )?;
161        compact_thread_queue_positions(&conn, &existing.record.thread_id)?;
162        Ok(Some(existing.record))
163    }
164
165    pub fn clear_thread_queue(&self, thread_id: &str) -> anyhow::Result<()> {
166        let conn = self.connect()?;
167        conn.execute(
168            "DELETE FROM queued_thread_messages
169             WHERE thread_id = ?1 AND status != ?2",
170            params![thread_id, enum_label(&QueueMessageStatus::Sending)?],
171        )?;
172        compact_thread_queue_positions(&conn, thread_id)?;
173        Ok(())
174    }
175
176    pub fn reserve_queued_thread_message(
177        &self,
178        queue_id: &str,
179        device_id: &str,
180    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
181        let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
182            return Ok(None);
183        };
184        if matches!(existing.record.status, QueueMessageStatus::Sending) {
185            return Ok(None);
186        }
187        existing.record.status = QueueMessageStatus::ReservedForEdit;
188        existing.record.reserved_by_device_id = Some(device_id.to_string());
189        existing.record.failure_message = None;
190        existing.record.updated_at_ms = now_millis();
191        self.update_queued_thread_message(&existing.record, &existing.input_items)?;
192        Ok(Some(existing.record))
193    }
194
195    pub fn cancel_queued_thread_message_edit(
196        &self,
197        queue_id: &str,
198    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
199        reset_reserved_message(self, queue_id)
200    }
201
202    pub fn release_reserved_queue_messages_for_device(
203        &self,
204        device_id: &str,
205    ) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
206        let conn = self.connect()?;
207        let mut stmt = conn.prepare(
208            "SELECT queue_id
209             FROM queued_thread_messages
210             WHERE reserved_by_device_id = ?1",
211        )?;
212        let queue_ids = stmt
213            .query_map(params![device_id], |row| row.get::<_, String>(0))?
214            .collect::<rusqlite::Result<Vec<_>>>()?;
215        let mut released = Vec::new();
216        for queue_id in queue_ids {
217            if let Some(record) = reset_reserved_message(self, &queue_id)? {
218                released.push(record);
219            }
220        }
221        Ok(released)
222    }
223
224    pub fn try_mark_queued_thread_message_sending(
225        &self,
226        queue_id: &str,
227    ) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
228        let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
229            return Ok(None);
230        };
231        if !matches!(existing.record.status, QueueMessageStatus::Queued) {
232            return Ok(None);
233        }
234        existing.record.status = QueueMessageStatus::Sending;
235        existing.record.failure_message = None;
236        existing.record.reserved_by_device_id = None;
237        existing.record.updated_at_ms = now_millis();
238        self.update_queued_thread_message(&existing.record, &existing.input_items)?;
239        Ok(Some(existing))
240    }
241
242    pub fn mark_queued_thread_message_failed(
243        &self,
244        queue_id: &str,
245        failure_message: &str,
246    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
247        let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
248            return Ok(None);
249        };
250        existing.record.status = QueueMessageStatus::Failed;
251        existing.record.failure_message = Some(failure_message.to_string());
252        existing.record.updated_at_ms = now_millis();
253        self.update_queued_thread_message(&existing.record, &existing.input_items)?;
254        Ok(Some(existing.record))
255    }
256}
257
258fn compact_thread_queue_positions(
259    conn: &rusqlite::Connection,
260    thread_id: &str,
261) -> anyhow::Result<()> {
262    let mut stmt = conn.prepare(
263        "SELECT queue_id
264         FROM queued_thread_messages
265         WHERE thread_id = ?1
266         ORDER BY position ASC",
267    )?;
268    let queue_ids = stmt
269        .query_map(params![thread_id], |row| row.get::<_, String>(0))?
270        .collect::<rusqlite::Result<Vec<_>>>()?;
271    for (index, queue_id) in queue_ids.iter().enumerate() {
272        conn.execute(
273            "UPDATE queued_thread_messages
274             SET position = ?2
275             WHERE queue_id = ?1",
276            params![queue_id, i64::try_from(index + 1).unwrap_or(i64::MAX)],
277        )?;
278    }
279    Ok(())
280}
281
282fn reset_reserved_message(
283    storage: &Storage,
284    queue_id: &str,
285) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
286    let Some(mut existing) = storage.get_stored_queued_thread_message(queue_id)? else {
287        return Ok(None);
288    };
289    if !matches!(existing.record.status, QueueMessageStatus::ReservedForEdit) {
290        return Ok(None);
291    }
292    existing.record.status = QueueMessageStatus::Queued;
293    existing.record.reserved_by_device_id = None;
294    existing.record.updated_at_ms = now_millis();
295    storage.update_queued_thread_message(&existing.record, &existing.input_items)?;
296    Ok(Some(existing.record))
297}
298
299fn enum_label<T: Serialize>(value: &T) -> anyhow::Result<String> {
300    let Some(label) = serde_json::to_value(value)?.as_str().map(ToOwned::to_owned) else {
301        anyhow::bail!("枚举序列化结果不是字符串");
302    };
303    Ok(label)
304}