1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone)]
200pub struct ThreadListFilters {
201 pub include_archived: bool,
203 pub limit: Option<usize>,
205}
206
207impl Default for ThreadListFilters {
208 fn default() -> Self {
209 Self {
210 include_archived: false,
211 limit: Some(50),
212 }
213 }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217struct SessionIndexEntry {
218 thread_id: String,
219 thread_name: Option<String>,
220 updated_at: i64,
221 rollout_path: Option<PathBuf>,
222}
223
224#[derive(Debug, Clone)]
229pub struct StateStore {
230 db_path: PathBuf,
231 session_index_path: PathBuf,
232}
233
234impl StateStore {
235 pub fn open(path: Option<PathBuf>) -> Result<Self> {
240 let db_path = path.unwrap_or_else(default_state_db_path);
241 let session_index_path = db_path
242 .parent()
243 .unwrap_or_else(|| Path::new("."))
244 .join("session_index.jsonl");
245 if let Some(parent) = db_path.parent() {
246 fs::create_dir_all(parent).with_context(|| {
247 format!("failed to create state directory {}", parent.display())
248 })?;
249 }
250 let store = Self {
251 db_path,
252 session_index_path,
253 };
254 store.init_schema()?;
255 Ok(store)
256 }
257
258 pub fn db_path(&self) -> &Path {
260 &self.db_path
261 }
262
263 fn conn(&self) -> Result<Connection> {
264 Connection::open(&self.db_path)
265 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
266 }
267
268 fn init_schema(&self) -> Result<()> {
269 let conn = self.conn()?;
270 let user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
271 if user_version == 0 {
272 conn.execute_batch(
273 r#"
274 BEGIN;
275 CREATE TABLE IF NOT EXISTS threads (
276 id TEXT PRIMARY KEY,
277 rollout_path TEXT,
278 preview TEXT NOT NULL,
279 ephemeral INTEGER NOT NULL,
280 model_provider TEXT NOT NULL,
281 created_at INTEGER NOT NULL,
282 updated_at INTEGER NOT NULL,
283 status TEXT NOT NULL,
284 path TEXT,
285 cwd TEXT NOT NULL,
286 cli_version TEXT NOT NULL,
287 source TEXT NOT NULL,
288 title TEXT,
289 sandbox_policy TEXT,
290 approval_mode TEXT,
291 archived INTEGER NOT NULL DEFAULT 0,
292 archived_at INTEGER,
293 git_sha TEXT,
294 git_branch TEXT,
295 git_origin_url TEXT,
296 memory_mode TEXT
297 );
298 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
299 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
300 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
301
302 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
303 thread_id TEXT NOT NULL,
304 position INTEGER NOT NULL,
305 name TEXT NOT NULL,
306 description TEXT,
307 input_schema TEXT NOT NULL,
308 PRIMARY KEY (thread_id, position),
309 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
310 );
311
312 CREATE TABLE IF NOT EXISTS messages (
313 id INTEGER PRIMARY KEY AUTOINCREMENT,
314 thread_id TEXT NOT NULL,
315 role TEXT NOT NULL,
316 content TEXT NOT NULL,
317 item_json TEXT,
318 created_at INTEGER NOT NULL,
319 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
320 );
321 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
322
323 CREATE TABLE IF NOT EXISTS checkpoints (
324 thread_id TEXT NOT NULL,
325 checkpoint_id TEXT NOT NULL,
326 state_json TEXT NOT NULL,
327 created_at INTEGER NOT NULL,
328 PRIMARY KEY(thread_id, checkpoint_id),
329 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
330 );
331 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
332
333 CREATE TABLE IF NOT EXISTS jobs (
334 id TEXT PRIMARY KEY,
335 name TEXT NOT NULL,
336 status TEXT NOT NULL,
337 progress INTEGER,
338 detail TEXT,
339 created_at INTEGER NOT NULL,
340 updated_at INTEGER NOT NULL
341 );
342 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
343
344 -- Add parent_entry_id column, and set to last message before current message
345 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
346 UPDATE messages
347 SET parent_entry_id = (
348 SELECT m2.id
349 FROM messages m2
350 WHERE m2.thread_id = messages.thread_id
351 AND (
352 m2.created_at < messages.created_at
353 OR (
354 m2.created_at = messages.created_at
355 AND m2.id < messages.id
356 )
357 )
358 ORDER BY m2.created_at DESC, m2.id DESC
359 LIMIT 1
360 );
361 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
362
363 -- Add current_leaf_id column, and set to last message in thread
364 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
365 UPDATE threads
366 SET current_leaf_id = (
367 SELECT m.id
368 FROM messages m
369 WHERE m.thread_id = threads.id
370 ORDER BY m.id DESC
371 LIMIT 1
372 );
373
374 PRAGMA user_version = 1;
375 COMMIT;
376 "#,
377 )
378 .context("failed to initialize thread schema")?;
379 }
380 Ok(())
381 }
382
383 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
388 let conn = self.conn()?;
389 conn.execute(
390 r#"
391 INSERT INTO threads (
392 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
393 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
394 git_sha, git_branch, git_origin_url, memory_mode
395 ) VALUES (
396 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
397 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
398 ?18, ?19, ?20, ?21
399 )
400 ON CONFLICT(id) DO UPDATE SET
401 rollout_path=excluded.rollout_path,
402 preview=excluded.preview,
403 ephemeral=excluded.ephemeral,
404 model_provider=excluded.model_provider,
405 created_at=excluded.created_at,
406 updated_at=excluded.updated_at,
407 status=excluded.status,
408 path=excluded.path,
409 cwd=excluded.cwd,
410 cli_version=excluded.cli_version,
411 source=excluded.source,
412 title=excluded.title,
413 sandbox_policy=excluded.sandbox_policy,
414 approval_mode=excluded.approval_mode,
415 archived=excluded.archived,
416 archived_at=excluded.archived_at,
417 git_sha=excluded.git_sha,
418 git_branch=excluded.git_branch,
419 git_origin_url=excluded.git_origin_url,
420 memory_mode=excluded.memory_mode
421 "#,
422 params![
423 thread.id,
424 path_to_opt_string(thread.rollout_path.as_deref()),
425 thread.preview,
426 bool_to_i64(thread.ephemeral),
427 thread.model_provider,
428 thread.created_at,
429 thread.updated_at,
430 thread_status_to_str(&thread.status),
431 path_to_opt_string(thread.path.as_deref()),
432 thread.cwd.display().to_string(),
433 thread.cli_version,
434 session_source_to_str(&thread.source),
435 thread.name,
436 thread.sandbox_policy,
437 thread.approval_mode,
438 bool_to_i64(thread.archived),
439 thread.archived_at,
440 thread.git_sha,
441 thread.git_branch,
442 thread.git_origin_url,
443 thread.memory_mode,
444 ],
445 )
446 .context("failed to upsert thread metadata")?;
447
448 self.append_thread_name(
449 &thread.id,
450 thread.name.clone(),
451 thread.updated_at,
452 thread.rollout_path.clone(),
453 )?;
454 Ok(())
455 }
456
457 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
461 let conn = self.conn()?;
462 conn.query_row(
463 r#"
464 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
465 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
466 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
467 FROM threads
468 WHERE id = ?1
469 "#,
470 params![id],
471 row_to_thread,
472 )
473 .optional()
474 .context("failed to read thread")
475 }
476
477 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
482 let conn = self.conn()?;
483 let sql = if filters.include_archived {
484 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
485 } else {
486 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
487 };
488
489 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
490 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
491 let mut rows = stmt
492 .query(params![limit])
493 .context("failed to query threads")?;
494 let mut out = Vec::new();
495 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
496 out.push(row_to_thread(row)?);
497 }
498 Ok(out)
499 }
500
501 pub fn mark_archived(&self, id: &str) -> Result<()> {
504 let conn = self.conn()?;
505 conn.execute(
506 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
507 params![
508 id,
509 Utc::now().timestamp(),
510 thread_status_to_str(&ThreadStatus::Archived)
511 ],
512 )
513 .context("failed to archive thread")?;
514 Ok(())
515 }
516
517 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
519 let conn = self.conn()?;
520 conn.execute(
521 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
522 params![id],
523 )
524 .context("failed to unarchive thread")?;
525 Ok(())
526 }
527
528 pub fn delete_thread(&self, id: &str) -> Result<()> {
531 let conn = self.conn()?;
532 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
533 .context("failed to delete thread")?;
534 Ok(())
535 }
536
537 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
541 let conn = self.conn()?;
542 conn.execute(
543 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
544 params![id, mode],
545 )
546 .context("failed to update thread memory mode")?;
547 Ok(())
548 }
549
550 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
554 let conn = self.conn()?;
555 conn.query_row(
556 "SELECT memory_mode FROM threads WHERE id = ?1",
557 params![id],
558 |row| row.get::<_, Option<String>>(0),
559 )
560 .optional()
561 .context("failed to read thread memory mode")
562 .map(Option::flatten)
563 }
564
565 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
570 let conn = self.conn()?;
571 let mut stmt = conn
572 .prepare(
573 r#"
574 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
575 FROM messages m1
576 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
577 WHERE m1.thread_id = ?1 AND m2.id IS NULL
578 "#,
579 )
580 .context("failed to prepare message listing query")?;
581 let mut rows = stmt
582 .query(params![thread_id])
583 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
584 let mut out = Vec::new();
585 while let Some(row) = rows.next().context("failed to iterate message rows")? {
586 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
587 let item = item_json
588 .as_deref()
589 .map(serde_json::from_str)
590 .transpose()
591 .with_context(|| {
592 format!("failed to parse message item json in thread {thread_id}")
593 })?;
594 out.push(MessageRecord {
595 id: row.get(0).context("failed to read message id")?,
596 thread_id: row.get(1).context("failed to read message thread id")?,
597 role: row.get(2).context("failed to read message role")?,
598 content: row.get(3).context("failed to read message content")?,
599 item,
600 created_at: row.get(5).context("failed to read message timestamp")?,
601 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
602 });
603 }
604 Ok(out)
605 }
606
607 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
612 let conn = self.conn()?;
613 conn.execute(
614 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
615 params![current_leaf_id, thread_id],
616 )
617 .context("failed to update thread current leaf id")?;
618 Ok(())
619 }
620
621 pub fn persist_dynamic_tools(
626 &self,
627 thread_id: &str,
628 tools: &[DynamicToolRecord],
629 ) -> Result<()> {
630 let mut conn = self.conn()?;
631 let tx = conn
632 .transaction()
633 .context("failed to begin dynamic tools transaction")?;
634 tx.execute(
635 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
636 params![thread_id],
637 )
638 .context("failed to clear dynamic tools")?;
639 for tool in tools {
640 tx.execute(
641 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
642 params![
643 thread_id,
644 tool.position,
645 tool.name,
646 tool.description,
647 tool.input_schema.to_string()
648 ],
649 )
650 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
651 }
652 tx.commit().context("failed to commit dynamic tools")?;
653 Ok(())
654 }
655
656 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
658 let conn = self.conn()?;
659 let mut stmt = conn
660 .prepare(
661 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
662 )
663 .context("failed to prepare get dynamic tools query")?;
664 let mut rows = stmt
665 .query(params![thread_id])
666 .context("failed to query dynamic tools")?;
667 let mut out = Vec::new();
668 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
669 let input_schema_raw: String =
670 row.get(3).context("failed to read tool input schema")?;
671 let input_schema: Value =
672 serde_json::from_str(&input_schema_raw).with_context(|| {
673 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
674 })?;
675 out.push(DynamicToolRecord {
676 position: row.get(0).context("failed to read tool position")?,
677 name: row.get(1).context("failed to read tool name")?,
678 description: row.get(2).context("failed to read tool description")?,
679 input_schema,
680 });
681 }
682 Ok(out)
683 }
684
685 pub fn append_message(
691 &self,
692 thread_id: &str,
693 role: &str,
694 content: &str,
695 item: Option<Value>,
696 ) -> Result<i64> {
697 let mut conn = self.conn()?;
698 let created_at = Utc::now().timestamp();
699 let item_json = item
700 .as_ref()
701 .map(serde_json::to_string)
702 .transpose()
703 .context("failed to serialize message item payload")?;
704
705 let tx = conn
706 .transaction()
707 .context("failed to begin append message transaction")?;
708
709 let current_leaf_id: Option<i64> = tx
710 .query_row(
711 "SELECT current_leaf_id FROM threads WHERE id = ?1",
712 params![thread_id],
713 |row| row.get(0),
714 )
715 .with_context(|| {
716 format!("failed to query thread current leaf id for thread {thread_id}")
717 })?;
718
719 let next_leaf_id: i64 = tx.query_row(
720 r#"
721 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
722 SELECT ?1, ?2, ?3, ?4, ?5, ?6
723 RETURNING id
724 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
725 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
726
727 tx.execute(
728 r#"
729 UPDATE threads
730 SET current_leaf_id = ?1
731 WHERE id = ?2;
732 "#,
733 params![next_leaf_id, thread_id],
734 )
735 .with_context(|| {
736 format!("failed to update thread current leaf id for thread {thread_id}")
737 })?;
738
739 tx.commit()
740 .context("failed to commit append message transaction")?;
741
742 Ok(next_leaf_id)
743 }
744
745 pub fn list_messages(
751 &self,
752 thread_id: &str,
753 limit: Option<usize>,
754 ) -> Result<Vec<MessageRecord>> {
755 let conn = self.conn()?;
756 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
757 let mut stmt = conn
758 .prepare(
759 r#"
760 WITH RECURSIVE
761 leaf_id AS (
762 SELECT current_leaf_id FROM threads WHERE id = ?1
763 ),
764 ancestors AS (
765 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
766 FROM messages
767 WHERE id = (SELECT current_leaf_id FROM leaf_id)
768
769 UNION ALL
770
771 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
772 FROM messages m
773 JOIN ancestors a ON m.id = a.parent_entry_id
774 WHERE a.depth < ?2
775 )
776 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
777 ORDER BY depth DESC
778 "#
779 )
780 .context("failed to prepare message listing query")?;
781 let mut rows = stmt
782 .query(params![thread_id, limit - 1])
783 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
784 let mut out = Vec::new();
785 while let Some(row) = rows.next().context("failed to iterate message rows")? {
786 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
787 let item = item_json
788 .as_deref()
789 .map(serde_json::from_str)
790 .transpose()
791 .with_context(|| {
792 format!("failed to parse message item json in thread {thread_id}")
793 })?;
794 out.push(MessageRecord {
795 id: row.get(0).context("failed to read message id")?,
796 thread_id: row.get(1).context("failed to read message thread id")?,
797 role: row.get(2).context("failed to read message role")?,
798 content: row.get(3).context("failed to read message content")?,
799 item,
800 created_at: row.get(5).context("failed to read message timestamp")?,
801 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
802 });
803 }
804 Ok(out)
805 }
806
807 pub fn fork_at_message(
813 &self,
814 message_id: &str,
815 role: &str,
816 content: &str,
817 item: Option<Value>,
818 ) -> Result<i64> {
819 let mut conn = self.conn()?;
820 let created_at = Utc::now().timestamp();
821 let item_json = item
822 .as_ref()
823 .map(serde_json::to_string)
824 .transpose()
825 .context("failed to serialize message item payload")?;
826
827 let tx = conn
828 .transaction()
829 .context("failed to begin fork message transaction")?;
830
831 let thread_id: String = tx
832 .query_row(
833 "SELECT thread_id FROM messages WHERE id = ?1",
834 params![message_id],
835 |row| row.get(0),
836 )
837 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
838
839 let next_leaf_id: i64 = tx.query_row(
840 r#"
841 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
842 SELECT ?1, ?2, ?3, ?4, ?5, ?6
843 RETURNING id
844 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
845 ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
846
847 tx.execute(
848 r#"
849 UPDATE threads
850 SET current_leaf_id = ?1
851 WHERE id = ?2;
852 "#,
853 params![next_leaf_id, thread_id],
854 )
855 .with_context(|| {
856 format!(
857 "failed to update thread current leaf id for thread {:?}",
858 thread_id
859 )
860 })?;
861
862 tx.commit()
863 .context("failed to commit fork message transaction")?;
864
865 Ok(next_leaf_id)
866 }
867
868 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
872 let mut conn = self.conn()?;
873 let tx = conn
874 .transaction()
875 .context("failed to begin clear messages transaction")?;
876
877 tx.execute(
878 r#"
879 UPDATE threads
880 SET current_leaf_id = NULL
881 WHERE id = ?1;
882 "#,
883 params![thread_id],
884 )
885 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
886 let result = tx
887 .execute(
888 r#"
889 DELETE FROM messages WHERE thread_id = ?1
890 "#,
891 params![thread_id],
892 )
893 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
894 tx.commit()
895 .context("failed to commit clear messages transaction")?;
896
897 Ok(result)
898 }
899
900 pub fn save_checkpoint(
905 &self,
906 thread_id: &str,
907 checkpoint_id: &str,
908 state: &Value,
909 ) -> Result<()> {
910 let conn = self.conn()?;
911 let state_json =
912 serde_json::to_string(state).context("failed to encode checkpoint state")?;
913 conn.execute(
914 r#"
915 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
916 VALUES (?1, ?2, ?3, ?4)
917 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
918 state_json = excluded.state_json,
919 created_at = excluded.created_at
920 "#,
921 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
922 )
923 .with_context(|| {
924 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
925 })?;
926 Ok(())
927 }
928
929 pub fn load_checkpoint(
935 &self,
936 thread_id: &str,
937 checkpoint_id: Option<&str>,
938 ) -> Result<Option<CheckpointRecord>> {
939 let conn = self.conn()?;
940 if let Some(checkpoint_id) = checkpoint_id {
941 let row = conn
942 .query_row(
943 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
944 params![thread_id, checkpoint_id],
945 |row| {
946 let state_json: String = row.get(2)?;
947 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
948 Ok(CheckpointRecord {
949 thread_id: row.get(0)?,
950 checkpoint_id: row.get(1)?,
951 state,
952 created_at: row.get(3)?,
953 })
954 },
955 )
956 .optional()
957 .with_context(|| {
958 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
959 })?;
960 return Ok(row);
961 }
962
963 conn.query_row(
964 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
965 params![thread_id],
966 |row| {
967 let state_json: String = row.get(2)?;
968 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
969 Ok(CheckpointRecord {
970 thread_id: row.get(0)?,
971 checkpoint_id: row.get(1)?,
972 state,
973 created_at: row.get(3)?,
974 })
975 },
976 )
977 .optional()
978 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
979 }
980
981 pub fn list_checkpoints(
985 &self,
986 thread_id: &str,
987 limit: Option<usize>,
988 ) -> Result<Vec<CheckpointRecord>> {
989 let conn = self.conn()?;
990 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
991 let mut stmt = conn
992 .prepare(
993 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
994 )
995 .context("failed to prepare checkpoint list query")?;
996 let mut rows = stmt
997 .query(params![thread_id, limit])
998 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
999
1000 let mut out = Vec::new();
1001 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1002 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1003 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1004 out.push(CheckpointRecord {
1005 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1006 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1007 state,
1008 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1009 });
1010 }
1011 Ok(out)
1012 }
1013
1014 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1016 let conn = self.conn()?;
1017 conn.execute(
1018 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1019 params![thread_id, checkpoint_id],
1020 )
1021 .with_context(|| {
1022 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1023 })?;
1024 Ok(())
1025 }
1026
1027 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1029 let conn = self.conn()?;
1030 conn.execute(
1031 r#"
1032 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1033 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1034 ON CONFLICT(id) DO UPDATE SET
1035 name = excluded.name,
1036 status = excluded.status,
1037 progress = excluded.progress,
1038 detail = excluded.detail,
1039 created_at = excluded.created_at,
1040 updated_at = excluded.updated_at
1041 "#,
1042 params![
1043 job.id,
1044 job.name,
1045 job_state_status_to_str(&job.status),
1046 job.progress.map(i64::from),
1047 job.detail,
1048 job.created_at,
1049 job.updated_at
1050 ],
1051 )
1052 .with_context(|| format!("failed to upsert job {}", job.id))?;
1053 Ok(())
1054 }
1055
1056 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1060 let conn = self.conn()?;
1061 conn.query_row(
1062 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1063 params![id],
1064 |row| {
1065 let status_raw: String = row.get(2)?;
1066 let progress: Option<i64> = row.get(3)?;
1067 Ok(JobStateRecord {
1068 id: row.get(0)?,
1069 name: row.get(1)?,
1070 status: job_state_status_from_str(&status_raw),
1071 progress: progress.and_then(|v| u8::try_from(v).ok()),
1072 detail: row.get(4)?,
1073 created_at: row.get(5)?,
1074 updated_at: row.get(6)?,
1075 })
1076 },
1077 )
1078 .optional()
1079 .with_context(|| format!("failed to read job {id}"))
1080 }
1081
1082 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1086 let conn = self.conn()?;
1087 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1088 let mut stmt = conn
1089 .prepare(
1090 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1091 )
1092 .context("failed to prepare job list query")?;
1093 let mut rows = stmt
1094 .query(params![limit])
1095 .context("failed to query persisted jobs")?;
1096 let mut out = Vec::new();
1097 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1098 let status_raw: String = row.get(2).context("failed to read job status")?;
1099 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1100 out.push(JobStateRecord {
1101 id: row.get(0).context("failed to read job id")?,
1102 name: row.get(1).context("failed to read job name")?,
1103 status: job_state_status_from_str(&status_raw),
1104 progress: progress.and_then(|v| u8::try_from(v).ok()),
1105 detail: row.get(4).context("failed to read job detail")?,
1106 created_at: row.get(5).context("failed to read job created_at")?,
1107 updated_at: row.get(6).context("failed to read job updated_at")?,
1108 });
1109 }
1110 Ok(out)
1111 }
1112
1113 pub fn delete_job(&self, id: &str) -> Result<()> {
1115 let conn = self.conn()?;
1116 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1117 .with_context(|| format!("failed to delete job {id}"))?;
1118 Ok(())
1119 }
1120
1121 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1123 let conn = self.conn()?;
1124 conn.query_row(
1125 "SELECT rollout_path FROM threads WHERE id = ?1",
1126 params![id],
1127 |row| row.get::<_, Option<String>>(0),
1128 )
1129 .optional()
1130 .context("failed to lookup rollout path")
1131 .map(|opt| opt.flatten().map(PathBuf::from))
1132 }
1133
1134 pub fn append_thread_name(
1140 &self,
1141 thread_id: &str,
1142 thread_name: Option<String>,
1143 updated_at: i64,
1144 rollout_path: Option<PathBuf>,
1145 ) -> Result<()> {
1146 if let Some(parent) = self.session_index_path.parent() {
1147 fs::create_dir_all(parent).with_context(|| {
1148 format!(
1149 "failed to create session index directory {}",
1150 parent.display()
1151 )
1152 })?;
1153 }
1154 let entry = SessionIndexEntry {
1155 thread_id: thread_id.to_string(),
1156 thread_name,
1157 updated_at,
1158 rollout_path,
1159 };
1160 let encoded =
1161 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1162 let mut file = OpenOptions::new()
1163 .create(true)
1164 .append(true)
1165 .open(&self.session_index_path)
1166 .with_context(|| {
1167 format!(
1168 "failed to open session index {}",
1169 self.session_index_path.display()
1170 )
1171 })?;
1172 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1173 Ok(())
1174 }
1175
1176 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1180 let map = self.session_index_map()?;
1181 Ok(map
1182 .get(thread_id)
1183 .and_then(|entry| entry.thread_name.clone()))
1184 }
1185
1186 pub fn find_thread_names_by_ids(
1190 &self,
1191 ids: &[String],
1192 ) -> Result<HashMap<String, Option<String>>> {
1193 let map = self.session_index_map()?;
1194 let mut out = HashMap::new();
1195 for id in ids {
1196 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1197 out.insert(id.clone(), name);
1198 }
1199 Ok(out)
1200 }
1201
1202 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1207 let map = self.session_index_map()?;
1208 let matched = map
1209 .values()
1210 .filter(|entry| {
1211 entry
1212 .thread_name
1213 .as_deref()
1214 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1215 })
1216 .max_by_key(|entry| entry.updated_at);
1217 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1218 }
1219
1220 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1221 if !self.session_index_path.exists() {
1222 return Ok(HashMap::new());
1223 }
1224 let file = OpenOptions::new()
1225 .read(true)
1226 .open(&self.session_index_path)
1227 .with_context(|| {
1228 format!(
1229 "failed to read session index {}",
1230 self.session_index_path.display()
1231 )
1232 })?;
1233 let reader = BufReader::new(file);
1234 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1235 for line in reader.lines() {
1236 let line = line.context("failed to read session index line")?;
1237 if line.trim().is_empty() {
1238 continue;
1239 }
1240 let parsed: SessionIndexEntry =
1241 serde_json::from_str(&line).context("failed to parse session index entry")?;
1242 latest.insert(parsed.thread_id.clone(), parsed);
1243 }
1244 Ok(latest)
1245 }
1246}
1247
1248fn default_state_db_path() -> PathBuf {
1249 dirs::home_dir()
1250 .unwrap_or_else(|| PathBuf::from("."))
1251 .join(".deepseek")
1252 .join("state.db")
1253}
1254
1255fn bool_to_i64(value: bool) -> i64 {
1256 if value { 1 } else { 0 }
1257}
1258
1259fn i64_to_bool(value: i64) -> bool {
1260 value != 0
1261}
1262
1263fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1264 match status {
1265 ThreadStatus::Running => "running",
1266 ThreadStatus::Idle => "idle",
1267 ThreadStatus::Completed => "completed",
1268 ThreadStatus::Failed => "failed",
1269 ThreadStatus::Paused => "paused",
1270 ThreadStatus::Archived => "archived",
1271 }
1272}
1273
1274fn thread_status_from_str(value: &str) -> ThreadStatus {
1275 match value {
1276 "running" => ThreadStatus::Running,
1277 "idle" => ThreadStatus::Idle,
1278 "completed" => ThreadStatus::Completed,
1279 "failed" => ThreadStatus::Failed,
1280 "paused" => ThreadStatus::Paused,
1281 "archived" => ThreadStatus::Archived,
1282 _ => ThreadStatus::Idle,
1283 }
1284}
1285
1286fn session_source_to_str(source: &SessionSource) -> &'static str {
1287 match source {
1288 SessionSource::Interactive => "interactive",
1289 SessionSource::Resume => "resume",
1290 SessionSource::Fork => "fork",
1291 SessionSource::Api => "api",
1292 SessionSource::Unknown => "unknown",
1293 }
1294}
1295
1296fn session_source_from_str(value: &str) -> SessionSource {
1297 match value {
1298 "interactive" => SessionSource::Interactive,
1299 "resume" => SessionSource::Resume,
1300 "fork" => SessionSource::Fork,
1301 "api" => SessionSource::Api,
1302 _ => SessionSource::Unknown,
1303 }
1304}
1305
1306fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1307 path.map(|p| p.display().to_string())
1308}
1309
1310fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1311 match status {
1312 JobStateStatus::Queued => "queued",
1313 JobStateStatus::Running => "running",
1314 JobStateStatus::Completed => "completed",
1315 JobStateStatus::Failed => "failed",
1316 JobStateStatus::Cancelled => "cancelled",
1317 }
1318}
1319
1320fn job_state_status_from_str(value: &str) -> JobStateStatus {
1321 match value {
1322 "queued" => JobStateStatus::Queued,
1323 "running" => JobStateStatus::Running,
1324 "completed" => JobStateStatus::Completed,
1325 "failed" => JobStateStatus::Failed,
1326 "cancelled" => JobStateStatus::Cancelled,
1327 _ => JobStateStatus::Queued,
1328 }
1329}
1330
1331fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1332 let status_raw: String = row.get(7)?;
1333 let source_raw: String = row.get(11)?;
1334 let rollout_path: Option<String> = row.get(1)?;
1335 let path: Option<String> = row.get(8)?;
1336 Ok(ThreadMetadata {
1337 id: row.get(0)?,
1338 rollout_path: rollout_path.map(PathBuf::from),
1339 preview: row.get(2)?,
1340 ephemeral: i64_to_bool(row.get(3)?),
1341 model_provider: row.get(4)?,
1342 created_at: row.get(5)?,
1343 updated_at: row.get(6)?,
1344 status: thread_status_from_str(&status_raw),
1345 path: path.map(PathBuf::from),
1346 cwd: PathBuf::from(row.get::<_, String>(9)?),
1347 cli_version: row.get(10)?,
1348 source: session_source_from_str(&source_raw),
1349 name: row.get(12)?,
1350 sandbox_policy: row.get(13)?,
1351 approval_mode: row.get(14)?,
1352 archived: i64_to_bool(row.get(15)?),
1353 archived_at: row.get(16)?,
1354 git_sha: row.get(17)?,
1355 git_branch: row.get(18)?,
1356 git_origin_url: row.get(19)?,
1357 memory_mode: row.get(20)?,
1358 current_leaf_id: row.get(21)?,
1359 })
1360}