Skip to main content

codex_mobile_bridge/storage/
threads.rs

1use std::path::Path;
2
3use rusqlite::{OptionalExtension, params, params_from_iter};
4use serde_json::json;
5
6use super::Storage;
7use super::decode::decode_thread_row;
8use crate::bridge_protocol::{ThreadStatusInfo, ThreadSummary};
9use crate::directory::{directory_contains, normalize_absolute_directory};
10
11impl Storage {
12    pub fn upsert_thread_index(&self, thread: &ThreadSummary) -> anyhow::Result<()> {
13        let conn = self.connect()?;
14        conn.execute(
15            "INSERT INTO thread_index (
16                 thread_id, runtime_id, name, preview, cwd, status,
17                 model_provider, source, created_at_ms, updated_at_ms, is_loaded, is_active,
18                 archived, raw_json
19             )
20             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
21             ON CONFLICT(thread_id) DO UPDATE SET
22                 runtime_id = excluded.runtime_id,
23                 name = excluded.name,
24                 preview = excluded.preview,
25                 cwd = excluded.cwd,
26                 status = excluded.status,
27                 model_provider = excluded.model_provider,
28                 source = excluded.source,
29                 created_at_ms = excluded.created_at_ms,
30                 updated_at_ms = excluded.updated_at_ms,
31                 is_loaded = excluded.is_loaded,
32                 is_active = excluded.is_active,
33                 archived = excluded.archived,
34                 raw_json = excluded.raw_json",
35            params![
36                thread.id,
37                thread.runtime_id,
38                thread.name,
39                thread.preview,
40                thread.cwd,
41                thread.status,
42                thread.model_provider,
43                thread.source,
44                thread.created_at,
45                thread.updated_at,
46                if thread.is_loaded { 1_i64 } else { 0_i64 },
47                if thread.is_active { 1_i64 } else { 0_i64 },
48                if thread.archived { 1_i64 } else { 0_i64 },
49                serde_json::to_string(thread)?
50            ],
51        )?;
52        Ok(())
53    }
54
55    pub fn get_thread_index(&self, thread_id: &str) -> anyhow::Result<Option<ThreadSummary>> {
56        let conn = self.connect()?;
57        let record = conn
58            .query_row(
59                "SELECT raw_json, archived FROM thread_index WHERE thread_id = ?1",
60                params![thread_id],
61                |row| decode_thread_row(row.get::<_, String>(0)?, row.get::<_, i64>(1)?),
62            )
63            .optional()?;
64        Ok(record)
65    }
66
67    pub fn list_thread_index(
68        &self,
69        directory_prefix: Option<&str>,
70        runtime_id: Option<&str>,
71        archived: Option<bool>,
72        search_term: Option<&str>,
73    ) -> anyhow::Result<Vec<ThreadSummary>> {
74        let conn = self.connect()?;
75        let mut sql = String::from(
76            "SELECT raw_json, archived
77             FROM thread_index",
78        );
79        let mut clauses = Vec::new();
80        let mut values = Vec::new();
81
82        if let Some(runtime_id) = runtime_id {
83            clauses.push("runtime_id = ?");
84            values.push(rusqlite::types::Value::from(runtime_id.to_string()));
85        }
86
87        if let Some(archived) = archived {
88            clauses.push("archived = ?");
89            values.push(rusqlite::types::Value::from(if archived {
90                1_i64
91            } else {
92                0_i64
93            }));
94        }
95
96        if let Some(search_term) = search_term.filter(|value| !value.trim().is_empty()) {
97            clauses.push(
98                "(LOWER(COALESCE(name, '')) LIKE ? OR LOWER(preview) LIKE ? OR LOWER(cwd) LIKE ?)",
99            );
100            let pattern = format!("%{}%", search_term.trim().to_lowercase());
101            values.push(rusqlite::types::Value::from(pattern.clone()));
102            values.push(rusqlite::types::Value::from(pattern.clone()));
103            values.push(rusqlite::types::Value::from(pattern));
104        }
105
106        if !clauses.is_empty() {
107            sql.push_str(" WHERE ");
108            sql.push_str(&clauses.join(" AND "));
109        }
110        sql.push_str(" ORDER BY updated_at_ms DESC");
111
112        let mut stmt = conn.prepare(&sql)?;
113        let rows = stmt.query_map(params_from_iter(values), |row| {
114            decode_thread_row(row.get::<_, String>(0)?, row.get::<_, i64>(1)?)
115        })?;
116
117        let mut threads = rows.collect::<rusqlite::Result<Vec<_>>>()?;
118
119        if let Some(directory_prefix) = directory_prefix {
120            let prefix = normalize_absolute_directory(Path::new(directory_prefix))?;
121            threads.retain(|thread| directory_contains(&prefix, Path::new(&thread.cwd)));
122        }
123
124        Ok(threads)
125    }
126    pub fn set_thread_archived(&self, thread_id: &str, archived: bool) -> anyhow::Result<()> {
127        let conn = self.connect()?;
128        conn.execute(
129            "UPDATE thread_index
130             SET archived = ?2
131             WHERE thread_id = ?1",
132            params![thread_id, if archived { 1_i64 } else { 0_i64 }],
133        )?;
134        Ok(())
135    }
136
137    pub fn downgrade_runtime_threads(
138        &self,
139        runtime_id: &str,
140        reason: Option<&str>,
141    ) -> anyhow::Result<Vec<ThreadSummary>> {
142        let mut updated_threads = Vec::new();
143        for mut thread in self.list_thread_index(None, Some(runtime_id), None, None)? {
144            if thread.status == "closed"
145                || (!thread.is_active && !thread.is_loaded && thread.status == "notLoaded")
146            {
147                continue;
148            }
149            thread.status = "notLoaded".to_string();
150            thread.status_info = ThreadStatusInfo {
151                kind: "notLoaded".to_string(),
152                reason: reason.map(|value| value.to_string()),
153                raw: json!({
154                    "type": "notLoaded",
155                    "reason": reason,
156                }),
157            };
158            thread.is_loaded = false;
159            thread.is_active = false;
160            self.upsert_thread_index(&thread)?;
161            updated_threads.push(thread);
162        }
163        Ok(updated_threads)
164    }
165}