Skip to main content

codex_mobile_bridge/
storage.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, Result};
5use rusqlite::types::Type;
6use rusqlite::{Connection, OptionalExtension, params};
7use serde::de::DeserializeOwned;
8
9use crate::bridge_protocol::{
10    DirectoryBookmarkRecord, DirectoryHistoryRecord, PendingServerRequestRecord, PersistedEvent,
11    RuntimeRecord, ThreadSummary, now_millis,
12};
13use crate::directory::{
14    canonicalize_directory, default_display_name, directory_contains, normalize_absolute_directory,
15};
16
17pub const PRIMARY_RUNTIME_ID: &str = "primary";
18
19#[derive(Debug, Clone)]
20pub struct Storage {
21    db_path: PathBuf,
22}
23
24impl Storage {
25    pub fn open(db_path: PathBuf) -> Result<Self> {
26        if let Some(parent) = db_path.parent() {
27            fs::create_dir_all(parent)
28                .with_context(|| format!("创建数据库目录失败: {}", parent.display()))?;
29        }
30
31        let storage = Self { db_path };
32        storage.migrate()?;
33        storage.clear_pending_requests()?;
34        storage.clear_legacy_pending_approvals()?;
35        Ok(storage)
36    }
37
38    pub fn ensure_primary_runtime(
39        &self,
40        codex_home: Option<String>,
41        codex_binary: String,
42    ) -> Result<RuntimeRecord> {
43        if let Some(existing) = self.get_runtime(PRIMARY_RUNTIME_ID)? {
44            let desired_home = codex_home.or(existing.codex_home.clone());
45            let desired_binary = if codex_binary.trim().is_empty() {
46                existing.codex_binary.clone()
47            } else {
48                codex_binary
49            };
50            let needs_update = existing.codex_home != desired_home
51                || existing.codex_binary != desired_binary
52                || !existing.is_primary
53                || !existing.auto_start;
54
55            if !needs_update {
56                return Ok(existing);
57            }
58
59            let updated = RuntimeRecord {
60                codex_home: desired_home,
61                codex_binary: desired_binary,
62                is_primary: true,
63                auto_start: true,
64                updated_at_ms: now_millis(),
65                ..existing
66            };
67            self.upsert_runtime(&updated)?;
68            return Ok(updated);
69        }
70
71        let now = now_millis();
72        let record = RuntimeRecord {
73            runtime_id: PRIMARY_RUNTIME_ID.to_string(),
74            display_name: "Primary".to_string(),
75            codex_home,
76            codex_binary,
77            is_primary: true,
78            auto_start: true,
79            created_at_ms: now,
80            updated_at_ms: now,
81        };
82        self.upsert_runtime(&record)?;
83        Ok(record)
84    }
85
86    pub fn list_runtimes(&self) -> Result<Vec<RuntimeRecord>> {
87        let conn = self.connect()?;
88        let mut stmt = conn.prepare(
89            "SELECT raw_json
90             FROM runtimes
91             ORDER BY is_primary DESC, created_at_ms ASC",
92        )?;
93
94        let rows = stmt.query_map([], |row| {
95            let raw: String = row.get(0)?;
96            decode_json_row(raw)
97        })?;
98
99        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
100    }
101
102    pub fn get_runtime(&self, runtime_id: &str) -> Result<Option<RuntimeRecord>> {
103        let conn = self.connect()?;
104        let record = conn
105            .query_row(
106                "SELECT raw_json FROM runtimes WHERE runtime_id = ?1",
107                params![runtime_id],
108                |row| {
109                    let raw: String = row.get(0)?;
110                    decode_json_row(raw)
111                },
112            )
113            .optional()?;
114        Ok(record)
115    }
116
117    pub fn upsert_runtime(&self, runtime: &RuntimeRecord) -> Result<()> {
118        let conn = self.connect()?;
119        conn.execute(
120            "INSERT INTO runtimes (
121                 runtime_id, display_name, codex_home, codex_binary, is_primary,
122                 auto_start, created_at_ms, updated_at_ms, raw_json
123             )
124             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
125             ON CONFLICT(runtime_id) DO UPDATE SET
126                 display_name = excluded.display_name,
127                 codex_home = excluded.codex_home,
128                 codex_binary = excluded.codex_binary,
129                 is_primary = excluded.is_primary,
130                 auto_start = excluded.auto_start,
131                 created_at_ms = excluded.created_at_ms,
132                 updated_at_ms = excluded.updated_at_ms,
133                 raw_json = excluded.raw_json",
134            params![
135                runtime.runtime_id,
136                runtime.display_name,
137                runtime.codex_home,
138                runtime.codex_binary,
139                if runtime.is_primary { 1_i64 } else { 0_i64 },
140                if runtime.auto_start { 1_i64 } else { 0_i64 },
141                runtime.created_at_ms,
142                runtime.updated_at_ms,
143                serde_json::to_string(runtime)?,
144            ],
145        )?;
146        Ok(())
147    }
148
149    pub fn remove_runtime(&self, runtime_id: &str) -> Result<()> {
150        let conn = self.connect()?;
151        conn.execute(
152            "DELETE FROM runtimes WHERE runtime_id = ?1",
153            params![runtime_id],
154        )?;
155        Ok(())
156    }
157
158    pub fn list_directory_bookmarks(&self) -> Result<Vec<DirectoryBookmarkRecord>> {
159        let conn = self.connect()?;
160        let mut stmt = conn.prepare(
161            "SELECT path, display_name, created_at_ms, updated_at_ms
162             FROM directory_bookmarks
163             ORDER BY display_name COLLATE NOCASE ASC",
164        )?;
165
166        let rows = stmt.query_map([], |row| {
167            Ok(DirectoryBookmarkRecord {
168                path: row.get(0)?,
169                display_name: row.get(1)?,
170                created_at_ms: row.get(2)?,
171                updated_at_ms: row.get(3)?,
172            })
173        })?;
174
175        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
176    }
177
178    pub fn upsert_directory_bookmark(
179        &self,
180        path: &Path,
181        display_name: Option<&str>,
182    ) -> Result<DirectoryBookmarkRecord> {
183        let canonical = canonicalize_directory(path)?;
184        let path_string = canonical.to_string_lossy().to_string();
185        let now = now_millis();
186        let conn = self.connect()?;
187        let existing = conn
188            .query_row(
189                "SELECT created_at_ms FROM directory_bookmarks WHERE path = ?1",
190                params![path_string],
191                |row| row.get::<_, i64>(0),
192            )
193            .optional()?;
194        let created_at_ms = existing.unwrap_or(now);
195        let display_name = display_name
196            .map(str::trim)
197            .filter(|value| !value.is_empty())
198            .map(ToOwned::to_owned)
199            .unwrap_or_else(|| default_display_name(&canonical));
200        conn.execute(
201            "INSERT INTO directory_bookmarks (path, display_name, created_at_ms, updated_at_ms)
202             VALUES (?1, ?2, ?3, ?4)
203             ON CONFLICT(path) DO UPDATE SET
204                 display_name = excluded.display_name,
205                 updated_at_ms = excluded.updated_at_ms",
206            params![path_string, display_name, created_at_ms, now],
207        )?;
208
209        Ok(DirectoryBookmarkRecord {
210            path: canonical.to_string_lossy().to_string(),
211            display_name,
212            created_at_ms,
213            updated_at_ms: now,
214        })
215    }
216
217    pub fn remove_directory_bookmark(&self, path: &Path) -> Result<()> {
218        let canonical = normalize_absolute_directory(path)?;
219        let conn = self.connect()?;
220        conn.execute(
221            "DELETE FROM directory_bookmarks WHERE path = ?1",
222            params![canonical.to_string_lossy().to_string()],
223        )?;
224        Ok(())
225    }
226
227    pub fn list_directory_history(&self, limit: usize) -> Result<Vec<DirectoryHistoryRecord>> {
228        let conn = self.connect()?;
229        let mut stmt = conn.prepare(
230            "SELECT path, last_used_at_ms, use_count
231             FROM directory_history
232             ORDER BY last_used_at_ms DESC
233             LIMIT ?1",
234        )?;
235        let rows = stmt.query_map(params![limit.max(1) as i64], |row| {
236            let path: String = row.get(0)?;
237            Ok(DirectoryHistoryRecord {
238                display_name: default_display_name(Path::new(&path)),
239                path,
240                last_used_at_ms: row.get(1)?,
241                use_count: row.get(2)?,
242            })
243        })?;
244        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
245    }
246
247    pub fn record_directory_usage(&self, path: &Path) -> Result<DirectoryHistoryRecord> {
248        let normalized = normalize_absolute_directory(path)?;
249        let path_string = normalized.to_string_lossy().to_string();
250        let now = now_millis();
251        let conn = self.connect()?;
252        let existing = conn
253            .query_row(
254                "SELECT use_count FROM directory_history WHERE path = ?1",
255                params![&path_string],
256                |row| row.get::<_, i64>(0),
257            )
258            .optional()?
259            .unwrap_or(0);
260        let use_count = existing + 1;
261        conn.execute(
262            "INSERT INTO directory_history (path, last_used_at_ms, use_count)
263             VALUES (?1, ?2, ?3)
264             ON CONFLICT(path) DO UPDATE SET
265                 last_used_at_ms = excluded.last_used_at_ms,
266                 use_count = directory_history.use_count + 1",
267            params![path_string, now, use_count],
268        )?;
269        Ok(DirectoryHistoryRecord {
270            path: normalized.to_string_lossy().to_string(),
271            display_name: default_display_name(&normalized),
272            last_used_at_ms: now,
273            use_count,
274        })
275    }
276
277    pub fn append_event(
278        &self,
279        event_type: &str,
280        runtime_id: Option<&str>,
281        thread_id: Option<&str>,
282        payload: &serde_json::Value,
283    ) -> Result<PersistedEvent> {
284        let now = now_millis();
285        let conn = self.connect()?;
286        conn.execute(
287            "INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
288             VALUES (?1, ?2, ?3, ?4, ?5)",
289            params![
290                event_type,
291                runtime_id,
292                thread_id,
293                serde_json::to_string(payload)?,
294                now
295            ],
296        )?;
297
298        let seq = conn.last_insert_rowid();
299        Ok(PersistedEvent {
300            seq,
301            event_type: event_type.to_string(),
302            runtime_id: runtime_id.map(ToOwned::to_owned),
303            thread_id: thread_id.map(ToOwned::to_owned),
304            payload: payload.clone(),
305            created_at_ms: now,
306        })
307    }
308
309    pub fn replay_events_after(&self, last_seq: i64) -> Result<Vec<PersistedEvent>> {
310        let conn = self.connect()?;
311        let mut stmt = conn.prepare(
312            "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
313             FROM events
314             WHERE seq > ?1
315             ORDER BY seq ASC",
316        )?;
317
318        let rows = stmt.query_map(params![last_seq], |row| {
319            Ok(PersistedEvent {
320                seq: row.get(0)?,
321                event_type: row.get(1)?,
322                runtime_id: row.get(2)?,
323                thread_id: row.get(3)?,
324                payload: decode_json_row(row.get::<_, String>(4)?)?,
325                created_at_ms: row.get(5)?,
326            })
327        })?;
328
329        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
330    }
331
332    pub fn load_thread_events(&self, thread_id: &str) -> Result<Vec<PersistedEvent>> {
333        let conn = self.connect()?;
334        let mut stmt = conn.prepare(
335            "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
336             FROM events
337             WHERE thread_id = ?1
338             ORDER BY seq ASC",
339        )?;
340
341        let rows = stmt.query_map(params![thread_id], |row| {
342            Ok(PersistedEvent {
343                seq: row.get(0)?,
344                event_type: row.get(1)?,
345                runtime_id: row.get(2)?,
346                thread_id: row.get(3)?,
347                payload: decode_json_row(row.get::<_, String>(4)?)?,
348                created_at_ms: row.get(5)?,
349            })
350        })?;
351
352        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
353    }
354
355    pub fn save_mobile_session_ack(&self, device_id: &str, last_ack_seq: i64) -> Result<()> {
356        let conn = self.connect()?;
357        let now = now_millis();
358        conn.execute(
359            "INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
360             VALUES (?1, ?2, ?3)
361             ON CONFLICT(device_id) DO UPDATE SET
362                 last_ack_seq = excluded.last_ack_seq,
363                 updated_at_ms = excluded.updated_at_ms",
364            params![device_id, last_ack_seq, now],
365        )?;
366        Ok(())
367    }
368
369    pub fn get_mobile_session_ack(&self, device_id: &str) -> Result<Option<i64>> {
370        let conn = self.connect()?;
371        let value = conn
372            .query_row(
373                "SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
374                params![device_id],
375                |row| row.get(0),
376            )
377            .optional()?;
378        Ok(value)
379    }
380
381    pub fn upsert_thread_index(&self, thread: &ThreadSummary) -> Result<()> {
382        let conn = self.connect()?;
383        conn.execute(
384            "INSERT INTO thread_index (
385                 thread_id, runtime_id, name, note, preview, cwd, status,
386                 model_provider, source, created_at_ms, updated_at_ms, is_loaded, is_active,
387                 archived, raw_json
388             )
389             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
390             ON CONFLICT(thread_id) DO UPDATE SET
391                 runtime_id = excluded.runtime_id,
392                 name = excluded.name,
393                 note = COALESCE(excluded.note, thread_index.note),
394                 preview = excluded.preview,
395                 cwd = excluded.cwd,
396                 status = excluded.status,
397                 model_provider = excluded.model_provider,
398                 source = excluded.source,
399                 created_at_ms = excluded.created_at_ms,
400                 updated_at_ms = excluded.updated_at_ms,
401                 is_loaded = excluded.is_loaded,
402                 is_active = excluded.is_active,
403                 archived = excluded.archived,
404                 raw_json = excluded.raw_json",
405            params![
406                thread.id,
407                thread.runtime_id,
408                thread.name,
409                thread.note,
410                thread.preview,
411                thread.cwd,
412                thread.status,
413                thread.model_provider,
414                thread.source,
415                thread.created_at,
416                thread.updated_at,
417                if thread.is_loaded { 1_i64 } else { 0_i64 },
418                if thread.is_active { 1_i64 } else { 0_i64 },
419                if thread.archived { 1_i64 } else { 0_i64 },
420                serde_json::to_string(thread)?
421            ],
422        )?;
423        Ok(())
424    }
425
426    pub fn get_thread_index(&self, thread_id: &str) -> Result<Option<ThreadSummary>> {
427        let conn = self.connect()?;
428        let record = conn
429            .query_row(
430                "SELECT raw_json, note, archived FROM thread_index WHERE thread_id = ?1",
431                params![thread_id],
432                |row| {
433                    decode_thread_row(
434                        row.get::<_, String>(0)?,
435                        row.get::<_, Option<String>>(1)?,
436                        row.get::<_, i64>(2)?,
437                    )
438                },
439            )
440            .optional()?;
441        Ok(record)
442    }
443
444    pub fn list_thread_index(
445        &self,
446        directory_prefix: Option<&str>,
447        runtime_id: Option<&str>,
448        archived: Option<bool>,
449        search_term: Option<&str>,
450    ) -> Result<Vec<ThreadSummary>> {
451        let conn = self.connect()?;
452        let mut sql = String::from(
453            "SELECT raw_json, note, archived
454             FROM thread_index",
455        );
456        let mut clauses = Vec::new();
457        let mut values = Vec::new();
458
459        if let Some(runtime_id) = runtime_id {
460            clauses.push("runtime_id = ?");
461            values.push(rusqlite::types::Value::from(runtime_id.to_string()));
462        }
463
464        if let Some(archived) = archived {
465            clauses.push("archived = ?");
466            values.push(rusqlite::types::Value::from(if archived {
467                1_i64
468            } else {
469                0_i64
470            }));
471        }
472
473        if let Some(search_term) = search_term.filter(|value| !value.trim().is_empty()) {
474            clauses.push(
475                "(LOWER(COALESCE(name, '')) LIKE ? OR LOWER(preview) LIKE ? OR \
476                 LOWER(cwd) LIKE ? OR LOWER(COALESCE(note, '')) LIKE ?)",
477            );
478            let pattern = format!("%{}%", search_term.trim().to_lowercase());
479            values.push(rusqlite::types::Value::from(pattern.clone()));
480            values.push(rusqlite::types::Value::from(pattern.clone()));
481            values.push(rusqlite::types::Value::from(pattern.clone()));
482            values.push(rusqlite::types::Value::from(pattern));
483        }
484
485        if !clauses.is_empty() {
486            sql.push_str(" WHERE ");
487            sql.push_str(&clauses.join(" AND "));
488        }
489        sql.push_str(" ORDER BY updated_at_ms DESC");
490
491        let mut stmt = conn.prepare(&sql)?;
492        let rows = stmt.query_map(rusqlite::params_from_iter(values), |row| {
493            decode_thread_row(
494                row.get::<_, String>(0)?,
495                row.get::<_, Option<String>>(1)?,
496                row.get::<_, i64>(2)?,
497            )
498        })?;
499
500        let mut threads = rows.collect::<rusqlite::Result<Vec<_>>>()?;
501
502        if let Some(directory_prefix) = directory_prefix {
503            let prefix = normalize_absolute_directory(Path::new(directory_prefix))?;
504            threads.retain(|thread| directory_contains(&prefix, Path::new(&thread.cwd)));
505        }
506
507        Ok(threads)
508    }
509
510    pub fn save_thread_note(&self, thread_id: &str, note: Option<&str>) -> Result<()> {
511        let conn = self.connect()?;
512        conn.execute(
513            "UPDATE thread_index
514             SET note = ?2
515             WHERE thread_id = ?1",
516            params![thread_id, note],
517        )?;
518        Ok(())
519    }
520
521    pub fn set_thread_archived(&self, thread_id: &str, archived: bool) -> Result<()> {
522        let conn = self.connect()?;
523        conn.execute(
524            "UPDATE thread_index
525             SET archived = ?2
526             WHERE thread_id = ?1",
527            params![thread_id, if archived { 1_i64 } else { 0_i64 }],
528        )?;
529        Ok(())
530    }
531
532    pub fn put_pending_request(&self, request: &PendingServerRequestRecord) -> Result<()> {
533        let conn = self.connect()?;
534        conn.execute(
535            "INSERT INTO pending_server_requests (
536                 request_id, runtime_id, request_type, thread_id, turn_id, item_id, title,
537                 reason, command, cwd, grant_root, tool_name, arguments, questions,
538                 proposed_execpolicy_amendment, network_approval_context, schema,
539                 available_decisions, raw_payload, created_at_ms, raw_json
540             )
541             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21)
542             ON CONFLICT(request_id) DO UPDATE SET
543                 runtime_id = excluded.runtime_id,
544                 request_type = excluded.request_type,
545                 thread_id = excluded.thread_id,
546                 turn_id = excluded.turn_id,
547                 item_id = excluded.item_id,
548                 title = excluded.title,
549                 reason = excluded.reason,
550                 command = excluded.command,
551                 cwd = excluded.cwd,
552                 grant_root = excluded.grant_root,
553                 tool_name = excluded.tool_name,
554                 arguments = excluded.arguments,
555                 questions = excluded.questions,
556                 proposed_execpolicy_amendment = excluded.proposed_execpolicy_amendment,
557                 network_approval_context = excluded.network_approval_context,
558                 schema = excluded.schema,
559                 available_decisions = excluded.available_decisions,
560                 raw_payload = excluded.raw_payload,
561                 created_at_ms = excluded.created_at_ms,
562                 raw_json = excluded.raw_json",
563            params![
564                request.request_id,
565                request.runtime_id,
566                request.request_type,
567                request.thread_id,
568                request.turn_id,
569                request.item_id,
570                request.title,
571                request.reason,
572                request.command,
573                request.cwd,
574                request.grant_root,
575                request.tool_name,
576                request.arguments.as_ref().map(serde_json::to_string).transpose()?,
577                serde_json::to_string(&request.questions)?,
578                request
579                    .proposed_execpolicy_amendment
580                    .as_ref()
581                    .map(serde_json::to_string)
582                    .transpose()?,
583                request
584                    .network_approval_context
585                    .as_ref()
586                    .map(serde_json::to_string)
587                    .transpose()?,
588                request.schema.as_ref().map(serde_json::to_string).transpose()?,
589                serde_json::to_string(&request.available_decisions)?,
590                serde_json::to_string(&request.raw_payload)?,
591                request.created_at_ms,
592                serde_json::to_string(request)?
593            ],
594        )?;
595        Ok(())
596    }
597
598    pub fn get_pending_request(
599        &self,
600        request_id: &str,
601    ) -> Result<Option<PendingServerRequestRecord>> {
602        let conn = self.connect()?;
603        let record = conn
604            .query_row(
605                "SELECT raw_json FROM pending_server_requests WHERE request_id = ?1",
606                params![request_id],
607                |row| {
608                    let raw: String = row.get(0)?;
609                    decode_json_row(raw)
610                },
611            )
612            .optional()?;
613        Ok(record)
614    }
615
616    pub fn list_pending_requests(&self) -> Result<Vec<PendingServerRequestRecord>> {
617        let conn = self.connect()?;
618        let mut stmt = conn.prepare(
619            "SELECT raw_json
620             FROM pending_server_requests
621             ORDER BY created_at_ms ASC",
622        )?;
623
624        let rows = stmt.query_map([], |row| {
625            let raw: String = row.get(0)?;
626            decode_json_row(raw)
627        })?;
628
629        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
630    }
631
632    pub fn remove_pending_request(&self, request_id: &str) -> Result<()> {
633        let conn = self.connect()?;
634        conn.execute(
635            "DELETE FROM pending_server_requests WHERE request_id = ?1",
636            params![request_id],
637        )?;
638        Ok(())
639    }
640
641    pub fn clear_pending_requests(&self) -> Result<()> {
642        let conn = self.connect()?;
643        conn.execute("DELETE FROM pending_server_requests", [])?;
644        Ok(())
645    }
646
647    fn clear_legacy_pending_approvals(&self) -> Result<()> {
648        let conn = self.connect()?;
649        conn.execute("DELETE FROM pending_approvals", [])?;
650        Ok(())
651    }
652
653    fn connect(&self) -> Result<Connection> {
654        let conn = Connection::open(&self.db_path)
655            .with_context(|| format!("打开数据库失败: {}", self.db_path.display()))?;
656        conn.execute_batch(
657            "PRAGMA foreign_keys = ON;
658             PRAGMA journal_mode = WAL;",
659        )?;
660        Ok(conn)
661    }
662
663    fn migrate(&self) -> Result<()> {
664        let conn = self.connect()?;
665        conn.execute_batch(
666            "CREATE TABLE IF NOT EXISTS directory_bookmarks (
667                path TEXT PRIMARY KEY,
668                display_name TEXT NOT NULL,
669                created_at_ms INTEGER NOT NULL,
670                updated_at_ms INTEGER NOT NULL
671            );
672
673            CREATE TABLE IF NOT EXISTS directory_history (
674                path TEXT PRIMARY KEY,
675                last_used_at_ms INTEGER NOT NULL,
676                use_count INTEGER NOT NULL
677            );
678
679            CREATE TABLE IF NOT EXISTS runtimes (
680                runtime_id TEXT PRIMARY KEY,
681                display_name TEXT NOT NULL,
682                codex_home TEXT NULL,
683                codex_binary TEXT NOT NULL,
684                is_primary INTEGER NOT NULL,
685                auto_start INTEGER NOT NULL,
686                created_at_ms INTEGER NOT NULL,
687                updated_at_ms INTEGER NOT NULL,
688                raw_json TEXT NOT NULL
689            );
690
691            CREATE TABLE IF NOT EXISTS mobile_sessions (
692                device_id TEXT PRIMARY KEY,
693                last_ack_seq INTEGER NOT NULL,
694                updated_at_ms INTEGER NOT NULL
695            );
696
697            CREATE TABLE IF NOT EXISTS events (
698                seq INTEGER PRIMARY KEY AUTOINCREMENT,
699                event_type TEXT NOT NULL,
700                runtime_id TEXT NULL,
701                thread_id TEXT NULL,
702                payload TEXT NOT NULL,
703                created_at_ms INTEGER NOT NULL
704            );
705
706            CREATE TABLE IF NOT EXISTS pending_approvals (
707                approval_id TEXT PRIMARY KEY,
708                runtime_id TEXT NOT NULL DEFAULT 'primary',
709                thread_id TEXT NOT NULL,
710                turn_id TEXT NOT NULL,
711                item_id TEXT NOT NULL,
712                kind TEXT NOT NULL,
713                reason TEXT NULL,
714                command TEXT NULL,
715                cwd TEXT NULL,
716                grant_root TEXT NULL,
717                available_decisions TEXT NOT NULL,
718                created_at_ms INTEGER NOT NULL,
719                raw_json TEXT NOT NULL
720            );
721
722            CREATE TABLE IF NOT EXISTS pending_server_requests (
723                request_id TEXT PRIMARY KEY,
724                runtime_id TEXT NOT NULL DEFAULT 'primary',
725                request_type TEXT NOT NULL,
726                thread_id TEXT NULL,
727                turn_id TEXT NULL,
728                item_id TEXT NULL,
729                title TEXT NULL,
730                reason TEXT NULL,
731                command TEXT NULL,
732                cwd TEXT NULL,
733                grant_root TEXT NULL,
734                tool_name TEXT NULL,
735                arguments TEXT NULL,
736                questions TEXT NOT NULL,
737                proposed_execpolicy_amendment TEXT NULL,
738                network_approval_context TEXT NULL,
739                schema TEXT NULL,
740                available_decisions TEXT NOT NULL,
741                raw_payload TEXT NOT NULL,
742                created_at_ms INTEGER NOT NULL,
743                raw_json TEXT NOT NULL
744            );",
745        )?;
746
747        ensure_thread_index_schema(&conn)?;
748        migrate_legacy_workspaces(&conn)?;
749        ensure_column(
750            &conn,
751            "thread_index",
752            "runtime_id",
753            "TEXT NOT NULL DEFAULT 'primary'",
754        )?;
755        ensure_column(&conn, "thread_index", "note", "TEXT NULL")?;
756        ensure_column(
757            &conn,
758            "thread_index",
759            "archived",
760            "INTEGER NOT NULL DEFAULT 0",
761        )?;
762        ensure_column(&conn, "events", "runtime_id", "TEXT NULL")?;
763        ensure_column(
764            &conn,
765            "pending_approvals",
766            "runtime_id",
767            "TEXT NOT NULL DEFAULT 'primary'",
768        )?;
769        ensure_column(
770            &conn,
771            "pending_server_requests",
772            "runtime_id",
773            "TEXT NOT NULL DEFAULT 'primary'",
774        )?;
775
776        Ok(())
777    }
778}
779
780fn ensure_column(conn: &Connection, table: &str, column: &str, definition: &str) -> Result<()> {
781    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
782    let mut rows = stmt.query([])?;
783    while let Some(row) = rows.next()? {
784        let existing: String = row.get(1)?;
785        if existing == column {
786            return Ok(());
787        }
788    }
789
790    conn.execute_batch(&format!(
791        "ALTER TABLE {table} ADD COLUMN {column} {definition};"
792    ))?;
793    Ok(())
794}
795
796fn ensure_thread_index_schema(conn: &Connection) -> Result<()> {
797    if !table_exists(conn, "thread_index")? {
798        create_thread_index_table(conn)?;
799        return Ok(());
800    }
801
802    ensure_column(
803        conn,
804        "thread_index",
805        "runtime_id",
806        "TEXT NOT NULL DEFAULT 'primary'",
807    )?;
808    ensure_column(conn, "thread_index", "note", "TEXT NULL")?;
809    ensure_column(
810        conn,
811        "thread_index",
812        "archived",
813        "INTEGER NOT NULL DEFAULT 0",
814    )?;
815
816    if column_exists(conn, "thread_index", "workspace_id")? {
817        rebuild_thread_index_without_workspace(conn)?;
818    }
819
820    Ok(())
821}
822
823fn create_thread_index_table(conn: &Connection) -> Result<()> {
824    conn.execute_batch(
825        "CREATE TABLE IF NOT EXISTS thread_index (
826            thread_id TEXT PRIMARY KEY,
827            runtime_id TEXT NOT NULL DEFAULT 'primary',
828            name TEXT NULL,
829            note TEXT NULL,
830            preview TEXT NOT NULL,
831            cwd TEXT NOT NULL,
832            status TEXT NOT NULL,
833            model_provider TEXT NOT NULL,
834            source TEXT NOT NULL,
835            created_at_ms INTEGER NOT NULL,
836            updated_at_ms INTEGER NOT NULL,
837            is_loaded INTEGER NOT NULL,
838            is_active INTEGER NOT NULL,
839            archived INTEGER NOT NULL DEFAULT 0,
840            raw_json TEXT NOT NULL
841        );",
842    )?;
843    Ok(())
844}
845
846fn rebuild_thread_index_without_workspace(conn: &Connection) -> Result<()> {
847    conn.execute_batch(
848        "ALTER TABLE thread_index RENAME TO thread_index_legacy;
849
850        CREATE TABLE thread_index (
851            thread_id TEXT PRIMARY KEY,
852            runtime_id TEXT NOT NULL DEFAULT 'primary',
853            name TEXT NULL,
854            note TEXT NULL,
855            preview TEXT NOT NULL,
856            cwd TEXT NOT NULL,
857            status TEXT NOT NULL,
858            model_provider TEXT NOT NULL,
859            source TEXT NOT NULL,
860            created_at_ms INTEGER NOT NULL,
861            updated_at_ms INTEGER NOT NULL,
862            is_loaded INTEGER NOT NULL,
863            is_active INTEGER NOT NULL,
864            archived INTEGER NOT NULL DEFAULT 0,
865            raw_json TEXT NOT NULL
866        );
867
868        INSERT INTO thread_index (
869            thread_id, runtime_id, name, note, preview, cwd, status,
870            model_provider, source, created_at_ms, updated_at_ms, is_loaded,
871            is_active, archived, raw_json
872        )
873        SELECT
874            thread_id,
875            COALESCE(runtime_id, 'primary'),
876            name,
877            note,
878            preview,
879            cwd,
880            status,
881            model_provider,
882            source,
883            created_at_ms,
884            updated_at_ms,
885            is_loaded,
886            is_active,
887            archived,
888            raw_json
889        FROM thread_index_legacy;
890
891        DROP TABLE thread_index_legacy;",
892    )?;
893    Ok(())
894}
895
896fn migrate_legacy_workspaces(conn: &Connection) -> Result<()> {
897    if !table_exists(conn, "workspaces")? {
898        return Ok(());
899    }
900
901    conn.execute_batch(
902        "INSERT INTO directory_bookmarks (path, display_name, created_at_ms, updated_at_ms)
903         SELECT root_path, display_name, created_at_ms, updated_at_ms
904         FROM workspaces
905         ON CONFLICT(path) DO NOTHING;
906
907         DROP TABLE workspaces;",
908    )?;
909    Ok(())
910}
911
912fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
913    let exists = conn
914        .query_row(
915            "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 LIMIT 1",
916            params![table],
917            |_| Ok(()),
918        )
919        .optional()?
920        .is_some();
921    Ok(exists)
922}
923
924fn column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
925    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
926    let mut rows = stmt.query([])?;
927    while let Some(row) = rows.next()? {
928        let existing: String = row.get(1)?;
929        if existing == column {
930            return Ok(true);
931        }
932    }
933    Ok(false)
934}
935
936#[cfg(test)]
937mod tests {
938    use std::env;
939    use std::fs;
940
941    use super::Storage;
942
943    #[test]
944    fn ensure_primary_runtime_refreshes_existing_binary() {
945        let base_dir =
946            env::temp_dir().join(format!("codex-mobile-storage-test-{}", std::process::id()));
947        fs::create_dir_all(&base_dir).expect("创建测试目录失败");
948        let db_path = base_dir.join("bridge.db");
949        let storage = Storage::open(db_path).expect("打开存储失败");
950
951        let initial = storage
952            .ensure_primary_runtime(None, "codex".to_string())
953            .expect("创建 primary runtime 失败");
954        assert_eq!(initial.codex_binary, "codex");
955
956        let refreshed = storage
957            .ensure_primary_runtime(None, "/home/test/.npm-global/bin/codex".to_string())
958            .expect("刷新 primary runtime 失败");
959        assert_eq!(refreshed.codex_binary, "/home/test/.npm-global/bin/codex");
960    }
961}
962
963fn decode_json_row<T: DeserializeOwned>(raw: String) -> rusqlite::Result<T> {
964    serde_json::from_str(&raw)
965        .map_err(|error| rusqlite::Error::FromSqlConversionFailure(0, Type::Text, Box::new(error)))
966}
967
968fn decode_thread_row(
969    raw: String,
970    note: Option<String>,
971    archived: i64,
972) -> rusqlite::Result<ThreadSummary> {
973    let mut thread: ThreadSummary = decode_json_row(raw)?;
974    thread.note = note;
975    thread.archived = archived != 0;
976    Ok(thread)
977}