codex-mobile-bridge 1.0.2

Remote bridge and service manager for codex-mobile.
Documentation
use rusqlite::{OptionalExtension, params};
use serde::Serialize;
use serde_json::Value;

use super::Storage;
use super::decode::decode_json_row;
use crate::bridge_protocol::{QueueMessageStatus, QueuedThreadMessageRecord, now_millis};

#[derive(Debug, Clone)]
pub struct StoredQueuedThreadMessage {
    pub record: QueuedThreadMessageRecord,
    pub input_items: Vec<Value>,
}

impl Storage {
    pub fn insert_queued_thread_message(
        &self,
        record: &QueuedThreadMessageRecord,
        input_items: &[Value],
    ) -> anyhow::Result<()> {
        let conn = self.connect()?;
        conn.execute(
            "INSERT INTO queued_thread_messages (
                 queue_id, runtime_id, thread_id, position, dispatch_mode, status, draft_text,
                 draft_images, image_send_mode, cwd, armed_turn_id, failure_message,
                 reserved_by_device_id, input_items, created_at_ms, updated_at_ms, raw_json
             )
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
            params![
                record.queue_id,
                record.runtime_id,
                record.thread_id,
                record.position,
                enum_label(&record.dispatch_mode)?,
                enum_label(&record.status)?,
                record.draft_text,
                serde_json::to_string(&record.draft_images)?,
                enum_label(&record.image_send_mode)?,
                record.cwd,
                record.armed_turn_id,
                record.failure_message,
                record.reserved_by_device_id,
                serde_json::to_string(input_items)?,
                record.created_at_ms,
                record.updated_at_ms,
                serde_json::to_string(record)?,
            ],
        )?;
        Ok(())
    }

    pub fn next_thread_queue_position(&self, thread_id: &str) -> anyhow::Result<i64> {
        let conn = self.connect()?;
        let position = conn
            .query_row(
                "SELECT COALESCE(MAX(position), 0) + 1
                 FROM queued_thread_messages
                 WHERE thread_id = ?1",
                params![thread_id],
                |row| row.get(0),
            )
            .optional()?
            .unwrap_or(1_i64);
        Ok(position)
    }

    pub fn list_queued_thread_messages(
        &self,
        thread_id: &str,
    ) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
        let conn = self.connect()?;
        let mut stmt = conn.prepare(
            "SELECT raw_json
             FROM queued_thread_messages
             WHERE thread_id = ?1
             ORDER BY position ASC",
        )?;
        let rows = stmt.query_map(params![thread_id], |row| {
            let raw: String = row.get(0)?;
            decode_json_row(raw)
        })?;
        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
    }

    pub fn get_stored_queued_thread_message(
        &self,
        queue_id: &str,
    ) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
        let conn = self.connect()?;
        let record = conn
            .query_row(
                "SELECT raw_json, input_items
                 FROM queued_thread_messages
                 WHERE queue_id = ?1",
                params![queue_id],
                |row| {
                    let raw_json: String = row.get(0)?;
                    let input_items_raw: String = row.get(1)?;
                    Ok(StoredQueuedThreadMessage {
                        record: decode_json_row(raw_json)?,
                        input_items: decode_json_row(input_items_raw)?,
                    })
                },
            )
            .optional()?;
        Ok(record)
    }

    pub fn update_queued_thread_message(
        &self,
        record: &QueuedThreadMessageRecord,
        input_items: &[Value],
    ) -> anyhow::Result<()> {
        let conn = self.connect()?;
        conn.execute(
            "UPDATE queued_thread_messages
             SET dispatch_mode = ?2,
                 status = ?3,
                 draft_text = ?4,
                 draft_images = ?5,
                 image_send_mode = ?6,
                 cwd = ?7,
                 armed_turn_id = ?8,
                 failure_message = ?9,
                 reserved_by_device_id = ?10,
                 input_items = ?11,
                 updated_at_ms = ?12,
                 raw_json = ?13
             WHERE queue_id = ?1",
            params![
                record.queue_id,
                enum_label(&record.dispatch_mode)?,
                enum_label(&record.status)?,
                record.draft_text,
                serde_json::to_string(&record.draft_images)?,
                enum_label(&record.image_send_mode)?,
                record.cwd,
                record.armed_turn_id,
                record.failure_message,
                record.reserved_by_device_id,
                serde_json::to_string(input_items)?,
                record.updated_at_ms,
                serde_json::to_string(record)?,
            ],
        )?;
        Ok(())
    }

    pub fn delete_queued_thread_message(
        &self,
        queue_id: &str,
    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
        let Some(existing) = self.get_stored_queued_thread_message(queue_id)? else {
            return Ok(None);
        };
        let conn = self.connect()?;
        conn.execute(
            "DELETE FROM queued_thread_messages WHERE queue_id = ?1",
            params![queue_id],
        )?;
        compact_thread_queue_positions(&conn, &existing.record.thread_id)?;
        Ok(Some(existing.record))
    }

    pub fn clear_thread_queue(&self, thread_id: &str) -> anyhow::Result<()> {
        let conn = self.connect()?;
        conn.execute(
            "DELETE FROM queued_thread_messages
             WHERE thread_id = ?1 AND status != ?2",
            params![thread_id, enum_label(&QueueMessageStatus::Sending)?],
        )?;
        compact_thread_queue_positions(&conn, thread_id)?;
        Ok(())
    }

    pub fn reserve_queued_thread_message(
        &self,
        queue_id: &str,
        device_id: &str,
    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
        let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
            return Ok(None);
        };
        if matches!(existing.record.status, QueueMessageStatus::Sending) {
            return Ok(None);
        }
        existing.record.status = QueueMessageStatus::ReservedForEdit;
        existing.record.reserved_by_device_id = Some(device_id.to_string());
        existing.record.failure_message = None;
        existing.record.updated_at_ms = now_millis();
        self.update_queued_thread_message(&existing.record, &existing.input_items)?;
        Ok(Some(existing.record))
    }

    pub fn cancel_queued_thread_message_edit(
        &self,
        queue_id: &str,
    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
        reset_reserved_message(self, queue_id)
    }

    pub fn release_reserved_queue_messages_for_device(
        &self,
        device_id: &str,
    ) -> anyhow::Result<Vec<QueuedThreadMessageRecord>> {
        let conn = self.connect()?;
        let mut stmt = conn.prepare(
            "SELECT queue_id
             FROM queued_thread_messages
             WHERE reserved_by_device_id = ?1",
        )?;
        let queue_ids = stmt
            .query_map(params![device_id], |row| row.get::<_, String>(0))?
            .collect::<rusqlite::Result<Vec<_>>>()?;
        let mut released = Vec::new();
        for queue_id in queue_ids {
            if let Some(record) = reset_reserved_message(self, &queue_id)? {
                released.push(record);
            }
        }
        Ok(released)
    }

    pub fn try_mark_queued_thread_message_sending(
        &self,
        queue_id: &str,
    ) -> anyhow::Result<Option<StoredQueuedThreadMessage>> {
        let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
            return Ok(None);
        };
        if !matches!(existing.record.status, QueueMessageStatus::Queued) {
            return Ok(None);
        }
        existing.record.status = QueueMessageStatus::Sending;
        existing.record.failure_message = None;
        existing.record.reserved_by_device_id = None;
        existing.record.updated_at_ms = now_millis();
        self.update_queued_thread_message(&existing.record, &existing.input_items)?;
        Ok(Some(existing))
    }

    pub fn mark_queued_thread_message_failed(
        &self,
        queue_id: &str,
        failure_message: &str,
    ) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
        let Some(mut existing) = self.get_stored_queued_thread_message(queue_id)? else {
            return Ok(None);
        };
        existing.record.status = QueueMessageStatus::Failed;
        existing.record.failure_message = Some(failure_message.to_string());
        existing.record.updated_at_ms = now_millis();
        self.update_queued_thread_message(&existing.record, &existing.input_items)?;
        Ok(Some(existing.record))
    }
}

