1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone)]
200pub struct ThreadListFilters {
201 pub include_archived: bool,
203 pub limit: Option<usize>,
205}
206
207impl Default for ThreadListFilters {
208 fn default() -> Self {
209 Self {
210 include_archived: false,
211 limit: Some(50),
212 }
213 }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217struct SessionIndexEntry {
218 thread_id: String,
219 thread_name: Option<String>,
220 updated_at: i64,
221 rollout_path: Option<PathBuf>,
222}
223
224#[derive(Debug, Clone)]
229pub struct StateStore {
230 db_path: PathBuf,
231 session_index_path: PathBuf,
232}
233
234impl StateStore {
235 pub fn open(path: Option<PathBuf>) -> Result<Self> {
240 let db_path = path.unwrap_or_else(default_state_db_path);
241 let session_index_path = db_path
242 .parent()
243 .unwrap_or_else(|| Path::new("."))
244 .join("session_index.jsonl");
245 if let Some(parent) = db_path.parent() {
246 fs::create_dir_all(parent).with_context(|| {
247 format!("failed to create state directory {}", parent.display())
248 })?;
249 }
250 let store = Self {
251 db_path,
252 session_index_path,
253 };
254 store.init_schema()?;
255 Ok(store)
256 }
257
258 pub fn db_path(&self) -> &Path {
260 &self.db_path
261 }
262
263 fn conn(&self) -> Result<Connection> {
264 Connection::open(&self.db_path)
265 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
266 }
267
268 fn init_schema(&self) -> Result<()> {
269 let conn = self.conn()?;
270 let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
271 if user_version == 0 {
272 conn.execute_batch(
273 r#"
274 BEGIN;
275 CREATE TABLE IF NOT EXISTS threads (
276 id TEXT PRIMARY KEY,
277 rollout_path TEXT,
278 preview TEXT NOT NULL,
279 ephemeral INTEGER NOT NULL,
280 model_provider TEXT NOT NULL,
281 created_at INTEGER NOT NULL,
282 updated_at INTEGER NOT NULL,
283 status TEXT NOT NULL,
284 path TEXT,
285 cwd TEXT NOT NULL,
286 cli_version TEXT NOT NULL,
287 source TEXT NOT NULL,
288 title TEXT,
289 sandbox_policy TEXT,
290 approval_mode TEXT,
291 archived INTEGER NOT NULL DEFAULT 0,
292 archived_at INTEGER,
293 git_sha TEXT,
294 git_branch TEXT,
295 git_origin_url TEXT,
296 memory_mode TEXT
297 );
298 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
299 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
300 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
301
302 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
303 thread_id TEXT NOT NULL,
304 position INTEGER NOT NULL,
305 name TEXT NOT NULL,
306 description TEXT,
307 input_schema TEXT NOT NULL,
308 PRIMARY KEY (thread_id, position),
309 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
310 );
311
312 CREATE TABLE IF NOT EXISTS messages (
313 id INTEGER PRIMARY KEY AUTOINCREMENT,
314 thread_id TEXT NOT NULL,
315 role TEXT NOT NULL,
316 content TEXT NOT NULL,
317 item_json TEXT,
318 created_at INTEGER NOT NULL,
319 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
320 );
321 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
322
323 CREATE TABLE IF NOT EXISTS checkpoints (
324 thread_id TEXT NOT NULL,
325 checkpoint_id TEXT NOT NULL,
326 state_json TEXT NOT NULL,
327 created_at INTEGER NOT NULL,
328 PRIMARY KEY(thread_id, checkpoint_id),
329 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
330 );
331 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
332
333 CREATE TABLE IF NOT EXISTS jobs (
334 id TEXT PRIMARY KEY,
335 name TEXT NOT NULL,
336 status TEXT NOT NULL,
337 progress INTEGER,
338 detail TEXT,
339 created_at INTEGER NOT NULL,
340 updated_at INTEGER NOT NULL
341 );
342 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
343
344 -- Add parent_entry_id column, and set to last message before current message
345 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
346 UPDATE messages
347 SET parent_entry_id = (
348 SELECT m2.id
349 FROM messages m2
350 WHERE m2.thread_id = messages.thread_id
351 AND (
352 m2.created_at < messages.created_at
353 OR (
354 m2.created_at = messages.created_at
355 AND m2.id < messages.id
356 )
357 )
358 ORDER BY m2.created_at DESC, m2.id DESC
359 LIMIT 1
360 );
361 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
362
363 -- Add current_leaf_id column, and set to last message in thread
364 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
365 UPDATE threads
366 SET current_leaf_id = (
367 SELECT m.id
368 FROM messages m
369 WHERE m.thread_id = threads.id
370 ORDER BY m.id DESC
371 LIMIT 1
372 );
373
374 PRAGMA user_version = 1;
375 COMMIT;
376 "#,
377 )
378 .context("failed to initialize thread schema")?;
379 user_version = 1;
380 }
381 if user_version < 2 {
382 conn.execute_batch(
383 r#"
384 BEGIN;
385 CREATE TABLE IF NOT EXISTS workflow_runs (
386 id TEXT PRIMARY KEY,
387 workflow_id TEXT NOT NULL,
388 goal TEXT NOT NULL,
389 status TEXT NOT NULL,
390 input_hash TEXT,
391 started_at INTEGER NOT NULL,
392 completed_at INTEGER,
393 metadata_json TEXT NOT NULL DEFAULT '{}'
394 );
395 CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
396 ON workflow_runs(status, started_at DESC);
397 CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
398 ON workflow_runs(workflow_id, started_at DESC);
399
400 CREATE TABLE IF NOT EXISTS branch_runs (
401 id TEXT PRIMARY KEY,
402 workflow_run_id TEXT NOT NULL,
403 branch_id TEXT NOT NULL,
404 node_id TEXT NOT NULL,
405 status TEXT NOT NULL,
406 started_at INTEGER NOT NULL,
407 completed_at INTEGER,
408 result_json TEXT NOT NULL DEFAULT '{}',
409 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
410 );
411 CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
412 ON branch_runs(workflow_run_id);
413 CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
414 ON branch_runs(branch_id);
415
416 CREATE TABLE IF NOT EXISTS leaf_runs (
417 id TEXT PRIMARY KEY,
418 workflow_run_id TEXT NOT NULL,
419 branch_run_id TEXT,
420 leaf_id TEXT NOT NULL,
421 task_id TEXT NOT NULL,
422 input_hash TEXT,
423 status TEXT NOT NULL,
424 output_json TEXT NOT NULL DEFAULT '{}',
425 artifacts_json TEXT NOT NULL DEFAULT '[]',
426 started_at INTEGER NOT NULL,
427 completed_at INTEGER,
428 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
429 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
430 );
431 CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
432 ON leaf_runs(workflow_run_id);
433 CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
434 ON leaf_runs(workflow_run_id, leaf_id, input_hash);
435
436 CREATE TABLE IF NOT EXISTS control_node_runs (
437 id TEXT PRIMARY KEY,
438 workflow_run_id TEXT NOT NULL,
439 node_id TEXT NOT NULL,
440 kind TEXT NOT NULL,
441 status TEXT NOT NULL,
442 selected_children_json TEXT NOT NULL DEFAULT '[]',
443 result_json TEXT NOT NULL DEFAULT '{}',
444 started_at INTEGER NOT NULL,
445 completed_at INTEGER,
446 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
447 );
448 CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
449 ON control_node_runs(workflow_run_id);
450 CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
451 ON control_node_runs(node_id);
452
453 CREATE TABLE IF NOT EXISTS teacher_candidates (
454 id TEXT PRIMARY KEY,
455 workflow_run_id TEXT NOT NULL,
456 control_node_run_id TEXT NOT NULL,
457 candidate_id TEXT NOT NULL,
458 branch_run_id TEXT,
459 score REAL,
460 passed INTEGER,
461 rationale_json TEXT NOT NULL DEFAULT '{}',
462 created_at INTEGER NOT NULL,
463 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
464 FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
465 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
466 );
467 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
468 ON teacher_candidates(workflow_run_id);
469 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
470 ON teacher_candidates(control_node_run_id);
471
472 PRAGMA user_version = 2;
473 COMMIT;
474 "#,
475 )
476 .context("failed to initialize workflow trace schema")?;
477 }
478 Ok(())
479 }
480
481 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
486 let conn = self.conn()?;
487 conn.execute(
488 r#"
489 INSERT INTO threads (
490 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
491 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
492 git_sha, git_branch, git_origin_url, memory_mode
493 ) VALUES (
494 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
495 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
496 ?18, ?19, ?20, ?21
497 )
498 ON CONFLICT(id) DO UPDATE SET
499 rollout_path=excluded.rollout_path,
500 preview=excluded.preview,
501 ephemeral=excluded.ephemeral,
502 model_provider=excluded.model_provider,
503 created_at=excluded.created_at,
504 updated_at=excluded.updated_at,
505 status=excluded.status,
506 path=excluded.path,
507 cwd=excluded.cwd,
508 cli_version=excluded.cli_version,
509 source=excluded.source,
510 title=excluded.title,
511 sandbox_policy=excluded.sandbox_policy,
512 approval_mode=excluded.approval_mode,
513 archived=excluded.archived,
514 archived_at=excluded.archived_at,
515 git_sha=excluded.git_sha,
516 git_branch=excluded.git_branch,
517 git_origin_url=excluded.git_origin_url,
518 memory_mode=excluded.memory_mode
519 "#,
520 params![
521 thread.id,
522 path_to_opt_string(thread.rollout_path.as_deref()),
523 thread.preview,
524 bool_to_i64(thread.ephemeral),
525 thread.model_provider,
526 thread.created_at,
527 thread.updated_at,
528 thread_status_to_str(&thread.status),
529 path_to_opt_string(thread.path.as_deref()),
530 thread.cwd.display().to_string(),
531 thread.cli_version,
532 session_source_to_str(&thread.source),
533 thread.name,
534 thread.sandbox_policy,
535 thread.approval_mode,
536 bool_to_i64(thread.archived),
537 thread.archived_at,
538 thread.git_sha,
539 thread.git_branch,
540 thread.git_origin_url,
541 thread.memory_mode,
542 ],
543 )
544 .context("failed to upsert thread metadata")?;
545
546 self.append_thread_name(
547 &thread.id,
548 thread.name.clone(),
549 thread.updated_at,
550 thread.rollout_path.clone(),
551 )?;
552 Ok(())
553 }
554
555 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
559 let conn = self.conn()?;
560 conn.query_row(
561 r#"
562 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
563 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
564 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
565 FROM threads
566 WHERE id = ?1
567 "#,
568 params![id],
569 row_to_thread,
570 )
571 .optional()
572 .context("failed to read thread")
573 }
574
575 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
580 let conn = self.conn()?;
581 let sql = if filters.include_archived {
582 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
583 } else {
584 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
585 };
586
587 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
588 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
589 let mut rows = stmt
590 .query(params![limit])
591 .context("failed to query threads")?;
592 let mut out = Vec::new();
593 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
594 out.push(row_to_thread(row)?);
595 }
596 Ok(out)
597 }
598
599 pub fn mark_archived(&self, id: &str) -> Result<()> {
602 let conn = self.conn()?;
603 conn.execute(
604 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
605 params![
606 id,
607 Utc::now().timestamp(),
608 thread_status_to_str(&ThreadStatus::Archived)
609 ],
610 )
611 .context("failed to archive thread")?;
612 Ok(())
613 }
614
615 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
617 let conn = self.conn()?;
618 conn.execute(
619 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
620 params![id],
621 )
622 .context("failed to unarchive thread")?;
623 Ok(())
624 }
625
626 pub fn delete_thread(&self, id: &str) -> Result<()> {
629 let conn = self.conn()?;
630 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
631 .context("failed to delete thread")?;
632 Ok(())
633 }
634
635 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
639 let conn = self.conn()?;
640 conn.execute(
641 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
642 params![id, mode],
643 )
644 .context("failed to update thread memory mode")?;
645 Ok(())
646 }
647
648 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
652 let conn = self.conn()?;
653 conn.query_row(
654 "SELECT memory_mode FROM threads WHERE id = ?1",
655 params![id],
656 |row| row.get::<_, Option<String>>(0),
657 )
658 .optional()
659 .context("failed to read thread memory mode")
660 .map(Option::flatten)
661 }
662
663 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
668 let conn = self.conn()?;
669 let mut stmt = conn
670 .prepare(
671 r#"
672 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
673 FROM messages m1
674 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
675 WHERE m1.thread_id = ?1 AND m2.id IS NULL
676 "#,
677 )
678 .context("failed to prepare message listing query")?;
679 let mut rows = stmt
680 .query(params![thread_id])
681 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
682 let mut out = Vec::new();
683 while let Some(row) = rows.next().context("failed to iterate message rows")? {
684 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
685 let item = item_json
686 .as_deref()
687 .map(serde_json::from_str)
688 .transpose()
689 .with_context(|| {
690 format!("failed to parse message item json in thread {thread_id}")
691 })?;
692 out.push(MessageRecord {
693 id: row.get(0).context("failed to read message id")?,
694 thread_id: row.get(1).context("failed to read message thread id")?,
695 role: row.get(2).context("failed to read message role")?,
696 content: row.get(3).context("failed to read message content")?,
697 item,
698 created_at: row.get(5).context("failed to read message timestamp")?,
699 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
700 });
701 }
702 Ok(out)
703 }
704
705 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
710 let conn = self.conn()?;
711 conn.execute(
712 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
713 params![current_leaf_id, thread_id],
714 )
715 .context("failed to update thread current leaf id")?;
716 Ok(())
717 }
718
719 pub fn persist_dynamic_tools(
724 &self,
725 thread_id: &str,
726 tools: &[DynamicToolRecord],
727 ) -> Result<()> {
728 let mut conn = self.conn()?;
729 let tx = conn
730 .transaction()
731 .context("failed to begin dynamic tools transaction")?;
732 tx.execute(
733 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
734 params![thread_id],
735 )
736 .context("failed to clear dynamic tools")?;
737 for tool in tools {
738 tx.execute(
739 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
740 params![
741 thread_id,
742 tool.position,
743 tool.name,
744 tool.description,
745 tool.input_schema.to_string()
746 ],
747 )
748 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
749 }
750 tx.commit().context("failed to commit dynamic tools")?;
751 Ok(())
752 }
753
754 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
756 let conn = self.conn()?;
757 let mut stmt = conn
758 .prepare(
759 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
760 )
761 .context("failed to prepare get dynamic tools query")?;
762 let mut rows = stmt
763 .query(params![thread_id])
764 .context("failed to query dynamic tools")?;
765 let mut out = Vec::new();
766 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
767 let input_schema_raw: String =
768 row.get(3).context("failed to read tool input schema")?;
769 let input_schema: Value =
770 serde_json::from_str(&input_schema_raw).with_context(|| {
771 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
772 })?;
773 out.push(DynamicToolRecord {
774 position: row.get(0).context("failed to read tool position")?,
775 name: row.get(1).context("failed to read tool name")?,
776 description: row.get(2).context("failed to read tool description")?,
777 input_schema,
778 });
779 }
780 Ok(out)
781 }
782
783 pub fn append_message(
789 &self,
790 thread_id: &str,
791 role: &str,
792 content: &str,
793 item: Option<Value>,
794 ) -> Result<i64> {
795 let mut conn = self.conn()?;
796 let created_at = Utc::now().timestamp();
797 let item_json = item
798 .as_ref()
799 .map(serde_json::to_string)
800 .transpose()
801 .context("failed to serialize message item payload")?;
802
803 let tx = conn
804 .transaction()
805 .context("failed to begin append message transaction")?;
806
807 let current_leaf_id: Option<i64> = tx
808 .query_row(
809 "SELECT current_leaf_id FROM threads WHERE id = ?1",
810 params![thread_id],
811 |row| row.get(0),
812 )
813 .with_context(|| {
814 format!("failed to query thread current leaf id for thread {thread_id}")
815 })?;
816
817 let next_leaf_id: i64 = tx.query_row(
818 r#"
819 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
820 SELECT ?1, ?2, ?3, ?4, ?5, ?6
821 RETURNING id
822 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
823 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
824
825 tx.execute(
826 r#"
827 UPDATE threads
828 SET current_leaf_id = ?1
829 WHERE id = ?2;
830 "#,
831 params![next_leaf_id, thread_id],
832 )
833 .with_context(|| {
834 format!("failed to update thread current leaf id for thread {thread_id}")
835 })?;
836
837 tx.commit()
838 .context("failed to commit append message transaction")?;
839
840 Ok(next_leaf_id)
841 }
842
843 pub fn list_messages(
849 &self,
850 thread_id: &str,
851 limit: Option<usize>,
852 ) -> Result<Vec<MessageRecord>> {
853 let conn = self.conn()?;
854 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
855 let mut stmt = conn
856 .prepare(
857 r#"
858 WITH RECURSIVE
859 leaf_id AS (
860 SELECT current_leaf_id FROM threads WHERE id = ?1
861 ),
862 ancestors AS (
863 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
864 FROM messages
865 WHERE id = (SELECT current_leaf_id FROM leaf_id)
866
867 UNION ALL
868
869 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
870 FROM messages m
871 JOIN ancestors a ON m.id = a.parent_entry_id
872 WHERE a.depth < ?2
873 )
874 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
875 ORDER BY depth DESC
876 "#
877 )
878 .context("failed to prepare message listing query")?;
879 let mut rows = stmt
880 .query(params![thread_id, limit - 1])
881 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
882 let mut out = Vec::new();
883 while let Some(row) = rows.next().context("failed to iterate message rows")? {
884 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
885 let item = item_json
886 .as_deref()
887 .map(serde_json::from_str)
888 .transpose()
889 .with_context(|| {
890 format!("failed to parse message item json in thread {thread_id}")
891 })?;
892 out.push(MessageRecord {
893 id: row.get(0).context("failed to read message id")?,
894 thread_id: row.get(1).context("failed to read message thread id")?,
895 role: row.get(2).context("failed to read message role")?,
896 content: row.get(3).context("failed to read message content")?,
897 item,
898 created_at: row.get(5).context("failed to read message timestamp")?,
899 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
900 });
901 }
902 Ok(out)
903 }
904
905 pub fn fork_at_message(
911 &self,
912 message_id: &str,
913 role: &str,
914 content: &str,
915 item: Option<Value>,
916 ) -> Result<i64> {
917 let mut conn = self.conn()?;
918 let created_at = Utc::now().timestamp();
919 let item_json = item
920 .as_ref()
921 .map(serde_json::to_string)
922 .transpose()
923 .context("failed to serialize message item payload")?;
924
925 let tx = conn
926 .transaction()
927 .context("failed to begin fork message transaction")?;
928
929 let thread_id: String = tx
930 .query_row(
931 "SELECT thread_id FROM messages WHERE id = ?1",
932 params![message_id],
933 |row| row.get(0),
934 )
935 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
936
937 let next_leaf_id: i64 = tx.query_row(
938 r#"
939 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
940 SELECT ?1, ?2, ?3, ?4, ?5, ?6
941 RETURNING id
942 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
943 ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
944
945 tx.execute(
946 r#"
947 UPDATE threads
948 SET current_leaf_id = ?1
949 WHERE id = ?2;
950 "#,
951 params![next_leaf_id, thread_id],
952 )
953 .with_context(|| {
954 format!(
955 "failed to update thread current leaf id for thread {:?}",
956 thread_id
957 )
958 })?;
959
960 tx.commit()
961 .context("failed to commit fork message transaction")?;
962
963 Ok(next_leaf_id)
964 }
965
966 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
970 let mut conn = self.conn()?;
971 let tx = conn
972 .transaction()
973 .context("failed to begin clear messages transaction")?;
974
975 tx.execute(
976 r#"
977 UPDATE threads
978 SET current_leaf_id = NULL
979 WHERE id = ?1;
980 "#,
981 params![thread_id],
982 )
983 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
984 let result = tx
985 .execute(
986 r#"
987 DELETE FROM messages WHERE thread_id = ?1
988 "#,
989 params![thread_id],
990 )
991 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
992 tx.commit()
993 .context("failed to commit clear messages transaction")?;
994
995 Ok(result)
996 }
997
998 pub fn save_checkpoint(
1003 &self,
1004 thread_id: &str,
1005 checkpoint_id: &str,
1006 state: &Value,
1007 ) -> Result<()> {
1008 let conn = self.conn()?;
1009 let state_json =
1010 serde_json::to_string(state).context("failed to encode checkpoint state")?;
1011 conn.execute(
1012 r#"
1013 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1014 VALUES (?1, ?2, ?3, ?4)
1015 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1016 state_json = excluded.state_json,
1017 created_at = excluded.created_at
1018 "#,
1019 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1020 )
1021 .with_context(|| {
1022 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1023 })?;
1024 Ok(())
1025 }
1026
1027 pub fn load_checkpoint(
1033 &self,
1034 thread_id: &str,
1035 checkpoint_id: Option<&str>,
1036 ) -> Result<Option<CheckpointRecord>> {
1037 let conn = self.conn()?;
1038 if let Some(checkpoint_id) = checkpoint_id {
1039 let row = conn
1040 .query_row(
1041 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1042 params![thread_id, checkpoint_id],
1043 |row| {
1044 let state_json: String = row.get(2)?;
1045 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1046 Ok(CheckpointRecord {
1047 thread_id: row.get(0)?,
1048 checkpoint_id: row.get(1)?,
1049 state,
1050 created_at: row.get(3)?,
1051 })
1052 },
1053 )
1054 .optional()
1055 .with_context(|| {
1056 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1057 })?;
1058 return Ok(row);
1059 }
1060
1061 conn.query_row(
1062 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1063 params![thread_id],
1064 |row| {
1065 let state_json: String = row.get(2)?;
1066 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1067 Ok(CheckpointRecord {
1068 thread_id: row.get(0)?,
1069 checkpoint_id: row.get(1)?,
1070 state,
1071 created_at: row.get(3)?,
1072 })
1073 },
1074 )
1075 .optional()
1076 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1077 }
1078
1079 pub fn list_checkpoints(
1083 &self,
1084 thread_id: &str,
1085 limit: Option<usize>,
1086 ) -> Result<Vec<CheckpointRecord>> {
1087 let conn = self.conn()?;
1088 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1089 let mut stmt = conn
1090 .prepare(
1091 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1092 )
1093 .context("failed to prepare checkpoint list query")?;
1094 let mut rows = stmt
1095 .query(params![thread_id, limit])
1096 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1097
1098 let mut out = Vec::new();
1099 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1100 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1101 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1102 out.push(CheckpointRecord {
1103 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1104 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1105 state,
1106 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1107 });
1108 }
1109 Ok(out)
1110 }
1111
1112 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1114 let conn = self.conn()?;
1115 conn.execute(
1116 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1117 params![thread_id, checkpoint_id],
1118 )
1119 .with_context(|| {
1120 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1121 })?;
1122 Ok(())
1123 }
1124
1125 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1127 let conn = self.conn()?;
1128 conn.execute(
1129 r#"
1130 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1131 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1132 ON CONFLICT(id) DO UPDATE SET
1133 name = excluded.name,
1134 status = excluded.status,
1135 progress = excluded.progress,
1136 detail = excluded.detail,
1137 created_at = excluded.created_at,
1138 updated_at = excluded.updated_at
1139 "#,
1140 params![
1141 job.id,
1142 job.name,
1143 job_state_status_to_str(&job.status),
1144 job.progress.map(i64::from),
1145 job.detail,
1146 job.created_at,
1147 job.updated_at
1148 ],
1149 )
1150 .with_context(|| format!("failed to upsert job {}", job.id))?;
1151 Ok(())
1152 }
1153
1154 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1158 let conn = self.conn()?;
1159 conn.query_row(
1160 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1161 params![id],
1162 |row| {
1163 let status_raw: String = row.get(2)?;
1164 let progress: Option<i64> = row.get(3)?;
1165 Ok(JobStateRecord {
1166 id: row.get(0)?,
1167 name: row.get(1)?,
1168 status: job_state_status_from_str(&status_raw),
1169 progress: progress.and_then(|v| u8::try_from(v).ok()),
1170 detail: row.get(4)?,
1171 created_at: row.get(5)?,
1172 updated_at: row.get(6)?,
1173 })
1174 },
1175 )
1176 .optional()
1177 .with_context(|| format!("failed to read job {id}"))
1178 }
1179
1180 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1184 let conn = self.conn()?;
1185 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1186 let mut stmt = conn
1187 .prepare(
1188 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1189 )
1190 .context("failed to prepare job list query")?;
1191 let mut rows = stmt
1192 .query(params![limit])
1193 .context("failed to query persisted jobs")?;
1194 let mut out = Vec::new();
1195 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1196 let status_raw: String = row.get(2).context("failed to read job status")?;
1197 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1198 out.push(JobStateRecord {
1199 id: row.get(0).context("failed to read job id")?,
1200 name: row.get(1).context("failed to read job name")?,
1201 status: job_state_status_from_str(&status_raw),
1202 progress: progress.and_then(|v| u8::try_from(v).ok()),
1203 detail: row.get(4).context("failed to read job detail")?,
1204 created_at: row.get(5).context("failed to read job created_at")?,
1205 updated_at: row.get(6).context("failed to read job updated_at")?,
1206 });
1207 }
1208 Ok(out)
1209 }
1210
1211 pub fn delete_job(&self, id: &str) -> Result<()> {
1213 let conn = self.conn()?;
1214 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1215 .with_context(|| format!("failed to delete job {id}"))?;
1216 Ok(())
1217 }
1218
1219 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1221 let conn = self.conn()?;
1222 conn.query_row(
1223 "SELECT rollout_path FROM threads WHERE id = ?1",
1224 params![id],
1225 |row| row.get::<_, Option<String>>(0),
1226 )
1227 .optional()
1228 .context("failed to lookup rollout path")
1229 .map(|opt| opt.flatten().map(PathBuf::from))
1230 }
1231
1232 pub fn append_thread_name(
1238 &self,
1239 thread_id: &str,
1240 thread_name: Option<String>,
1241 updated_at: i64,
1242 rollout_path: Option<PathBuf>,
1243 ) -> Result<()> {
1244 if let Some(parent) = self.session_index_path.parent() {
1245 fs::create_dir_all(parent).with_context(|| {
1246 format!(
1247 "failed to create session index directory {}",
1248 parent.display()
1249 )
1250 })?;
1251 }
1252 let entry = SessionIndexEntry {
1253 thread_id: thread_id.to_string(),
1254 thread_name,
1255 updated_at,
1256 rollout_path,
1257 };
1258 let encoded =
1259 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1260 let mut file = OpenOptions::new()
1261 .create(true)
1262 .append(true)
1263 .open(&self.session_index_path)
1264 .with_context(|| {
1265 format!(
1266 "failed to open session index {}",
1267 self.session_index_path.display()
1268 )
1269 })?;
1270 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1271 Ok(())
1272 }
1273
1274 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1278 let map = self.session_index_map()?;
1279 Ok(map
1280 .get(thread_id)
1281 .and_then(|entry| entry.thread_name.clone()))
1282 }
1283
1284 pub fn find_thread_names_by_ids(
1288 &self,
1289 ids: &[String],
1290 ) -> Result<HashMap<String, Option<String>>> {
1291 let map = self.session_index_map()?;
1292 let mut out = HashMap::new();
1293 for id in ids {
1294 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1295 out.insert(id.clone(), name);
1296 }
1297 Ok(out)
1298 }
1299
1300 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1305 let map = self.session_index_map()?;
1306 let matched = map
1307 .values()
1308 .filter(|entry| {
1309 entry
1310 .thread_name
1311 .as_deref()
1312 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1313 })
1314 .max_by_key(|entry| entry.updated_at);
1315 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1316 }
1317
1318 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1319 if !self.session_index_path.exists() {
1320 return Ok(HashMap::new());
1321 }
1322 let file = OpenOptions::new()
1323 .read(true)
1324 .open(&self.session_index_path)
1325 .with_context(|| {
1326 format!(
1327 "failed to read session index {}",
1328 self.session_index_path.display()
1329 )
1330 })?;
1331 let reader = BufReader::new(file);
1332 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1333 for line in reader.lines() {
1334 let line = line.context("failed to read session index line")?;
1335 if line.trim().is_empty() {
1336 continue;
1337 }
1338 let parsed: SessionIndexEntry =
1339 serde_json::from_str(&line).context("failed to parse session index entry")?;
1340 latest.insert(parsed.thread_id.clone(), parsed);
1341 }
1342 Ok(latest)
1343 }
1344}
1345
1346fn default_state_db_path() -> PathBuf {
1347 dirs::home_dir()
1348 .unwrap_or_else(|| PathBuf::from("."))
1349 .join(".deepseek")
1350 .join("state.db")
1351}
1352
1353fn bool_to_i64(value: bool) -> i64 {
1354 if value { 1 } else { 0 }
1355}
1356
1357fn i64_to_bool(value: i64) -> bool {
1358 value != 0
1359}
1360
1361fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1362 match status {
1363 ThreadStatus::Running => "running",
1364 ThreadStatus::Idle => "idle",
1365 ThreadStatus::Completed => "completed",
1366 ThreadStatus::Failed => "failed",
1367 ThreadStatus::Paused => "paused",
1368 ThreadStatus::Archived => "archived",
1369 }
1370}
1371
1372fn thread_status_from_str(value: &str) -> ThreadStatus {
1373 match value {
1374 "running" => ThreadStatus::Running,
1375 "idle" => ThreadStatus::Idle,
1376 "completed" => ThreadStatus::Completed,
1377 "failed" => ThreadStatus::Failed,
1378 "paused" => ThreadStatus::Paused,
1379 "archived" => ThreadStatus::Archived,
1380 _ => ThreadStatus::Idle,
1381 }
1382}
1383
1384fn session_source_to_str(source: &SessionSource) -> &'static str {
1385 match source {
1386 SessionSource::Interactive => "interactive",
1387 SessionSource::Resume => "resume",
1388 SessionSource::Fork => "fork",
1389 SessionSource::Api => "api",
1390 SessionSource::Unknown => "unknown",
1391 }
1392}
1393
1394fn session_source_from_str(value: &str) -> SessionSource {
1395 match value {
1396 "interactive" => SessionSource::Interactive,
1397 "resume" => SessionSource::Resume,
1398 "fork" => SessionSource::Fork,
1399 "api" => SessionSource::Api,
1400 _ => SessionSource::Unknown,
1401 }
1402}
1403
1404fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1405 path.map(|p| p.display().to_string())
1406}
1407
1408fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1409 match status {
1410 JobStateStatus::Queued => "queued",
1411 JobStateStatus::Running => "running",
1412 JobStateStatus::Completed => "completed",
1413 JobStateStatus::Failed => "failed",
1414 JobStateStatus::Cancelled => "cancelled",
1415 }
1416}
1417
1418fn job_state_status_from_str(value: &str) -> JobStateStatus {
1419 match value {
1420 "queued" => JobStateStatus::Queued,
1421 "running" => JobStateStatus::Running,
1422 "completed" => JobStateStatus::Completed,
1423 "failed" => JobStateStatus::Failed,
1424 "cancelled" => JobStateStatus::Cancelled,
1425 _ => JobStateStatus::Queued,
1426 }
1427}
1428
1429fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1430 let status_raw: String = row.get(7)?;
1431 let source_raw: String = row.get(11)?;
1432 let rollout_path: Option<String> = row.get(1)?;
1433 let path: Option<String> = row.get(8)?;
1434 Ok(ThreadMetadata {
1435 id: row.get(0)?,
1436 rollout_path: rollout_path.map(PathBuf::from),
1437 preview: row.get(2)?,
1438 ephemeral: i64_to_bool(row.get(3)?),
1439 model_provider: row.get(4)?,
1440 created_at: row.get(5)?,
1441 updated_at: row.get(6)?,
1442 status: thread_status_from_str(&status_raw),
1443 path: path.map(PathBuf::from),
1444 cwd: PathBuf::from(row.get::<_, String>(9)?),
1445 cli_version: row.get(10)?,
1446 source: session_source_from_str(&source_raw),
1447 name: row.get(12)?,
1448 sandbox_policy: row.get(13)?,
1449 approval_mode: row.get(14)?,
1450 archived: i64_to_bool(row.get(15)?),
1451 archived_at: row.get(16)?,
1452 git_sha: row.get(17)?,
1453 git_branch: row.get(18)?,
1454 git_origin_url: row.get(19)?,
1455 memory_mode: row.get(20)?,
1456 current_leaf_id: row.get(21)?,
1457 })
1458}