1use std::fs;
2use std::path::{Path, PathBuf};
3
4use anyhow::{Context, Result};
5use rusqlite::types::Type;
6use rusqlite::{Connection, OptionalExtension, params};
7use serde::de::DeserializeOwned;
8
9use crate::bridge_protocol::{
10 DirectoryBookmarkRecord, DirectoryHistoryRecord, PendingServerRequestRecord, PersistedEvent,
11 RuntimeRecord, ThreadSummary, now_millis,
12};
13use crate::directory::{
14 canonicalize_directory, default_display_name, directory_contains, normalize_absolute_directory,
15};
16
17pub const PRIMARY_RUNTIME_ID: &str = "primary";
18
19#[derive(Debug, Clone)]
20pub struct Storage {
21 db_path: PathBuf,
22}
23
24impl Storage {
25 pub fn open(db_path: PathBuf) -> Result<Self> {
26 if let Some(parent) = db_path.parent() {
27 fs::create_dir_all(parent)
28 .with_context(|| format!("创建数据库目录失败: {}", parent.display()))?;
29 }
30
31 let storage = Self { db_path };
32 storage.migrate()?;
33 storage.clear_pending_requests()?;
34 storage.clear_legacy_pending_approvals()?;
35 Ok(storage)
36 }
37
38 pub fn ensure_primary_runtime(
39 &self,
40 codex_home: Option<String>,
41 codex_binary: String,
42 ) -> Result<RuntimeRecord> {
43 if let Some(existing) = self.get_runtime(PRIMARY_RUNTIME_ID)? {
44 let desired_home = codex_home.or(existing.codex_home.clone());
45 let desired_binary = if codex_binary.trim().is_empty() {
46 existing.codex_binary.clone()
47 } else {
48 codex_binary
49 };
50 let needs_update = existing.codex_home != desired_home
51 || existing.codex_binary != desired_binary
52 || !existing.is_primary
53 || !existing.auto_start;
54
55 if !needs_update {
56 return Ok(existing);
57 }
58
59 let updated = RuntimeRecord {
60 codex_home: desired_home,
61 codex_binary: desired_binary,
62 is_primary: true,
63 auto_start: true,
64 updated_at_ms: now_millis(),
65 ..existing
66 };
67 self.upsert_runtime(&updated)?;
68 return Ok(updated);
69 }
70
71 let now = now_millis();
72 let record = RuntimeRecord {
73 runtime_id: PRIMARY_RUNTIME_ID.to_string(),
74 display_name: "Primary".to_string(),
75 codex_home,
76 codex_binary,
77 is_primary: true,
78 auto_start: true,
79 created_at_ms: now,
80 updated_at_ms: now,
81 };
82 self.upsert_runtime(&record)?;
83 Ok(record)
84 }
85
86 pub fn list_runtimes(&self) -> Result<Vec<RuntimeRecord>> {
87 let conn = self.connect()?;
88 let mut stmt = conn.prepare(
89 "SELECT raw_json
90 FROM runtimes
91 ORDER BY is_primary DESC, created_at_ms ASC",
92 )?;
93
94 let rows = stmt.query_map([], |row| {
95 let raw: String = row.get(0)?;
96 decode_json_row(raw)
97 })?;
98
99 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
100 }
101
102 pub fn get_runtime(&self, runtime_id: &str) -> Result<Option<RuntimeRecord>> {
103 let conn = self.connect()?;
104 let record = conn
105 .query_row(
106 "SELECT raw_json FROM runtimes WHERE runtime_id = ?1",
107 params![runtime_id],
108 |row| {
109 let raw: String = row.get(0)?;
110 decode_json_row(raw)
111 },
112 )
113 .optional()?;
114 Ok(record)
115 }
116
117 pub fn upsert_runtime(&self, runtime: &RuntimeRecord) -> Result<()> {
118 let conn = self.connect()?;
119 conn.execute(
120 "INSERT INTO runtimes (
121 runtime_id, display_name, codex_home, codex_binary, is_primary,
122 auto_start, created_at_ms, updated_at_ms, raw_json
123 )
124 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
125 ON CONFLICT(runtime_id) DO UPDATE SET
126 display_name = excluded.display_name,
127 codex_home = excluded.codex_home,
128 codex_binary = excluded.codex_binary,
129 is_primary = excluded.is_primary,
130 auto_start = excluded.auto_start,
131 created_at_ms = excluded.created_at_ms,
132 updated_at_ms = excluded.updated_at_ms,
133 raw_json = excluded.raw_json",
134 params![
135 runtime.runtime_id,
136 runtime.display_name,
137 runtime.codex_home,
138 runtime.codex_binary,
139 if runtime.is_primary { 1_i64 } else { 0_i64 },
140 if runtime.auto_start { 1_i64 } else { 0_i64 },
141 runtime.created_at_ms,
142 runtime.updated_at_ms,
143 serde_json::to_string(runtime)?,
144 ],
145 )?;
146 Ok(())
147 }
148
149 pub fn remove_runtime(&self, runtime_id: &str) -> Result<()> {
150 let conn = self.connect()?;
151 conn.execute(
152 "DELETE FROM runtimes WHERE runtime_id = ?1",
153 params![runtime_id],
154 )?;
155 Ok(())
156 }
157
158 pub fn list_directory_bookmarks(&self) -> Result<Vec<DirectoryBookmarkRecord>> {
159 let conn = self.connect()?;
160 let mut stmt = conn.prepare(
161 "SELECT path, display_name, created_at_ms, updated_at_ms
162 FROM directory_bookmarks
163 ORDER BY display_name COLLATE NOCASE ASC",
164 )?;
165
166 let rows = stmt.query_map([], |row| {
167 Ok(DirectoryBookmarkRecord {
168 path: row.get(0)?,
169 display_name: row.get(1)?,
170 created_at_ms: row.get(2)?,
171 updated_at_ms: row.get(3)?,
172 })
173 })?;
174
175 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
176 }
177
178 pub fn upsert_directory_bookmark(
179 &self,
180 path: &Path,
181 display_name: Option<&str>,
182 ) -> Result<DirectoryBookmarkRecord> {
183 let canonical = canonicalize_directory(path)?;
184 let path_string = canonical.to_string_lossy().to_string();
185 let now = now_millis();
186 let conn = self.connect()?;
187 let existing = conn
188 .query_row(
189 "SELECT created_at_ms FROM directory_bookmarks WHERE path = ?1",
190 params![path_string],
191 |row| row.get::<_, i64>(0),
192 )
193 .optional()?;
194 let created_at_ms = existing.unwrap_or(now);
195 let display_name = display_name
196 .map(str::trim)
197 .filter(|value| !value.is_empty())
198 .map(ToOwned::to_owned)
199 .unwrap_or_else(|| default_display_name(&canonical));
200 conn.execute(
201 "INSERT INTO directory_bookmarks (path, display_name, created_at_ms, updated_at_ms)
202 VALUES (?1, ?2, ?3, ?4)
203 ON CONFLICT(path) DO UPDATE SET
204 display_name = excluded.display_name,
205 updated_at_ms = excluded.updated_at_ms",
206 params![path_string, display_name, created_at_ms, now],
207 )?;
208
209 Ok(DirectoryBookmarkRecord {
210 path: canonical.to_string_lossy().to_string(),
211 display_name,
212 created_at_ms,
213 updated_at_ms: now,
214 })
215 }
216
217 pub fn remove_directory_bookmark(&self, path: &Path) -> Result<()> {
218 let canonical = normalize_absolute_directory(path)?;
219 let conn = self.connect()?;
220 conn.execute(
221 "DELETE FROM directory_bookmarks WHERE path = ?1",
222 params![canonical.to_string_lossy().to_string()],
223 )?;
224 Ok(())
225 }
226
227 pub fn list_directory_history(&self, limit: usize) -> Result<Vec<DirectoryHistoryRecord>> {
228 let conn = self.connect()?;
229 let mut stmt = conn.prepare(
230 "SELECT path, last_used_at_ms, use_count
231 FROM directory_history
232 ORDER BY last_used_at_ms DESC
233 LIMIT ?1",
234 )?;
235 let rows = stmt.query_map(params![limit.max(1) as i64], |row| {
236 let path: String = row.get(0)?;
237 Ok(DirectoryHistoryRecord {
238 display_name: default_display_name(Path::new(&path)),
239 path,
240 last_used_at_ms: row.get(1)?,
241 use_count: row.get(2)?,
242 })
243 })?;
244 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
245 }
246
247 pub fn record_directory_usage(&self, path: &Path) -> Result<DirectoryHistoryRecord> {
248 let normalized = normalize_absolute_directory(path)?;
249 let path_string = normalized.to_string_lossy().to_string();
250 let now = now_millis();
251 let conn = self.connect()?;
252 let existing = conn
253 .query_row(
254 "SELECT use_count FROM directory_history WHERE path = ?1",
255 params![&path_string],
256 |row| row.get::<_, i64>(0),
257 )
258 .optional()?
259 .unwrap_or(0);
260 let use_count = existing + 1;
261 conn.execute(
262 "INSERT INTO directory_history (path, last_used_at_ms, use_count)
263 VALUES (?1, ?2, ?3)
264 ON CONFLICT(path) DO UPDATE SET
265 last_used_at_ms = excluded.last_used_at_ms,
266 use_count = directory_history.use_count + 1",
267 params![path_string, now, use_count],
268 )?;
269 Ok(DirectoryHistoryRecord {
270 path: normalized.to_string_lossy().to_string(),
271 display_name: default_display_name(&normalized),
272 last_used_at_ms: now,
273 use_count,
274 })
275 }
276
277 pub fn append_event(
278 &self,
279 event_type: &str,
280 runtime_id: Option<&str>,
281 thread_id: Option<&str>,
282 payload: &serde_json::Value,
283 ) -> Result<PersistedEvent> {
284 let now = now_millis();
285 let conn = self.connect()?;
286 conn.execute(
287 "INSERT INTO events (event_type, runtime_id, thread_id, payload, created_at_ms)
288 VALUES (?1, ?2, ?3, ?4, ?5)",
289 params![
290 event_type,
291 runtime_id,
292 thread_id,
293 serde_json::to_string(payload)?,
294 now
295 ],
296 )?;
297
298 let seq = conn.last_insert_rowid();
299 Ok(PersistedEvent {
300 seq,
301 event_type: event_type.to_string(),
302 runtime_id: runtime_id.map(ToOwned::to_owned),
303 thread_id: thread_id.map(ToOwned::to_owned),
304 payload: payload.clone(),
305 created_at_ms: now,
306 })
307 }
308
309 pub fn replay_events_after(&self, last_seq: i64) -> Result<Vec<PersistedEvent>> {
310 let conn = self.connect()?;
311 let mut stmt = conn.prepare(
312 "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
313 FROM events
314 WHERE seq > ?1
315 ORDER BY seq ASC",
316 )?;
317
318 let rows = stmt.query_map(params![last_seq], |row| {
319 Ok(PersistedEvent {
320 seq: row.get(0)?,
321 event_type: row.get(1)?,
322 runtime_id: row.get(2)?,
323 thread_id: row.get(3)?,
324 payload: decode_json_row(row.get::<_, String>(4)?)?,
325 created_at_ms: row.get(5)?,
326 })
327 })?;
328
329 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
330 }
331
332 pub fn load_thread_events(&self, thread_id: &str) -> Result<Vec<PersistedEvent>> {
333 let conn = self.connect()?;
334 let mut stmt = conn.prepare(
335 "SELECT seq, event_type, runtime_id, thread_id, payload, created_at_ms
336 FROM events
337 WHERE thread_id = ?1
338 ORDER BY seq ASC",
339 )?;
340
341 let rows = stmt.query_map(params![thread_id], |row| {
342 Ok(PersistedEvent {
343 seq: row.get(0)?,
344 event_type: row.get(1)?,
345 runtime_id: row.get(2)?,
346 thread_id: row.get(3)?,
347 payload: decode_json_row(row.get::<_, String>(4)?)?,
348 created_at_ms: row.get(5)?,
349 })
350 })?;
351
352 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
353 }
354
355 pub fn save_mobile_session_ack(&self, device_id: &str, last_ack_seq: i64) -> Result<()> {
356 let conn = self.connect()?;
357 let now = now_millis();
358 conn.execute(
359 "INSERT INTO mobile_sessions (device_id, last_ack_seq, updated_at_ms)
360 VALUES (?1, ?2, ?3)
361 ON CONFLICT(device_id) DO UPDATE SET
362 last_ack_seq = excluded.last_ack_seq,
363 updated_at_ms = excluded.updated_at_ms",
364 params![device_id, last_ack_seq, now],
365 )?;
366 Ok(())
367 }
368
369 pub fn get_mobile_session_ack(&self, device_id: &str) -> Result<Option<i64>> {
370 let conn = self.connect()?;
371 let value = conn
372 .query_row(
373 "SELECT last_ack_seq FROM mobile_sessions WHERE device_id = ?1",
374 params![device_id],
375 |row| row.get(0),
376 )
377 .optional()?;
378 Ok(value)
379 }
380
381 pub fn upsert_thread_index(&self, thread: &ThreadSummary) -> Result<()> {
382 let conn = self.connect()?;
383 conn.execute(
384 "INSERT INTO thread_index (
385 thread_id, runtime_id, name, note, preview, cwd, status,
386 model_provider, source, created_at_ms, updated_at_ms, is_loaded, is_active,
387 archived, raw_json
388 )
389 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
390 ON CONFLICT(thread_id) DO UPDATE SET
391 runtime_id = excluded.runtime_id,
392 name = excluded.name,
393 note = COALESCE(excluded.note, thread_index.note),
394 preview = excluded.preview,
395 cwd = excluded.cwd,
396 status = excluded.status,
397 model_provider = excluded.model_provider,
398 source = excluded.source,
399 created_at_ms = excluded.created_at_ms,
400 updated_at_ms = excluded.updated_at_ms,
401 is_loaded = excluded.is_loaded,
402 is_active = excluded.is_active,
403 archived = excluded.archived,
404 raw_json = excluded.raw_json",
405 params![
406 thread.id,
407 thread.runtime_id,
408 thread.name,
409 thread.note,
410 thread.preview,
411 thread.cwd,
412 thread.status,
413 thread.model_provider,
414 thread.source,
415 thread.created_at,
416 thread.updated_at,
417 if thread.is_loaded { 1_i64 } else { 0_i64 },
418 if thread.is_active { 1_i64 } else { 0_i64 },
419 if thread.archived { 1_i64 } else { 0_i64 },
420 serde_json::to_string(thread)?
421 ],
422 )?;
423 Ok(())
424 }
425
426 pub fn get_thread_index(&self, thread_id: &str) -> Result<Option<ThreadSummary>> {
427 let conn = self.connect()?;
428 let record = conn
429 .query_row(
430 "SELECT raw_json, note, archived FROM thread_index WHERE thread_id = ?1",
431 params![thread_id],
432 |row| {
433 decode_thread_row(
434 row.get::<_, String>(0)?,
435 row.get::<_, Option<String>>(1)?,
436 row.get::<_, i64>(2)?,
437 )
438 },
439 )
440 .optional()?;
441 Ok(record)
442 }
443
444 pub fn list_thread_index(
445 &self,
446 directory_prefix: Option<&str>,
447 runtime_id: Option<&str>,
448 archived: Option<bool>,
449 search_term: Option<&str>,
450 ) -> Result<Vec<ThreadSummary>> {
451 let conn = self.connect()?;
452 let mut sql = String::from(
453 "SELECT raw_json, note, archived
454 FROM thread_index",
455 );
456 let mut clauses = Vec::new();
457 let mut values = Vec::new();
458
459 if let Some(runtime_id) = runtime_id {
460 clauses.push("runtime_id = ?");
461 values.push(rusqlite::types::Value::from(runtime_id.to_string()));
462 }
463
464 if let Some(archived) = archived {
465 clauses.push("archived = ?");
466 values.push(rusqlite::types::Value::from(if archived {
467 1_i64
468 } else {
469 0_i64
470 }));
471 }
472
473 if let Some(search_term) = search_term.filter(|value| !value.trim().is_empty()) {
474 clauses.push(
475 "(LOWER(COALESCE(name, '')) LIKE ? OR LOWER(preview) LIKE ? OR \
476 LOWER(cwd) LIKE ? OR LOWER(COALESCE(note, '')) LIKE ?)",
477 );
478 let pattern = format!("%{}%", search_term.trim().to_lowercase());
479 values.push(rusqlite::types::Value::from(pattern.clone()));
480 values.push(rusqlite::types::Value::from(pattern.clone()));
481 values.push(rusqlite::types::Value::from(pattern.clone()));
482 values.push(rusqlite::types::Value::from(pattern));
483 }
484
485 if !clauses.is_empty() {
486 sql.push_str(" WHERE ");
487 sql.push_str(&clauses.join(" AND "));
488 }
489 sql.push_str(" ORDER BY updated_at_ms DESC");
490
491 let mut stmt = conn.prepare(&sql)?;
492 let rows = stmt.query_map(rusqlite::params_from_iter(values), |row| {
493 decode_thread_row(
494 row.get::<_, String>(0)?,
495 row.get::<_, Option<String>>(1)?,
496 row.get::<_, i64>(2)?,
497 )
498 })?;
499
500 let mut threads = rows.collect::<rusqlite::Result<Vec<_>>>()?;
501
502 if let Some(directory_prefix) = directory_prefix {
503 let prefix = normalize_absolute_directory(Path::new(directory_prefix))?;
504 threads.retain(|thread| directory_contains(&prefix, Path::new(&thread.cwd)));
505 }
506
507 Ok(threads)
508 }
509
510 pub fn save_thread_note(&self, thread_id: &str, note: Option<&str>) -> Result<()> {
511 let conn = self.connect()?;
512 conn.execute(
513 "UPDATE thread_index
514 SET note = ?2
515 WHERE thread_id = ?1",
516 params![thread_id, note],
517 )?;
518 Ok(())
519 }
520
521 pub fn set_thread_archived(&self, thread_id: &str, archived: bool) -> Result<()> {
522 let conn = self.connect()?;
523 conn.execute(
524 "UPDATE thread_index
525 SET archived = ?2
526 WHERE thread_id = ?1",
527 params![thread_id, if archived { 1_i64 } else { 0_i64 }],
528 )?;
529 Ok(())
530 }
531
532 pub fn put_pending_request(&self, request: &PendingServerRequestRecord) -> Result<()> {
533 let conn = self.connect()?;
534 conn.execute(
535 "INSERT INTO pending_server_requests (
536 request_id, runtime_id, request_type, thread_id, turn_id, item_id, title,
537 reason, command, cwd, grant_root, tool_name, arguments, questions,
538 proposed_execpolicy_amendment, network_approval_context, schema,
539 available_decisions, raw_payload, created_at_ms, raw_json
540 )
541 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21)
542 ON CONFLICT(request_id) DO UPDATE SET
543 runtime_id = excluded.runtime_id,
544 request_type = excluded.request_type,
545 thread_id = excluded.thread_id,
546 turn_id = excluded.turn_id,
547 item_id = excluded.item_id,
548 title = excluded.title,
549 reason = excluded.reason,
550 command = excluded.command,
551 cwd = excluded.cwd,
552 grant_root = excluded.grant_root,
553 tool_name = excluded.tool_name,
554 arguments = excluded.arguments,
555 questions = excluded.questions,
556 proposed_execpolicy_amendment = excluded.proposed_execpolicy_amendment,
557 network_approval_context = excluded.network_approval_context,
558 schema = excluded.schema,
559 available_decisions = excluded.available_decisions,
560 raw_payload = excluded.raw_payload,
561 created_at_ms = excluded.created_at_ms,
562 raw_json = excluded.raw_json",
563 params![
564 request.request_id,
565 request.runtime_id,
566 request.request_type,
567 request.thread_id,
568 request.turn_id,
569 request.item_id,
570 request.title,
571 request.reason,
572 request.command,
573 request.cwd,
574 request.grant_root,
575 request.tool_name,
576 request.arguments.as_ref().map(serde_json::to_string).transpose()?,
577 serde_json::to_string(&request.questions)?,
578 request
579 .proposed_execpolicy_amendment
580 .as_ref()
581 .map(serde_json::to_string)
582 .transpose()?,
583 request
584 .network_approval_context
585 .as_ref()
586 .map(serde_json::to_string)
587 .transpose()?,
588 request.schema.as_ref().map(serde_json::to_string).transpose()?,
589 serde_json::to_string(&request.available_decisions)?,
590 serde_json::to_string(&request.raw_payload)?,
591 request.created_at_ms,
592 serde_json::to_string(request)?
593 ],
594 )?;
595 Ok(())
596 }
597
598 pub fn get_pending_request(
599 &self,
600 request_id: &str,
601 ) -> Result<Option<PendingServerRequestRecord>> {
602 let conn = self.connect()?;
603 let record = conn
604 .query_row(
605 "SELECT raw_json FROM pending_server_requests WHERE request_id = ?1",
606 params![request_id],
607 |row| {
608 let raw: String = row.get(0)?;
609 decode_json_row(raw)
610 },
611 )
612 .optional()?;
613 Ok(record)
614 }
615
616 pub fn list_pending_requests(&self) -> Result<Vec<PendingServerRequestRecord>> {
617 let conn = self.connect()?;
618 let mut stmt = conn.prepare(
619 "SELECT raw_json
620 FROM pending_server_requests
621 ORDER BY created_at_ms ASC",
622 )?;
623
624 let rows = stmt.query_map([], |row| {
625 let raw: String = row.get(0)?;
626 decode_json_row(raw)
627 })?;
628
629 Ok(rows.collect::<rusqlite::Result<Vec<_>>>()?)
630 }
631
632 pub fn remove_pending_request(&self, request_id: &str) -> Result<()> {
633 let conn = self.connect()?;
634 conn.execute(
635 "DELETE FROM pending_server_requests WHERE request_id = ?1",
636 params![request_id],
637 )?;
638 Ok(())
639 }
640
641 pub fn clear_pending_requests(&self) -> Result<()> {
642 let conn = self.connect()?;
643 conn.execute("DELETE FROM pending_server_requests", [])?;
644 Ok(())
645 }
646
647 fn clear_legacy_pending_approvals(&self) -> Result<()> {
648 let conn = self.connect()?;
649 conn.execute("DELETE FROM pending_approvals", [])?;
650 Ok(())
651 }
652
653 fn connect(&self) -> Result<Connection> {
654 let conn = Connection::open(&self.db_path)
655 .with_context(|| format!("打开数据库失败: {}", self.db_path.display()))?;
656 conn.execute_batch(
657 "PRAGMA foreign_keys = ON;
658 PRAGMA journal_mode = WAL;",
659 )?;
660 Ok(conn)
661 }
662
663 fn migrate(&self) -> Result<()> {
664 let conn = self.connect()?;
665 conn.execute_batch(
666 "CREATE TABLE IF NOT EXISTS directory_bookmarks (
667 path TEXT PRIMARY KEY,
668 display_name TEXT NOT NULL,
669 created_at_ms INTEGER NOT NULL,
670 updated_at_ms INTEGER NOT NULL
671 );
672
673 CREATE TABLE IF NOT EXISTS directory_history (
674 path TEXT PRIMARY KEY,
675 last_used_at_ms INTEGER NOT NULL,
676 use_count INTEGER NOT NULL
677 );
678
679 CREATE TABLE IF NOT EXISTS runtimes (
680 runtime_id TEXT PRIMARY KEY,
681 display_name TEXT NOT NULL,
682 codex_home TEXT NULL,
683 codex_binary TEXT NOT NULL,
684 is_primary INTEGER NOT NULL,
685 auto_start INTEGER NOT NULL,
686 created_at_ms INTEGER NOT NULL,
687 updated_at_ms INTEGER NOT NULL,
688 raw_json TEXT NOT NULL
689 );
690
691 CREATE TABLE IF NOT EXISTS mobile_sessions (
692 device_id TEXT PRIMARY KEY,
693 last_ack_seq INTEGER NOT NULL,
694 updated_at_ms INTEGER NOT NULL
695 );
696
697 CREATE TABLE IF NOT EXISTS events (
698 seq INTEGER PRIMARY KEY AUTOINCREMENT,
699 event_type TEXT NOT NULL,
700 runtime_id TEXT NULL,
701 thread_id TEXT NULL,
702 payload TEXT NOT NULL,
703 created_at_ms INTEGER NOT NULL
704 );
705
706 CREATE TABLE IF NOT EXISTS pending_approvals (
707 approval_id TEXT PRIMARY KEY,
708 runtime_id TEXT NOT NULL DEFAULT 'primary',
709 thread_id TEXT NOT NULL,
710 turn_id TEXT NOT NULL,
711 item_id TEXT NOT NULL,
712 kind TEXT NOT NULL,
713 reason TEXT NULL,
714 command TEXT NULL,
715 cwd TEXT NULL,
716 grant_root TEXT NULL,
717 available_decisions TEXT NOT NULL,
718 created_at_ms INTEGER NOT NULL,
719 raw_json TEXT NOT NULL
720 );
721
722 CREATE TABLE IF NOT EXISTS pending_server_requests (
723 request_id TEXT PRIMARY KEY,
724 runtime_id TEXT NOT NULL DEFAULT 'primary',
725 request_type TEXT NOT NULL,
726 thread_id TEXT NULL,
727 turn_id TEXT NULL,
728 item_id TEXT NULL,
729 title TEXT NULL,
730 reason TEXT NULL,
731 command TEXT NULL,
732 cwd TEXT NULL,
733 grant_root TEXT NULL,
734 tool_name TEXT NULL,
735 arguments TEXT NULL,
736 questions TEXT NOT NULL,
737 proposed_execpolicy_amendment TEXT NULL,
738 network_approval_context TEXT NULL,
739 schema TEXT NULL,
740 available_decisions TEXT NOT NULL,
741 raw_payload TEXT NOT NULL,
742 created_at_ms INTEGER NOT NULL,
743 raw_json TEXT NOT NULL
744 );",
745 )?;
746
747 ensure_thread_index_schema(&conn)?;
748 migrate_legacy_workspaces(&conn)?;
749 ensure_column(
750 &conn,
751 "thread_index",
752 "runtime_id",
753 "TEXT NOT NULL DEFAULT 'primary'",
754 )?;
755 ensure_column(&conn, "thread_index", "note", "TEXT NULL")?;
756 ensure_column(
757 &conn,
758 "thread_index",
759 "archived",
760 "INTEGER NOT NULL DEFAULT 0",
761 )?;
762 ensure_column(&conn, "events", "runtime_id", "TEXT NULL")?;
763 ensure_column(
764 &conn,
765 "pending_approvals",
766 "runtime_id",
767 "TEXT NOT NULL DEFAULT 'primary'",
768 )?;
769 ensure_column(
770 &conn,
771 "pending_server_requests",
772 "runtime_id",
773 "TEXT NOT NULL DEFAULT 'primary'",
774 )?;
775
776 Ok(())
777 }
778}
779
780fn ensure_column(conn: &Connection, table: &str, column: &str, definition: &str) -> Result<()> {
781 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
782 let mut rows = stmt.query([])?;
783 while let Some(row) = rows.next()? {
784 let existing: String = row.get(1)?;
785 if existing == column {
786 return Ok(());
787 }
788 }
789
790 conn.execute_batch(&format!(
791 "ALTER TABLE {table} ADD COLUMN {column} {definition};"
792 ))?;
793 Ok(())
794}
795
796fn ensure_thread_index_schema(conn: &Connection) -> Result<()> {
797 if !table_exists(conn, "thread_index")? {
798 create_thread_index_table(conn)?;
799 return Ok(());
800 }
801
802 ensure_column(
803 conn,
804 "thread_index",
805 "runtime_id",
806 "TEXT NOT NULL DEFAULT 'primary'",
807 )?;
808 ensure_column(conn, "thread_index", "note", "TEXT NULL")?;
809 ensure_column(
810 conn,
811 "thread_index",
812 "archived",
813 "INTEGER NOT NULL DEFAULT 0",
814 )?;
815
816 if column_exists(conn, "thread_index", "workspace_id")? {
817 rebuild_thread_index_without_workspace(conn)?;
818 }
819
820 Ok(())
821}
822
823fn create_thread_index_table(conn: &Connection) -> Result<()> {
824 conn.execute_batch(
825 "CREATE TABLE IF NOT EXISTS thread_index (
826 thread_id TEXT PRIMARY KEY,
827 runtime_id TEXT NOT NULL DEFAULT 'primary',
828 name TEXT NULL,
829 note TEXT NULL,
830 preview TEXT NOT NULL,
831 cwd TEXT NOT NULL,
832 status TEXT NOT NULL,
833 model_provider TEXT NOT NULL,
834 source TEXT NOT NULL,
835 created_at_ms INTEGER NOT NULL,
836 updated_at_ms INTEGER NOT NULL,
837 is_loaded INTEGER NOT NULL,
838 is_active INTEGER NOT NULL,
839 archived INTEGER NOT NULL DEFAULT 0,
840 raw_json TEXT NOT NULL
841 );",
842 )?;
843 Ok(())
844}
845
846fn rebuild_thread_index_without_workspace(conn: &Connection) -> Result<()> {
847 conn.execute_batch(
848 "ALTER TABLE thread_index RENAME TO thread_index_legacy;
849
850 CREATE TABLE thread_index (
851 thread_id TEXT PRIMARY KEY,
852 runtime_id TEXT NOT NULL DEFAULT 'primary',
853 name TEXT NULL,
854 note TEXT NULL,
855 preview TEXT NOT NULL,
856 cwd TEXT NOT NULL,
857 status TEXT NOT NULL,
858 model_provider TEXT NOT NULL,
859 source TEXT NOT NULL,
860 created_at_ms INTEGER NOT NULL,
861 updated_at_ms INTEGER NOT NULL,
862 is_loaded INTEGER NOT NULL,
863 is_active INTEGER NOT NULL,
864 archived INTEGER NOT NULL DEFAULT 0,
865 raw_json TEXT NOT NULL
866 );
867
868 INSERT INTO thread_index (
869 thread_id, runtime_id, name, note, preview, cwd, status,
870 model_provider, source, created_at_ms, updated_at_ms, is_loaded,
871 is_active, archived, raw_json
872 )
873 SELECT
874 thread_id,
875 COALESCE(runtime_id, 'primary'),
876 name,
877 note,
878 preview,
879 cwd,
880 status,
881 model_provider,
882 source,
883 created_at_ms,
884 updated_at_ms,
885 is_loaded,
886 is_active,
887 archived,
888 raw_json
889 FROM thread_index_legacy;
890
891 DROP TABLE thread_index_legacy;",
892 )?;
893 Ok(())
894}
895
896fn migrate_legacy_workspaces(conn: &Connection) -> Result<()> {
897 if !table_exists(conn, "workspaces")? {
898 return Ok(());
899 }
900
901 conn.execute_batch(
902 "INSERT INTO directory_bookmarks (path, display_name, created_at_ms, updated_at_ms)
903 SELECT root_path, display_name, created_at_ms, updated_at_ms
904 FROM workspaces
905 ON CONFLICT(path) DO NOTHING;
906
907 DROP TABLE workspaces;",
908 )?;
909 Ok(())
910}
911
912fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
913 let exists = conn
914 .query_row(
915 "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1 LIMIT 1",
916 params![table],
917 |_| Ok(()),
918 )
919 .optional()?
920 .is_some();
921 Ok(exists)
922}
923
924fn column_exists(conn: &Connection, table: &str, column: &str) -> Result<bool> {
925 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
926 let mut rows = stmt.query([])?;
927 while let Some(row) = rows.next()? {
928 let existing: String = row.get(1)?;
929 if existing == column {
930 return Ok(true);
931 }
932 }
933 Ok(false)
934}
935
936#[cfg(test)]
937mod tests {
938 use std::env;
939 use std::fs;
940
941 use super::Storage;
942
943 #[test]
944 fn ensure_primary_runtime_refreshes_existing_binary() {
945 let base_dir =
946 env::temp_dir().join(format!("codex-mobile-storage-test-{}", std::process::id()));
947 fs::create_dir_all(&base_dir).expect("创建测试目录失败");
948 let db_path = base_dir.join("bridge.db");
949 let storage = Storage::open(db_path).expect("打开存储失败");
950
951 let initial = storage
952 .ensure_primary_runtime(None, "codex".to_string())
953 .expect("创建 primary runtime 失败");
954 assert_eq!(initial.codex_binary, "codex");
955
956 let refreshed = storage
957 .ensure_primary_runtime(None, "/home/test/.npm-global/bin/codex".to_string())
958 .expect("刷新 primary runtime 失败");
959 assert_eq!(refreshed.codex_binary, "/home/test/.npm-global/bin/codex");
960 }
961}
962
963fn decode_json_row<T: DeserializeOwned>(raw: String) -> rusqlite::Result<T> {
964 serde_json::from_str(&raw)
965 .map_err(|error| rusqlite::Error::FromSqlConversionFailure(0, Type::Text, Box::new(error)))
966}
967
968fn decode_thread_row(
969 raw: String,
970 note: Option<String>,
971 archived: i64,
972) -> rusqlite::Result<ThreadSummary> {
973 let mut thread: ThreadSummary = decode_json_row(raw)?;
974 thread.note = note;
975 thread.archived = archived != 0;
976 Ok(thread)
977}