1use std::fs;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, Result};
5use rusqlite::types::Type;
6use rusqlite::{Connection, OptionalExtension, params};
7use serde::de::DeserializeOwned;
8
9use crate::bridge_protocol::{
10 PendingServerRequestRecord, PersistedEvent, RuntimeRecord, ThreadSummary, WorkspaceRecord,
11 now_millis,
12};
13use crate::workspace::{build_workspace_id, canonicalize_directory};
14
15pub const PRIMARY_RUNTIME_ID: &str = "primary";
16
17#[derive(Debug, Clone)]
18pub struct Storage {
19 db_path: PathBuf,
20}
21
22impl Storage {
23 pub fn open(db_path: PathBuf) -> Result<Self> {
24 if let Some(parent) = db_path.parent() {
25 fs::create_dir_all(parent)
26 .with_context(|| format!("创建数据库目录失败: {}", parent.display()))?;
27 }
28
29 let storage = Self { db_path };
30 storage.migrate()?;
31 storage.clear_pending_requests()?;
32 storage.clear_legacy_pending_approvals()?;
33 Ok(storage)
34 }
35
36 pub fn ensure_primary_runtime(
37 &self,
38 codex_home: Option<String>,
39 codex_binary: String,
40 ) -> Result<RuntimeRecord> {
41 if let Some(existing) = self.get_runtime(PRIMARY_RUNTIME_ID)? {
42 let desired_home = codex_home.or(existing.codex_home.clone());
43 let desired_binary = if codex_binary.trim().is_empty() {
44 existing.codex_binary.clone()
45 } else {
46 codex_binary
47 };
48 let needs_update = existing.codex_home != desired_home
49 || existing.codex_binary != desired_binary
50 || !existing.is_primary
51 || !existing.auto_start;
52
53 if !needs_update {
54 return Ok(existing);
55 }
56
57 let updated = RuntimeRecord {
58 codex_home: desired_home,
59 codex_binary: desired_binary,
60 is_primary: true,
61 auto_start: true,
62 updated_at_ms: now_millis(),
63 ..existing
64 };
65 self.upsert_runtime(&updated)?;
66 return Ok(updated);
67 }
68
69 let now = now_millis();
70 let record = RuntimeRecord {
71 runtime_id: PRIMARY_RUNTIME_ID.to_string(),
72 display_name: "Primary".to_string(),
73 codex_home,
74 codex_binary,
75 is_primary: true,
76 auto_start: true,
77 created_at_ms: now,
78 updated_at_ms: now,
79 };
80 self.upsert_runtime(&record)?;
81 Ok(record)
82 }
83
84 pub fn list_runtimes(&self) -> Result<Vec<RuntimeRecord>> {
85 let conn = self.connect()?;
86 let mut stmt = conn.prepare(
87 "SELECT raw_json
88 FROM runtimes
89 ORDER BY is_primary DESC, created_at_ms ASC",
90 )?;
91
92 let rows = stmt.query_map([], |row| {
93 let raw: String = row.get(0)?;
94 decode_json_row(raw)
95 })?;
96
97 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
98 }
99
100 pub fn get_runtime(&self, runtime_id: &str) -> Result<Option<RuntimeRecord>> {
101 let conn = self.connect()?;
102 let record = conn
103 .query_row(
104 "SELECT raw_json FROM runtimes WHERE runtime_id = ?1",
105 params![runtime_id],
106 |row| {
107 let raw: String = row.get(0)?;
108 decode_json_row(raw)
109 },
110 )
111 .optional()?;
112 Ok(record)
113 }
114
115 pub fn upsert_runtime(&self, runtime: &RuntimeRecord) -> Result<()> {
116 let conn = self.connect()?;
117 conn.execute(
118 "INSERT INTO runtimes (
119 runtime_id, display_name, codex_home, codex_binary, is_primary,
120 auto_start, created_at_ms, updated_at_ms, raw_json
121 )
122 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
123 ON CONFLICT(runtime_id) DO UPDATE SET
124 display_name = excluded.display_name,
125 codex_home = excluded.codex_home,
126 codex_binary = excluded.codex_binary,
127 is_primary = excluded.is_primary,
128 auto_start = excluded.auto_start,
129 created_at_ms = excluded.created_at_ms,
130 updated_at_ms = excluded.updated_at_ms,
131 raw_json = excluded.raw_json",
132 params![
133 runtime.runtime_id,
134 runtime.display_name,
135 runtime.codex_home,
136 runtime.codex_binary,
137 if runtime.is_primary { 1_i64 } else { 0_i64 },
138 if runtime.auto_start { 1_i64 } else { 0_i64 },
139 runtime.created_at_ms,
140 runtime.updated_at_ms,
141 serde_json::to_string(runtime)?,
142 ],
143 )?;
144 Ok(())
145 }
146
147 pub fn remove_runtime(&self, runtime_id: &str) -> Result<()> {
148 let conn = self.connect()?;
149 conn.execute(
150 "DELETE FROM runtimes WHERE runtime_id = ?1",
151 params![runtime_id],
152 )?;
153 Ok(())
154 }
155
156 pub fn list_workspaces(&self) -> Result<Vec<WorkspaceRecord>> {
157 let conn = self.connect()?;
158 let mut stmt = conn.prepare(
159 "SELECT id, display_name, root_path, trusted, created_at_ms, updated_at_ms
160 FROM workspaces
161 ORDER BY display_name COLLATE NOCASE ASC",
162 )?;
163
164 let rows = stmt.query_map([], |row| {
165 Ok(WorkspaceRecord {
166 id: row.get(0)?,
167 display_name: row.get(1)?,
168 root_path: row.get(2)?,
169 trusted: row.get::<_, i64>(3)? != 0,
170 created_at_ms: row.get(4)?,
171 updated_at_ms: row.get(5)?,
172 })
173 })?;
174
175 let workspaces = rows.collect::<rusqlite::Result<Vec<_>>>()?;
176 Ok(workspaces)
177 }
178
179 pub fn get_workspace(&self, workspace_id: &str) -> Result<Option<WorkspaceRecord>> {
180 let conn = self.connect()?;
181 let workspace = conn
182 .query_row(
183 "SELECT id, display_name, root_path, trusted, created_at_ms, updated_at_ms
184 FROM workspaces
185 WHERE id = ?1",
186 params![workspace_id],
187 |row| {
188 Ok(WorkspaceRecord {
189 id: row.get(0)?,
190 display_name: row.get(1)?,
191 root_path: row.get(2)?,
192 trusted: row.get::<_, i64>(3)? != 0,
193 created_at_ms: row.get(4)?,
194 updated_at_ms: row.get(5)?,
195 })
196 },
197 )
198 .optional()?;
199
200 Ok(workspace)
201 }
202
203 pub fn upsert_workspace(
204 &self,
205 display_name: &str,
206 root_path: &Path,
207 trusted: bool,
208 ) -> Result<WorkspaceRecord> {
209 let canonical = canonicalize_directory(root_path)?;
210 let now = now_millis();
211 let conn = self.connect()?;
212 let existing = conn
213 .query_row(
214 "SELECT id, created_at_ms FROM workspaces WHERE root_path = ?1",
215 params![canonical.to_string_lossy().to_string()],
216 |row| Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?)),
217 )
218 .optional()?;
219
220 let (id, created_at_ms) = existing.unwrap_or_else(|| (build_workspace_id(&canonical), now));
221 conn.execute(
222 "INSERT INTO workspaces (id, display_name, root_path, trusted, created_at_ms, updated_at_ms)
223 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
224 ON CONFLICT(id) DO UPDATE SET
225 display_name = excluded.display_name,
226 root_path = excluded.root_path,
227 trusted = excluded.trusted,
228 updated_at_ms = excluded.updated_at_ms",
229 params![
230 id,
231 display_name,
232 canonical.to_string_lossy().to_string(),
233 if trusted { 1_i64 } else { 0_i64 },
234 created_at_ms,
235 now
236 ],
237 )?;
238
239 Ok(WorkspaceRecord {
240 id,
241 display_name: display_name.to_string(),
242 root_path: canonical.to_string_lossy().to_string(),
243 trusted,
244 created_at_ms,
245 updated_at_ms: now,
246 })
247 }
248
249 pub fn append_event(
250 &self,
251 event_type: &str,
252 runtime_id: Option<&str>,
253 thread_id: Option<&str>,
254 payload: &serde_json::Value,
255 ) -> Result<PersistedEvent> {
256 let now = now_millis();
257 let conn = self.connect()?;
258 conn.execute(
259 "INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
260 VALUES (?1, ?2, ?3, ?4, ?5)",
261 params![
262 event_type,
263 runtime_id,
264 thread_id,
265 serde_json::to_string(payload)?,
266 now
267 ],
268 )?;
269
270 let seq = conn.last_insert_rowid();
271 Ok(PersistedEvent {
272 seq,
273 event_type: event_type.to_string(),
274 runtime_id: runtime_id.map(ToOwned::to_owned),
275 thread_id: thread_id.map(ToOwned::to_owned),
276 payload: payload.clone(),
277 created_at_ms: now,
278 })
279 }
280
281 pub fn replay_events_after(&self, last_seq: i64) -> Result<Vec<PersistedEvent>> {
282 let conn = self.connect()?;
283 let mut stmt = conn.prepare(
284 "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
285 FROM events
286 WHERE seq > ?1
287 ORDER BY seq ASC",
288 )?;
289
290 let rows = stmt.query_map(params![last_seq], |row| {
291 Ok(PersistedEvent {
292 seq: row.get(0)?,
293 event_type: row.get(1)?,
294 runtime_id: row.get(2)?,
295 thread_id: row.get(3)?,
296 payload: decode_json_row(row.get::<_, String>(4)?)?,
297 created_at_ms: row.get(5)?,
298 })
299 })?;
300
301 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
302 }
303
304 pub fn load_thread_events(&self, thread_id: &str) -> Result<Vec<PersistedEvent>> {
305 let conn = self.connect()?;
306 let mut stmt = conn.prepare(
307 "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
308 FROM events
309 WHERE thread_id = ?1
310 ORDER BY seq ASC",
311 )?;
312
313 let rows = stmt.query_map(params![thread_id], |row| {
314 Ok(PersistedEvent {
315 seq: row.get(0)?,
316 event_type: row.get(1)?,
317 runtime_id: row.get(2)?,
318 thread_id: row.get(3)?,
319 payload: decode_json_row(row.get::<_, String>(4)?)?,
320 created_at_ms: row.get(5)?,
321 })
322 })?;
323
324 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
325 }
326
327 pub fn save_mobile_session_ack(&self, device_id: &str, last_ack_seq: i64) -> Result<()> {
328 let conn = self.connect()?;
329 let now = now_millis();
330 conn.execute(
331 "INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
332 VALUES (?1, ?2, ?3)
333 ON CONFLICT(device_id) DO UPDATE SET
334 last_ack_seq = excluded.last_ack_seq,
335 updated_at_ms = excluded.updated_at_ms",
336 params![device_id, last_ack_seq, now],
337 )?;
338 Ok(())
339 }
340
341 pub fn get_mobile_session_ack(&self, device_id: &str) -> Result<Option<i64>> {
342 let conn = self.connect()?;
343 let value = conn
344 .query_row(
345 "SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
346 params![device_id],
347 |row| row.get(0),
348 )
349 .optional()?;
350 Ok(value)
351 }
352
353 pub fn upsert_thread_index(&self, thread: &ThreadSummary) -> Result<()> {
354 let conn = self.connect()?;
355 conn.execute(
356 "INSERT INTO thread_index (
357 thread_id, runtime_id, workspace_id, name, note, preview, cwd, status,
358 model_provider, source, created_at_ms, updated_at_ms, is_loaded, is_active,
359 archived, raw_json
360 )
361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)
362 ON CONFLICT(thread_id) DO UPDATE SET
363 runtime_id = excluded.runtime_id,
364 workspace_id = excluded.workspace_id,
365 name = excluded.name,
366 note = COALESCE(excluded.note, thread_index.note),
367 preview = excluded.preview,
368 cwd = excluded.cwd,
369 status = excluded.status,
370 model_provider = excluded.model_provider,
371 source = excluded.source,
372 created_at_ms = excluded.created_at_ms,
373 updated_at_ms = excluded.updated_at_ms,
374 is_loaded = excluded.is_loaded,
375 is_active = excluded.is_active,
376 archived = excluded.archived,
377 raw_json = excluded.raw_json",
378 params![
379 thread.id,
380 thread.runtime_id,
381 thread.workspace_id,
382 thread.name,
383 thread.note,
384 thread.preview,
385 thread.cwd,
386 thread.status,
387 thread.model_provider,
388 thread.source,
389 thread.created_at,
390 thread.updated_at,
391 if thread.is_loaded { 1_i64 } else { 0_i64 },
392 if thread.is_active { 1_i64 } else { 0_i64 },
393 if thread.archived { 1_i64 } else { 0_i64 },
394 serde_json::to_string(thread)?
395 ],
396 )?;
397 Ok(())
398 }
399
400 pub fn get_thread_index(&self, thread_id: &str) -> Result<Option<ThreadSummary>> {
401 let conn = self.connect()?;
402 let record = conn
403 .query_row(
404 "SELECT raw_json, note, archived FROM thread_index WHERE thread_id = ?1",
405 params![thread_id],
406 |row| {
407 decode_thread_row(
408 row.get::<_, String>(0)?,
409 row.get::<_, Option<String>>(1)?,
410 row.get::<_, i64>(2)?,
411 )
412 },
413 )
414 .optional()?;
415 Ok(record)
416 }
417
418 pub fn list_thread_index(
419 &self,
420 workspace_id: Option<&str>,
421 runtime_id: Option<&str>,
422 archived: Option<bool>,
423 search_term: Option<&str>,
424 ) -> Result<Vec<ThreadSummary>> {
425 let conn = self.connect()?;
426 let mut sql = String::from(
427 "SELECT raw_json, note, archived
428 FROM thread_index",
429 );
430 let mut clauses = Vec::new();
431 let mut values = Vec::new();
432
433 if let Some(workspace_id) = workspace_id {
434 clauses.push("workspace_id = ?");
435 values.push(rusqlite::types::Value::from(workspace_id.to_string()));
436 }
437
438 if let Some(runtime_id) = runtime_id {
439 clauses.push("runtime_id = ?");
440 values.push(rusqlite::types::Value::from(runtime_id.to_string()));
441 }
442
443 if let Some(archived) = archived {
444 clauses.push("archived = ?");
445 values.push(rusqlite::types::Value::from(if archived {
446 1_i64
447 } else {
448 0_i64
449 }));
450 }
451
452 if let Some(search_term) = search_term.filter(|value| !value.trim().is_empty()) {
453 clauses.push(
454 "(LOWER(COALESCE(name, '')) LIKE ? OR LOWER(preview) LIKE ? OR \
455 LOWER(cwd) LIKE ? OR LOWER(COALESCE(note, '')) LIKE ?)",
456 );
457 let pattern = format!("%{}%", search_term.trim().to_lowercase());
458 values.push(rusqlite::types::Value::from(pattern.clone()));
459 values.push(rusqlite::types::Value::from(pattern.clone()));
460 values.push(rusqlite::types::Value::from(pattern.clone()));
461 values.push(rusqlite::types::Value::from(pattern));
462 }
463
464 if !clauses.is_empty() {
465 sql.push_str(" WHERE ");
466 sql.push_str(&clauses.join(" AND "));
467 }
468 sql.push_str(" ORDER BY updated_at_ms DESC");
469
470 let mut stmt = conn.prepare(&sql)?;
471 let rows = stmt.query_map(rusqlite::params_from_iter(values), |row| {
472 decode_thread_row(
473 row.get::<_, String>(0)?,
474 row.get::<_, Option<String>>(1)?,
475 row.get::<_, i64>(2)?,
476 )
477 })?;
478
479 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
480 }
481
482 pub fn save_thread_note(&self, thread_id: &str, note: Option<&str>) -> Result<()> {
483 let conn = self.connect()?;
484 conn.execute(
485 "UPDATE thread_index
486 SET note = ?2
487 WHERE thread_id = ?1",
488 params![thread_id, note],
489 )?;
490 Ok(())
491 }
492
493 pub fn set_thread_archived(&self, thread_id: &str, archived: bool) -> Result<()> {
494 let conn = self.connect()?;
495 conn.execute(
496 "UPDATE thread_index
497 SET archived = ?2
498 WHERE thread_id = ?1",
499 params![thread_id, if archived { 1_i64 } else { 0_i64 }],
500 )?;
501 Ok(())
502 }
503
504 pub fn put_pending_request(&self, request: &PendingServerRequestRecord) -> Result<()> {
505 let conn = self.connect()?;
506 conn.execute(
507 "INSERT INTO pending_server_requests (
508 request_id, runtime_id, request_type, thread_id, turn_id, item_id, title,
509 reason, command, cwd, grant_root, tool_name, arguments, questions,
510 proposed_execpolicy_amendment, network_approval_context, schema,
511 available_decisions, raw_payload, created_at_ms, raw_json
512 )
513 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21)
514 ON CONFLICT(request_id) DO UPDATE SET
515 runtime_id = excluded.runtime_id,
516 request_type = excluded.request_type,
517 thread_id = excluded.thread_id,
518 turn_id = excluded.turn_id,
519 item_id = excluded.item_id,
520 title = excluded.title,
521 reason = excluded.reason,
522 command = excluded.command,
523 cwd = excluded.cwd,
524 grant_root = excluded.grant_root,
525 tool_name = excluded.tool_name,
526 arguments = excluded.arguments,
527 questions = excluded.questions,
528 proposed_execpolicy_amendment = excluded.proposed_execpolicy_amendment,
529 network_approval_context = excluded.network_approval_context,
530 schema = excluded.schema,
531 available_decisions = excluded.available_decisions,
532 raw_payload = excluded.raw_payload,
533 created_at_ms = excluded.created_at_ms,
534 raw_json = excluded.raw_json",
535 params![
536 request.request_id,
537 request.runtime_id,
538 request.request_type,
539 request.thread_id,
540 request.turn_id,
541 request.item_id,
542 request.title,
543 request.reason,
544 request.command,
545 request.cwd,
546 request.grant_root,
547 request.tool_name,
548 request.arguments.as_ref().map(serde_json::to_string).transpose()?,
549 serde_json::to_string(&request.questions)?,
550 request
551 .proposed_execpolicy_amendment
552 .as_ref()
553 .map(serde_json::to_string)
554 .transpose()?,
555 request
556 .network_approval_context
557 .as_ref()
558 .map(serde_json::to_string)
559 .transpose()?,
560 request.schema.as_ref().map(serde_json::to_string).transpose()?,
561 serde_json::to_string(&request.available_decisions)?,
562 serde_json::to_string(&request.raw_payload)?,
563 request.created_at_ms,
564 serde_json::to_string(request)?
565 ],
566 )?;
567 Ok(())
568 }
569
570 pub fn get_pending_request(
571 &self,
572 request_id: &str,
573 ) -> Result<Option<PendingServerRequestRecord>> {
574 let conn = self.connect()?;
575 let record = conn
576 .query_row(
577 "SELECT raw_json FROM pending_server_requests WHERE request_id = ?1",
578 params![request_id],
579 |row| {
580 let raw: String = row.get(0)?;
581 decode_json_row(raw)
582 },
583 )
584 .optional()?;
585 Ok(record)
586 }
587
588 pub fn list_pending_requests(&self) -> Result<Vec<PendingServerRequestRecord>> {
589 let conn = self.connect()?;
590 let mut stmt = conn.prepare(
591 "SELECT raw_json
592 FROM pending_server_requests
593 ORDER BY created_at_ms ASC",
594 )?;
595
596 let rows = stmt.query_map([], |row| {
597 let raw: String = row.get(0)?;
598 decode_json_row(raw)
599 })?;
600
601 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
602 }
603
604 pub fn remove_pending_request(&self, request_id: &str) -> Result<()> {
605 let conn = self.connect()?;
606 conn.execute(
607 "DELETE FROM pending_server_requests WHERE request_id = ?1",
608 params![request_id],
609 )?;
610 Ok(())
611 }
612
613 pub fn clear_pending_requests(&self) -> Result<()> {
614 let conn = self.connect()?;
615 conn.execute("DELETE FROM pending_server_requests", [])?;
616 Ok(())
617 }
618
619 fn clear_legacy_pending_approvals(&self) -> Result<()> {
620 let conn = self.connect()?;
621 conn.execute("DELETE FROM pending_approvals", [])?;
622 Ok(())
623 }
624
625 fn connect(&self) -> Result<Connection> {
626 let conn = Connection::open(&self.db_path)
627 .with_context(|| format!("打开数据库失败: {}", self.db_path.display()))?;
628 conn.execute_batch(
629 "PRAGMA foreign_keys = ON;
630 PRAGMA journal_mode = WAL;",
631 )?;
632 Ok(conn)
633 }
634
635 fn migrate(&self) -> Result<()> {
636 let conn = self.connect()?;
637 conn.execute_batch(
638 "CREATE TABLE IF NOT EXISTS workspaces (
639 id TEXT PRIMARY KEY,
640 display_name TEXT NOT NULL,
641 root_path TEXT NOT NULL UNIQUE,
642 trusted INTEGER NOT NULL,
643 created_at_ms INTEGER NOT NULL,
644 updated_at_ms INTEGER NOT NULL
645 );
646
647 CREATE TABLE IF NOT EXISTS runtimes (
648 runtime_id TEXT PRIMARY KEY,
649 display_name TEXT NOT NULL,
650 codex_home TEXT NULL,
651 codex_binary TEXT NOT NULL,
652 is_primary INTEGER NOT NULL,
653 auto_start INTEGER NOT NULL,
654 created_at_ms INTEGER NOT NULL,
655 updated_at_ms INTEGER NOT NULL,
656 raw_json TEXT NOT NULL
657 );
658
659 CREATE TABLE IF NOT EXISTS thread_index (
660 thread_id TEXT PRIMARY KEY,
661 runtime_id TEXT NOT NULL DEFAULT 'primary',
662 workspace_id TEXT NULL,
663 name TEXT NULL,
664 note TEXT NULL,
665 preview TEXT NOT NULL,
666 cwd TEXT NOT NULL,
667 status TEXT NOT NULL,
668 model_provider TEXT NOT NULL,
669 source TEXT NOT NULL,
670 created_at_ms INTEGER NOT NULL,
671 updated_at_ms INTEGER NOT NULL,
672 is_loaded INTEGER NOT NULL,
673 is_active INTEGER NOT NULL,
674 archived INTEGER NOT NULL DEFAULT 0,
675 raw_json TEXT NOT NULL
676 );
677
678 CREATE TABLE IF NOT EXISTS mobile_sessions (
679 device_id TEXT PRIMARY KEY,
680 last_ack_seq INTEGER NOT NULL,
681 updated_at_ms INTEGER NOT NULL
682 );
683
684 CREATE TABLE IF NOT EXISTS events (
685 seq INTEGER PRIMARY KEY AUTOINCREMENT,
686 event_type TEXT NOT NULL,
687 runtime_id TEXT NULL,
688 thread_id TEXT NULL,
689 payload TEXT NOT NULL,
690 created_at_ms INTEGER NOT NULL
691 );
692
693 CREATE TABLE IF NOT EXISTS pending_approvals (
694 approval_id TEXT PRIMARY KEY,
695 runtime_id TEXT NOT NULL DEFAULT 'primary',
696 thread_id TEXT NOT NULL,
697 turn_id TEXT NOT NULL,
698 item_id TEXT NOT NULL,
699 kind TEXT NOT NULL,
700 reason TEXT NULL,
701 command TEXT NULL,
702 cwd TEXT NULL,
703 grant_root TEXT NULL,
704 available_decisions TEXT NOT NULL,
705 created_at_ms INTEGER NOT NULL,
706 raw_json TEXT NOT NULL
707 );
708
709 CREATE TABLE IF NOT EXISTS pending_server_requests (
710 request_id TEXT PRIMARY KEY,
711 runtime_id TEXT NOT NULL DEFAULT 'primary',
712 request_type TEXT NOT NULL,
713 thread_id TEXT NULL,
714 turn_id TEXT NULL,
715 item_id TEXT NULL,
716 title TEXT NULL,
717 reason TEXT NULL,
718 command TEXT NULL,
719 cwd TEXT NULL,
720 grant_root TEXT NULL,
721 tool_name TEXT NULL,
722 arguments TEXT NULL,
723 questions TEXT NOT NULL,
724 proposed_execpolicy_amendment TEXT NULL,
725 network_approval_context TEXT NULL,
726 schema TEXT NULL,
727 available_decisions TEXT NOT NULL,
728 raw_payload TEXT NOT NULL,
729 created_at_ms INTEGER NOT NULL,
730 raw_json TEXT NOT NULL
731 );",
732 )?;
733
734 ensure_column(
735 &conn,
736 "thread_index",
737 "runtime_id",
738 "TEXT NOT NULL DEFAULT 'primary'",
739 )?;
740 ensure_column(&conn, "thread_index", "note", "TEXT NULL")?;
741 ensure_column(
742 &conn,
743 "thread_index",
744 "archived",
745 "INTEGER NOT NULL DEFAULT 0",
746 )?;
747 ensure_column(&conn, "events", "runtime_id", "TEXT NULL")?;
748 ensure_column(
749 &conn,
750 "pending_approvals",
751 "runtime_id",
752 "TEXT NOT NULL DEFAULT 'primary'",
753 )?;
754 ensure_column(
755 &conn,
756 "pending_server_requests",
757 "runtime_id",
758 "TEXT NOT NULL DEFAULT 'primary'",
759 )?;
760
761 Ok(())
762 }
763}
764
765fn ensure_column(conn: &Connection, table: &str, column: &str, definition: &str) -> Result<()> {
766 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
767 let mut rows = stmt.query([])?;
768 while let Some(row) = rows.next()? {
769 let existing: String = row.get(1)?;
770 if existing == column {
771 return Ok(());
772 }
773 }
774
775 conn.execute_batch(&format!(
776 "ALTER TABLE {table} ADD COLUMN {column} {definition};"
777 ))?;
778 Ok(())
779}
780
781#[cfg(test)]
782mod tests {
783 use std::env;
784 use std::fs;
785
786 use super::Storage;
787
788 #[test]
789 fn ensure_primary_runtime_refreshes_existing_binary() {
790 let base_dir =
791 env::temp_dir().join(format!("codex-mobile-storage-test-{}", std::process::id()));
792 fs::create_dir_all(&base_dir).expect("创建测试目录失败");
793 let db_path = base_dir.join("bridge.db");
794 let storage = Storage::open(db_path).expect("打开存储失败");
795
796 let initial = storage
797 .ensure_primary_runtime(None, "codex".to_string())
798 .expect("创建 primary runtime 失败");
799 assert_eq!(initial.codex_binary, "codex");
800
801 let refreshed = storage
802 .ensure_primary_runtime(None, "/home/test/.npm-global/bin/codex".to_string())
803 .expect("刷新 primary runtime 失败");
804 assert_eq!(refreshed.codex_binary, "/home/test/.npm-global/bin/codex");
805 }
806}
807
808fn decode_json_row<T: DeserializeOwned>(raw: String) -> rusqlite::Result<T> {
809 serde_json::from_str(&raw)
810 .map_err(|error| rusqlite::Error::FromSqlConversionFailure(0, Type::Text, Box::new(error)))
811}
812
813fn decode_thread_row(
814 raw: String,
815 note: Option<String>,
816 archived: i64,
817) -> rusqlite::Result<ThreadSummary> {
818 let mut thread: ThreadSummary = decode_json_row(raw)?;
819 thread.note = note;
820 thread.archived = archived != 0;
821 Ok(thread)
822}