1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
200#[serde(rename_all = "snake_case")]
201pub enum ThreadGoalStatus {
202 Active,
204 Paused,
206 Blocked,
208 UsageLimited,
210 BudgetLimited,
212 Complete,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
218pub struct ThreadGoalRecord {
219 pub thread_id: String,
221 pub goal_id: String,
223 pub objective: String,
225 pub status: ThreadGoalStatus,
227 pub token_budget: Option<i64>,
229 pub tokens_used: i64,
231 pub time_used_seconds: i64,
233 pub created_at: i64,
235 pub updated_at: i64,
237}
238
239#[derive(Debug, Clone)]
241pub struct ThreadListFilters {
242 pub include_archived: bool,
244 pub limit: Option<usize>,
246}
247
248impl Default for ThreadListFilters {
249 fn default() -> Self {
250 Self {
251 include_archived: false,
252 limit: Some(50),
253 }
254 }
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258struct SessionIndexEntry {
259 thread_id: String,
260 thread_name: Option<String>,
261 updated_at: i64,
262 rollout_path: Option<PathBuf>,
263}
264
265#[derive(Debug, Clone)]
270pub struct StateStore {
271 db_path: PathBuf,
272 session_index_path: PathBuf,
273}
274
275impl StateStore {
276 pub fn open(path: Option<PathBuf>) -> Result<Self> {
282 let db_path = path.unwrap_or_else(default_state_db_path);
283 let session_index_path = db_path
284 .parent()
285 .unwrap_or_else(|| Path::new("."))
286 .join("session_index.jsonl");
287 if let Some(parent) = db_path.parent() {
288 fs::create_dir_all(parent).with_context(|| {
289 format!("failed to create state directory {}", parent.display())
290 })?;
291 }
292 let store = Self {
293 db_path,
294 session_index_path,
295 };
296 store.init_schema()?;
297 Ok(store)
298 }
299
300 pub fn db_path(&self) -> &Path {
302 &self.db_path
303 }
304
305 fn conn(&self) -> Result<Connection> {
306 Connection::open(&self.db_path)
307 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
308 }
309
310 fn init_schema(&self) -> Result<()> {
311 let conn = self.conn()?;
312 let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
313 if user_version == 0 {
314 conn.execute_batch(
315 r#"
316 BEGIN;
317 CREATE TABLE IF NOT EXISTS threads (
318 id TEXT PRIMARY KEY,
319 rollout_path TEXT,
320 preview TEXT NOT NULL,
321 ephemeral INTEGER NOT NULL,
322 model_provider TEXT NOT NULL,
323 created_at INTEGER NOT NULL,
324 updated_at INTEGER NOT NULL,
325 status TEXT NOT NULL,
326 path TEXT,
327 cwd TEXT NOT NULL,
328 cli_version TEXT NOT NULL,
329 source TEXT NOT NULL,
330 title TEXT,
331 sandbox_policy TEXT,
332 approval_mode TEXT,
333 archived INTEGER NOT NULL DEFAULT 0,
334 archived_at INTEGER,
335 git_sha TEXT,
336 git_branch TEXT,
337 git_origin_url TEXT,
338 memory_mode TEXT
339 );
340 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
341 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
342 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
343
344 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
345 thread_id TEXT NOT NULL,
346 position INTEGER NOT NULL,
347 name TEXT NOT NULL,
348 description TEXT,
349 input_schema TEXT NOT NULL,
350 PRIMARY KEY (thread_id, position),
351 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
352 );
353
354 CREATE TABLE IF NOT EXISTS messages (
355 id INTEGER PRIMARY KEY AUTOINCREMENT,
356 thread_id TEXT NOT NULL,
357 role TEXT NOT NULL,
358 content TEXT NOT NULL,
359 item_json TEXT,
360 created_at INTEGER NOT NULL,
361 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
362 );
363 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
364
365 CREATE TABLE IF NOT EXISTS checkpoints (
366 thread_id TEXT NOT NULL,
367 checkpoint_id TEXT NOT NULL,
368 state_json TEXT NOT NULL,
369 created_at INTEGER NOT NULL,
370 PRIMARY KEY(thread_id, checkpoint_id),
371 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
372 );
373 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
374
375 CREATE TABLE IF NOT EXISTS jobs (
376 id TEXT PRIMARY KEY,
377 name TEXT NOT NULL,
378 status TEXT NOT NULL,
379 progress INTEGER,
380 detail TEXT,
381 created_at INTEGER NOT NULL,
382 updated_at INTEGER NOT NULL
383 );
384 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
385
386 -- Add parent_entry_id column, and set to last message before current message
387 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
388 UPDATE messages
389 SET parent_entry_id = (
390 SELECT m2.id
391 FROM messages m2
392 WHERE m2.thread_id = messages.thread_id
393 AND (
394 m2.created_at < messages.created_at
395 OR (
396 m2.created_at = messages.created_at
397 AND m2.id < messages.id
398 )
399 )
400 ORDER BY m2.created_at DESC, m2.id DESC
401 LIMIT 1
402 );
403 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
404
405 -- Add current_leaf_id column, and set to last message in thread
406 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
407 UPDATE threads
408 SET current_leaf_id = (
409 SELECT m.id
410 FROM messages m
411 WHERE m.thread_id = threads.id
412 ORDER BY m.id DESC
413 LIMIT 1
414 );
415
416 PRAGMA user_version = 1;
417 COMMIT;
418 "#,
419 )
420 .context("failed to initialize thread schema")?;
421 user_version = 1;
422 }
423 if user_version < 2 {
424 conn.execute_batch(
425 r#"
426 BEGIN;
427 CREATE TABLE IF NOT EXISTS workflow_runs (
428 id TEXT PRIMARY KEY,
429 workflow_id TEXT NOT NULL,
430 goal TEXT NOT NULL,
431 status TEXT NOT NULL,
432 input_hash TEXT,
433 started_at INTEGER NOT NULL,
434 completed_at INTEGER,
435 metadata_json TEXT NOT NULL DEFAULT '{}'
436 );
437 CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
438 ON workflow_runs(status, started_at DESC);
439 CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
440 ON workflow_runs(workflow_id, started_at DESC);
441
442 CREATE TABLE IF NOT EXISTS branch_runs (
443 id TEXT PRIMARY KEY,
444 workflow_run_id TEXT NOT NULL,
445 branch_id TEXT NOT NULL,
446 node_id TEXT NOT NULL,
447 status TEXT NOT NULL,
448 started_at INTEGER NOT NULL,
449 completed_at INTEGER,
450 result_json TEXT NOT NULL DEFAULT '{}',
451 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
452 );
453 CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
454 ON branch_runs(workflow_run_id);
455 CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
456 ON branch_runs(branch_id);
457
458 CREATE TABLE IF NOT EXISTS leaf_runs (
459 id TEXT PRIMARY KEY,
460 workflow_run_id TEXT NOT NULL,
461 branch_run_id TEXT,
462 leaf_id TEXT NOT NULL,
463 task_id TEXT NOT NULL,
464 input_hash TEXT,
465 status TEXT NOT NULL,
466 output_json TEXT NOT NULL DEFAULT '{}',
467 artifacts_json TEXT NOT NULL DEFAULT '[]',
468 started_at INTEGER NOT NULL,
469 completed_at INTEGER,
470 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
471 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
472 );
473 CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
474 ON leaf_runs(workflow_run_id);
475 CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
476 ON leaf_runs(workflow_run_id, leaf_id, input_hash);
477
478 CREATE TABLE IF NOT EXISTS control_node_runs (
479 id TEXT PRIMARY KEY,
480 workflow_run_id TEXT NOT NULL,
481 node_id TEXT NOT NULL,
482 kind TEXT NOT NULL,
483 status TEXT NOT NULL,
484 selected_children_json TEXT NOT NULL DEFAULT '[]',
485 result_json TEXT NOT NULL DEFAULT '{}',
486 started_at INTEGER NOT NULL,
487 completed_at INTEGER,
488 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
489 );
490 CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
491 ON control_node_runs(workflow_run_id);
492 CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
493 ON control_node_runs(node_id);
494
495 CREATE TABLE IF NOT EXISTS teacher_candidates (
496 id TEXT PRIMARY KEY,
497 workflow_run_id TEXT NOT NULL,
498 control_node_run_id TEXT NOT NULL,
499 candidate_id TEXT NOT NULL,
500 branch_run_id TEXT,
501 score REAL,
502 passed INTEGER,
503 rationale_json TEXT NOT NULL DEFAULT '{}',
504 created_at INTEGER NOT NULL,
505 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
506 FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
507 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
508 );
509 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
510 ON teacher_candidates(workflow_run_id);
511 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
512 ON teacher_candidates(control_node_run_id);
513
514 PRAGMA user_version = 2;
515 COMMIT;
516 "#,
517 )
518 .context("failed to initialize workflow trace schema")?;
519 user_version = 2;
520 }
521 if user_version < 3 {
522 conn.execute_batch(
523 r#"
524 BEGIN;
525 CREATE TABLE IF NOT EXISTS thread_goals (
526 thread_id TEXT PRIMARY KEY NOT NULL,
527 goal_id TEXT NOT NULL,
528 objective TEXT NOT NULL,
529 status TEXT NOT NULL CHECK(status IN (
530 'active',
531 'paused',
532 'blocked',
533 'usage_limited',
534 'budget_limited',
535 'complete'
536 )),
537 token_budget INTEGER,
538 tokens_used INTEGER NOT NULL DEFAULT 0,
539 time_used_seconds INTEGER NOT NULL DEFAULT 0,
540 created_at INTEGER NOT NULL,
541 updated_at INTEGER NOT NULL,
542 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
543 );
544
545 PRAGMA user_version = 3;
546 COMMIT;
547 "#,
548 )
549 .context("failed to initialize thread goal schema")?;
550 }
551 Ok(())
552 }
553
554 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
559 let conn = self.conn()?;
560 conn.execute(
561 r#"
562 INSERT INTO threads (
563 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
564 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
565 git_sha, git_branch, git_origin_url, memory_mode
566 ) VALUES (
567 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
568 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
569 ?18, ?19, ?20, ?21
570 )
571 ON CONFLICT(id) DO UPDATE SET
572 rollout_path=excluded.rollout_path,
573 preview=excluded.preview,
574 ephemeral=excluded.ephemeral,
575 model_provider=excluded.model_provider,
576 created_at=excluded.created_at,
577 updated_at=excluded.updated_at,
578 status=excluded.status,
579 path=excluded.path,
580 cwd=excluded.cwd,
581 cli_version=excluded.cli_version,
582 source=excluded.source,
583 title=excluded.title,
584 sandbox_policy=excluded.sandbox_policy,
585 approval_mode=excluded.approval_mode,
586 archived=excluded.archived,
587 archived_at=excluded.archived_at,
588 git_sha=excluded.git_sha,
589 git_branch=excluded.git_branch,
590 git_origin_url=excluded.git_origin_url,
591 memory_mode=excluded.memory_mode
592 "#,
593 params![
594 thread.id,
595 path_to_opt_string(thread.rollout_path.as_deref()),
596 thread.preview,
597 bool_to_i64(thread.ephemeral),
598 thread.model_provider,
599 thread.created_at,
600 thread.updated_at,
601 thread_status_to_str(&thread.status),
602 path_to_opt_string(thread.path.as_deref()),
603 thread.cwd.display().to_string(),
604 thread.cli_version,
605 session_source_to_str(&thread.source),
606 thread.name,
607 thread.sandbox_policy,
608 thread.approval_mode,
609 bool_to_i64(thread.archived),
610 thread.archived_at,
611 thread.git_sha,
612 thread.git_branch,
613 thread.git_origin_url,
614 thread.memory_mode,
615 ],
616 )
617 .context("failed to upsert thread metadata")?;
618
619 self.append_thread_name(
620 &thread.id,
621 thread.name.clone(),
622 thread.updated_at,
623 thread.rollout_path.clone(),
624 )?;
625 Ok(())
626 }
627
628 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
632 let conn = self.conn()?;
633 conn.query_row(
634 r#"
635 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
636 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
637 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
638 FROM threads
639 WHERE id = ?1
640 "#,
641 params![id],
642 row_to_thread,
643 )
644 .optional()
645 .context("failed to read thread")
646 }
647
648 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
653 let conn = self.conn()?;
654 let sql = if filters.include_archived {
655 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
656 } else {
657 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
658 };
659
660 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
661 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
662 let mut rows = stmt
663 .query(params![limit])
664 .context("failed to query threads")?;
665 let mut out = Vec::new();
666 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
667 out.push(row_to_thread(row)?);
668 }
669 Ok(out)
670 }
671
672 pub fn mark_archived(&self, id: &str) -> Result<()> {
675 let conn = self.conn()?;
676 conn.execute(
677 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
678 params![
679 id,
680 Utc::now().timestamp(),
681 thread_status_to_str(&ThreadStatus::Archived)
682 ],
683 )
684 .context("failed to archive thread")?;
685 Ok(())
686 }
687
688 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
690 let conn = self.conn()?;
691 conn.execute(
692 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
693 params![id],
694 )
695 .context("failed to unarchive thread")?;
696 Ok(())
697 }
698
699 pub fn delete_thread(&self, id: &str) -> Result<()> {
702 let conn = self.conn()?;
703 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
704 .context("failed to delete thread")?;
705 Ok(())
706 }
707
708 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
712 let conn = self.conn()?;
713 conn.execute(
714 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
715 params![id, mode],
716 )
717 .context("failed to update thread memory mode")?;
718 Ok(())
719 }
720
721 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
725 let conn = self.conn()?;
726 conn.query_row(
727 "SELECT memory_mode FROM threads WHERE id = ?1",
728 params![id],
729 |row| row.get::<_, Option<String>>(0),
730 )
731 .optional()
732 .context("failed to read thread memory mode")
733 .map(Option::flatten)
734 }
735
736 pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> {
738 let conn = self.conn()?;
739 let exists: Option<i64> = conn
740 .query_row(
741 "SELECT 1 FROM threads WHERE id = ?1",
742 params![goal.thread_id],
743 |row| row.get(0),
744 )
745 .optional()
746 .context("failed to verify thread before saving goal")?;
747 if exists.is_none() {
748 anyhow::bail!("thread {} not found", goal.thread_id);
749 }
750
751 conn.execute(
752 r#"
753 INSERT INTO thread_goals (
754 thread_id, goal_id, objective, status, token_budget, tokens_used,
755 time_used_seconds, created_at, updated_at
756 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
757 ON CONFLICT(thread_id) DO UPDATE SET
758 goal_id=excluded.goal_id,
759 objective=excluded.objective,
760 status=excluded.status,
761 token_budget=excluded.token_budget,
762 tokens_used=excluded.tokens_used,
763 time_used_seconds=excluded.time_used_seconds,
764 created_at=excluded.created_at,
765 updated_at=excluded.updated_at
766 "#,
767 params![
768 goal.thread_id,
769 goal.goal_id,
770 goal.objective,
771 thread_goal_status_to_str(&goal.status),
772 goal.token_budget,
773 goal.tokens_used,
774 goal.time_used_seconds,
775 goal.created_at,
776 goal.updated_at,
777 ],
778 )
779 .context("failed to upsert thread goal")?;
780 Ok(())
781 }
782
783 pub fn get_thread_goal(&self, thread_id: &str) -> Result<Option<ThreadGoalRecord>> {
785 let conn = self.conn()?;
786 conn.query_row(
787 r#"
788 SELECT thread_id, goal_id, objective, status, token_budget, tokens_used,
789 time_used_seconds, created_at, updated_at
790 FROM thread_goals
791 WHERE thread_id = ?1
792 "#,
793 params![thread_id],
794 row_to_thread_goal,
795 )
796 .optional()
797 .context("failed to read thread goal")
798 }
799
800 pub fn delete_thread_goal(&self, thread_id: &str) -> Result<bool> {
802 let conn = self.conn()?;
803 let changed = conn
804 .execute(
805 "DELETE FROM thread_goals WHERE thread_id = ?1",
806 params![thread_id],
807 )
808 .context("failed to delete thread goal")?;
809 Ok(changed > 0)
810 }
811
812 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
817 let conn = self.conn()?;
818 let mut stmt = conn
819 .prepare(
820 r#"
821 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
822 FROM messages m1
823 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
824 WHERE m1.thread_id = ?1 AND m2.id IS NULL
825 "#,
826 )
827 .context("failed to prepare message listing query")?;
828 let mut rows = stmt
829 .query(params![thread_id])
830 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
831 let mut out = Vec::new();
832 while let Some(row) = rows.next().context("failed to iterate message rows")? {
833 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
834 let item = item_json
835 .as_deref()
836 .map(serde_json::from_str)
837 .transpose()
838 .with_context(|| {
839 format!("failed to parse message item json in thread {thread_id}")
840 })?;
841 out.push(MessageRecord {
842 id: row.get(0).context("failed to read message id")?,
843 thread_id: row.get(1).context("failed to read message thread id")?,
844 role: row.get(2).context("failed to read message role")?,
845 content: row.get(3).context("failed to read message content")?,
846 item,
847 created_at: row.get(5).context("failed to read message timestamp")?,
848 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
849 });
850 }
851 Ok(out)
852 }
853
854 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
859 let conn = self.conn()?;
860 conn.execute(
861 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
862 params![current_leaf_id, thread_id],
863 )
864 .context("failed to update thread current leaf id")?;
865 Ok(())
866 }
867
868 pub fn persist_dynamic_tools(
873 &self,
874 thread_id: &str,
875 tools: &[DynamicToolRecord],
876 ) -> Result<()> {
877 let mut conn = self.conn()?;
878 let tx = conn
879 .transaction()
880 .context("failed to begin dynamic tools transaction")?;
881 tx.execute(
882 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
883 params![thread_id],
884 )
885 .context("failed to clear dynamic tools")?;
886 for tool in tools {
887 tx.execute(
888 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
889 params![
890 thread_id,
891 tool.position,
892 tool.name,
893 tool.description,
894 tool.input_schema.to_string()
895 ],
896 )
897 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
898 }
899 tx.commit().context("failed to commit dynamic tools")?;
900 Ok(())
901 }
902
903 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
905 let conn = self.conn()?;
906 let mut stmt = conn
907 .prepare(
908 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
909 )
910 .context("failed to prepare get dynamic tools query")?;
911 let mut rows = stmt
912 .query(params![thread_id])
913 .context("failed to query dynamic tools")?;
914 let mut out = Vec::new();
915 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
916 let input_schema_raw: String =
917 row.get(3).context("failed to read tool input schema")?;
918 let input_schema: Value =
919 serde_json::from_str(&input_schema_raw).with_context(|| {
920 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
921 })?;
922 out.push(DynamicToolRecord {
923 position: row.get(0).context("failed to read tool position")?,
924 name: row.get(1).context("failed to read tool name")?,
925 description: row.get(2).context("failed to read tool description")?,
926 input_schema,
927 });
928 }
929 Ok(out)
930 }
931
932 pub fn append_message(
938 &self,
939 thread_id: &str,
940 role: &str,
941 content: &str,
942 item: Option<Value>,
943 ) -> Result<i64> {
944 let mut conn = self.conn()?;
945 let created_at = Utc::now().timestamp();
946 let item_json = item
947 .as_ref()
948 .map(serde_json::to_string)
949 .transpose()
950 .context("failed to serialize message item payload")?;
951
952 let tx = conn
953 .transaction()
954 .context("failed to begin append message transaction")?;
955
956 let current_leaf_id: Option<i64> = tx
957 .query_row(
958 "SELECT current_leaf_id FROM threads WHERE id = ?1",
959 params![thread_id],
960 |row| row.get(0),
961 )
962 .with_context(|| {
963 format!("failed to query thread current leaf id for thread {thread_id}")
964 })?;
965
966 let next_leaf_id: i64 = tx.query_row(
967 r#"
968 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
969 SELECT ?1, ?2, ?3, ?4, ?5, ?6
970 RETURNING id
971 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
972 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
973
974 tx.execute(
975 r#"
976 UPDATE threads
977 SET current_leaf_id = ?1
978 WHERE id = ?2;
979 "#,
980 params![next_leaf_id, thread_id],
981 )
982 .with_context(|| {
983 format!("failed to update thread current leaf id for thread {thread_id}")
984 })?;
985
986 tx.commit()
987 .context("failed to commit append message transaction")?;
988
989 Ok(next_leaf_id)
990 }
991
992 pub fn list_messages(
998 &self,
999 thread_id: &str,
1000 limit: Option<usize>,
1001 ) -> Result<Vec<MessageRecord>> {
1002 let conn = self.conn()?;
1003 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
1004 let mut stmt = conn
1005 .prepare(
1006 r#"
1007 WITH RECURSIVE
1008 leaf_id AS (
1009 SELECT current_leaf_id FROM threads WHERE id = ?1
1010 ),
1011 ancestors AS (
1012 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
1013 FROM messages
1014 WHERE id = (SELECT current_leaf_id FROM leaf_id)
1015
1016 UNION ALL
1017
1018 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
1019 FROM messages m
1020 JOIN ancestors a ON m.id = a.parent_entry_id
1021 WHERE a.depth < ?2
1022 )
1023 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
1024 ORDER BY depth DESC
1025 "#
1026 )
1027 .context("failed to prepare message listing query")?;
1028 let mut rows = stmt
1029 .query(params![thread_id, limit - 1])
1030 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
1031 let mut out = Vec::new();
1032 while let Some(row) = rows.next().context("failed to iterate message rows")? {
1033 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
1034 let item = item_json
1035 .as_deref()
1036 .map(serde_json::from_str)
1037 .transpose()
1038 .with_context(|| {
1039 format!("failed to parse message item json in thread {thread_id}")
1040 })?;
1041 out.push(MessageRecord {
1042 id: row.get(0).context("failed to read message id")?,
1043 thread_id: row.get(1).context("failed to read message thread id")?,
1044 role: row.get(2).context("failed to read message role")?,
1045 content: row.get(3).context("failed to read message content")?,
1046 item,
1047 created_at: row.get(5).context("failed to read message timestamp")?,
1048 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
1049 });
1050 }
1051 Ok(out)
1052 }
1053
1054 pub fn fork_at_message(
1060 &self,
1061 message_id: &str,
1062 role: &str,
1063 content: &str,
1064 item: Option<Value>,
1065 ) -> Result<i64> {
1066 let mut conn = self.conn()?;
1067 let created_at = Utc::now().timestamp();
1068 let item_json = item
1069 .as_ref()
1070 .map(serde_json::to_string)
1071 .transpose()
1072 .context("failed to serialize message item payload")?;
1073
1074 let tx = conn
1075 .transaction()
1076 .context("failed to begin fork message transaction")?;
1077
1078 let thread_id: String = tx
1079 .query_row(
1080 "SELECT thread_id FROM messages WHERE id = ?1",
1081 params![message_id],
1082 |row| row.get(0),
1083 )
1084 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
1085
1086 let next_leaf_id: i64 = tx.query_row(
1087 r#"
1088 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1089 SELECT ?1, ?2, ?3, ?4, ?5, ?6
1090 RETURNING id
1091 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
1092 ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
1093
1094 tx.execute(
1095 r#"
1096 UPDATE threads
1097 SET current_leaf_id = ?1
1098 WHERE id = ?2;
1099 "#,
1100 params![next_leaf_id, thread_id],
1101 )
1102 .with_context(|| {
1103 format!(
1104 "failed to update thread current leaf id for thread {:?}",
1105 thread_id
1106 )
1107 })?;
1108
1109 tx.commit()
1110 .context("failed to commit fork message transaction")?;
1111
1112 Ok(next_leaf_id)
1113 }
1114
1115 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
1119 let mut conn = self.conn()?;
1120 let tx = conn
1121 .transaction()
1122 .context("failed to begin clear messages transaction")?;
1123
1124 tx.execute(
1125 r#"
1126 UPDATE threads
1127 SET current_leaf_id = NULL
1128 WHERE id = ?1;
1129 "#,
1130 params![thread_id],
1131 )
1132 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1133 let result = tx
1134 .execute(
1135 r#"
1136 DELETE FROM messages WHERE thread_id = ?1
1137 "#,
1138 params![thread_id],
1139 )
1140 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1141 tx.commit()
1142 .context("failed to commit clear messages transaction")?;
1143
1144 Ok(result)
1145 }
1146
1147 pub fn save_checkpoint(
1152 &self,
1153 thread_id: &str,
1154 checkpoint_id: &str,
1155 state: &Value,
1156 ) -> Result<()> {
1157 let conn = self.conn()?;
1158 let state_json =
1159 serde_json::to_string(state).context("failed to encode checkpoint state")?;
1160 conn.execute(
1161 r#"
1162 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1163 VALUES (?1, ?2, ?3, ?4)
1164 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1165 state_json = excluded.state_json,
1166 created_at = excluded.created_at
1167 "#,
1168 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1169 )
1170 .with_context(|| {
1171 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1172 })?;
1173 Ok(())
1174 }
1175
1176 pub fn load_checkpoint(
1182 &self,
1183 thread_id: &str,
1184 checkpoint_id: Option<&str>,
1185 ) -> Result<Option<CheckpointRecord>> {
1186 let conn = self.conn()?;
1187 if let Some(checkpoint_id) = checkpoint_id {
1188 let row = conn
1189 .query_row(
1190 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1191 params![thread_id, checkpoint_id],
1192 |row| {
1193 let state_json: String = row.get(2)?;
1194 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1195 Ok(CheckpointRecord {
1196 thread_id: row.get(0)?,
1197 checkpoint_id: row.get(1)?,
1198 state,
1199 created_at: row.get(3)?,
1200 })
1201 },
1202 )
1203 .optional()
1204 .with_context(|| {
1205 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1206 })?;
1207 return Ok(row);
1208 }
1209
1210 conn.query_row(
1211 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1212 params![thread_id],
1213 |row| {
1214 let state_json: String = row.get(2)?;
1215 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1216 Ok(CheckpointRecord {
1217 thread_id: row.get(0)?,
1218 checkpoint_id: row.get(1)?,
1219 state,
1220 created_at: row.get(3)?,
1221 })
1222 },
1223 )
1224 .optional()
1225 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1226 }
1227
1228 pub fn list_checkpoints(
1232 &self,
1233 thread_id: &str,
1234 limit: Option<usize>,
1235 ) -> Result<Vec<CheckpointRecord>> {
1236 let conn = self.conn()?;
1237 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1238 let mut stmt = conn
1239 .prepare(
1240 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1241 )
1242 .context("failed to prepare checkpoint list query")?;
1243 let mut rows = stmt
1244 .query(params![thread_id, limit])
1245 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1246
1247 let mut out = Vec::new();
1248 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1249 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1250 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1251 out.push(CheckpointRecord {
1252 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1253 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1254 state,
1255 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1256 });
1257 }
1258 Ok(out)
1259 }
1260
1261 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1263 let conn = self.conn()?;
1264 conn.execute(
1265 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1266 params![thread_id, checkpoint_id],
1267 )
1268 .with_context(|| {
1269 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1270 })?;
1271 Ok(())
1272 }
1273
1274 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1276 let conn = self.conn()?;
1277 conn.execute(
1278 r#"
1279 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1280 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1281 ON CONFLICT(id) DO UPDATE SET
1282 name = excluded.name,
1283 status = excluded.status,
1284 progress = excluded.progress,
1285 detail = excluded.detail,
1286 created_at = excluded.created_at,
1287 updated_at = excluded.updated_at
1288 "#,
1289 params![
1290 job.id,
1291 job.name,
1292 job_state_status_to_str(&job.status),
1293 job.progress.map(i64::from),
1294 job.detail,
1295 job.created_at,
1296 job.updated_at
1297 ],
1298 )
1299 .with_context(|| format!("failed to upsert job {}", job.id))?;
1300 Ok(())
1301 }
1302
1303 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1307 let conn = self.conn()?;
1308 conn.query_row(
1309 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1310 params![id],
1311 |row| {
1312 let status_raw: String = row.get(2)?;
1313 let progress: Option<i64> = row.get(3)?;
1314 Ok(JobStateRecord {
1315 id: row.get(0)?,
1316 name: row.get(1)?,
1317 status: job_state_status_from_str(&status_raw),
1318 progress: progress.and_then(|v| u8::try_from(v).ok()),
1319 detail: row.get(4)?,
1320 created_at: row.get(5)?,
1321 updated_at: row.get(6)?,
1322 })
1323 },
1324 )
1325 .optional()
1326 .with_context(|| format!("failed to read job {id}"))
1327 }
1328
1329 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1333 let conn = self.conn()?;
1334 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1335 let mut stmt = conn
1336 .prepare(
1337 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1338 )
1339 .context("failed to prepare job list query")?;
1340 let mut rows = stmt
1341 .query(params![limit])
1342 .context("failed to query persisted jobs")?;
1343 let mut out = Vec::new();
1344 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1345 let status_raw: String = row.get(2).context("failed to read job status")?;
1346 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1347 out.push(JobStateRecord {
1348 id: row.get(0).context("failed to read job id")?,
1349 name: row.get(1).context("failed to read job name")?,
1350 status: job_state_status_from_str(&status_raw),
1351 progress: progress.and_then(|v| u8::try_from(v).ok()),
1352 detail: row.get(4).context("failed to read job detail")?,
1353 created_at: row.get(5).context("failed to read job created_at")?,
1354 updated_at: row.get(6).context("failed to read job updated_at")?,
1355 });
1356 }
1357 Ok(out)
1358 }
1359
1360 pub fn delete_job(&self, id: &str) -> Result<()> {
1362 let conn = self.conn()?;
1363 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1364 .with_context(|| format!("failed to delete job {id}"))?;
1365 Ok(())
1366 }
1367
1368 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1370 let conn = self.conn()?;
1371 conn.query_row(
1372 "SELECT rollout_path FROM threads WHERE id = ?1",
1373 params![id],
1374 |row| row.get::<_, Option<String>>(0),
1375 )
1376 .optional()
1377 .context("failed to lookup rollout path")
1378 .map(|opt| opt.flatten().map(PathBuf::from))
1379 }
1380
1381 pub fn append_thread_name(
1387 &self,
1388 thread_id: &str,
1389 thread_name: Option<String>,
1390 updated_at: i64,
1391 rollout_path: Option<PathBuf>,
1392 ) -> Result<()> {
1393 if let Some(parent) = self.session_index_path.parent() {
1394 fs::create_dir_all(parent).with_context(|| {
1395 format!(
1396 "failed to create session index directory {}",
1397 parent.display()
1398 )
1399 })?;
1400 }
1401 let entry = SessionIndexEntry {
1402 thread_id: thread_id.to_string(),
1403 thread_name,
1404 updated_at,
1405 rollout_path,
1406 };
1407 let encoded =
1408 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1409 let mut file = OpenOptions::new()
1410 .create(true)
1411 .append(true)
1412 .open(&self.session_index_path)
1413 .with_context(|| {
1414 format!(
1415 "failed to open session index {}",
1416 self.session_index_path.display()
1417 )
1418 })?;
1419 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1420 Ok(())
1421 }
1422
1423 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1427 let map = self.session_index_map()?;
1428 Ok(map
1429 .get(thread_id)
1430 .and_then(|entry| entry.thread_name.clone()))
1431 }
1432
1433 pub fn find_thread_names_by_ids(
1437 &self,
1438 ids: &[String],
1439 ) -> Result<HashMap<String, Option<String>>> {
1440 let map = self.session_index_map()?;
1441 let mut out = HashMap::new();
1442 for id in ids {
1443 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1444 out.insert(id.clone(), name);
1445 }
1446 Ok(out)
1447 }
1448
1449 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1454 let map = self.session_index_map()?;
1455 let matched = map
1456 .values()
1457 .filter(|entry| {
1458 entry
1459 .thread_name
1460 .as_deref()
1461 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1462 })
1463 .max_by_key(|entry| entry.updated_at);
1464 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1465 }
1466
1467 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1468 if !self.session_index_path.exists() {
1469 return Ok(HashMap::new());
1470 }
1471 let file = OpenOptions::new()
1472 .read(true)
1473 .open(&self.session_index_path)
1474 .with_context(|| {
1475 format!(
1476 "failed to read session index {}",
1477 self.session_index_path.display()
1478 )
1479 })?;
1480 let reader = BufReader::new(file);
1481 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1482 for line in reader.lines() {
1483 let line = line.context("failed to read session index line")?;
1484 if line.trim().is_empty() {
1485 continue;
1486 }
1487 let parsed: SessionIndexEntry =
1488 serde_json::from_str(&line).context("failed to parse session index entry")?;
1489 latest.insert(parsed.thread_id.clone(), parsed);
1490 }
1491 Ok(latest)
1492 }
1493}
1494
1495fn default_state_db_path() -> PathBuf {
1496 let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1497 let primary = home.join(".codewhale").join("state.db");
1500 if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1501 primary
1502 } else {
1503 home.join(".deepseek").join("state.db")
1504 }
1505}
1506
1507fn bool_to_i64(value: bool) -> i64 {
1508 if value { 1 } else { 0 }
1509}
1510
1511fn i64_to_bool(value: i64) -> bool {
1512 value != 0
1513}
1514
1515fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1516 match status {
1517 ThreadStatus::Running => "running",
1518 ThreadStatus::Idle => "idle",
1519 ThreadStatus::Completed => "completed",
1520 ThreadStatus::Failed => "failed",
1521 ThreadStatus::Paused => "paused",
1522 ThreadStatus::Archived => "archived",
1523 }
1524}
1525
1526fn thread_status_from_str(value: &str) -> ThreadStatus {
1527 match value {
1528 "running" => ThreadStatus::Running,
1529 "idle" => ThreadStatus::Idle,
1530 "completed" => ThreadStatus::Completed,
1531 "failed" => ThreadStatus::Failed,
1532 "paused" => ThreadStatus::Paused,
1533 "archived" => ThreadStatus::Archived,
1534 _ => ThreadStatus::Idle,
1535 }
1536}
1537
1538fn session_source_to_str(source: &SessionSource) -> &'static str {
1539 match source {
1540 SessionSource::Interactive => "interactive",
1541 SessionSource::Resume => "resume",
1542 SessionSource::Fork => "fork",
1543 SessionSource::Api => "api",
1544 SessionSource::Unknown => "unknown",
1545 }
1546}
1547
1548fn session_source_from_str(value: &str) -> SessionSource {
1549 match value {
1550 "interactive" => SessionSource::Interactive,
1551 "resume" => SessionSource::Resume,
1552 "fork" => SessionSource::Fork,
1553 "api" => SessionSource::Api,
1554 _ => SessionSource::Unknown,
1555 }
1556}
1557
1558fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1559 path.map(|p| p.display().to_string())
1560}
1561
1562fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1563 match status {
1564 JobStateStatus::Queued => "queued",
1565 JobStateStatus::Running => "running",
1566 JobStateStatus::Completed => "completed",
1567 JobStateStatus::Failed => "failed",
1568 JobStateStatus::Cancelled => "cancelled",
1569 }
1570}
1571
1572fn job_state_status_from_str(value: &str) -> JobStateStatus {
1573 match value {
1574 "queued" => JobStateStatus::Queued,
1575 "running" => JobStateStatus::Running,
1576 "completed" => JobStateStatus::Completed,
1577 "failed" => JobStateStatus::Failed,
1578 "cancelled" => JobStateStatus::Cancelled,
1579 _ => JobStateStatus::Queued,
1580 }
1581}
1582
1583fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str {
1584 match status {
1585 ThreadGoalStatus::Active => "active",
1586 ThreadGoalStatus::Paused => "paused",
1587 ThreadGoalStatus::Blocked => "blocked",
1588 ThreadGoalStatus::UsageLimited => "usage_limited",
1589 ThreadGoalStatus::BudgetLimited => "budget_limited",
1590 ThreadGoalStatus::Complete => "complete",
1591 }
1592}
1593
1594fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus {
1595 match value {
1596 "active" => ThreadGoalStatus::Active,
1597 "paused" => ThreadGoalStatus::Paused,
1598 "blocked" => ThreadGoalStatus::Blocked,
1599 "usage_limited" => ThreadGoalStatus::UsageLimited,
1600 "budget_limited" => ThreadGoalStatus::BudgetLimited,
1601 "complete" => ThreadGoalStatus::Complete,
1602 _ => ThreadGoalStatus::Active,
1603 }
1604}
1605
1606fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1607 let status_raw: String = row.get(7)?;
1608 let source_raw: String = row.get(11)?;
1609 let rollout_path: Option<String> = row.get(1)?;
1610 let path: Option<String> = row.get(8)?;
1611 Ok(ThreadMetadata {
1612 id: row.get(0)?,
1613 rollout_path: rollout_path.map(PathBuf::from),
1614 preview: row.get(2)?,
1615 ephemeral: i64_to_bool(row.get(3)?),
1616 model_provider: row.get(4)?,
1617 created_at: row.get(5)?,
1618 updated_at: row.get(6)?,
1619 status: thread_status_from_str(&status_raw),
1620 path: path.map(PathBuf::from),
1621 cwd: PathBuf::from(row.get::<_, String>(9)?),
1622 cli_version: row.get(10)?,
1623 source: session_source_from_str(&source_raw),
1624 name: row.get(12)?,
1625 sandbox_policy: row.get(13)?,
1626 approval_mode: row.get(14)?,
1627 archived: i64_to_bool(row.get(15)?),
1628 archived_at: row.get(16)?,
1629 git_sha: row.get(17)?,
1630 git_branch: row.get(18)?,
1631 git_origin_url: row.get(19)?,
1632 memory_mode: row.get(20)?,
1633 current_leaf_id: row.get(21)?,
1634 })
1635}
1636
1637fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadGoalRecord> {
1638 let status_raw: String = row.get(3)?;
1639 Ok(ThreadGoalRecord {
1640 thread_id: row.get(0)?,
1641 goal_id: row.get(1)?,
1642 objective: row.get(2)?,
1643 status: thread_goal_status_from_str(&status_raw),
1644 token_budget: row.get(4)?,
1645 tokens_used: row.get(5)?,
1646 time_used_seconds: row.get(6)?,
1647 created_at: row.get(7)?,
1648 updated_at: row.get(8)?,
1649 })
1650}
1651
1652#[cfg(test)]
1653mod tests {
1654 use super::*;
1655 use std::time::{SystemTime, UNIX_EPOCH};
1656
1657 fn temp_state_store(name: &str) -> StateStore {
1658 let suffix = SystemTime::now()
1659 .duration_since(UNIX_EPOCH)
1660 .expect("system time")
1661 .as_nanos();
1662 let dir = std::env::temp_dir().join(format!(
1663 "codewhale-state-{name}-{}-{suffix}",
1664 std::process::id()
1665 ));
1666 fs::create_dir_all(&dir).expect("create temp state dir");
1667 StateStore::open(Some(dir.join("state.db"))).expect("open state store")
1668 }
1669
1670 fn test_thread(id: &str) -> ThreadMetadata {
1671 ThreadMetadata {
1672 id: id.to_string(),
1673 rollout_path: None,
1674 preview: "test thread".to_string(),
1675 ephemeral: false,
1676 model_provider: "deepseek".to_string(),
1677 created_at: 10,
1678 updated_at: 10,
1679 status: ThreadStatus::Running,
1680 path: None,
1681 cwd: PathBuf::from("/tmp/codewhale"),
1682 cli_version: "0.0.0-test".to_string(),
1683 source: SessionSource::Interactive,
1684 name: None,
1685 sandbox_policy: None,
1686 approval_mode: None,
1687 archived: false,
1688 archived_at: None,
1689 git_sha: None,
1690 git_branch: None,
1691 git_origin_url: None,
1692 memory_mode: None,
1693 current_leaf_id: None,
1694 }
1695 }
1696
1697 fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord {
1698 ThreadGoalRecord {
1699 thread_id: thread_id.to_string(),
1700 goal_id: "goal-1".to_string(),
1701 objective: objective.to_string(),
1702 status: ThreadGoalStatus::Active,
1703 token_budget: Some(123),
1704 tokens_used: 7,
1705 time_used_seconds: 11,
1706 created_at: 100,
1707 updated_at: 101,
1708 }
1709 }
1710
1711 #[test]
1712 fn thread_goal_crud_round_trips_and_replaces() {
1713 let store = temp_state_store("thread-goal-crud");
1714 store
1715 .upsert_thread(&test_thread("thread-1"))
1716 .expect("upsert thread");
1717
1718 let goal = test_goal("thread-1", "Ship v0.8.59");
1719 store.upsert_thread_goal(&goal).expect("upsert goal");
1720 assert_eq!(
1721 store
1722 .get_thread_goal("thread-1")
1723 .expect("read goal")
1724 .as_ref(),
1725 Some(&goal)
1726 );
1727
1728 let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely");
1729 replacement.goal_id = "goal-2".to_string();
1730 replacement.status = ThreadGoalStatus::BudgetLimited;
1731 replacement.token_budget = None;
1732 replacement.updated_at = 202;
1733 store
1734 .upsert_thread_goal(&replacement)
1735 .expect("replace goal");
1736 assert_eq!(
1737 store.get_thread_goal("thread-1").expect("read replacement"),
1738 Some(replacement)
1739 );
1740
1741 assert!(store.delete_thread_goal("thread-1").expect("delete goal"));
1742 assert!(
1743 store
1744 .get_thread_goal("thread-1")
1745 .expect("read empty")
1746 .is_none()
1747 );
1748 assert!(!store.delete_thread_goal("thread-1").expect("delete empty"));
1749 }
1750
1751 #[test]
1752 fn thread_goal_requires_existing_thread() {
1753 let store = temp_state_store("thread-goal-missing-thread");
1754 let err = store
1755 .upsert_thread_goal(&test_goal("missing-thread", "nope"))
1756 .expect_err("goal without a thread should fail");
1757 assert!(err.to_string().contains("thread missing-thread not found"));
1758 }
1759}