fn compact_thread_queue_positions(
    conn: &rusqlite::Connection,
    thread_id: &str,
) -> anyhow::Result<()> {
    let mut stmt = conn.prepare(
        "SELECT queue_id
         FROM queued_thread_messages
         WHERE thread_id = ?1
         ORDER BY position ASC",
    )?;
    let queue_ids = stmt
        .query_map(params![thread_id], |row| row.get::<_, String>(0))?
        .collect::<rusqlite::Result<Vec<_>>>()?;
    for (index, queue_id) in queue_ids.iter().enumerate() {
        conn.execute(
            "UPDATE queued_thread_messages
             SET position = ?2
             WHERE queue_id = ?1",
            params![queue_id, i64::try_from(index + 1).unwrap_or(i64::MAX)],
        )?;
    }
    Ok(())
}

fn reset_reserved_message(
    storage: &Storage,
    queue_id: &str,
) -> anyhow::Result<Option<QueuedThreadMessageRecord>> {
    let Some(mut existing) = storage.get_stored_queued_thread_message(queue_id)? else {
        return Ok(None);
    };
    if !matches!(existing.record.status, QueueMessageStatus::ReservedForEdit) {
        return Ok(None);
    }
    existing.record.status = QueueMessageStatus::Queued;
    existing.record.reserved_by_device_id = None;
    existing.record.updated_at_ms = now_millis();
    storage.update_queued_thread_message(&existing.record, &existing.input_items)?;
    Ok(Some(existing.record))
}

fn enum_label<T: Serialize>(value: &T) -> anyhow::Result<String> {
    let Some(label) = serde_json::to_value(value)?.as_str().map(ToOwned::to_owned) else {
        anyhow::bail!("枚举序列化结果不是字符串");
    };
    Ok(label)
}