1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone)]
200pub struct ThreadListFilters {
201 pub include_archived: bool,
203 pub limit: Option<usize>,
205}
206
207impl Default for ThreadListFilters {
208 fn default() -> Self {
209 Self {
210 include_archived: false,
211 limit: Some(50),
212 }
213 }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217struct SessionIndexEntry {
218 thread_id: String,
219 thread_name: Option<String>,
220 updated_at: i64,
221 rollout_path: Option<PathBuf>,
222}
223
224#[derive(Debug, Clone)]
229pub struct StateStore {
230 db_path: PathBuf,
231 session_index_path: PathBuf,
232}
233
234impl StateStore {
235 pub fn open(path: Option<PathBuf>) -> Result<Self> {
240 let db_path = path.unwrap_or_else(default_state_db_path);
241 let session_index_path = db_path
242 .parent()
243 .unwrap_or_else(|| Path::new("."))
244 .join("session_index.jsonl");
245 if let Some(parent) = db_path.parent() {
246 fs::create_dir_all(parent).with_context(|| {
247 format!("failed to create state directory {}", parent.display())
248 })?;
249 }
250 let store = Self {
251 db_path,
252 session_index_path,
253 };
254 store.init_schema()?;
255 Ok(store)
256 }
257
258 pub fn db_path(&self) -> &Path {
260 &self.db_path
261 }
262
263 fn conn(&self) -> Result<Connection> {
264 Connection::open(&self.db_path)
265 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
266 }
267
268 fn init_schema(&self) -> Result<()> {
269 let conn = self.conn()?;
270 let user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
271 if user_version == 0 {
272 conn.execute_batch(
273 r#"
274 BEGIN;
275 CREATE TABLE IF NOT EXISTS threads (
276 id TEXT PRIMARY KEY,
277 rollout_path TEXT,
278 preview TEXT NOT NULL,
279 ephemeral INTEGER NOT NULL,
280 model_provider TEXT NOT NULL,
281 created_at INTEGER NOT NULL,
282 updated_at INTEGER NOT NULL,
283 status TEXT NOT NULL,
284 path TEXT,
285 cwd TEXT NOT NULL,
286 cli_version TEXT NOT NULL,
287 source TEXT NOT NULL,
288 title TEXT,
289 sandbox_policy TEXT,
290 approval_mode TEXT,
291 archived INTEGER NOT NULL DEFAULT 0,
292 archived_at INTEGER,
293 git_sha TEXT,
294 git_branch TEXT,
295 git_origin_url TEXT,
296 memory_mode TEXT
297 );
298 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
299 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
300 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
301
302 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
303 thread_id TEXT NOT NULL,
304 position INTEGER NOT NULL,
305 name TEXT NOT NULL,
306 description TEXT,
307 input_schema TEXT NOT NULL,
308 PRIMARY KEY (thread_id, position),
309 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
310 );
311
312 CREATE TABLE IF NOT EXISTS messages (
313 id INTEGER PRIMARY KEY AUTOINCREMENT,
314 thread_id TEXT NOT NULL,
315 role TEXT NOT NULL,
316 content TEXT NOT NULL,
317 item_json TEXT,
318 created_at INTEGER NOT NULL,
319 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
320 );
321 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
322
323 CREATE TABLE IF NOT EXISTS checkpoints (
324 thread_id TEXT NOT NULL,
325 checkpoint_id TEXT NOT NULL,
326 state_json TEXT NOT NULL,
327 created_at INTEGER NOT NULL,
328 PRIMARY KEY(thread_id, checkpoint_id),
329 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
330 );
331 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
332
333 CREATE TABLE IF NOT EXISTS jobs (
334 id TEXT PRIMARY KEY,
335 name TEXT NOT NULL,
336 status TEXT NOT NULL,
337 progress INTEGER,
338 detail TEXT,
339 created_at INTEGER NOT NULL,
340 updated_at INTEGER NOT NULL
341 );
342 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
343
344 -- Add parent_entry_id column, and set to last message before current message
345 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
346 UPDATE messages
347 SET parent_entry_id = (
348 SELECT m2.id
349 FROM messages m2
350 WHERE m2.created_at < messages.created_at AND m2.thread_id = messages.thread_id
351 ORDER BY m2.id DESC
352 LIMIT 1
353 );
354 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
355
356 -- Add current_leaf_id column, and set to last message in thread
357 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
358 UPDATE threads
359 SET current_leaf_id = (
360 SELECT m.id
361 FROM messages m
362 WHERE m.thread_id = threads.id
363 ORDER BY m.id DESC
364 LIMIT 1
365 );
366
367 PRAGMA user_version = 1;
368 COMMIT;
369 "#,
370 )
371 .context("failed to initialize thread schema")?;
372 }
373 Ok(())
374 }
375
376 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
381 let conn = self.conn()?;
382 conn.execute(
383 r#"
384 INSERT INTO threads (
385 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
386 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
387 git_sha, git_branch, git_origin_url, memory_mode
388 ) VALUES (
389 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
390 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
391 ?18, ?19, ?20, ?21
392 )
393 ON CONFLICT(id) DO UPDATE SET
394 rollout_path=excluded.rollout_path,
395 preview=excluded.preview,
396 ephemeral=excluded.ephemeral,
397 model_provider=excluded.model_provider,
398 created_at=excluded.created_at,
399 updated_at=excluded.updated_at,
400 status=excluded.status,
401 path=excluded.path,
402 cwd=excluded.cwd,
403 cli_version=excluded.cli_version,
404 source=excluded.source,
405 title=excluded.title,
406 sandbox_policy=excluded.sandbox_policy,
407 approval_mode=excluded.approval_mode,
408 archived=excluded.archived,
409 archived_at=excluded.archived_at,
410 git_sha=excluded.git_sha,
411 git_branch=excluded.git_branch,
412 git_origin_url=excluded.git_origin_url,
413 memory_mode=excluded.memory_mode
414 "#,
415 params![
416 thread.id,
417 path_to_opt_string(thread.rollout_path.as_deref()),
418 thread.preview,
419 bool_to_i64(thread.ephemeral),
420 thread.model_provider,
421 thread.created_at,
422 thread.updated_at,
423 thread_status_to_str(&thread.status),
424 path_to_opt_string(thread.path.as_deref()),
425 thread.cwd.display().to_string(),
426 thread.cli_version,
427 session_source_to_str(&thread.source),
428 thread.name,
429 thread.sandbox_policy,
430 thread.approval_mode,
431 bool_to_i64(thread.archived),
432 thread.archived_at,
433 thread.git_sha,
434 thread.git_branch,
435 thread.git_origin_url,
436 thread.memory_mode,
437 ],
438 )
439 .context("failed to upsert thread metadata")?;
440
441 self.append_thread_name(
442 &thread.id,
443 thread.name.clone(),
444 thread.updated_at,
445 thread.rollout_path.clone(),
446 )?;
447 Ok(())
448 }
449
450 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
454 let conn = self.conn()?;
455 conn.query_row(
456 r#"
457 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
458 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
459 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
460 FROM threads
461 WHERE id = ?1
462 "#,
463 params![id],
464 row_to_thread,
465 )
466 .optional()
467 .context("failed to read thread")
468 }
469
470 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
475 let conn = self.conn()?;
476 let sql = if filters.include_archived {
477 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
478 } else {
479 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
480 };
481
482 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
483 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
484 let mut rows = stmt
485 .query(params![limit])
486 .context("failed to query threads")?;
487 let mut out = Vec::new();
488 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
489 out.push(row_to_thread(row)?);
490 }
491 Ok(out)
492 }
493
494 pub fn mark_archived(&self, id: &str) -> Result<()> {
497 let conn = self.conn()?;
498 conn.execute(
499 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
500 params![
501 id,
502 Utc::now().timestamp(),
503 thread_status_to_str(&ThreadStatus::Archived)
504 ],
505 )
506 .context("failed to archive thread")?;
507 Ok(())
508 }
509
510 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
512 let conn = self.conn()?;
513 conn.execute(
514 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
515 params![id],
516 )
517 .context("failed to unarchive thread")?;
518 Ok(())
519 }
520
521 pub fn delete_thread(&self, id: &str) -> Result<()> {
524 let conn = self.conn()?;
525 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
526 .context("failed to delete thread")?;
527 Ok(())
528 }
529
530 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
534 let conn = self.conn()?;
535 conn.execute(
536 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
537 params![id, mode],
538 )
539 .context("failed to update thread memory mode")?;
540 Ok(())
541 }
542
543 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
547 let conn = self.conn()?;
548 conn.query_row(
549 "SELECT memory_mode FROM threads WHERE id = ?1",
550 params![id],
551 |row| row.get::<_, Option<String>>(0),
552 )
553 .optional()
554 .context("failed to read thread memory mode")
555 .map(Option::flatten)
556 }
557
558 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
563 let conn = self.conn()?;
564 let mut stmt = conn
565 .prepare(
566 r#"
567 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
568 FROM messages m1
569 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
570 WHERE m1.thread_id = ?1 AND m2.id IS NULL
571 "#,
572 )
573 .context("failed to prepare message listing query")?;
574 let mut rows = stmt
575 .query(params![thread_id])
576 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
577 let mut out = Vec::new();
578 while let Some(row) = rows.next().context("failed to iterate message rows")? {
579 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
580 let item = item_json
581 .as_deref()
582 .map(serde_json::from_str)
583 .transpose()
584 .with_context(|| {
585 format!("failed to parse message item json in thread {thread_id}")
586 })?;
587 out.push(MessageRecord {
588 id: row.get(0).context("failed to read message id")?,
589 thread_id: row.get(1).context("failed to read message thread id")?,
590 role: row.get(2).context("failed to read message role")?,
591 content: row.get(3).context("failed to read message content")?,
592 item,
593 created_at: row.get(5).context("failed to read message timestamp")?,
594 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
595 });
596 }
597 Ok(out)
598 }
599
600 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
605 let conn = self.conn()?;
606 conn.execute(
607 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
608 params![current_leaf_id, thread_id],
609 )
610 .context("failed to update thread current leaf id")?;
611 Ok(())
612 }
613
614 pub fn persist_dynamic_tools(
619 &self,
620 thread_id: &str,
621 tools: &[DynamicToolRecord],
622 ) -> Result<()> {
623 let mut conn = self.conn()?;
624 let tx = conn
625 .transaction()
626 .context("failed to begin dynamic tools transaction")?;
627 tx.execute(
628 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
629 params![thread_id],
630 )
631 .context("failed to clear dynamic tools")?;
632 for tool in tools {
633 tx.execute(
634 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
635 params![
636 thread_id,
637 tool.position,
638 tool.name,
639 tool.description,
640 tool.input_schema.to_string()
641 ],
642 )
643 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
644 }
645 tx.commit().context("failed to commit dynamic tools")?;
646 Ok(())
647 }
648
649 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
651 let conn = self.conn()?;
652 let mut stmt = conn
653 .prepare(
654 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
655 )
656 .context("failed to prepare get dynamic tools query")?;
657 let mut rows = stmt
658 .query(params![thread_id])
659 .context("failed to query dynamic tools")?;
660 let mut out = Vec::new();
661 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
662 let input_schema_raw: String =
663 row.get(3).context("failed to read tool input schema")?;
664 let input_schema: Value =
665 serde_json::from_str(&input_schema_raw).with_context(|| {
666 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
667 })?;
668 out.push(DynamicToolRecord {
669 position: row.get(0).context("failed to read tool position")?,
670 name: row.get(1).context("failed to read tool name")?,
671 description: row.get(2).context("failed to read tool description")?,
672 input_schema,
673 });
674 }
675 Ok(out)
676 }
677
678 pub fn append_message(
684 &self,
685 thread_id: &str,
686 role: &str,
687 content: &str,
688 item: Option<Value>,
689 ) -> Result<i64> {
690 let mut conn = self.conn()?;
691 let created_at = Utc::now().timestamp();
692 let item_json = item
693 .as_ref()
694 .map(serde_json::to_string)
695 .transpose()
696 .context("failed to serialize message item payload")?;
697
698 let tx = conn
699 .transaction()
700 .context("failed to begin append message transaction")?;
701
702 let current_leaf_id: Option<i64> = tx
703 .query_row(
704 "SELECT current_leaf_id FROM threads WHERE id = ?1",
705 params![thread_id],
706 |row| row.get(0),
707 )
708 .with_context(|| {
709 format!("failed to query thread current leaf id for thread {thread_id}")
710 })?;
711
712 let next_leaf_id: i64 = tx.query_row(
713 r#"
714 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
715 SELECT ?1, ?2, ?3, ?4, ?5, ?6
716 RETURNING id
717 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
718 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
719
720 tx.execute(
721 r#"
722 UPDATE threads
723 SET current_leaf_id = ?1
724 WHERE id = ?2;
725 "#,
726 params![next_leaf_id, thread_id],
727 )
728 .with_context(|| {
729 format!("failed to update thread current leaf id for thread {thread_id}")
730 })?;
731
732 tx.commit()
733 .context("failed to commit append message transaction")?;
734
735 Ok(next_leaf_id)
736 }
737
738 pub fn list_messages(
744 &self,
745 thread_id: &str,
746 limit: Option<usize>,
747 ) -> Result<Vec<MessageRecord>> {
748 let conn = self.conn()?;
749 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
750 let mut stmt = conn
751 .prepare(
752 r#"
753 WITH RECURSIVE
754 leaf_id AS (
755 SELECT current_leaf_id FROM threads WHERE id = ?1
756 ),
757 ancestors AS (
758 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
759 FROM messages
760 WHERE id = (SELECT current_leaf_id FROM leaf_id)
761
762 UNION ALL
763
764 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
765 FROM messages m
766 JOIN ancestors a ON m.id = a.parent_entry_id
767 WHERE a.depth < ?2
768 )
769 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
770 ORDER BY depth DESC
771 "#
772 )
773 .context("failed to prepare message listing query")?;
774 let mut rows = stmt
775 .query(params![thread_id, limit - 1])
776 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
777 let mut out = Vec::new();
778 while let Some(row) = rows.next().context("failed to iterate message rows")? {
779 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
780 let item = item_json
781 .as_deref()
782 .map(serde_json::from_str)
783 .transpose()
784 .with_context(|| {
785 format!("failed to parse message item json in thread {thread_id}")
786 })?;
787 out.push(MessageRecord {
788 id: row.get(0).context("failed to read message id")?,
789 thread_id: row.get(1).context("failed to read message thread id")?,
790 role: row.get(2).context("failed to read message role")?,
791 content: row.get(3).context("failed to read message content")?,
792 item,
793 created_at: row.get(5).context("failed to read message timestamp")?,
794 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
795 });
796 }
797 Ok(out)
798 }
799
800 pub fn fork_at_message(
806 &self,
807 message_id: &str,
808 role: &str,
809 content: &str,
810 item: Option<Value>,
811 ) -> Result<i64> {
812 let mut conn = self.conn()?;
813 let created_at = Utc::now().timestamp();
814 let item_json = item
815 .as_ref()
816 .map(serde_json::to_string)
817 .transpose()
818 .context("failed to serialize message item payload")?;
819
820 let tx = conn
821 .transaction()
822 .context("failed to begin fork message transaction")?;
823
824 let thread_id: String = tx
825 .query_row(
826 "SELECT thread_id FROM messages WHERE id = ?1",
827 params![message_id],
828 |row| row.get(0),
829 )
830 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
831
832 let next_leaf_id: i64 = tx.query_row(
833 r#"
834 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
835 SELECT ?1, ?2, ?3, ?4, ?5, ?6
836 RETURNING id
837 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
838 ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
839
840 tx.execute(
841 r#"
842 UPDATE threads
843 SET current_leaf_id = ?1
844 WHERE id = ?2;
845 "#,
846 params![next_leaf_id, thread_id],
847 )
848 .with_context(|| {
849 format!(
850 "failed to update thread current leaf id for thread {:?}",
851 thread_id
852 )
853 })?;
854
855 tx.commit()
856 .context("failed to commit fork message transaction")?;
857
858 Ok(next_leaf_id)
859 }
860
861 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
865 let mut conn = self.conn()?;
866 let tx = conn
867 .transaction()
868 .context("failed to begin clear messages transaction")?;
869
870 tx.execute(
871 r#"
872 UPDATE threads
873 SET current_leaf_id = NULL
874 WHERE id = ?1;
875 "#,
876 params![thread_id],
877 )
878 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
879 let result = tx
880 .execute(
881 r#"
882 DELETE FROM messages WHERE thread_id = ?1
883 "#,
884 params![thread_id],
885 )
886 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
887 tx.commit()
888 .context("failed to commit clear messages transaction")?;
889
890 Ok(result)
891 }
892
893 pub fn save_checkpoint(
898 &self,
899 thread_id: &str,
900 checkpoint_id: &str,
901 state: &Value,
902 ) -> Result<()> {
903 let conn = self.conn()?;
904 let state_json =
905 serde_json::to_string(state).context("failed to encode checkpoint state")?;
906 conn.execute(
907 r#"
908 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
909 VALUES (?1, ?2, ?3, ?4)
910 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
911 state_json = excluded.state_json,
912 created_at = excluded.created_at
913 "#,
914 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
915 )
916 .with_context(|| {
917 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
918 })?;
919 Ok(())
920 }
921
922 pub fn load_checkpoint(
928 &self,
929 thread_id: &str,
930 checkpoint_id: Option<&str>,
931 ) -> Result<Option<CheckpointRecord>> {
932 let conn = self.conn()?;
933 if let Some(checkpoint_id) = checkpoint_id {
934 let row = conn
935 .query_row(
936 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
937 params![thread_id, checkpoint_id],
938 |row| {
939 let state_json: String = row.get(2)?;
940 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
941 Ok(CheckpointRecord {
942 thread_id: row.get(0)?,
943 checkpoint_id: row.get(1)?,
944 state,
945 created_at: row.get(3)?,
946 })
947 },
948 )
949 .optional()
950 .with_context(|| {
951 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
952 })?;
953 return Ok(row);
954 }
955
956 conn.query_row(
957 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
958 params![thread_id],
959 |row| {
960 let state_json: String = row.get(2)?;
961 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
962 Ok(CheckpointRecord {
963 thread_id: row.get(0)?,
964 checkpoint_id: row.get(1)?,
965 state,
966 created_at: row.get(3)?,
967 })
968 },
969 )
970 .optional()
971 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
972 }
973
974 pub fn list_checkpoints(
978 &self,
979 thread_id: &str,
980 limit: Option<usize>,
981 ) -> Result<Vec<CheckpointRecord>> {
982 let conn = self.conn()?;
983 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
984 let mut stmt = conn
985 .prepare(
986 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
987 )
988 .context("failed to prepare checkpoint list query")?;
989 let mut rows = stmt
990 .query(params![thread_id, limit])
991 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
992
993 let mut out = Vec::new();
994 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
995 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
996 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
997 out.push(CheckpointRecord {
998 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
999 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1000 state,
1001 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1002 });
1003 }
1004 Ok(out)
1005 }
1006
1007 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1009 let conn = self.conn()?;
1010 conn.execute(
1011 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1012 params![thread_id, checkpoint_id],
1013 )
1014 .with_context(|| {
1015 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1016 })?;
1017 Ok(())
1018 }
1019
1020 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1022 let conn = self.conn()?;
1023 conn.execute(
1024 r#"
1025 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1026 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1027 ON CONFLICT(id) DO UPDATE SET
1028 name = excluded.name,
1029 status = excluded.status,
1030 progress = excluded.progress,
1031 detail = excluded.detail,
1032 created_at = excluded.created_at,
1033 updated_at = excluded.updated_at
1034 "#,
1035 params![
1036 job.id,
1037 job.name,
1038 job_state_status_to_str(&job.status),
1039 job.progress.map(i64::from),
1040 job.detail,
1041 job.created_at,
1042 job.updated_at
1043 ],
1044 )
1045 .with_context(|| format!("failed to upsert job {}", job.id))?;
1046 Ok(())
1047 }
1048
1049 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1053 let conn = self.conn()?;
1054 conn.query_row(
1055 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1056 params![id],
1057 |row| {
1058 let status_raw: String = row.get(2)?;
1059 let progress: Option<i64> = row.get(3)?;
1060 Ok(JobStateRecord {
1061 id: row.get(0)?,
1062 name: row.get(1)?,
1063 status: job_state_status_from_str(&status_raw),
1064 progress: progress.and_then(|v| u8::try_from(v).ok()),
1065 detail: row.get(4)?,
1066 created_at: row.get(5)?,
1067 updated_at: row.get(6)?,
1068 })
1069 },
1070 )
1071 .optional()
1072 .with_context(|| format!("failed to read job {id}"))
1073 }
1074
1075 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1079 let conn = self.conn()?;
1080 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1081 let mut stmt = conn
1082 .prepare(
1083 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1084 )
1085 .context("failed to prepare job list query")?;
1086 let mut rows = stmt
1087 .query(params![limit])
1088 .context("failed to query persisted jobs")?;
1089 let mut out = Vec::new();
1090 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1091 let status_raw: String = row.get(2).context("failed to read job status")?;
1092 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1093 out.push(JobStateRecord {
1094 id: row.get(0).context("failed to read job id")?,
1095 name: row.get(1).context("failed to read job name")?,
1096 status: job_state_status_from_str(&status_raw),
1097 progress: progress.and_then(|v| u8::try_from(v).ok()),
1098 detail: row.get(4).context("failed to read job detail")?,
1099 created_at: row.get(5).context("failed to read job created_at")?,
1100 updated_at: row.get(6).context("failed to read job updated_at")?,
1101 });
1102 }
1103 Ok(out)
1104 }
1105
1106 pub fn delete_job(&self, id: &str) -> Result<()> {
1108 let conn = self.conn()?;
1109 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1110 .with_context(|| format!("failed to delete job {id}"))?;
1111 Ok(())
1112 }
1113
1114 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1116 let conn = self.conn()?;
1117 conn.query_row(
1118 "SELECT rollout_path FROM threads WHERE id = ?1",
1119 params![id],
1120 |row| row.get::<_, Option<String>>(0),
1121 )
1122 .optional()
1123 .context("failed to lookup rollout path")
1124 .map(|opt| opt.flatten().map(PathBuf::from))
1125 }
1126
1127 pub fn append_thread_name(
1133 &self,
1134 thread_id: &str,
1135 thread_name: Option<String>,
1136 updated_at: i64,
1137 rollout_path: Option<PathBuf>,
1138 ) -> Result<()> {
1139 if let Some(parent) = self.session_index_path.parent() {
1140 fs::create_dir_all(parent).with_context(|| {
1141 format!(
1142 "failed to create session index directory {}",
1143 parent.display()
1144 )
1145 })?;
1146 }
1147 let entry = SessionIndexEntry {
1148 thread_id: thread_id.to_string(),
1149 thread_name,
1150 updated_at,
1151 rollout_path,
1152 };
1153 let encoded =
1154 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1155 let mut file = OpenOptions::new()
1156 .create(true)
1157 .append(true)
1158 .open(&self.session_index_path)
1159 .with_context(|| {
1160 format!(
1161 "failed to open session index {}",
1162 self.session_index_path.display()
1163 )
1164 })?;
1165 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1166 Ok(())
1167 }
1168
1169 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1173 let map = self.session_index_map()?;
1174 Ok(map
1175 .get(thread_id)
1176 .and_then(|entry| entry.thread_name.clone()))
1177 }
1178
1179 pub fn find_thread_names_by_ids(
1183 &self,
1184 ids: &[String],
1185 ) -> Result<HashMap<String, Option<String>>> {
1186 let map = self.session_index_map()?;
1187 let mut out = HashMap::new();
1188 for id in ids {
1189 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1190 out.insert(id.clone(), name);
1191 }
1192 Ok(out)
1193 }
1194
1195 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1200 let map = self.session_index_map()?;
1201 let matched = map
1202 .values()
1203 .filter(|entry| {
1204 entry
1205 .thread_name
1206 .as_deref()
1207 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1208 })
1209 .max_by_key(|entry| entry.updated_at);
1210 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1211 }
1212
1213 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1214 if !self.session_index_path.exists() {
1215 return Ok(HashMap::new());
1216 }
1217 let file = OpenOptions::new()
1218 .read(true)
1219 .open(&self.session_index_path)
1220 .with_context(|| {
1221 format!(
1222 "failed to read session index {}",
1223 self.session_index_path.display()
1224 )
1225 })?;
1226 let reader = BufReader::new(file);
1227 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1228 for line in reader.lines() {
1229 let line = line.context("failed to read session index line")?;
1230 if line.trim().is_empty() {
1231 continue;
1232 }
1233 let parsed: SessionIndexEntry =
1234 serde_json::from_str(&line).context("failed to parse session index entry")?;
1235 latest.insert(parsed.thread_id.clone(), parsed);
1236 }
1237 Ok(latest)
1238 }
1239}
1240
1241fn default_state_db_path() -> PathBuf {
1242 dirs::home_dir()
1243 .unwrap_or_else(|| PathBuf::from("."))
1244 .join(".deepseek")
1245 .join("state.db")
1246}
1247
1248fn bool_to_i64(value: bool) -> i64 {
1249 if value { 1 } else { 0 }
1250}
1251
1252fn i64_to_bool(value: i64) -> bool {
1253 value != 0
1254}
1255
1256fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1257 match status {
1258 ThreadStatus::Running => "running",
1259 ThreadStatus::Idle => "idle",
1260 ThreadStatus::Completed => "completed",
1261 ThreadStatus::Failed => "failed",
1262 ThreadStatus::Paused => "paused",
1263 ThreadStatus::Archived => "archived",
1264 }
1265}
1266
1267fn thread_status_from_str(value: &str) -> ThreadStatus {
1268 match value {
1269 "running" => ThreadStatus::Running,
1270 "idle" => ThreadStatus::Idle,
1271 "completed" => ThreadStatus::Completed,
1272 "failed" => ThreadStatus::Failed,
1273 "paused" => ThreadStatus::Paused,
1274 "archived" => ThreadStatus::Archived,
1275 _ => ThreadStatus::Idle,
1276 }
1277}
1278
1279fn session_source_to_str(source: &SessionSource) -> &'static str {
1280 match source {
1281 SessionSource::Interactive => "interactive",
1282 SessionSource::Resume => "resume",
1283 SessionSource::Fork => "fork",
1284 SessionSource::Api => "api",
1285 SessionSource::Unknown => "unknown",
1286 }
1287}
1288
1289fn session_source_from_str(value: &str) -> SessionSource {
1290 match value {
1291 "interactive" => SessionSource::Interactive,
1292 "resume" => SessionSource::Resume,
1293 "fork" => SessionSource::Fork,
1294 "api" => SessionSource::Api,
1295 _ => SessionSource::Unknown,
1296 }
1297}
1298
1299fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1300 path.map(|p| p.display().to_string())
1301}
1302
1303fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1304 match status {
1305 JobStateStatus::Queued => "queued",
1306 JobStateStatus::Running => "running",
1307 JobStateStatus::Completed => "completed",
1308 JobStateStatus::Failed => "failed",
1309 JobStateStatus::Cancelled => "cancelled",
1310 }
1311}
1312
1313fn job_state_status_from_str(value: &str) -> JobStateStatus {
1314 match value {
1315 "queued" => JobStateStatus::Queued,
1316 "running" => JobStateStatus::Running,
1317 "completed" => JobStateStatus::Completed,
1318 "failed" => JobStateStatus::Failed,
1319 "cancelled" => JobStateStatus::Cancelled,
1320 _ => JobStateStatus::Queued,
1321 }
1322}
1323
1324fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1325 let status_raw: String = row.get(7)?;
1326 let source_raw: String = row.get(11)?;
1327 let rollout_path: Option<String> = row.get(1)?;
1328 let path: Option<String> = row.get(8)?;
1329 Ok(ThreadMetadata {
1330 id: row.get(0)?,
1331 rollout_path: rollout_path.map(PathBuf::from),
1332 preview: row.get(2)?,
1333 ephemeral: i64_to_bool(row.get(3)?),
1334 model_provider: row.get(4)?,
1335 created_at: row.get(5)?,
1336 updated_at: row.get(6)?,
1337 status: thread_status_from_str(&status_raw),
1338 path: path.map(PathBuf::from),
1339 cwd: PathBuf::from(row.get::<_, String>(9)?),
1340 cli_version: row.get(10)?,
1341 source: session_source_from_str(&source_raw),
1342 name: row.get(12)?,
1343 sandbox_policy: row.get(13)?,
1344 approval_mode: row.get(14)?,
1345 archived: i64_to_bool(row.get(15)?),
1346 archived_at: row.get(16)?,
1347 git_sha: row.get(17)?,
1348 git_branch: row.get(18)?,
1349 git_origin_url: row.get(19)?,
1350 memory_mode: row.get(20)?,
1351 current_leaf_id: row.get(21)?,
1352 })
1353}