1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
200#[serde(rename_all = "snake_case")]
201pub enum ThreadGoalStatus {
202 Active,
204 Paused,
206 Blocked,
208 UsageLimited,
210 BudgetLimited,
212 Complete,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
218pub struct ThreadGoalRecord {
219 pub thread_id: String,
221 pub goal_id: String,
223 pub objective: String,
225 pub status: ThreadGoalStatus,
227 pub token_budget: Option<i64>,
229 pub tokens_used: i64,
231 pub time_used_seconds: i64,
233 pub continuation_count: i64,
235 pub created_at: i64,
237 pub updated_at: i64,
239}
240
241#[derive(Debug, Clone)]
243pub struct ThreadListFilters {
244 pub include_archived: bool,
246 pub limit: Option<usize>,
248}
249
250impl Default for ThreadListFilters {
251 fn default() -> Self {
252 Self {
253 include_archived: false,
254 limit: Some(50),
255 }
256 }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260struct SessionIndexEntry {
261 thread_id: String,
262 thread_name: Option<String>,
263 updated_at: i64,
264 rollout_path: Option<PathBuf>,
265}
266
267#[derive(Debug, Clone)]
272pub struct StateStore {
273 db_path: PathBuf,
274 session_index_path: PathBuf,
275}
276
277impl StateStore {
278 pub fn open(path: Option<PathBuf>) -> Result<Self> {
284 let db_path = path.unwrap_or_else(default_state_db_path);
285 let session_index_path = db_path
286 .parent()
287 .unwrap_or_else(|| Path::new("."))
288 .join("session_index.jsonl");
289 if let Some(parent) = db_path.parent() {
290 fs::create_dir_all(parent).with_context(|| {
291 format!("failed to create state directory {}", parent.display())
292 })?;
293 }
294 let store = Self {
295 db_path,
296 session_index_path,
297 };
298 store.init_schema()?;
299 Ok(store)
300 }
301
302 pub fn db_path(&self) -> &Path {
304 &self.db_path
305 }
306
307 fn conn(&self) -> Result<Connection> {
308 Connection::open(&self.db_path)
309 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
310 }
311
312 fn init_schema(&self) -> Result<()> {
313 let conn = self.conn()?;
314 let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
315 if user_version == 0 {
316 conn.execute_batch(
317 r#"
318 BEGIN;
319 CREATE TABLE IF NOT EXISTS threads (
320 id TEXT PRIMARY KEY,
321 rollout_path TEXT,
322 preview TEXT NOT NULL,
323 ephemeral INTEGER NOT NULL,
324 model_provider TEXT NOT NULL,
325 created_at INTEGER NOT NULL,
326 updated_at INTEGER NOT NULL,
327 status TEXT NOT NULL,
328 path TEXT,
329 cwd TEXT NOT NULL,
330 cli_version TEXT NOT NULL,
331 source TEXT NOT NULL,
332 title TEXT,
333 sandbox_policy TEXT,
334 approval_mode TEXT,
335 archived INTEGER NOT NULL DEFAULT 0,
336 archived_at INTEGER,
337 git_sha TEXT,
338 git_branch TEXT,
339 git_origin_url TEXT,
340 memory_mode TEXT
341 );
342 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
343 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
344 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
345
346 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
347 thread_id TEXT NOT NULL,
348 position INTEGER NOT NULL,
349 name TEXT NOT NULL,
350 description TEXT,
351 input_schema TEXT NOT NULL,
352 PRIMARY KEY (thread_id, position),
353 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
354 );
355
356 CREATE TABLE IF NOT EXISTS messages (
357 id INTEGER PRIMARY KEY AUTOINCREMENT,
358 thread_id TEXT NOT NULL,
359 role TEXT NOT NULL,
360 content TEXT NOT NULL,
361 item_json TEXT,
362 created_at INTEGER NOT NULL,
363 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
364 );
365 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
366
367 CREATE TABLE IF NOT EXISTS checkpoints (
368 thread_id TEXT NOT NULL,
369 checkpoint_id TEXT NOT NULL,
370 state_json TEXT NOT NULL,
371 created_at INTEGER NOT NULL,
372 PRIMARY KEY(thread_id, checkpoint_id),
373 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
374 );
375 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
376
377 CREATE TABLE IF NOT EXISTS jobs (
378 id TEXT PRIMARY KEY,
379 name TEXT NOT NULL,
380 status TEXT NOT NULL,
381 progress INTEGER,
382 detail TEXT,
383 created_at INTEGER NOT NULL,
384 updated_at INTEGER NOT NULL
385 );
386 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
387
388 -- Add parent_entry_id column, and set to last message before current message
389 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
390 UPDATE messages
391 SET parent_entry_id = (
392 SELECT m2.id
393 FROM messages m2
394 WHERE m2.thread_id = messages.thread_id
395 AND (
396 m2.created_at < messages.created_at
397 OR (
398 m2.created_at = messages.created_at
399 AND m2.id < messages.id
400 )
401 )
402 ORDER BY m2.created_at DESC, m2.id DESC
403 LIMIT 1
404 );
405 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
406
407 -- Add current_leaf_id column, and set to last message in thread
408 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
409 UPDATE threads
410 SET current_leaf_id = (
411 SELECT m.id
412 FROM messages m
413 WHERE m.thread_id = threads.id
414 ORDER BY m.id DESC
415 LIMIT 1
416 );
417
418 PRAGMA user_version = 1;
419 COMMIT;
420 "#,
421 )
422 .context("failed to initialize thread schema")?;
423 user_version = 1;
424 }
425 if user_version < 2 {
426 conn.execute_batch(
427 r#"
428 BEGIN;
429 CREATE TABLE IF NOT EXISTS workflow_runs (
430 id TEXT PRIMARY KEY,
431 workflow_id TEXT NOT NULL,
432 goal TEXT NOT NULL,
433 status TEXT NOT NULL,
434 input_hash TEXT,
435 started_at INTEGER NOT NULL,
436 completed_at INTEGER,
437 metadata_json TEXT NOT NULL DEFAULT '{}'
438 );
439 CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
440 ON workflow_runs(status, started_at DESC);
441 CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
442 ON workflow_runs(workflow_id, started_at DESC);
443
444 CREATE TABLE IF NOT EXISTS branch_runs (
445 id TEXT PRIMARY KEY,
446 workflow_run_id TEXT NOT NULL,
447 branch_id TEXT NOT NULL,
448 node_id TEXT NOT NULL,
449 status TEXT NOT NULL,
450 started_at INTEGER NOT NULL,
451 completed_at INTEGER,
452 result_json TEXT NOT NULL DEFAULT '{}',
453 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
454 );
455 CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
456 ON branch_runs(workflow_run_id);
457 CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
458 ON branch_runs(branch_id);
459
460 CREATE TABLE IF NOT EXISTS leaf_runs (
461 id TEXT PRIMARY KEY,
462 workflow_run_id TEXT NOT NULL,
463 branch_run_id TEXT,
464 leaf_id TEXT NOT NULL,
465 task_id TEXT NOT NULL,
466 input_hash TEXT,
467 status TEXT NOT NULL,
468 output_json TEXT NOT NULL DEFAULT '{}',
469 artifacts_json TEXT NOT NULL DEFAULT '[]',
470 started_at INTEGER NOT NULL,
471 completed_at INTEGER,
472 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
473 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
474 );
475 CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
476 ON leaf_runs(workflow_run_id);
477 CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
478 ON leaf_runs(workflow_run_id, leaf_id, input_hash);
479
480 CREATE TABLE IF NOT EXISTS control_node_runs (
481 id TEXT PRIMARY KEY,
482 workflow_run_id TEXT NOT NULL,
483 node_id TEXT NOT NULL,
484 kind TEXT NOT NULL,
485 status TEXT NOT NULL,
486 selected_children_json TEXT NOT NULL DEFAULT '[]',
487 result_json TEXT NOT NULL DEFAULT '{}',
488 started_at INTEGER NOT NULL,
489 completed_at INTEGER,
490 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
491 );
492 CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
493 ON control_node_runs(workflow_run_id);
494 CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
495 ON control_node_runs(node_id);
496
497 CREATE TABLE IF NOT EXISTS teacher_candidates (
498 id TEXT PRIMARY KEY,
499 workflow_run_id TEXT NOT NULL,
500 control_node_run_id TEXT NOT NULL,
501 candidate_id TEXT NOT NULL,
502 branch_run_id TEXT,
503 score REAL,
504 passed INTEGER,
505 rationale_json TEXT NOT NULL DEFAULT '{}',
506 created_at INTEGER NOT NULL,
507 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
508 FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
509 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
510 );
511 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
512 ON teacher_candidates(workflow_run_id);
513 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
514 ON teacher_candidates(control_node_run_id);
515
516 PRAGMA user_version = 2;
517 COMMIT;
518 "#,
519 )
520 .context("failed to initialize workflow trace schema")?;
521 user_version = 2;
522 }
523 if user_version < 3 {
524 conn.execute_batch(
525 r#"
526 BEGIN;
527 CREATE TABLE IF NOT EXISTS thread_goals (
528 thread_id TEXT PRIMARY KEY NOT NULL,
529 goal_id TEXT NOT NULL,
530 objective TEXT NOT NULL,
531 status TEXT NOT NULL CHECK(status IN (
532 'active',
533 'paused',
534 'blocked',
535 'usage_limited',
536 'budget_limited',
537 'complete'
538 )),
539 token_budget INTEGER,
540 tokens_used INTEGER NOT NULL DEFAULT 0,
541 time_used_seconds INTEGER NOT NULL DEFAULT 0,
542 created_at INTEGER NOT NULL,
543 updated_at INTEGER NOT NULL,
544 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
545 );
546
547 PRAGMA user_version = 3;
548 COMMIT;
549 "#,
550 )
551 .context("failed to initialize thread goal schema")?;
552 user_version = 3;
553 }
554 if user_version < 4 {
555 conn.execute_batch(
556 r#"
557 BEGIN;
558 ALTER TABLE thread_goals
559 ADD COLUMN continuation_count INTEGER NOT NULL DEFAULT 0;
560
561 PRAGMA user_version = 4;
562 COMMIT;
563 "#,
564 )
565 .context("failed to initialize thread goal continuation schema")?;
566 }
567 Ok(())
568 }
569
570 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
575 let conn = self.conn()?;
576 conn.execute(
577 r#"
578 INSERT INTO threads (
579 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
580 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
581 git_sha, git_branch, git_origin_url, memory_mode
582 ) VALUES (
583 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
584 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
585 ?18, ?19, ?20, ?21
586 )
587 ON CONFLICT(id) DO UPDATE SET
588 rollout_path=excluded.rollout_path,
589 preview=excluded.preview,
590 ephemeral=excluded.ephemeral,
591 model_provider=excluded.model_provider,
592 created_at=excluded.created_at,
593 updated_at=excluded.updated_at,
594 status=excluded.status,
595 path=excluded.path,
596 cwd=excluded.cwd,
597 cli_version=excluded.cli_version,
598 source=excluded.source,
599 title=excluded.title,
600 sandbox_policy=excluded.sandbox_policy,
601 approval_mode=excluded.approval_mode,
602 archived=excluded.archived,
603 archived_at=excluded.archived_at,
604 git_sha=excluded.git_sha,
605 git_branch=excluded.git_branch,
606 git_origin_url=excluded.git_origin_url,
607 memory_mode=excluded.memory_mode
608 "#,
609 params![
610 thread.id,
611 path_to_opt_string(thread.rollout_path.as_deref()),
612 thread.preview,
613 bool_to_i64(thread.ephemeral),
614 thread.model_provider,
615 thread.created_at,
616 thread.updated_at,
617 thread_status_to_str(&thread.status),
618 path_to_opt_string(thread.path.as_deref()),
619 thread.cwd.display().to_string(),
620 thread.cli_version,
621 session_source_to_str(&thread.source),
622 thread.name,
623 thread.sandbox_policy,
624 thread.approval_mode,
625 bool_to_i64(thread.archived),
626 thread.archived_at,
627 thread.git_sha,
628 thread.git_branch,
629 thread.git_origin_url,
630 thread.memory_mode,
631 ],
632 )
633 .context("failed to upsert thread metadata")?;
634
635 self.append_thread_name(
636 &thread.id,
637 thread.name.clone(),
638 thread.updated_at,
639 thread.rollout_path.clone(),
640 )?;
641 Ok(())
642 }
643
644 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
648 let conn = self.conn()?;
649 conn.query_row(
650 r#"
651 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
652 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
653 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
654 FROM threads
655 WHERE id = ?1
656 "#,
657 params![id],
658 row_to_thread,
659 )
660 .optional()
661 .context("failed to read thread")
662 }
663
664 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
669 let conn = self.conn()?;
670 let sql = if filters.include_archived {
671 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
672 } else {
673 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
674 };
675
676 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
677 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
678 let mut rows = stmt
679 .query(params![limit])
680 .context("failed to query threads")?;
681 let mut out = Vec::new();
682 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
683 out.push(row_to_thread(row)?);
684 }
685 Ok(out)
686 }
687
688 pub fn mark_archived(&self, id: &str) -> Result<()> {
691 let conn = self.conn()?;
692 conn.execute(
693 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
694 params![
695 id,
696 Utc::now().timestamp(),
697 thread_status_to_str(&ThreadStatus::Archived)
698 ],
699 )
700 .context("failed to archive thread")?;
701 Ok(())
702 }
703
704 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
706 let conn = self.conn()?;
707 conn.execute(
708 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
709 params![id],
710 )
711 .context("failed to unarchive thread")?;
712 Ok(())
713 }
714
715 pub fn delete_thread(&self, id: &str) -> Result<()> {
718 let conn = self.conn()?;
719 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
720 .context("failed to delete thread")?;
721 Ok(())
722 }
723
724 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
728 let conn = self.conn()?;
729 conn.execute(
730 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
731 params![id, mode],
732 )
733 .context("failed to update thread memory mode")?;
734 Ok(())
735 }
736
737 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
741 let conn = self.conn()?;
742 conn.query_row(
743 "SELECT memory_mode FROM threads WHERE id = ?1",
744 params![id],
745 |row| row.get::<_, Option<String>>(0),
746 )
747 .optional()
748 .context("failed to read thread memory mode")
749 .map(Option::flatten)
750 }
751
752 pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> {
754 let conn = self.conn()?;
755 let exists: Option<i64> = conn
756 .query_row(
757 "SELECT 1 FROM threads WHERE id = ?1",
758 params![goal.thread_id],
759 |row| row.get(0),
760 )
761 .optional()
762 .context("failed to verify thread before saving goal")?;
763 if exists.is_none() {
764 anyhow::bail!("thread {} not found", goal.thread_id);
765 }
766
767 conn.execute(
768 r#"
769 INSERT INTO thread_goals (
770 thread_id, goal_id, objective, status, token_budget, tokens_used,
771 time_used_seconds, continuation_count, created_at, updated_at
772 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
773 ON CONFLICT(thread_id) DO UPDATE SET
774 goal_id=excluded.goal_id,
775 objective=excluded.objective,
776 status=excluded.status,
777 token_budget=excluded.token_budget,
778 tokens_used=excluded.tokens_used,
779 time_used_seconds=excluded.time_used_seconds,
780 continuation_count=excluded.continuation_count,
781 created_at=excluded.created_at,
782 updated_at=excluded.updated_at
783 "#,
784 params![
785 goal.thread_id,
786 goal.goal_id,
787 goal.objective,
788 thread_goal_status_to_str(&goal.status),
789 goal.token_budget,
790 goal.tokens_used,
791 goal.time_used_seconds,
792 goal.continuation_count,
793 goal.created_at,
794 goal.updated_at,
795 ],
796 )
797 .context("failed to upsert thread goal")?;
798 Ok(())
799 }
800
801 pub fn record_thread_goal_usage(
817 &self,
818 thread_id: &str,
819 token_delta: i64,
820 time_delta_seconds: i64,
821 now: i64,
822 ) -> Result<Option<ThreadGoalRecord>> {
823 let conn = self.conn()?;
824 let changed = conn
825 .execute(
826 r#"
827 UPDATE thread_goals
828 SET tokens_used = tokens_used + ?2,
829 time_used_seconds = time_used_seconds + ?3,
830 updated_at = MAX(updated_at, ?4)
831 WHERE thread_id = ?1
832 "#,
833 params![thread_id, token_delta, time_delta_seconds, now],
834 )
835 .context("failed to record thread goal usage")?;
836 if changed == 0 {
837 return Ok(None);
838 }
839 self.get_thread_goal(thread_id)
840 }
841
842 pub fn record_thread_goal_continuation(
848 &self,
849 thread_id: &str,
850 now: i64,
851 ) -> Result<Option<ThreadGoalRecord>> {
852 let conn = self.conn()?;
853 let changed = conn
854 .execute(
855 r#"
856 UPDATE thread_goals
857 SET continuation_count = continuation_count + 1,
858 updated_at = MAX(updated_at, ?2)
859 WHERE thread_id = ?1
860 "#,
861 params![thread_id, now],
862 )
863 .context("failed to record thread goal continuation")?;
864 if changed == 0 {
865 return Ok(None);
866 }
867 self.get_thread_goal(thread_id)
868 }
869
870 pub fn get_thread_goal(&self, thread_id: &str) -> Result<Option<ThreadGoalRecord>> {
872 let conn = self.conn()?;
873 conn.query_row(
874 r#"
875 SELECT thread_id, goal_id, objective, status, token_budget, tokens_used,
876 time_used_seconds, continuation_count, created_at, updated_at
877 FROM thread_goals
878 WHERE thread_id = ?1
879 "#,
880 params![thread_id],
881 row_to_thread_goal,
882 )
883 .optional()
884 .context("failed to read thread goal")
885 }
886
887 pub fn delete_thread_goal(&self, thread_id: &str) -> Result<bool> {
889 let conn = self.conn()?;
890 let changed = conn
891 .execute(
892 "DELETE FROM thread_goals WHERE thread_id = ?1",
893 params![thread_id],
894 )
895 .context("failed to delete thread goal")?;
896 Ok(changed > 0)
897 }
898
899 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
904 let conn = self.conn()?;
905 let mut stmt = conn
906 .prepare(
907 r#"
908 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
909 FROM messages m1
910 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
911 WHERE m1.thread_id = ?1 AND m2.id IS NULL
912 "#,
913 )
914 .context("failed to prepare message listing query")?;
915 let mut rows = stmt
916 .query(params![thread_id])
917 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
918 let mut out = Vec::new();
919 while let Some(row) = rows.next().context("failed to iterate message rows")? {
920 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
921 let item = item_json
922 .as_deref()
923 .map(serde_json::from_str)
924 .transpose()
925 .with_context(|| {
926 format!("failed to parse message item json in thread {thread_id}")
927 })?;
928 out.push(MessageRecord {
929 id: row.get(0).context("failed to read message id")?,
930 thread_id: row.get(1).context("failed to read message thread id")?,
931 role: row.get(2).context("failed to read message role")?,
932 content: row.get(3).context("failed to read message content")?,
933 item,
934 created_at: row.get(5).context("failed to read message timestamp")?,
935 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
936 });
937 }
938 Ok(out)
939 }
940
941 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
946 let conn = self.conn()?;
947 conn.execute(
948 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
949 params![current_leaf_id, thread_id],
950 )
951 .context("failed to update thread current leaf id")?;
952 Ok(())
953 }
954
955 pub fn persist_dynamic_tools(
960 &self,
961 thread_id: &str,
962 tools: &[DynamicToolRecord],
963 ) -> Result<()> {
964 let mut conn = self.conn()?;
965 let tx = conn
966 .transaction()
967 .context("failed to begin dynamic tools transaction")?;
968 tx.execute(
969 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
970 params![thread_id],
971 )
972 .context("failed to clear dynamic tools")?;
973 for tool in tools {
974 tx.execute(
975 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
976 params![
977 thread_id,
978 tool.position,
979 tool.name,
980 tool.description,
981 tool.input_schema.to_string()
982 ],
983 )
984 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
985 }
986 tx.commit().context("failed to commit dynamic tools")?;
987 Ok(())
988 }
989
990 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
992 let conn = self.conn()?;
993 let mut stmt = conn
994 .prepare(
995 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
996 )
997 .context("failed to prepare get dynamic tools query")?;
998 let mut rows = stmt
999 .query(params![thread_id])
1000 .context("failed to query dynamic tools")?;
1001 let mut out = Vec::new();
1002 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
1003 let input_schema_raw: String =
1004 row.get(3).context("failed to read tool input schema")?;
1005 let input_schema: Value =
1006 serde_json::from_str(&input_schema_raw).with_context(|| {
1007 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
1008 })?;
1009 out.push(DynamicToolRecord {
1010 position: row.get(0).context("failed to read tool position")?,
1011 name: row.get(1).context("failed to read tool name")?,
1012 description: row.get(2).context("failed to read tool description")?,
1013 input_schema,
1014 });
1015 }
1016 Ok(out)
1017 }
1018
1019 pub fn append_message(
1025 &self,
1026 thread_id: &str,
1027 role: &str,
1028 content: &str,
1029 item: Option<Value>,
1030 ) -> Result<i64> {
1031 let mut conn = self.conn()?;
1032 let created_at = Utc::now().timestamp();
1033 let item_json = item
1034 .as_ref()
1035 .map(serde_json::to_string)
1036 .transpose()
1037 .context("failed to serialize message item payload")?;
1038
1039 let tx = conn
1040 .transaction()
1041 .context("failed to begin append message transaction")?;
1042
1043 let current_leaf_id: Option<i64> = tx
1044 .query_row(
1045 "SELECT current_leaf_id FROM threads WHERE id = ?1",
1046 params![thread_id],
1047 |row| row.get(0),
1048 )
1049 .with_context(|| {
1050 format!("failed to query thread current leaf id for thread {thread_id}")
1051 })?;
1052
1053 let next_leaf_id: i64 = tx.query_row(
1054 r#"
1055 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1056 SELECT ?1, ?2, ?3, ?4, ?5, ?6
1057 RETURNING id
1058 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
1059 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
1060
1061 tx.execute(
1062 r#"
1063 UPDATE threads
1064 SET current_leaf_id = ?1
1065 WHERE id = ?2;
1066 "#,
1067 params![next_leaf_id, thread_id],
1068 )
1069 .with_context(|| {
1070 format!("failed to update thread current leaf id for thread {thread_id}")
1071 })?;
1072
1073 tx.commit()
1074 .context("failed to commit append message transaction")?;
1075
1076 Ok(next_leaf_id)
1077 }
1078
1079 pub fn list_messages(
1085 &self,
1086 thread_id: &str,
1087 limit: Option<usize>,
1088 ) -> Result<Vec<MessageRecord>> {
1089 let conn = self.conn()?;
1090 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
1091 let mut stmt = conn
1092 .prepare(
1093 r#"
1094 WITH RECURSIVE
1095 leaf_id AS (
1096 SELECT current_leaf_id FROM threads WHERE id = ?1
1097 ),
1098 ancestors AS (
1099 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
1100 FROM messages
1101 WHERE id = (SELECT current_leaf_id FROM leaf_id)
1102
1103 UNION ALL
1104
1105 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
1106 FROM messages m
1107 JOIN ancestors a ON m.id = a.parent_entry_id
1108 WHERE a.depth < ?2
1109 )
1110 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
1111 ORDER BY depth DESC
1112 "#
1113 )
1114 .context("failed to prepare message listing query")?;
1115 let mut rows = stmt
1116 .query(params![thread_id, limit - 1])
1117 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
1118 let mut out = Vec::new();
1119 while let Some(row) = rows.next().context("failed to iterate message rows")? {
1120 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
1121 let item = item_json
1122 .as_deref()
1123 .map(serde_json::from_str)
1124 .transpose()
1125 .with_context(|| {
1126 format!("failed to parse message item json in thread {thread_id}")
1127 })?;
1128 out.push(MessageRecord {
1129 id: row.get(0).context("failed to read message id")?,
1130 thread_id: row.get(1).context("failed to read message thread id")?,
1131 role: row.get(2).context("failed to read message role")?,
1132 content: row.get(3).context("failed to read message content")?,
1133 item,
1134 created_at: row.get(5).context("failed to read message timestamp")?,
1135 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
1136 });
1137 }
1138 Ok(out)
1139 }
1140
1141 pub fn fork_at_message(
1147 &self,
1148 message_id: &str,
1149 role: &str,
1150 content: &str,
1151 item: Option<Value>,
1152 ) -> Result<i64> {
1153 let mut conn = self.conn()?;
1154 let created_at = Utc::now().timestamp();
1155 let item_json = item
1156 .as_ref()
1157 .map(serde_json::to_string)
1158 .transpose()
1159 .context("failed to serialize message item payload")?;
1160
1161 let tx = conn
1162 .transaction()
1163 .context("failed to begin fork message transaction")?;
1164
1165 let thread_id: String = tx
1166 .query_row(
1167 "SELECT thread_id FROM messages WHERE id = ?1",
1168 params![message_id],
1169 |row| row.get(0),
1170 )
1171 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
1172
1173 let next_leaf_id: i64 = tx.query_row(
1174 r#"
1175 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1176 SELECT ?1, ?2, ?3, ?4, ?5, ?6
1177 RETURNING id
1178 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
1179 ).with_context(|| format!("failed to fork at message for thread {thread_id:?}"))?;
1180
1181 tx.execute(
1182 r#"
1183 UPDATE threads
1184 SET current_leaf_id = ?1
1185 WHERE id = ?2;
1186 "#,
1187 params![next_leaf_id, thread_id],
1188 )
1189 .with_context(|| {
1190 format!("failed to update thread current leaf id for thread {thread_id:?}")
1191 })?;
1192
1193 tx.commit()
1194 .context("failed to commit fork message transaction")?;
1195
1196 Ok(next_leaf_id)
1197 }
1198
1199 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
1203 let mut conn = self.conn()?;
1204 let tx = conn
1205 .transaction()
1206 .context("failed to begin clear messages transaction")?;
1207
1208 tx.execute(
1209 r#"
1210 UPDATE threads
1211 SET current_leaf_id = NULL
1212 WHERE id = ?1;
1213 "#,
1214 params![thread_id],
1215 )
1216 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1217 let result = tx
1218 .execute(
1219 r#"
1220 DELETE FROM messages WHERE thread_id = ?1
1221 "#,
1222 params![thread_id],
1223 )
1224 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1225 tx.commit()
1226 .context("failed to commit clear messages transaction")?;
1227
1228 Ok(result)
1229 }
1230
1231 pub fn save_checkpoint(
1236 &self,
1237 thread_id: &str,
1238 checkpoint_id: &str,
1239 state: &Value,
1240 ) -> Result<()> {
1241 let conn = self.conn()?;
1242 let state_json =
1243 serde_json::to_string(state).context("failed to encode checkpoint state")?;
1244 conn.execute(
1245 r#"
1246 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1247 VALUES (?1, ?2, ?3, ?4)
1248 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1249 state_json = excluded.state_json,
1250 created_at = excluded.created_at
1251 "#,
1252 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1253 )
1254 .with_context(|| {
1255 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1256 })?;
1257 Ok(())
1258 }
1259
1260 pub fn load_checkpoint(
1266 &self,
1267 thread_id: &str,
1268 checkpoint_id: Option<&str>,
1269 ) -> Result<Option<CheckpointRecord>> {
1270 let conn = self.conn()?;
1271 if let Some(checkpoint_id) = checkpoint_id {
1272 let row = conn
1273 .query_row(
1274 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1275 params![thread_id, checkpoint_id],
1276 |row| {
1277 let state_json: String = row.get(2)?;
1278 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1279 Ok(CheckpointRecord {
1280 thread_id: row.get(0)?,
1281 checkpoint_id: row.get(1)?,
1282 state,
1283 created_at: row.get(3)?,
1284 })
1285 },
1286 )
1287 .optional()
1288 .with_context(|| {
1289 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1290 })?;
1291 return Ok(row);
1292 }
1293
1294 conn.query_row(
1295 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1296 params![thread_id],
1297 |row| {
1298 let state_json: String = row.get(2)?;
1299 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1300 Ok(CheckpointRecord {
1301 thread_id: row.get(0)?,
1302 checkpoint_id: row.get(1)?,
1303 state,
1304 created_at: row.get(3)?,
1305 })
1306 },
1307 )
1308 .optional()
1309 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1310 }
1311
1312 pub fn list_checkpoints(
1316 &self,
1317 thread_id: &str,
1318 limit: Option<usize>,
1319 ) -> Result<Vec<CheckpointRecord>> {
1320 let conn = self.conn()?;
1321 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1322 let mut stmt = conn
1323 .prepare(
1324 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1325 )
1326 .context("failed to prepare checkpoint list query")?;
1327 let mut rows = stmt
1328 .query(params![thread_id, limit])
1329 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1330
1331 let mut out = Vec::new();
1332 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1333 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1334 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1335 out.push(CheckpointRecord {
1336 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1337 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1338 state,
1339 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1340 });
1341 }
1342 Ok(out)
1343 }
1344
1345 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1347 let conn = self.conn()?;
1348 conn.execute(
1349 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1350 params![thread_id, checkpoint_id],
1351 )
1352 .with_context(|| {
1353 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1354 })?;
1355 Ok(())
1356 }
1357
1358 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1360 let conn = self.conn()?;
1361 conn.execute(
1362 r#"
1363 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1364 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1365 ON CONFLICT(id) DO UPDATE SET
1366 name = excluded.name,
1367 status = excluded.status,
1368 progress = excluded.progress,
1369 detail = excluded.detail,
1370 created_at = excluded.created_at,
1371 updated_at = excluded.updated_at
1372 "#,
1373 params![
1374 job.id,
1375 job.name,
1376 job_state_status_to_str(&job.status),
1377 job.progress.map(i64::from),
1378 job.detail,
1379 job.created_at,
1380 job.updated_at
1381 ],
1382 )
1383 .with_context(|| format!("failed to upsert job {}", job.id))?;
1384 Ok(())
1385 }
1386
1387 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1391 let conn = self.conn()?;
1392 conn.query_row(
1393 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1394 params![id],
1395 |row| {
1396 let status_raw: String = row.get(2)?;
1397 let progress: Option<i64> = row.get(3)?;
1398 Ok(JobStateRecord {
1399 id: row.get(0)?,
1400 name: row.get(1)?,
1401 status: job_state_status_from_str(&status_raw),
1402 progress: progress.and_then(|v| u8::try_from(v).ok()),
1403 detail: row.get(4)?,
1404 created_at: row.get(5)?,
1405 updated_at: row.get(6)?,
1406 })
1407 },
1408 )
1409 .optional()
1410 .with_context(|| format!("failed to read job {id}"))
1411 }
1412
1413 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1417 let conn = self.conn()?;
1418 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1419 let mut stmt = conn
1420 .prepare(
1421 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1422 )
1423 .context("failed to prepare job list query")?;
1424 let mut rows = stmt
1425 .query(params![limit])
1426 .context("failed to query persisted jobs")?;
1427 let mut out = Vec::new();
1428 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1429 let status_raw: String = row.get(2).context("failed to read job status")?;
1430 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1431 out.push(JobStateRecord {
1432 id: row.get(0).context("failed to read job id")?,
1433 name: row.get(1).context("failed to read job name")?,
1434 status: job_state_status_from_str(&status_raw),
1435 progress: progress.and_then(|v| u8::try_from(v).ok()),
1436 detail: row.get(4).context("failed to read job detail")?,
1437 created_at: row.get(5).context("failed to read job created_at")?,
1438 updated_at: row.get(6).context("failed to read job updated_at")?,
1439 });
1440 }
1441 Ok(out)
1442 }
1443
1444 pub fn delete_job(&self, id: &str) -> Result<()> {
1446 let conn = self.conn()?;
1447 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1448 .with_context(|| format!("failed to delete job {id}"))?;
1449 Ok(())
1450 }
1451
1452 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1454 let conn = self.conn()?;
1455 conn.query_row(
1456 "SELECT rollout_path FROM threads WHERE id = ?1",
1457 params![id],
1458 |row| row.get::<_, Option<String>>(0),
1459 )
1460 .optional()
1461 .context("failed to lookup rollout path")
1462 .map(|opt| opt.flatten().map(PathBuf::from))
1463 }
1464
1465 pub fn append_thread_name(
1471 &self,
1472 thread_id: &str,
1473 thread_name: Option<String>,
1474 updated_at: i64,
1475 rollout_path: Option<PathBuf>,
1476 ) -> Result<()> {
1477 if let Some(parent) = self.session_index_path.parent() {
1478 fs::create_dir_all(parent).with_context(|| {
1479 format!(
1480 "failed to create session index directory {}",
1481 parent.display()
1482 )
1483 })?;
1484 }
1485 let entry = SessionIndexEntry {
1486 thread_id: thread_id.to_string(),
1487 thread_name,
1488 updated_at,
1489 rollout_path,
1490 };
1491 let encoded =
1492 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1493 let mut file = OpenOptions::new()
1494 .create(true)
1495 .append(true)
1496 .open(&self.session_index_path)
1497 .with_context(|| {
1498 format!(
1499 "failed to open session index {}",
1500 self.session_index_path.display()
1501 )
1502 })?;
1503 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1504 Ok(())
1505 }
1506
1507 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1511 let map = self.session_index_map()?;
1512 Ok(map
1513 .get(thread_id)
1514 .and_then(|entry| entry.thread_name.clone()))
1515 }
1516
1517 pub fn find_thread_names_by_ids(
1521 &self,
1522 ids: &[String],
1523 ) -> Result<HashMap<String, Option<String>>> {
1524 let map = self.session_index_map()?;
1525 let mut out = HashMap::new();
1526 for id in ids {
1527 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1528 out.insert(id.clone(), name);
1529 }
1530 Ok(out)
1531 }
1532
1533 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1538 let map = self.session_index_map()?;
1539 let matched = map
1540 .values()
1541 .filter(|entry| {
1542 entry
1543 .thread_name
1544 .as_deref()
1545 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1546 })
1547 .max_by_key(|entry| entry.updated_at);
1548 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1549 }
1550
1551 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1552 if !self.session_index_path.exists() {
1553 return Ok(HashMap::new());
1554 }
1555 let file = OpenOptions::new()
1556 .read(true)
1557 .open(&self.session_index_path)
1558 .with_context(|| {
1559 format!(
1560 "failed to read session index {}",
1561 self.session_index_path.display()
1562 )
1563 })?;
1564 let reader = BufReader::new(file);
1565 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1566 for line in reader.lines() {
1567 let line = line.context("failed to read session index line")?;
1568 if line.trim().is_empty() {
1569 continue;
1570 }
1571 let parsed: SessionIndexEntry =
1572 serde_json::from_str(&line).context("failed to parse session index entry")?;
1573 latest.insert(parsed.thread_id.clone(), parsed);
1574 }
1575 Ok(latest)
1576 }
1577}
1578
1579fn default_state_db_path() -> PathBuf {
1580 if let Some(overridden) = codewhale_home_override() {
1587 return overridden.join("state.db");
1588 }
1589 let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1590 let primary = home.join(".codewhale").join("state.db");
1593 if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1594 primary
1595 } else {
1596 home.join(".deepseek").join("state.db")
1597 }
1598}
1599
1600fn codewhale_home_override() -> Option<PathBuf> {
1610 std::env::var_os("CODEWHALE_HOME")
1611 .filter(|value| !value.is_empty())
1612 .map(PathBuf::from)
1613}
1614
1615fn bool_to_i64(value: bool) -> i64 {
1616 if value { 1 } else { 0 }
1617}
1618
1619fn i64_to_bool(value: i64) -> bool {
1620 value != 0
1621}
1622
1623fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1624 match status {
1625 ThreadStatus::Running => "running",
1626 ThreadStatus::Idle => "idle",
1627 ThreadStatus::Completed => "completed",
1628 ThreadStatus::Failed => "failed",
1629 ThreadStatus::Paused => "paused",
1630 ThreadStatus::Archived => "archived",
1631 }
1632}
1633
1634fn thread_status_from_str(value: &str) -> ThreadStatus {
1635 match value {
1636 "running" => ThreadStatus::Running,
1637 "idle" => ThreadStatus::Idle,
1638 "completed" => ThreadStatus::Completed,
1639 "failed" => ThreadStatus::Failed,
1640 "paused" => ThreadStatus::Paused,
1641 "archived" => ThreadStatus::Archived,
1642 _ => ThreadStatus::Idle,
1643 }
1644}
1645
1646fn session_source_to_str(source: &SessionSource) -> &'static str {
1647 match source {
1648 SessionSource::Interactive => "interactive",
1649 SessionSource::Resume => "resume",
1650 SessionSource::Fork => "fork",
1651 SessionSource::Api => "api",
1652 SessionSource::Unknown => "unknown",
1653 }
1654}
1655
1656fn session_source_from_str(value: &str) -> SessionSource {
1657 match value {
1658 "interactive" => SessionSource::Interactive,
1659 "resume" => SessionSource::Resume,
1660 "fork" => SessionSource::Fork,
1661 "api" => SessionSource::Api,
1662 _ => SessionSource::Unknown,
1663 }
1664}
1665
1666fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1667 path.map(|p| p.display().to_string())
1668}
1669
1670fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1671 match status {
1672 JobStateStatus::Queued => "queued",
1673 JobStateStatus::Running => "running",
1674 JobStateStatus::Completed => "completed",
1675 JobStateStatus::Failed => "failed",
1676 JobStateStatus::Cancelled => "cancelled",
1677 }
1678}
1679
1680fn job_state_status_from_str(value: &str) -> JobStateStatus {
1681 match value {
1682 "queued" => JobStateStatus::Queued,
1683 "running" => JobStateStatus::Running,
1684 "completed" => JobStateStatus::Completed,
1685 "failed" => JobStateStatus::Failed,
1686 "cancelled" => JobStateStatus::Cancelled,
1687 _ => JobStateStatus::Queued,
1688 }
1689}
1690
1691fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str {
1692 match status {
1693 ThreadGoalStatus::Active => "active",
1694 ThreadGoalStatus::Paused => "paused",
1695 ThreadGoalStatus::Blocked => "blocked",
1696 ThreadGoalStatus::UsageLimited => "usage_limited",
1697 ThreadGoalStatus::BudgetLimited => "budget_limited",
1698 ThreadGoalStatus::Complete => "complete",
1699 }
1700}
1701
1702fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus {
1703 match value {
1704 "active" => ThreadGoalStatus::Active,
1705 "paused" => ThreadGoalStatus::Paused,
1706 "blocked" => ThreadGoalStatus::Blocked,
1707 "usage_limited" => ThreadGoalStatus::UsageLimited,
1708 "budget_limited" => ThreadGoalStatus::BudgetLimited,
1709 "complete" => ThreadGoalStatus::Complete,
1710 _ => ThreadGoalStatus::Active,
1711 }
1712}
1713
1714fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1715 let status_raw: String = row.get(7)?;
1716 let source_raw: String = row.get(11)?;
1717 let rollout_path: Option<String> = row.get(1)?;
1718 let path: Option<String> = row.get(8)?;
1719 Ok(ThreadMetadata {
1720 id: row.get(0)?,
1721 rollout_path: rollout_path.map(PathBuf::from),
1722 preview: row.get(2)?,
1723 ephemeral: i64_to_bool(row.get(3)?),
1724 model_provider: row.get(4)?,
1725 created_at: row.get(5)?,
1726 updated_at: row.get(6)?,
1727 status: thread_status_from_str(&status_raw),
1728 path: path.map(PathBuf::from),
1729 cwd: PathBuf::from(row.get::<_, String>(9)?),
1730 cli_version: row.get(10)?,
1731 source: session_source_from_str(&source_raw),
1732 name: row.get(12)?,
1733 sandbox_policy: row.get(13)?,
1734 approval_mode: row.get(14)?,
1735 archived: i64_to_bool(row.get(15)?),
1736 archived_at: row.get(16)?,
1737 git_sha: row.get(17)?,
1738 git_branch: row.get(18)?,
1739 git_origin_url: row.get(19)?,
1740 memory_mode: row.get(20)?,
1741 current_leaf_id: row.get(21)?,
1742 })
1743}
1744
1745fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadGoalRecord> {
1746 let status_raw: String = row.get(3)?;
1747 Ok(ThreadGoalRecord {
1748 thread_id: row.get(0)?,
1749 goal_id: row.get(1)?,
1750 objective: row.get(2)?,
1751 status: thread_goal_status_from_str(&status_raw),
1752 token_budget: row.get(4)?,
1753 tokens_used: row.get(5)?,
1754 time_used_seconds: row.get(6)?,
1755 continuation_count: row.get(7)?,
1756 created_at: row.get(8)?,
1757 updated_at: row.get(9)?,
1758 })
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763 use super::*;
1764 use std::time::{SystemTime, UNIX_EPOCH};
1765
1766 fn temp_state_store(name: &str) -> StateStore {
1767 let suffix = SystemTime::now()
1768 .duration_since(UNIX_EPOCH)
1769 .expect("system time")
1770 .as_nanos();
1771 let dir = std::env::temp_dir().join(format!(
1772 "codewhale-state-{name}-{}-{suffix}",
1773 std::process::id()
1774 ));
1775 fs::create_dir_all(&dir).expect("create temp state dir");
1776 StateStore::open(Some(dir.join("state.db"))).expect("open state store")
1777 }
1778
1779 fn test_thread(id: &str) -> ThreadMetadata {
1780 ThreadMetadata {
1781 id: id.to_string(),
1782 rollout_path: None,
1783 preview: "test thread".to_string(),
1784 ephemeral: false,
1785 model_provider: "deepseek".to_string(),
1786 created_at: 10,
1787 updated_at: 10,
1788 status: ThreadStatus::Running,
1789 path: None,
1790 cwd: PathBuf::from("/tmp/codewhale"),
1791 cli_version: "0.0.0-test".to_string(),
1792 source: SessionSource::Interactive,
1793 name: None,
1794 sandbox_policy: None,
1795 approval_mode: None,
1796 archived: false,
1797 archived_at: None,
1798 git_sha: None,
1799 git_branch: None,
1800 git_origin_url: None,
1801 memory_mode: None,
1802 current_leaf_id: None,
1803 }
1804 }
1805
1806 fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord {
1807 ThreadGoalRecord {
1808 thread_id: thread_id.to_string(),
1809 goal_id: "goal-1".to_string(),
1810 objective: objective.to_string(),
1811 status: ThreadGoalStatus::Active,
1812 token_budget: Some(123),
1813 tokens_used: 7,
1814 time_used_seconds: 11,
1815 continuation_count: 0,
1816 created_at: 100,
1817 updated_at: 101,
1818 }
1819 }
1820
1821 #[test]
1822 fn thread_goal_crud_round_trips_and_replaces() {
1823 let store = temp_state_store("thread-goal-crud");
1824 store
1825 .upsert_thread(&test_thread("thread-1"))
1826 .expect("upsert thread");
1827
1828 let goal = test_goal("thread-1", "Ship v0.8.59");
1829 store.upsert_thread_goal(&goal).expect("upsert goal");
1830 assert_eq!(
1831 store
1832 .get_thread_goal("thread-1")
1833 .expect("read goal")
1834 .as_ref(),
1835 Some(&goal)
1836 );
1837
1838 let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely");
1839 replacement.goal_id = "goal-2".to_string();
1840 replacement.status = ThreadGoalStatus::BudgetLimited;
1841 replacement.token_budget = None;
1842 replacement.updated_at = 202;
1843 store
1844 .upsert_thread_goal(&replacement)
1845 .expect("replace goal");
1846 assert_eq!(
1847 store.get_thread_goal("thread-1").expect("read replacement"),
1848 Some(replacement)
1849 );
1850
1851 assert!(store.delete_thread_goal("thread-1").expect("delete goal"));
1852 assert!(
1853 store
1854 .get_thread_goal("thread-1")
1855 .expect("read empty")
1856 .is_none()
1857 );
1858 assert!(!store.delete_thread_goal("thread-1").expect("delete empty"));
1859 }
1860
1861 #[test]
1862 fn thread_goal_requires_existing_thread() {
1863 let store = temp_state_store("thread-goal-missing-thread");
1864 let err = store
1865 .upsert_thread_goal(&test_goal("missing-thread", "nope"))
1866 .expect_err("goal without a thread should fail");
1867 assert!(err.to_string().contains("thread missing-thread not found"));
1868 }
1869
1870 #[test]
1871 fn record_thread_goal_usage_accumulates_tokens_and_time() {
1872 let store = temp_state_store("thread-goal-usage");
1873 store
1874 .upsert_thread(&test_thread("thread-1"))
1875 .expect("upsert thread");
1876
1877 let mut goal = test_goal("thread-1", "Ship the persistent goal loop");
1879 goal.tokens_used = 0;
1880 goal.time_used_seconds = 0;
1881 goal.updated_at = 100;
1882 store.upsert_thread_goal(&goal).expect("upsert goal");
1883
1884 let after_first = store
1886 .record_thread_goal_usage("thread-1", 250, 12, 150)
1887 .expect("record usage")
1888 .expect("goal exists");
1889 assert_eq!(after_first.tokens_used, 250);
1890 assert_eq!(after_first.time_used_seconds, 12);
1891 assert_eq!(after_first.updated_at, 150);
1892 assert_eq!(after_first.goal_id, goal.goal_id);
1894 assert_eq!(after_first.objective, goal.objective);
1895 assert_eq!(after_first.status, goal.status);
1896 assert_eq!(after_first.token_budget, goal.token_budget);
1897 assert_eq!(after_first.created_at, goal.created_at);
1898 assert_eq!(after_first.continuation_count, 0);
1899
1900 let after_second = store
1902 .record_thread_goal_usage("thread-1", 75, 8, 200)
1903 .expect("record usage")
1904 .expect("goal exists");
1905 assert_eq!(after_second.tokens_used, 325);
1906 assert_eq!(after_second.time_used_seconds, 20);
1907 assert_eq!(after_second.updated_at, 200);
1908
1909 let after_stale = store
1911 .record_thread_goal_usage("thread-1", 5, 1, 1)
1912 .expect("record usage")
1913 .expect("goal exists");
1914 assert_eq!(after_stale.tokens_used, 330);
1915 assert_eq!(after_stale.time_used_seconds, 21);
1916 assert_eq!(after_stale.updated_at, 200);
1917
1918 let persisted = store
1920 .get_thread_goal("thread-1")
1921 .expect("read goal")
1922 .expect("goal exists");
1923 assert_eq!(persisted.tokens_used, 330);
1924 assert_eq!(persisted.time_used_seconds, 21);
1925 }
1926
1927 #[test]
1928 fn record_thread_goal_usage_returns_none_without_goal() {
1929 let store = temp_state_store("thread-goal-usage-missing");
1930 store
1931 .upsert_thread(&test_thread("thread-1"))
1932 .expect("upsert thread");
1933 let result = store
1936 .record_thread_goal_usage("thread-1", 100, 5, 999)
1937 .expect("record usage on goalless thread");
1938 assert!(result.is_none());
1939 assert!(
1940 store
1941 .get_thread_goal("thread-1")
1942 .expect("read goal")
1943 .is_none()
1944 );
1945 }
1946
1947 #[test]
1948 fn record_thread_goal_continuation_accumulates_durably() {
1949 let store = temp_state_store("thread-goal-continuation");
1950 store
1951 .upsert_thread(&test_thread("thread-1"))
1952 .expect("upsert thread");
1953
1954 let mut goal = test_goal("thread-1", "Keep working across turns");
1955 goal.updated_at = 100;
1956 store.upsert_thread_goal(&goal).expect("upsert goal");
1957
1958 let after_first = store
1959 .record_thread_goal_continuation("thread-1", 120)
1960 .expect("record continuation")
1961 .expect("goal exists");
1962 assert_eq!(after_first.continuation_count, 1);
1963 assert_eq!(after_first.tokens_used, goal.tokens_used);
1964 assert_eq!(after_first.time_used_seconds, goal.time_used_seconds);
1965 assert_eq!(after_first.updated_at, 120);
1966
1967 let after_second = store
1968 .record_thread_goal_continuation("thread-1", 110)
1969 .expect("record second continuation")
1970 .expect("goal exists");
1971 assert_eq!(after_second.continuation_count, 2);
1972 assert_eq!(after_second.updated_at, 120);
1973
1974 let persisted = store
1975 .get_thread_goal("thread-1")
1976 .expect("read goal")
1977 .expect("goal exists");
1978 assert_eq!(persisted.continuation_count, 2);
1979 }
1980
1981 static CODEWHALE_HOME_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1988
1989 struct CodeWhaleHomeGuard {
1990 prior: Option<std::ffi::OsString>,
1991 }
1992 impl CodeWhaleHomeGuard {
1993 fn set(value: &str) -> Self {
1994 let prior = std::env::var_os("CODEWHALE_HOME");
1995 unsafe { std::env::set_var("CODEWHALE_HOME", value) };
1997 Self { prior }
1998 }
1999 fn remove() -> Self {
2000 let prior = std::env::var_os("CODEWHALE_HOME");
2001 unsafe { std::env::remove_var("CODEWHALE_HOME") };
2003 Self { prior }
2004 }
2005 }
2006 impl Drop for CodeWhaleHomeGuard {
2007 fn drop(&mut self) {
2008 unsafe {
2010 match &self.prior {
2011 Some(value) => std::env::set_var("CODEWHALE_HOME", value),
2012 None => std::env::remove_var("CODEWHALE_HOME"),
2013 }
2014 }
2015 }
2016 }
2017
2018 #[test]
2019 fn codewhale_home_override_returns_the_env_value_verbatim() {
2020 let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2021 let _g = CodeWhaleHomeGuard::set("/tmp/cw-isolated-state");
2022 assert_eq!(
2025 codewhale_home_override().as_deref(),
2026 Some(std::path::Path::new("/tmp/cw-isolated-state"))
2027 );
2028 }
2029
2030 #[test]
2031 fn codewhale_home_override_none_when_unset() {
2032 let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2033 let _g = CodeWhaleHomeGuard::remove();
2034 assert!(codewhale_home_override().is_none());
2035 }
2036
2037 #[test]
2038 fn codewhale_home_override_none_when_empty() {
2039 let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2040 let _g = CodeWhaleHomeGuard::set(" ");
2041 assert!(
2047 codewhale_home_override().is_some(),
2048 "non-empty (even whitespace) counts as set; trimming is the caller's job"
2049 );
2050 }
2051
2052 #[test]
2053 fn default_state_db_path_uses_codewhale_home_when_set() {
2054 let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2055 let dir = std::env::temp_dir().join(format!(
2056 "cw-home-state-{}-{}",
2057 std::process::id(),
2058 std::time::SystemTime::now()
2059 .duration_since(std::time::UNIX_EPOCH)
2060 .unwrap()
2061 .as_nanos()
2062 ));
2063 let _g = CodeWhaleHomeGuard::set(dir.to_str().unwrap());
2064 assert_eq!(default_state_db_path(), dir.join("state.db"));
2068 }
2069}