1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone)]
200pub struct ThreadListFilters {
201 pub include_archived: bool,
203 pub limit: Option<usize>,
205}
206
207impl Default for ThreadListFilters {
208 fn default() -> Self {
209 Self {
210 include_archived: false,
211 limit: Some(50),
212 }
213 }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217struct SessionIndexEntry {
218 thread_id: String,
219 thread_name: Option<String>,
220 updated_at: i64,
221 rollout_path: Option<PathBuf>,
222}
223
224#[derive(Debug, Clone)]
229pub struct StateStore {
230 db_path: PathBuf,
231 session_index_path: PathBuf,
232}
233
234impl StateStore {
235 pub fn open(path: Option<PathBuf>) -> Result<Self> {
241 let db_path = path.unwrap_or_else(default_state_db_path);
242 let session_index_path = db_path
243 .parent()
244 .unwrap_or_else(|| Path::new("."))
245 .join("session_index.jsonl");
246 if let Some(parent) = db_path.parent() {
247 fs::create_dir_all(parent).with_context(|| {
248 format!("failed to create state directory {}", parent.display())
249 })?;
250 }
251 let store = Self {
252 db_path,
253 session_index_path,
254 };
255 store.init_schema()?;
256 Ok(store)
257 }
258
259 pub fn db_path(&self) -> &Path {
261 &self.db_path
262 }
263
264 fn conn(&self) -> Result<Connection> {
265 Connection::open(&self.db_path)
266 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
267 }
268
269 fn init_schema(&self) -> Result<()> {
270 let conn = self.conn()?;
271 let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
272 if user_version == 0 {
273 conn.execute_batch(
274 r#"
275 BEGIN;
276 CREATE TABLE IF NOT EXISTS threads (
277 id TEXT PRIMARY KEY,
278 rollout_path TEXT,
279 preview TEXT NOT NULL,
280 ephemeral INTEGER NOT NULL,
281 model_provider TEXT NOT NULL,
282 created_at INTEGER NOT NULL,
283 updated_at INTEGER NOT NULL,
284 status TEXT NOT NULL,
285 path TEXT,
286 cwd TEXT NOT NULL,
287 cli_version TEXT NOT NULL,
288 source TEXT NOT NULL,
289 title TEXT,
290 sandbox_policy TEXT,
291 approval_mode TEXT,
292 archived INTEGER NOT NULL DEFAULT 0,
293 archived_at INTEGER,
294 git_sha TEXT,
295 git_branch TEXT,
296 git_origin_url TEXT,
297 memory_mode TEXT
298 );
299 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
300 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
301 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
302
303 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
304 thread_id TEXT NOT NULL,
305 position INTEGER NOT NULL,
306 name TEXT NOT NULL,
307 description TEXT,
308 input_schema TEXT NOT NULL,
309 PRIMARY KEY (thread_id, position),
310 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
311 );
312
313 CREATE TABLE IF NOT EXISTS messages (
314 id INTEGER PRIMARY KEY AUTOINCREMENT,
315 thread_id TEXT NOT NULL,
316 role TEXT NOT NULL,
317 content TEXT NOT NULL,
318 item_json TEXT,
319 created_at INTEGER NOT NULL,
320 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
321 );
322 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
323
324 CREATE TABLE IF NOT EXISTS checkpoints (
325 thread_id TEXT NOT NULL,
326 checkpoint_id TEXT NOT NULL,
327 state_json TEXT NOT NULL,
328 created_at INTEGER NOT NULL,
329 PRIMARY KEY(thread_id, checkpoint_id),
330 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
331 );
332 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
333
334 CREATE TABLE IF NOT EXISTS jobs (
335 id TEXT PRIMARY KEY,
336 name TEXT NOT NULL,
337 status TEXT NOT NULL,
338 progress INTEGER,
339 detail TEXT,
340 created_at INTEGER NOT NULL,
341 updated_at INTEGER NOT NULL
342 );
343 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
344
345 -- Add parent_entry_id column, and set to last message before current message
346 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
347 UPDATE messages
348 SET parent_entry_id = (
349 SELECT m2.id
350 FROM messages m2
351 WHERE m2.thread_id = messages.thread_id
352 AND (
353 m2.created_at < messages.created_at
354 OR (
355 m2.created_at = messages.created_at
356 AND m2.id < messages.id
357 )
358 )
359 ORDER BY m2.created_at DESC, m2.id DESC
360 LIMIT 1
361 );
362 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
363
364 -- Add current_leaf_id column, and set to last message in thread
365 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
366 UPDATE threads
367 SET current_leaf_id = (
368 SELECT m.id
369 FROM messages m
370 WHERE m.thread_id = threads.id
371 ORDER BY m.id DESC
372 LIMIT 1
373 );
374
375 PRAGMA user_version = 1;
376 COMMIT;
377 "#,
378 )
379 .context("failed to initialize thread schema")?;
380 user_version = 1;
381 }
382 if user_version < 2 {
383 conn.execute_batch(
384 r#"
385 BEGIN;
386 CREATE TABLE IF NOT EXISTS workflow_runs (
387 id TEXT PRIMARY KEY,
388 workflow_id TEXT NOT NULL,
389 goal TEXT NOT NULL,
390 status TEXT NOT NULL,
391 input_hash TEXT,
392 started_at INTEGER NOT NULL,
393 completed_at INTEGER,
394 metadata_json TEXT NOT NULL DEFAULT '{}'
395 );
396 CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
397 ON workflow_runs(status, started_at DESC);
398 CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
399 ON workflow_runs(workflow_id, started_at DESC);
400
401 CREATE TABLE IF NOT EXISTS branch_runs (
402 id TEXT PRIMARY KEY,
403 workflow_run_id TEXT NOT NULL,
404 branch_id TEXT NOT NULL,
405 node_id TEXT NOT NULL,
406 status TEXT NOT NULL,
407 started_at INTEGER NOT NULL,
408 completed_at INTEGER,
409 result_json TEXT NOT NULL DEFAULT '{}',
410 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
411 );
412 CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
413 ON branch_runs(workflow_run_id);
414 CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
415 ON branch_runs(branch_id);
416
417 CREATE TABLE IF NOT EXISTS leaf_runs (
418 id TEXT PRIMARY KEY,
419 workflow_run_id TEXT NOT NULL,
420 branch_run_id TEXT,
421 leaf_id TEXT NOT NULL,
422 task_id TEXT NOT NULL,
423 input_hash TEXT,
424 status TEXT NOT NULL,
425 output_json TEXT NOT NULL DEFAULT '{}',
426 artifacts_json TEXT NOT NULL DEFAULT '[]',
427 started_at INTEGER NOT NULL,
428 completed_at INTEGER,
429 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
430 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
431 );
432 CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
433 ON leaf_runs(workflow_run_id);
434 CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
435 ON leaf_runs(workflow_run_id, leaf_id, input_hash);
436
437 CREATE TABLE IF NOT EXISTS control_node_runs (
438 id TEXT PRIMARY KEY,
439 workflow_run_id TEXT NOT NULL,
440 node_id TEXT NOT NULL,
441 kind TEXT NOT NULL,
442 status TEXT NOT NULL,
443 selected_children_json TEXT NOT NULL DEFAULT '[]',
444 result_json TEXT NOT NULL DEFAULT '{}',
445 started_at INTEGER NOT NULL,
446 completed_at INTEGER,
447 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
448 );
449 CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
450 ON control_node_runs(workflow_run_id);
451 CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
452 ON control_node_runs(node_id);
453
454 CREATE TABLE IF NOT EXISTS teacher_candidates (
455 id TEXT PRIMARY KEY,
456 workflow_run_id TEXT NOT NULL,
457 control_node_run_id TEXT NOT NULL,
458 candidate_id TEXT NOT NULL,
459 branch_run_id TEXT,
460 score REAL,
461 passed INTEGER,
462 rationale_json TEXT NOT NULL DEFAULT '{}',
463 created_at INTEGER NOT NULL,
464 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
465 FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
466 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
467 );
468 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
469 ON teacher_candidates(workflow_run_id);
470 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
471 ON teacher_candidates(control_node_run_id);
472
473 PRAGMA user_version = 2;
474 COMMIT;
475 "#,
476 )
477 .context("failed to initialize workflow trace schema")?;
478 }
479 Ok(())
480 }
481
482 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
487 let conn = self.conn()?;
488 conn.execute(
489 r#"
490 INSERT INTO threads (
491 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
492 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
493 git_sha, git_branch, git_origin_url, memory_mode
494 ) VALUES (
495 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
496 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
497 ?18, ?19, ?20, ?21
498 )
499 ON CONFLICT(id) DO UPDATE SET
500 rollout_path=excluded.rollout_path,
501 preview=excluded.preview,
502 ephemeral=excluded.ephemeral,
503 model_provider=excluded.model_provider,
504 created_at=excluded.created_at,
505 updated_at=excluded.updated_at,
506 status=excluded.status,
507 path=excluded.path,
508 cwd=excluded.cwd,
509 cli_version=excluded.cli_version,
510 source=excluded.source,
511 title=excluded.title,
512 sandbox_policy=excluded.sandbox_policy,
513 approval_mode=excluded.approval_mode,
514 archived=excluded.archived,
515 archived_at=excluded.archived_at,
516 git_sha=excluded.git_sha,
517 git_branch=excluded.git_branch,
518 git_origin_url=excluded.git_origin_url,
519 memory_mode=excluded.memory_mode
520 "#,
521 params![
522 thread.id,
523 path_to_opt_string(thread.rollout_path.as_deref()),
524 thread.preview,
525 bool_to_i64(thread.ephemeral),
526 thread.model_provider,
527 thread.created_at,
528 thread.updated_at,
529 thread_status_to_str(&thread.status),
530 path_to_opt_string(thread.path.as_deref()),
531 thread.cwd.display().to_string(),
532 thread.cli_version,
533 session_source_to_str(&thread.source),
534 thread.name,
535 thread.sandbox_policy,
536 thread.approval_mode,
537 bool_to_i64(thread.archived),
538 thread.archived_at,
539 thread.git_sha,
540 thread.git_branch,
541 thread.git_origin_url,
542 thread.memory_mode,
543 ],
544 )
545 .context("failed to upsert thread metadata")?;
546
547 self.append_thread_name(
548 &thread.id,
549 thread.name.clone(),
550 thread.updated_at,
551 thread.rollout_path.clone(),
552 )?;
553 Ok(())
554 }
555
556 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
560 let conn = self.conn()?;
561 conn.query_row(
562 r#"
563 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
564 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
565 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
566 FROM threads
567 WHERE id = ?1
568 "#,
569 params![id],
570 row_to_thread,
571 )
572 .optional()
573 .context("failed to read thread")
574 }
575
576 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
581 let conn = self.conn()?;
582 let sql = if filters.include_archived {
583 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
584 } else {
585 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
586 };
587
588 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
589 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
590 let mut rows = stmt
591 .query(params![limit])
592 .context("failed to query threads")?;
593 let mut out = Vec::new();
594 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
595 out.push(row_to_thread(row)?);
596 }
597 Ok(out)
598 }
599
600 pub fn mark_archived(&self, id: &str) -> Result<()> {
603 let conn = self.conn()?;
604 conn.execute(
605 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
606 params![
607 id,
608 Utc::now().timestamp(),
609 thread_status_to_str(&ThreadStatus::Archived)
610 ],
611 )
612 .context("failed to archive thread")?;
613 Ok(())
614 }
615
616 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
618 let conn = self.conn()?;
619 conn.execute(
620 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
621 params![id],
622 )
623 .context("failed to unarchive thread")?;
624 Ok(())
625 }
626
627 pub fn delete_thread(&self, id: &str) -> Result<()> {
630 let conn = self.conn()?;
631 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
632 .context("failed to delete thread")?;
633 Ok(())
634 }
635
636 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
640 let conn = self.conn()?;
641 conn.execute(
642 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
643 params![id, mode],
644 )
645 .context("failed to update thread memory mode")?;
646 Ok(())
647 }
648
649 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
653 let conn = self.conn()?;
654 conn.query_row(
655 "SELECT memory_mode FROM threads WHERE id = ?1",
656 params![id],
657 |row| row.get::<_, Option<String>>(0),
658 )
659 .optional()
660 .context("failed to read thread memory mode")
661 .map(Option::flatten)
662 }
663
664 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
669 let conn = self.conn()?;
670 let mut stmt = conn
671 .prepare(
672 r#"
673 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
674 FROM messages m1
675 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
676 WHERE m1.thread_id = ?1 AND m2.id IS NULL
677 "#,
678 )
679 .context("failed to prepare message listing query")?;
680 let mut rows = stmt
681 .query(params![thread_id])
682 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
683 let mut out = Vec::new();
684 while let Some(row) = rows.next().context("failed to iterate message rows")? {
685 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
686 let item = item_json
687 .as_deref()
688 .map(serde_json::from_str)
689 .transpose()
690 .with_context(|| {
691 format!("failed to parse message item json in thread {thread_id}")
692 })?;
693 out.push(MessageRecord {
694 id: row.get(0).context("failed to read message id")?,
695 thread_id: row.get(1).context("failed to read message thread id")?,
696 role: row.get(2).context("failed to read message role")?,
697 content: row.get(3).context("failed to read message content")?,
698 item,
699 created_at: row.get(5).context("failed to read message timestamp")?,
700 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
701 });
702 }
703 Ok(out)
704 }
705
706 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
711 let conn = self.conn()?;
712 conn.execute(
713 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
714 params![current_leaf_id, thread_id],
715 )
716 .context("failed to update thread current leaf id")?;
717 Ok(())
718 }
719
720 pub fn persist_dynamic_tools(
725 &self,
726 thread_id: &str,
727 tools: &[DynamicToolRecord],
728 ) -> Result<()> {
729 let mut conn = self.conn()?;
730 let tx = conn
731 .transaction()
732 .context("failed to begin dynamic tools transaction")?;
733 tx.execute(
734 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
735 params![thread_id],
736 )
737 .context("failed to clear dynamic tools")?;
738 for tool in tools {
739 tx.execute(
740 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
741 params![
742 thread_id,
743 tool.position,
744 tool.name,
745 tool.description,
746 tool.input_schema.to_string()
747 ],
748 )
749 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
750 }
751 tx.commit().context("failed to commit dynamic tools")?;
752 Ok(())
753 }
754
755 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
757 let conn = self.conn()?;
758 let mut stmt = conn
759 .prepare(
760 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
761 )
762 .context("failed to prepare get dynamic tools query")?;
763 let mut rows = stmt
764 .query(params![thread_id])
765 .context("failed to query dynamic tools")?;
766 let mut out = Vec::new();
767 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
768 let input_schema_raw: String =
769 row.get(3).context("failed to read tool input schema")?;
770 let input_schema: Value =
771 serde_json::from_str(&input_schema_raw).with_context(|| {
772 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
773 })?;
774 out.push(DynamicToolRecord {
775 position: row.get(0).context("failed to read tool position")?,
776 name: row.get(1).context("failed to read tool name")?,
777 description: row.get(2).context("failed to read tool description")?,
778 input_schema,
779 });
780 }
781 Ok(out)
782 }
783
784 pub fn append_message(
790 &self,
791 thread_id: &str,
792 role: &str,
793 content: &str,
794 item: Option<Value>,
795 ) -> Result<i64> {
796 let mut conn = self.conn()?;
797 let created_at = Utc::now().timestamp();
798 let item_json = item
799 .as_ref()
800 .map(serde_json::to_string)
801 .transpose()
802 .context("failed to serialize message item payload")?;
803
804 let tx = conn
805 .transaction()
806 .context("failed to begin append message transaction")?;
807
808 let current_leaf_id: Option<i64> = tx
809 .query_row(
810 "SELECT current_leaf_id FROM threads WHERE id = ?1",
811 params![thread_id],
812 |row| row.get(0),
813 )
814 .with_context(|| {
815 format!("failed to query thread current leaf id for thread {thread_id}")
816 })?;
817
818 let next_leaf_id: i64 = tx.query_row(
819 r#"
820 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
821 SELECT ?1, ?2, ?3, ?4, ?5, ?6
822 RETURNING id
823 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
824 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
825
826 tx.execute(
827 r#"
828 UPDATE threads
829 SET current_leaf_id = ?1
830 WHERE id = ?2;
831 "#,
832 params![next_leaf_id, thread_id],
833 )
834 .with_context(|| {
835 format!("failed to update thread current leaf id for thread {thread_id}")
836 })?;
837
838 tx.commit()
839 .context("failed to commit append message transaction")?;
840
841 Ok(next_leaf_id)
842 }
843
844 pub fn list_messages(
850 &self,
851 thread_id: &str,
852 limit: Option<usize>,
853 ) -> Result<Vec<MessageRecord>> {
854 let conn = self.conn()?;
855 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
856 let mut stmt = conn
857 .prepare(
858 r#"
859 WITH RECURSIVE
860 leaf_id AS (
861 SELECT current_leaf_id FROM threads WHERE id = ?1
862 ),
863 ancestors AS (
864 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
865 FROM messages
866 WHERE id = (SELECT current_leaf_id FROM leaf_id)
867
868 UNION ALL
869
870 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
871 FROM messages m
872 JOIN ancestors a ON m.id = a.parent_entry_id
873 WHERE a.depth < ?2
874 )
875 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
876 ORDER BY depth DESC
877 "#
878 )
879 .context("failed to prepare message listing query")?;
880 let mut rows = stmt
881 .query(params![thread_id, limit - 1])
882 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
883 let mut out = Vec::new();
884 while let Some(row) = rows.next().context("failed to iterate message rows")? {
885 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
886 let item = item_json
887 .as_deref()
888 .map(serde_json::from_str)
889 .transpose()
890 .with_context(|| {
891 format!("failed to parse message item json in thread {thread_id}")
892 })?;
893 out.push(MessageRecord {
894 id: row.get(0).context("failed to read message id")?,
895 thread_id: row.get(1).context("failed to read message thread id")?,
896 role: row.get(2).context("failed to read message role")?,
897 content: row.get(3).context("failed to read message content")?,
898 item,
899 created_at: row.get(5).context("failed to read message timestamp")?,
900 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
901 });
902 }
903 Ok(out)
904 }
905
906 pub fn fork_at_message(
912 &self,
913 message_id: &str,
914 role: &str,
915 content: &str,
916 item: Option<Value>,
917 ) -> Result<i64> {
918 let mut conn = self.conn()?;
919 let created_at = Utc::now().timestamp();
920 let item_json = item
921 .as_ref()
922 .map(serde_json::to_string)
923 .transpose()
924 .context("failed to serialize message item payload")?;
925
926 let tx = conn
927 .transaction()
928 .context("failed to begin fork message transaction")?;
929
930 let thread_id: String = tx
931 .query_row(
932 "SELECT thread_id FROM messages WHERE id = ?1",
933 params![message_id],
934 |row| row.get(0),
935 )
936 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
937
938 let next_leaf_id: i64 = tx.query_row(
939 r#"
940 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
941 SELECT ?1, ?2, ?3, ?4, ?5, ?6
942 RETURNING id
943 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
944 ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
945
946 tx.execute(
947 r#"
948 UPDATE threads
949 SET current_leaf_id = ?1
950 WHERE id = ?2;
951 "#,
952 params![next_leaf_id, thread_id],
953 )
954 .with_context(|| {
955 format!(
956 "failed to update thread current leaf id for thread {:?}",
957 thread_id
958 )
959 })?;
960
961 tx.commit()
962 .context("failed to commit fork message transaction")?;
963
964 Ok(next_leaf_id)
965 }
966
967 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
971 let mut conn = self.conn()?;
972 let tx = conn
973 .transaction()
974 .context("failed to begin clear messages transaction")?;
975
976 tx.execute(
977 r#"
978 UPDATE threads
979 SET current_leaf_id = NULL
980 WHERE id = ?1;
981 "#,
982 params![thread_id],
983 )
984 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
985 let result = tx
986 .execute(
987 r#"
988 DELETE FROM messages WHERE thread_id = ?1
989 "#,
990 params![thread_id],
991 )
992 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
993 tx.commit()
994 .context("failed to commit clear messages transaction")?;
995
996 Ok(result)
997 }
998
999 pub fn save_checkpoint(
1004 &self,
1005 thread_id: &str,
1006 checkpoint_id: &str,
1007 state: &Value,
1008 ) -> Result<()> {
1009 let conn = self.conn()?;
1010 let state_json =
1011 serde_json::to_string(state).context("failed to encode checkpoint state")?;
1012 conn.execute(
1013 r#"
1014 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1015 VALUES (?1, ?2, ?3, ?4)
1016 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1017 state_json = excluded.state_json,
1018 created_at = excluded.created_at
1019 "#,
1020 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1021 )
1022 .with_context(|| {
1023 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1024 })?;
1025 Ok(())
1026 }
1027
1028 pub fn load_checkpoint(
1034 &self,
1035 thread_id: &str,
1036 checkpoint_id: Option<&str>,
1037 ) -> Result<Option<CheckpointRecord>> {
1038 let conn = self.conn()?;
1039 if let Some(checkpoint_id) = checkpoint_id {
1040 let row = conn
1041 .query_row(
1042 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1043 params![thread_id, checkpoint_id],
1044 |row| {
1045 let state_json: String = row.get(2)?;
1046 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1047 Ok(CheckpointRecord {
1048 thread_id: row.get(0)?,
1049 checkpoint_id: row.get(1)?,
1050 state,
1051 created_at: row.get(3)?,
1052 })
1053 },
1054 )
1055 .optional()
1056 .with_context(|| {
1057 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1058 })?;
1059 return Ok(row);
1060 }
1061
1062 conn.query_row(
1063 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1064 params![thread_id],
1065 |row| {
1066 let state_json: String = row.get(2)?;
1067 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1068 Ok(CheckpointRecord {
1069 thread_id: row.get(0)?,
1070 checkpoint_id: row.get(1)?,
1071 state,
1072 created_at: row.get(3)?,
1073 })
1074 },
1075 )
1076 .optional()
1077 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1078 }
1079
1080 pub fn list_checkpoints(
1084 &self,
1085 thread_id: &str,
1086 limit: Option<usize>,
1087 ) -> Result<Vec<CheckpointRecord>> {
1088 let conn = self.conn()?;
1089 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1090 let mut stmt = conn
1091 .prepare(
1092 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1093 )
1094 .context("failed to prepare checkpoint list query")?;
1095 let mut rows = stmt
1096 .query(params![thread_id, limit])
1097 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1098
1099 let mut out = Vec::new();
1100 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1101 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1102 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1103 out.push(CheckpointRecord {
1104 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1105 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1106 state,
1107 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1108 });
1109 }
1110 Ok(out)
1111 }
1112
1113 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1115 let conn = self.conn()?;
1116 conn.execute(
1117 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1118 params![thread_id, checkpoint_id],
1119 )
1120 .with_context(|| {
1121 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1122 })?;
1123 Ok(())
1124 }
1125
1126 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1128 let conn = self.conn()?;
1129 conn.execute(
1130 r#"
1131 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1132 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1133 ON CONFLICT(id) DO UPDATE SET
1134 name = excluded.name,
1135 status = excluded.status,
1136 progress = excluded.progress,
1137 detail = excluded.detail,
1138 created_at = excluded.created_at,
1139 updated_at = excluded.updated_at
1140 "#,
1141 params![
1142 job.id,
1143 job.name,
1144 job_state_status_to_str(&job.status),
1145 job.progress.map(i64::from),
1146 job.detail,
1147 job.created_at,
1148 job.updated_at
1149 ],
1150 )
1151 .with_context(|| format!("failed to upsert job {}", job.id))?;
1152 Ok(())
1153 }
1154
1155 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1159 let conn = self.conn()?;
1160 conn.query_row(
1161 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1162 params![id],
1163 |row| {
1164 let status_raw: String = row.get(2)?;
1165 let progress: Option<i64> = row.get(3)?;
1166 Ok(JobStateRecord {
1167 id: row.get(0)?,
1168 name: row.get(1)?,
1169 status: job_state_status_from_str(&status_raw),
1170 progress: progress.and_then(|v| u8::try_from(v).ok()),
1171 detail: row.get(4)?,
1172 created_at: row.get(5)?,
1173 updated_at: row.get(6)?,
1174 })
1175 },
1176 )
1177 .optional()
1178 .with_context(|| format!("failed to read job {id}"))
1179 }
1180
1181 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1185 let conn = self.conn()?;
1186 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1187 let mut stmt = conn
1188 .prepare(
1189 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1190 )
1191 .context("failed to prepare job list query")?;
1192 let mut rows = stmt
1193 .query(params![limit])
1194 .context("failed to query persisted jobs")?;
1195 let mut out = Vec::new();
1196 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1197 let status_raw: String = row.get(2).context("failed to read job status")?;
1198 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1199 out.push(JobStateRecord {
1200 id: row.get(0).context("failed to read job id")?,
1201 name: row.get(1).context("failed to read job name")?,
1202 status: job_state_status_from_str(&status_raw),
1203 progress: progress.and_then(|v| u8::try_from(v).ok()),
1204 detail: row.get(4).context("failed to read job detail")?,
1205 created_at: row.get(5).context("failed to read job created_at")?,
1206 updated_at: row.get(6).context("failed to read job updated_at")?,
1207 });
1208 }
1209 Ok(out)
1210 }
1211
1212 pub fn delete_job(&self, id: &str) -> Result<()> {
1214 let conn = self.conn()?;
1215 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1216 .with_context(|| format!("failed to delete job {id}"))?;
1217 Ok(())
1218 }
1219
1220 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1222 let conn = self.conn()?;
1223 conn.query_row(
1224 "SELECT rollout_path FROM threads WHERE id = ?1",
1225 params![id],
1226 |row| row.get::<_, Option<String>>(0),
1227 )
1228 .optional()
1229 .context("failed to lookup rollout path")
1230 .map(|opt| opt.flatten().map(PathBuf::from))
1231 }
1232
1233 pub fn append_thread_name(
1239 &self,
1240 thread_id: &str,
1241 thread_name: Option<String>,
1242 updated_at: i64,
1243 rollout_path: Option<PathBuf>,
1244 ) -> Result<()> {
1245 if let Some(parent) = self.session_index_path.parent() {
1246 fs::create_dir_all(parent).with_context(|| {
1247 format!(
1248 "failed to create session index directory {}",
1249 parent.display()
1250 )
1251 })?;
1252 }
1253 let entry = SessionIndexEntry {
1254 thread_id: thread_id.to_string(),
1255 thread_name,
1256 updated_at,
1257 rollout_path,
1258 };
1259 let encoded =
1260 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1261 let mut file = OpenOptions::new()
1262 .create(true)
1263 .append(true)
1264 .open(&self.session_index_path)
1265 .with_context(|| {
1266 format!(
1267 "failed to open session index {}",
1268 self.session_index_path.display()
1269 )
1270 })?;
1271 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1272 Ok(())
1273 }
1274
1275 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1279 let map = self.session_index_map()?;
1280 Ok(map
1281 .get(thread_id)
1282 .and_then(|entry| entry.thread_name.clone()))
1283 }
1284
1285 pub fn find_thread_names_by_ids(
1289 &self,
1290 ids: &[String],
1291 ) -> Result<HashMap<String, Option<String>>> {
1292 let map = self.session_index_map()?;
1293 let mut out = HashMap::new();
1294 for id in ids {
1295 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1296 out.insert(id.clone(), name);
1297 }
1298 Ok(out)
1299 }
1300
1301 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1306 let map = self.session_index_map()?;
1307 let matched = map
1308 .values()
1309 .filter(|entry| {
1310 entry
1311 .thread_name
1312 .as_deref()
1313 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1314 })
1315 .max_by_key(|entry| entry.updated_at);
1316 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1317 }
1318
1319 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1320 if !self.session_index_path.exists() {
1321 return Ok(HashMap::new());
1322 }
1323 let file = OpenOptions::new()
1324 .read(true)
1325 .open(&self.session_index_path)
1326 .with_context(|| {
1327 format!(
1328 "failed to read session index {}",
1329 self.session_index_path.display()
1330 )
1331 })?;
1332 let reader = BufReader::new(file);
1333 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1334 for line in reader.lines() {
1335 let line = line.context("failed to read session index line")?;
1336 if line.trim().is_empty() {
1337 continue;
1338 }
1339 let parsed: SessionIndexEntry =
1340 serde_json::from_str(&line).context("failed to parse session index entry")?;
1341 latest.insert(parsed.thread_id.clone(), parsed);
1342 }
1343 Ok(latest)
1344 }
1345}
1346
1347fn default_state_db_path() -> PathBuf {
1348 let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1349 let primary = home.join(".codewhale").join("state.db");
1352 if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1353 primary
1354 } else {
1355 home.join(".deepseek").join("state.db")
1356 }
1357}
1358
1359fn bool_to_i64(value: bool) -> i64 {
1360 if value { 1 } else { 0 }
1361}
1362
1363fn i64_to_bool(value: i64) -> bool {
1364 value != 0
1365}
1366
1367fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1368 match status {
1369 ThreadStatus::Running => "running",
1370 ThreadStatus::Idle => "idle",
1371 ThreadStatus::Completed => "completed",
1372 ThreadStatus::Failed => "failed",
1373 ThreadStatus::Paused => "paused",
1374 ThreadStatus::Archived => "archived",
1375 }
1376}
1377
1378fn thread_status_from_str(value: &str) -> ThreadStatus {
1379 match value {
1380 "running" => ThreadStatus::Running,
1381 "idle" => ThreadStatus::Idle,
1382 "completed" => ThreadStatus::Completed,
1383 "failed" => ThreadStatus::Failed,
1384 "paused" => ThreadStatus::Paused,
1385 "archived" => ThreadStatus::Archived,
1386 _ => ThreadStatus::Idle,
1387 }
1388}
1389
1390fn session_source_to_str(source: &SessionSource) -> &'static str {
1391 match source {
1392 SessionSource::Interactive => "interactive",
1393 SessionSource::Resume => "resume",
1394 SessionSource::Fork => "fork",
1395 SessionSource::Api => "api",
1396 SessionSource::Unknown => "unknown",
1397 }
1398}
1399
1400fn session_source_from_str(value: &str) -> SessionSource {
1401 match value {
1402 "interactive" => SessionSource::Interactive,
1403 "resume" => SessionSource::Resume,
1404 "fork" => SessionSource::Fork,
1405 "api" => SessionSource::Api,
1406 _ => SessionSource::Unknown,
1407 }
1408}
1409
1410fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1411 path.map(|p| p.display().to_string())
1412}
1413
1414fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1415 match status {
1416 JobStateStatus::Queued => "queued",
1417 JobStateStatus::Running => "running",
1418 JobStateStatus::Completed => "completed",
1419 JobStateStatus::Failed => "failed",
1420 JobStateStatus::Cancelled => "cancelled",
1421 }
1422}
1423
1424fn job_state_status_from_str(value: &str) -> JobStateStatus {
1425 match value {
1426 "queued" => JobStateStatus::Queued,
1427 "running" => JobStateStatus::Running,
1428 "completed" => JobStateStatus::Completed,
1429 "failed" => JobStateStatus::Failed,
1430 "cancelled" => JobStateStatus::Cancelled,
1431 _ => JobStateStatus::Queued,
1432 }
1433}
1434
1435fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1436 let status_raw: String = row.get(7)?;
1437 let source_raw: String = row.get(11)?;
1438 let rollout_path: Option<String> = row.get(1)?;
1439 let path: Option<String> = row.get(8)?;
1440 Ok(ThreadMetadata {
1441 id: row.get(0)?,
1442 rollout_path: rollout_path.map(PathBuf::from),
1443 preview: row.get(2)?,
1444 ephemeral: i64_to_bool(row.get(3)?),
1445 model_provider: row.get(4)?,
1446 created_at: row.get(5)?,
1447 updated_at: row.get(6)?,
1448 status: thread_status_from_str(&status_raw),
1449 path: path.map(PathBuf::from),
1450 cwd: PathBuf::from(row.get::<_, String>(9)?),
1451 cli_version: row.get(10)?,
1452 source: session_source_from_str(&source_raw),
1453 name: row.get(12)?,
1454 sandbox_policy: row.get(13)?,
1455 approval_mode: row.get(14)?,
1456 archived: i64_to_bool(row.get(15)?),
1457 archived_at: row.get(16)?,
1458 git_sha: row.get(17)?,
1459 git_branch: row.get(18)?,
1460 git_origin_url: row.get(19)?,
1461 memory_mode: row.get(20)?,
1462 current_leaf_id: row.get(21)?,
1463 })
1464}