Skip to main content

codex_mobile_bridge/
storage.rs

1use std::fs;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, Result};
5use rusqlite::types::Type;
6use rusqlite::{Connection, OptionalExtension, params};
7use serde::de::DeserializeOwned;
8
9use crate::bridge_protocol::{
10    PendingServerRequestRecord, PersistedEvent, RuntimeRecord, ThreadSummary, WorkspaceRecord,
11    now_millis,
12};
13use crate::workspace::{build_workspace_id, canonicalize_directory};
14
15pub const PRIMARY_RUNTIME_ID: &str = "primary";
16
17#[derive(Debug, Clone)]
18pub struct Storage {
19    db_path: PathBuf,
20}
21
22impl Storage {
23    pub fn open(db_path: PathBuf) -> Result<Self> {
24        if let Some(parent) = db_path.parent() {
25            fs::create_dir_all(parent)
26                .with_context(|| format!("创建数据库目录失败: {}", parent.display()))?;
27        }
28
29        let storage = Self { db_path };
30        storage.migrate()?;
31        storage.clear_pending_requests()?;
32        storage.clear_legacy_pending_approvals()?;
33        Ok(storage)
34    }
35
36    pub fn ensure_primary_runtime(
37        &self,
38        codex_home: Option<String>,
39        codex_binary: String,
40    ) -> Result<RuntimeRecord> {
41        if let Some(existing) = self.get_runtime(PRIMARY_RUNTIME_ID)? {
42            let desired_home = codex_home.or(existing.codex_home.clone());
43            let desired_binary = if codex_binary.trim().is_empty() {
44                existing.codex_binary.clone()
45            } else {
46                codex_binary
47            };
48            let needs_update = existing.codex_home != desired_home
49                || existing.codex_binary != desired_binary
50                || !existing.is_primary
51                || !existing.auto_start;
52
53            if !needs_update {
54                return Ok(existing);
55            }
56
57            let updated = RuntimeRecord {
58                codex_home: desired_home,
59                codex_binary: desired_binary,
60                is_primary: true,
61                auto_start: true,
62                updated_at_ms: now_millis(),
63                ..existing
64            };
65            self.upsert_runtime(&updated)?;
66            return Ok(updated);
67        }
68
69        let now = now_millis();
70        let record = RuntimeRecord {
71            runtime_id: PRIMARY_RUNTIME_ID.to_string(),
72            display_name: "Primary".to_string(),
73            codex_home,
74            codex_binary,
75            is_primary: true,
76            auto_start: true,
77            created_at_ms: now,
78            updated_at_ms: now,
79        };
80        self.upsert_runtime(&record)?;
81        Ok(record)
82    }
83
84    pub fn list_runtimes(&self) -> Result<Vec<RuntimeRecord>> {
85        let conn = self.connect()?;
86        let mut stmt = conn.prepare(
87            "SELECT raw_json
88             FROM runtimes
89             ORDER BY is_primary DESC, created_at_ms ASC",
90        )?;
91
92        let rows = stmt.query_map([], |row| {
93            let raw: String = row.get(0)?;
94            decode_json_row(raw)
95        })?;
96
97        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
98    }
99
100    pub fn get_runtime(&self, runtime_id: &str) -> Result<Option<RuntimeRecord>> {
101        let conn = self.connect()?;
102        let record = conn
103            .query_row(
104                "SELECT raw_json FROM runtimes WHERE runtime_id = ?1",
105                params![runtime_id],
106                |row| {
107                    let raw: String = row.get(0)?;
108                    decode_json_row(raw)
109                },
110            )
111            .optional()?;
112        Ok(record)
113    }
114
115    pub fn upsert_runtime(&self, runtime: &RuntimeRecord) -> Result<()> {
116        let conn = self.connect()?;
117        conn.execute(
118            "INSERT INTO runtimes (
119                 runtime_id, display_name, codex_home, codex_binary, is_primary,
120                 auto_start, created_at_ms, updated_at_ms, raw_json
121             )
122             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
123             ON CONFLICT(runtime_id) DO UPDATE SET
124                 display_name = excluded.display_name,
125                 codex_home = excluded.codex_home,
126                 codex_binary = excluded.codex_binary,
127                 is_primary = excluded.is_primary,
128                 auto_start = excluded.auto_start,
129                 created_at_ms = excluded.created_at_ms,
130                 updated_at_ms = excluded.updated_at_ms,
131                 raw_json = excluded.raw_json",
132            params![
133                runtime.runtime_id,
134                runtime.display_name,
135                runtime.codex_home,
136                runtime.codex_binary,
137                if runtime.is_primary { 1_i64 } else { 0_i64 },
138                if runtime.auto_start { 1_i64 } else { 0_i64 },
139                runtime.created_at_ms,
140                runtime.updated_at_ms,
141                serde_json::to_string(runtime)?,
142            ],
143        )?;
144        Ok(())
145    }
146
147    pub fn remove_runtime(&self, runtime_id: &str) -> Result<()> {
148        let conn = self.connect()?;
149        conn.execute(
150            "DELETE FROM runtimes WHERE runtime_id = ?1",
151            params![runtime_id],
152        )?;
153        Ok(())
154    }
155
156    pub fn list_workspaces(&self) -> Result<Vec<WorkspaceRecord>> {
157        let conn = self.connect()?;
158        let mut stmt = conn.prepare(
159            "SELECT id, display_name, root_path, trusted, created_at_ms, updated_at_ms
160             FROM workspaces
161             ORDER BY display_name COLLATE NOCASE ASC",
162        )?;
163
164        let rows = stmt.query_map([], |row| {
165            Ok(WorkspaceRecord {
166                id: row.get(0)?,
167                display_name: row.get(1)?,
168                root_path: row.get(2)?,
169                trusted: row.get::<_, i64>(3)? != 0,
170                created_at_ms: row.get(4)?,
171                updated_at_ms: row.get(5)?,
172            })
173        })?;
174
175        let workspaces = rows.collect::<rusqlite::Result<Vec<_>>>()?;
176        Ok(workspaces)
177    }
178
179    pub fn get_workspace(&self, workspace_id: &str) -> Result<Option<WorkspaceRecord>> {
180        let conn = self.connect()?;
181        let workspace = conn
182            .query_row(
183                "SELECT id, display_name, root_path, trusted, created_at_ms, updated_at_ms
184                 FROM workspaces
185                 WHERE id = ?1",
186                params![workspace_id],
187                |row| {
188                    Ok(WorkspaceRecord {
189                        id: row.get(0)?,
190                        display_name: row.get(1)?,
191                        root_path: row.get(2)?,
192                        trusted: row.get::<_, i64>(3)? != 0,
193                        created_at_ms: row.get(4)?,
194                        updated_at_ms: row.get(5)?,
195                    })
196                },
197            )
198            .optional()?;
199
200        Ok(workspace)
201    }
202
203    pub fn upsert_workspace(
204        &self,
205        display_name: &str,
206        root_path: &Path,
207        trusted: bool,
208    ) -> Result<WorkspaceRecord> {
209        let canonical = canonicalize_directory(root_path)?;
210        let now = now_millis();
211        let conn = self.connect()?;
212        let existing = conn
213            .query_row(
214                "SELECT id, created_at_ms FROM workspaces WHERE root_path = ?1",
215                params![canonical.to_string_lossy().to_string()],
216                |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
217            )
218            .optional()?;
219
220        let (id, created_at_ms) = existing.unwrap_or_else(|| (build_workspace_id(&canonical), now));
221        conn.execute(
222            "INSERT INTO workspaces (id, display_name, root_path, trusted, created_at_ms, updated_at_ms)
223             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
224             ON CONFLICT(id) DO UPDATE SET
225                 display_name = excluded.display_name,
226                 root_path = excluded.root_path,
227                 trusted = excluded.trusted,
228                 updated_at_ms = excluded.updated_at_ms",
229            params![
230                id,
231                display_name,
232                canonical.to_string_lossy().to_string(),
233                if trusted { 1_i64 } else { 0_i64 },
234                created_at_ms,
235                now
236            ],
237        )?;
238
239        Ok(WorkspaceRecord {
240            id,
241            display_name: display_name.to_string(),
242            root_path: canonical.to_string_lossy().to_string(),
243            trusted,
244            created_at_ms,
245            updated_at_ms: now,
246        })
247    }
248
249    pub fn append_event(
250        &self,
251        event_type: &str,
252        runtime_id: Option<&str>,
253        thread_id: Option<&str>,
254        payload: &serde_json::Value,
255    ) -> Result<PersistedEvent> {
256        let now = now_millis();
257        let conn = self.connect()?;
258        conn.execute(
259            "INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
260             VALUES (?1, ?2, ?3, ?4, ?5)",
261            params![
262                event_type,
263                runtime_id,
264                thread_id,
265                serde_json::to_string(payload)?,
266                now
267            ],
268        )?;
269
270        let seq = conn.last_insert_rowid();
271        Ok(PersistedEvent {
272            seq,
273            event_type: event_type.to_string(),
274            runtime_id: runtime_id.map(ToOwned::to_owned),
275            thread_id: thread_id.map(ToOwned::to_owned),
276            payload: payload.clone(),
277            created_at_ms: now,
278        })
279    }
280
281    pub fn replay_events_after(&self, last_seq: i64) -> Result<Vec<PersistedEvent>> {
282        let conn = self.connect()?;
283        let mut stmt = conn.prepare(
284            "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
285             FROM events
286             WHERE seq > ?1
287             ORDER BY seq ASC",
288        )?;
289
290        let rows = stmt.query_map(params![last_seq], |row| {
291            Ok(PersistedEvent {
292                seq: row.get(0)?,
293                event_type: row.get(1)?,
294                runtime_id: row.get(2)?,
295                thread_id: row.get(3)?,
296                payload: decode_json_row(row.get::<_, String>(4)?)?,
297                created_at_ms: row.get(5)?,
298            })
299        })?;
300
301        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
302    }
303
304    pub fn load_thread_events(&self, thread_id: &str) -> Result<Vec<PersistedEvent>> {
305        let conn = self.connect()?;
306        let mut stmt = conn.prepare(
307            "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
308             FROM events
309             WHERE thread_id = ?1
310             ORDER BY seq ASC",
311        )?;
312
313        let rows = stmt.query_map(params![thread_id], |row| {
314            Ok(PersistedEvent {
315                seq: row.get(0)?,
316                event_type: row.get(1)?,
317                runtime_id: row.get(2)?,
318                thread_id: row.get(3)?,
319                payload: decode_json_row(row.get::<_, String>(4)?)?,
320                created_at_ms: row.get(5)?,
321            })
322        })?;
323
324        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
325    }
326
327    pub fn save_mobile_session_ack(&self, device_id: &str, last_ack_seq: i64) -> Result<()> {
328        let conn = self.connect()?;
329        let now = now_millis();
330        conn.execute(
331            "INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
332             VALUES (?1, ?2, ?3)
333             ON CONFLICT(device_id) DO UPDATE SET
334                 last_ack_seq = excluded.last_ack_seq,
335                 updated_at_ms = excluded.updated_at_ms",
336            params![device_id, last_ack_seq, now],
337        )?;
338        Ok(())
339    }
340
341    pub fn get_mobile_session_ack(&self, device_id: &str) -> Result<Option<i64>> {
342        let conn = self.connect()?;
343        let value = conn
344            .query_row(
345                "SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
346                params![device_id],
347                |row| row.get(0),
348            )
349            .optional()?;
350        Ok(value)
351    }
352
353    pub fn upsert_thread_index(&self, thread: &ThreadSummary) -> Result<()> {
354        let conn = self.connect()?;
355        conn.execute(
356            "INSERT INTO thread_index (
357                 thread_id, runtime_id, workspace_id, name, note, preview, cwd, status,
358                 model_provider, source, created_at_ms, updated_at_ms, is_loaded, is_active,
359                 archived, raw_json
360             )
361             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
362             ON CONFLICT(thread_id) DO UPDATE SET
363                 runtime_id = excluded.runtime_id,
364                 workspace_id = excluded.workspace_id,
365                 name = excluded.name,
366                 note = COALESCE(excluded.note, thread_index.note),
367                 preview = excluded.preview,
368                 cwd = excluded.cwd,
369                 status = excluded.status,
370                 model_provider = excluded.model_provider,
371                 source = excluded.source,
372                 created_at_ms = excluded.created_at_ms,
373                 updated_at_ms = excluded.updated_at_ms,
374                 is_loaded = excluded.is_loaded,
375                 is_active = excluded.is_active,
376                 archived = excluded.archived,
377                 raw_json = excluded.raw_json",
378            params![
379                thread.id,
380                thread.runtime_id,
381                thread.workspace_id,
382                thread.name,
383                thread.note,
384                thread.preview,
385                thread.cwd,
386                thread.status,
387                thread.model_provider,
388                thread.source,
389                thread.created_at,
390                thread.updated_at,
391                if thread.is_loaded { 1_i64 } else { 0_i64 },
392                if thread.is_active { 1_i64 } else { 0_i64 },
393                if thread.archived { 1_i64 } else { 0_i64 },
394                serde_json::to_string(thread)?
395            ],
396        )?;
397        Ok(())
398    }
399
400    pub fn get_thread_index(&self, thread_id: &str) -> Result<Option<ThreadSummary>> {
401        let conn = self.connect()?;
402        let record = conn
403            .query_row(
404                "SELECT raw_json, note, archived FROM thread_index WHERE thread_id = ?1",
405                params![thread_id],
406                |row| {
407                    decode_thread_row(
408                        row.get::<_, String>(0)?,
409                        row.get::<_, Option<String>>(1)?,
410                        row.get::<_, i64>(2)?,
411                    )
412                },
413            )
414            .optional()?;
415        Ok(record)
416    }
417
418    pub fn list_thread_index(
419        &self,
420        workspace_id: Option<&str>,
421        runtime_id: Option<&str>,
422        archived: Option<bool>,
423        search_term: Option<&str>,
424    ) -> Result<Vec<ThreadSummary>> {
425        let conn = self.connect()?;
426        let mut sql = String::from(
427            "SELECT raw_json, note, archived
428             FROM thread_index",
429        );
430        let mut clauses = Vec::new();
431        let mut values = Vec::new();
432
433        if let Some(workspace_id) = workspace_id {
434            clauses.push("workspace_id = ?");
435            values.push(rusqlite::types::Value::from(workspace_id.to_string()));
436        }
437
438        if let Some(runtime_id) = runtime_id {
439            clauses.push("runtime_id = ?");
440            values.push(rusqlite::types::Value::from(runtime_id.to_string()));
441        }
442
443        if let Some(archived) = archived {
444            clauses.push("archived = ?");
445            values.push(rusqlite::types::Value::from(if archived {
446                1_i64
447            } else {
448                0_i64
449            }));
450        }
451
452        if let Some(search_term) = search_term.filter(|value| !value.trim().is_empty()) {
453            clauses.push(
454                "(LOWER(COALESCE(name, '')) LIKE ? OR LOWER(preview) LIKE ? OR \
455                 LOWER(cwd) LIKE ? OR LOWER(COALESCE(note, '')) LIKE ?)",
456            );
457            let pattern = format!("%{}%", search_term.trim().to_lowercase());
458            values.push(rusqlite::types::Value::from(pattern.clone()));
459            values.push(rusqlite::types::Value::from(pattern.clone()));
460            values.push(rusqlite::types::Value::from(pattern.clone()));
461            values.push(rusqlite::types::Value::from(pattern));
462        }
463
464        if !clauses.is_empty() {
465            sql.push_str(" WHERE ");
466            sql.push_str(&clauses.join(" AND "));
467        }
468        sql.push_str(" ORDER BY updated_at_ms DESC");
469
470        let mut stmt = conn.prepare(&sql)?;
471        let rows = stmt.query_map(rusqlite::params_from_iter(values), |row| {
472            decode_thread_row(
473                row.get::<_, String>(0)?,
474                row.get::<_, Option<String>>(1)?,
475                row.get::<_, i64>(2)?,
476            )
477        })?;
478
479        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
480    }
481
482    pub fn save_thread_note(&self, thread_id: &str, note: Option<&str>) -> Result<()> {
483        let conn = self.connect()?;
484        conn.execute(
485            "UPDATE thread_index
486             SET note = ?2
487             WHERE thread_id = ?1",
488            params![thread_id, note],
489        )?;
490        Ok(())
491    }
492
493    pub fn set_thread_archived(&self, thread_id: &str, archived: bool) -> Result<()> {
494        let conn = self.connect()?;
495        conn.execute(
496            "UPDATE thread_index
497             SET archived = ?2
498             WHERE thread_id = ?1",
499            params![thread_id, if archived { 1_i64 } else { 0_i64 }],
500        )?;
501        Ok(())
502    }
503
504    pub fn put_pending_request(&self, request: &PendingServerRequestRecord) -> Result<()> {
505        let conn = self.connect()?;
506        conn.execute(
507            "INSERT INTO pending_server_requests (
508                 request_id, runtime_id, request_type, thread_id, turn_id, item_id, title,
509                 reason, command, cwd, grant_root, tool_name, arguments, questions,
510                 proposed_execpolicy_amendment, network_approval_context, schema,
511                 available_decisions, raw_payload, created_at_ms, raw_json
512             )
513             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21)
514             ON CONFLICT(request_id) DO UPDATE SET
515                 runtime_id = excluded.runtime_id,
516                 request_type = excluded.request_type,
517                 thread_id = excluded.thread_id,
518                 turn_id = excluded.turn_id,
519                 item_id = excluded.item_id,
520                 title = excluded.title,
521                 reason = excluded.reason,
522                 command = excluded.command,
523                 cwd = excluded.cwd,
524                 grant_root = excluded.grant_root,
525                 tool_name = excluded.tool_name,
526                 arguments = excluded.arguments,
527                 questions = excluded.questions,
528                 proposed_execpolicy_amendment = excluded.proposed_execpolicy_amendment,
529                 network_approval_context = excluded.network_approval_context,
530                 schema = excluded.schema,
531                 available_decisions = excluded.available_decisions,
532                 raw_payload = excluded.raw_payload,
533                 created_at_ms = excluded.created_at_ms,
534                 raw_json = excluded.raw_json",
535            params![
536                request.request_id,
537                request.runtime_id,
538                request.request_type,
539                request.thread_id,
540                request.turn_id,
541                request.item_id,
542                request.title,
543                request.reason,
544                request.command,
545                request.cwd,
546                request.grant_root,
547                request.tool_name,
548                request.arguments.as_ref().map(serde_json::to_string).transpose()?,
549                serde_json::to_string(&request.questions)?,
550                request
551                    .proposed_execpolicy_amendment
552                    .as_ref()
553                    .map(serde_json::to_string)
554                    .transpose()?,
555                request
556                    .network_approval_context
557                    .as_ref()
558                    .map(serde_json::to_string)
559                    .transpose()?,
560                request.schema.as_ref().map(serde_json::to_string).transpose()?,
561                serde_json::to_string(&request.available_decisions)?,
562                serde_json::to_string(&request.raw_payload)?,
563                request.created_at_ms,
564                serde_json::to_string(request)?
565            ],
566        )?;
567        Ok(())
568    }
569
570    pub fn get_pending_request(
571        &self,
572        request_id: &str,
573    ) -> Result<Option<PendingServerRequestRecord>> {
574        let conn = self.connect()?;
575        let record = conn
576            .query_row(
577                "SELECT raw_json FROM pending_server_requests WHERE request_id = ?1",
578                params![request_id],
579                |row| {
580                    let raw: String = row.get(0)?;
581                    decode_json_row(raw)
582                },
583            )
584            .optional()?;
585        Ok(record)
586    }
587
588    pub fn list_pending_requests(&self) -> Result<Vec<PendingServerRequestRecord>> {
589        let conn = self.connect()?;
590        let mut stmt = conn.prepare(
591            "SELECT raw_json
592             FROM pending_server_requests
593             ORDER BY created_at_ms ASC",
594        )?;
595
596        let rows = stmt.query_map([], |row| {
597            let raw: String = row.get(0)?;
598            decode_json_row(raw)
599        })?;
600
601        Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
602    }
603
604    pub fn remove_pending_request(&self, request_id: &str) -> Result<()> {
605        let conn = self.connect()?;
606        conn.execute(
607            "DELETE FROM pending_server_requests WHERE request_id = ?1",
608            params![request_id],
609        )?;
610        Ok(())
611    }
612
613    pub fn clear_pending_requests(&self) -> Result<()> {
614        let conn = self.connect()?;
615        conn.execute("DELETE FROM pending_server_requests", [])?;
616        Ok(())
617    }
618
619    fn clear_legacy_pending_approvals(&self) -> Result<()> {
620        let conn = self.connect()?;
621        conn.execute("DELETE FROM pending_approvals", [])?;
622        Ok(())
623    }
624
625    fn connect(&self) -> Result<Connection> {
626        let conn = Connection::open(&self.db_path)
627            .with_context(|| format!("打开数据库失败: {}", self.db_path.display()))?;
628        conn.execute_batch(
629            "PRAGMA foreign_keys = ON;
630             PRAGMA journal_mode = WAL;",
631        )?;
632        Ok(conn)
633    }
634
635    fn migrate(&self) -> Result<()> {
636        let conn = self.connect()?;
637        conn.execute_batch(
638            "CREATE TABLE IF NOT EXISTS workspaces (
639                id TEXT PRIMARY KEY,
640                display_name TEXT NOT NULL,
641                root_path TEXT NOT NULL UNIQUE,
642                trusted INTEGER NOT NULL,
643                created_at_ms INTEGER NOT NULL,
644                updated_at_ms INTEGER NOT NULL
645            );
646
647            CREATE TABLE IF NOT EXISTS runtimes (
648                runtime_id TEXT PRIMARY KEY,
649                display_name TEXT NOT NULL,
650                codex_home TEXT NULL,
651                codex_binary TEXT NOT NULL,
652                is_primary INTEGER NOT NULL,
653                auto_start INTEGER NOT NULL,
654                created_at_ms INTEGER NOT NULL,
655                updated_at_ms INTEGER NOT NULL,
656                raw_json TEXT NOT NULL
657            );
658
659            CREATE TABLE IF NOT EXISTS thread_index (
660                thread_id TEXT PRIMARY KEY,
661                runtime_id TEXT NOT NULL DEFAULT 'primary',
662                workspace_id TEXT NULL,
663                name TEXT NULL,
664                note TEXT NULL,
665                preview TEXT NOT NULL,
666                cwd TEXT NOT NULL,
667                status TEXT NOT NULL,
668                model_provider TEXT NOT NULL,
669                source TEXT NOT NULL,
670                created_at_ms INTEGER NOT NULL,
671                updated_at_ms INTEGER NOT NULL,
672                is_loaded INTEGER NOT NULL,
673                is_active INTEGER NOT NULL,
674                archived INTEGER NOT NULL DEFAULT 0,
675                raw_json TEXT NOT NULL
676            );
677
678            CREATE TABLE IF NOT EXISTS mobile_sessions (
679                device_id TEXT PRIMARY KEY,
680                last_ack_seq INTEGER NOT NULL,
681                updated_at_ms INTEGER NOT NULL
682            );
683
684            CREATE TABLE IF NOT EXISTS events (
685                seq INTEGER PRIMARY KEY AUTOINCREMENT,
686                event_type TEXT NOT NULL,
687                runtime_id TEXT NULL,
688                thread_id TEXT NULL,
689                payload TEXT NOT NULL,
690                created_at_ms INTEGER NOT NULL
691            );
692
693            CREATE TABLE IF NOT EXISTS pending_approvals (
694                approval_id TEXT PRIMARY KEY,
695                runtime_id TEXT NOT NULL DEFAULT 'primary',
696                thread_id TEXT NOT NULL,
697                turn_id TEXT NOT NULL,
698                item_id TEXT NOT NULL,
699                kind TEXT NOT NULL,
700                reason TEXT NULL,
701                command TEXT NULL,
702                cwd TEXT NULL,
703                grant_root TEXT NULL,
704                available_decisions TEXT NOT NULL,
705                created_at_ms INTEGER NOT NULL,
706                raw_json TEXT NOT NULL
707            );
708
709            CREATE TABLE IF NOT EXISTS pending_server_requests (
710                request_id TEXT PRIMARY KEY,
711                runtime_id TEXT NOT NULL DEFAULT 'primary',
712                request_type TEXT NOT NULL,
713                thread_id TEXT NULL,
714                turn_id TEXT NULL,
715                item_id TEXT NULL,
716                title TEXT NULL,
717                reason TEXT NULL,
718                command TEXT NULL,
719                cwd TEXT NULL,
720                grant_root TEXT NULL,
721                tool_name TEXT NULL,
722                arguments TEXT NULL,
723                questions TEXT NOT NULL,
724                proposed_execpolicy_amendment TEXT NULL,
725                network_approval_context TEXT NULL,
726                schema TEXT NULL,
727                available_decisions TEXT NOT NULL,
728                raw_payload TEXT NOT NULL,
729                created_at_ms INTEGER NOT NULL,
730                raw_json TEXT NOT NULL
731            );",
732        )?;
733
734        ensure_column(
735            &conn,
736            "thread_index",
737            "runtime_id",
738            "TEXT NOT NULL DEFAULT 'primary'",
739        )?;
740        ensure_column(&conn, "thread_index", "note", "TEXT NULL")?;
741        ensure_column(
742            &conn,
743            "thread_index",
744            "archived",
745            "INTEGER NOT NULL DEFAULT 0",
746        )?;
747        ensure_column(&conn, "events", "runtime_id", "TEXT NULL")?;
748        ensure_column(
749            &conn,
750            "pending_approvals",
751            "runtime_id",
752            "TEXT NOT NULL DEFAULT 'primary'",
753        )?;
754        ensure_column(
755            &conn,
756            "pending_server_requests",
757            "runtime_id",
758            "TEXT NOT NULL DEFAULT 'primary'",
759        )?;
760
761        Ok(())
762    }
763}
764
765fn ensure_column(conn: &Connection, table: &str, column: &str, definition: &str) -> Result<()> {
766    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
767    let mut rows = stmt.query([])?;
768    while let Some(row) = rows.next()? {
769        let existing: String = row.get(1)?;
770        if existing == column {
771            return Ok(());
772        }
773    }
774
775    conn.execute_batch(&format!(
776        "ALTER TABLE {table} ADD COLUMN {column} {definition};"
777    ))?;
778    Ok(())
779}
780
781#[cfg(test)]
782mod tests {
783    use std::env;
784    use std::fs;
785
786    use super::Storage;
787
788    #[test]
789    fn ensure_primary_runtime_refreshes_existing_binary() {
790        let base_dir =
791            env::temp_dir().join(format!("codex-mobile-storage-test-{}", std::process::id()));
792        fs::create_dir_all(&base_dir).expect("创建测试目录失败");
793        let db_path = base_dir.join("bridge.db");
794        let storage = Storage::open(db_path).expect("打开存储失败");
795
796        let initial = storage
797            .ensure_primary_runtime(None, "codex".to_string())
798            .expect("创建 primary runtime 失败");
799        assert_eq!(initial.codex_binary, "codex");
800
801        let refreshed = storage
802            .ensure_primary_runtime(None, "/home/test/.npm-global/bin/codex".to_string())
803            .expect("刷新 primary runtime 失败");
804        assert_eq!(refreshed.codex_binary, "/home/test/.npm-global/bin/codex");
805    }
806}
807
808fn decode_json_row<T: DeserializeOwned>(raw: String) -> rusqlite::Result<T> {
809    serde_json::from_str(&raw)
810        .map_err(|error| rusqlite::Error::FromSqlConversionFailure(0, Type::Text, Box::new(error)))
811}
812
813fn decode_thread_row(
814    raw: String,
815    note: Option<String>,
816    archived: i64,
817) -> rusqlite::Result<ThreadSummary> {
818    let mut thread: ThreadSummary = decode_json_row(raw)?;
819    thread.note = note;
820    thread.archived = archived != 0;
821    Ok(thread)
822}