1use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29 Running,
31 Idle,
33 Completed,
35 Failed,
37 Paused,
39 Archived,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49 Interactive,
51 Resume,
53 Fork,
55 Api,
57 Unknown,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67 pub id: String,
69 pub rollout_path: Option<PathBuf>,
71 pub preview: String,
73 pub ephemeral: bool,
75 pub model_provider: String,
77 pub created_at: i64,
79 pub updated_at: i64,
81 pub status: ThreadStatus,
83 pub path: Option<PathBuf>,
85 pub cwd: PathBuf,
87 pub cli_version: String,
89 pub source: SessionSource,
91 pub name: Option<String>,
93 pub sandbox_policy: Option<String>,
95 pub approval_mode: Option<String>,
97 pub archived: bool,
99 pub archived_at: Option<i64>,
101 pub git_sha: Option<String>,
103 pub git_branch: Option<String>,
105 pub git_origin_url: Option<String>,
107 pub memory_mode: Option<String>,
109 pub current_leaf_id: Option<i64>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116 pub position: i64,
118 pub name: String,
120 pub description: Option<String>,
122 pub input_schema: Value,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132 pub id: i64,
134 pub thread_id: String,
136 pub role: String,
138 pub content: String,
140 pub item: Option<Value>,
142 pub created_at: i64,
144 pub parent_entry_id: Option<i64>,
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151 pub thread_id: String,
153 pub checkpoint_id: String,
155 pub state: Value,
157 pub created_at: i64,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167 Queued,
169 Running,
171 Completed,
173 Failed,
175 Cancelled,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182 pub id: String,
184 pub name: String,
186 pub status: JobStateStatus,
188 pub progress: Option<u8>,
190 pub detail: Option<String>,
192 pub created_at: i64,
194 pub updated_at: i64,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
200#[serde(rename_all = "snake_case")]
201pub enum ThreadGoalStatus {
202 Active,
204 Paused,
206 Blocked,
208 UsageLimited,
210 BudgetLimited,
212 Complete,
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
218pub struct ThreadGoalRecord {
219 pub thread_id: String,
221 pub goal_id: String,
223 pub objective: String,
225 pub status: ThreadGoalStatus,
227 pub token_budget: Option<i64>,
229 pub tokens_used: i64,
231 pub time_used_seconds: i64,
233 pub continuation_count: i64,
235 pub created_at: i64,
237 pub updated_at: i64,
239}
240
241#[derive(Debug, Clone)]
243pub struct ThreadListFilters {
244 pub include_archived: bool,
246 pub limit: Option<usize>,
248}
249
250impl Default for ThreadListFilters {
251 fn default() -> Self {
252 Self {
253 include_archived: false,
254 limit: Some(50),
255 }
256 }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260struct SessionIndexEntry {
261 thread_id: String,
262 thread_name: Option<String>,
263 updated_at: i64,
264 rollout_path: Option<PathBuf>,
265}
266
267#[derive(Debug, Clone)]
272pub struct StateStore {
273 db_path: PathBuf,
274 session_index_path: PathBuf,
275}
276
277impl StateStore {
278 pub fn open(path: Option<PathBuf>) -> Result<Self> {
284 let db_path = path.unwrap_or_else(default_state_db_path);
285 let session_index_path = db_path
286 .parent()
287 .unwrap_or_else(|| Path::new("."))
288 .join("session_index.jsonl");
289 if let Some(parent) = db_path.parent() {
290 fs::create_dir_all(parent).with_context(|| {
291 format!("failed to create state directory {}", parent.display())
292 })?;
293 }
294 let store = Self {
295 db_path,
296 session_index_path,
297 };
298 store.init_schema()?;
299 Ok(store)
300 }
301
302 pub fn db_path(&self) -> &Path {
304 &self.db_path
305 }
306
307 fn conn(&self) -> Result<Connection> {
308 Connection::open(&self.db_path)
309 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
310 }
311
312 fn init_schema(&self) -> Result<()> {
313 let conn = self.conn()?;
314 let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
315 if user_version == 0 {
316 conn.execute_batch(
317 r#"
318 BEGIN;
319 CREATE TABLE IF NOT EXISTS threads (
320 id TEXT PRIMARY KEY,
321 rollout_path TEXT,
322 preview TEXT NOT NULL,
323 ephemeral INTEGER NOT NULL,
324 model_provider TEXT NOT NULL,
325 created_at INTEGER NOT NULL,
326 updated_at INTEGER NOT NULL,
327 status TEXT NOT NULL,
328 path TEXT,
329 cwd TEXT NOT NULL,
330 cli_version TEXT NOT NULL,
331 source TEXT NOT NULL,
332 title TEXT,
333 sandbox_policy TEXT,
334 approval_mode TEXT,
335 archived INTEGER NOT NULL DEFAULT 0,
336 archived_at INTEGER,
337 git_sha TEXT,
338 git_branch TEXT,
339 git_origin_url TEXT,
340 memory_mode TEXT
341 );
342 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
343 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
344 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
345
346 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
347 thread_id TEXT NOT NULL,
348 position INTEGER NOT NULL,
349 name TEXT NOT NULL,
350 description TEXT,
351 input_schema TEXT NOT NULL,
352 PRIMARY KEY (thread_id, position),
353 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
354 );
355
356 CREATE TABLE IF NOT EXISTS messages (
357 id INTEGER PRIMARY KEY AUTOINCREMENT,
358 thread_id TEXT NOT NULL,
359 role TEXT NOT NULL,
360 content TEXT NOT NULL,
361 item_json TEXT,
362 created_at INTEGER NOT NULL,
363 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
364 );
365 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
366
367 CREATE TABLE IF NOT EXISTS checkpoints (
368 thread_id TEXT NOT NULL,
369 checkpoint_id TEXT NOT NULL,
370 state_json TEXT NOT NULL,
371 created_at INTEGER NOT NULL,
372 PRIMARY KEY(thread_id, checkpoint_id),
373 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
374 );
375 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
376
377 CREATE TABLE IF NOT EXISTS jobs (
378 id TEXT PRIMARY KEY,
379 name TEXT NOT NULL,
380 status TEXT NOT NULL,
381 progress INTEGER,
382 detail TEXT,
383 created_at INTEGER NOT NULL,
384 updated_at INTEGER NOT NULL
385 );
386 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
387
388 -- Add parent_entry_id column, and set to last message before current message
389 ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
390 UPDATE messages
391 SET parent_entry_id = (
392 SELECT m2.id
393 FROM messages m2
394 WHERE m2.thread_id = messages.thread_id
395 AND (
396 m2.created_at < messages.created_at
397 OR (
398 m2.created_at = messages.created_at
399 AND m2.id < messages.id
400 )
401 )
402 ORDER BY m2.created_at DESC, m2.id DESC
403 LIMIT 1
404 );
405 CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
406
407 -- Add current_leaf_id column, and set to last message in thread
408 ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
409 UPDATE threads
410 SET current_leaf_id = (
411 SELECT m.id
412 FROM messages m
413 WHERE m.thread_id = threads.id
414 ORDER BY m.id DESC
415 LIMIT 1
416 );
417
418 PRAGMA user_version = 1;
419 COMMIT;
420 "#,
421 )
422 .context("failed to initialize thread schema")?;
423 user_version = 1;
424 }
425 if user_version < 2 {
426 conn.execute_batch(
427 r#"
428 BEGIN;
429 CREATE TABLE IF NOT EXISTS workflow_runs (
430 id TEXT PRIMARY KEY,
431 workflow_id TEXT NOT NULL,
432 goal TEXT NOT NULL,
433 status TEXT NOT NULL,
434 input_hash TEXT,
435 started_at INTEGER NOT NULL,
436 completed_at INTEGER,
437 metadata_json TEXT NOT NULL DEFAULT '{}'
438 );
439 CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
440 ON workflow_runs(status, started_at DESC);
441 CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
442 ON workflow_runs(workflow_id, started_at DESC);
443
444 CREATE TABLE IF NOT EXISTS branch_runs (
445 id TEXT PRIMARY KEY,
446 workflow_run_id TEXT NOT NULL,
447 branch_id TEXT NOT NULL,
448 node_id TEXT NOT NULL,
449 status TEXT NOT NULL,
450 started_at INTEGER NOT NULL,
451 completed_at INTEGER,
452 result_json TEXT NOT NULL DEFAULT '{}',
453 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
454 );
455 CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
456 ON branch_runs(workflow_run_id);
457 CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
458 ON branch_runs(branch_id);
459
460 CREATE TABLE IF NOT EXISTS leaf_runs (
461 id TEXT PRIMARY KEY,
462 workflow_run_id TEXT NOT NULL,
463 branch_run_id TEXT,
464 leaf_id TEXT NOT NULL,
465 task_id TEXT NOT NULL,
466 input_hash TEXT,
467 status TEXT NOT NULL,
468 output_json TEXT NOT NULL DEFAULT '{}',
469 artifacts_json TEXT NOT NULL DEFAULT '[]',
470 started_at INTEGER NOT NULL,
471 completed_at INTEGER,
472 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
473 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
474 );
475 CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
476 ON leaf_runs(workflow_run_id);
477 CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
478 ON leaf_runs(workflow_run_id, leaf_id, input_hash);
479
480 CREATE TABLE IF NOT EXISTS control_node_runs (
481 id TEXT PRIMARY KEY,
482 workflow_run_id TEXT NOT NULL,
483 node_id TEXT NOT NULL,
484 kind TEXT NOT NULL,
485 status TEXT NOT NULL,
486 selected_children_json TEXT NOT NULL DEFAULT '[]',
487 result_json TEXT NOT NULL DEFAULT '{}',
488 started_at INTEGER NOT NULL,
489 completed_at INTEGER,
490 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
491 );
492 CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
493 ON control_node_runs(workflow_run_id);
494 CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
495 ON control_node_runs(node_id);
496
497 CREATE TABLE IF NOT EXISTS teacher_candidates (
498 id TEXT PRIMARY KEY,
499 workflow_run_id TEXT NOT NULL,
500 control_node_run_id TEXT NOT NULL,
501 candidate_id TEXT NOT NULL,
502 branch_run_id TEXT,
503 score REAL,
504 passed INTEGER,
505 rationale_json TEXT NOT NULL DEFAULT '{}',
506 created_at INTEGER NOT NULL,
507 FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
508 FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
509 FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
510 );
511 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
512 ON teacher_candidates(workflow_run_id);
513 CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
514 ON teacher_candidates(control_node_run_id);
515
516 PRAGMA user_version = 2;
517 COMMIT;
518 "#,
519 )
520 .context("failed to initialize workflow trace schema")?;
521 user_version = 2;
522 }
523 if user_version < 3 {
524 conn.execute_batch(
525 r#"
526 BEGIN;
527 CREATE TABLE IF NOT EXISTS thread_goals (
528 thread_id TEXT PRIMARY KEY NOT NULL,
529 goal_id TEXT NOT NULL,
530 objective TEXT NOT NULL,
531 status TEXT NOT NULL CHECK(status IN (
532 'active',
533 'paused',
534 'blocked',
535 'usage_limited',
536 'budget_limited',
537 'complete'
538 )),
539 token_budget INTEGER,
540 tokens_used INTEGER NOT NULL DEFAULT 0,
541 time_used_seconds INTEGER NOT NULL DEFAULT 0,
542 created_at INTEGER NOT NULL,
543 updated_at INTEGER NOT NULL,
544 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
545 );
546
547 PRAGMA user_version = 3;
548 COMMIT;
549 "#,
550 )
551 .context("failed to initialize thread goal schema")?;
552 user_version = 3;
553 }
554 if user_version < 4 {
555 conn.execute_batch(
556 r#"
557 BEGIN;
558 ALTER TABLE thread_goals
559 ADD COLUMN continuation_count INTEGER NOT NULL DEFAULT 0;
560
561 PRAGMA user_version = 4;
562 COMMIT;
563 "#,
564 )
565 .context("failed to initialize thread goal continuation schema")?;
566 }
567 Ok(())
568 }
569
570 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
575 let conn = self.conn()?;
576 conn.execute(
577 r#"
578 INSERT INTO threads (
579 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
580 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
581 git_sha, git_branch, git_origin_url, memory_mode
582 ) VALUES (
583 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
584 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
585 ?18, ?19, ?20, ?21
586 )
587 ON CONFLICT(id) DO UPDATE SET
588 rollout_path=excluded.rollout_path,
589 preview=excluded.preview,
590 ephemeral=excluded.ephemeral,
591 model_provider=excluded.model_provider,
592 created_at=excluded.created_at,
593 updated_at=excluded.updated_at,
594 status=excluded.status,
595 path=excluded.path,
596 cwd=excluded.cwd,
597 cli_version=excluded.cli_version,
598 source=excluded.source,
599 title=excluded.title,
600 sandbox_policy=excluded.sandbox_policy,
601 approval_mode=excluded.approval_mode,
602 archived=excluded.archived,
603 archived_at=excluded.archived_at,
604 git_sha=excluded.git_sha,
605 git_branch=excluded.git_branch,
606 git_origin_url=excluded.git_origin_url,
607 memory_mode=excluded.memory_mode
608 "#,
609 params![
610 thread.id,
611 path_to_opt_string(thread.rollout_path.as_deref()),
612 thread.preview,
613 bool_to_i64(thread.ephemeral),
614 thread.model_provider,
615 thread.created_at,
616 thread.updated_at,
617 thread_status_to_str(&thread.status),
618 path_to_opt_string(thread.path.as_deref()),
619 thread.cwd.display().to_string(),
620 thread.cli_version,
621 session_source_to_str(&thread.source),
622 thread.name,
623 thread.sandbox_policy,
624 thread.approval_mode,
625 bool_to_i64(thread.archived),
626 thread.archived_at,
627 thread.git_sha,
628 thread.git_branch,
629 thread.git_origin_url,
630 thread.memory_mode,
631 ],
632 )
633 .context("failed to upsert thread metadata")?;
634
635 self.append_thread_name(
636 &thread.id,
637 thread.name.clone(),
638 thread.updated_at,
639 thread.rollout_path.clone(),
640 )?;
641 Ok(())
642 }
643
644 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
648 let conn = self.conn()?;
649 conn.query_row(
650 r#"
651 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
652 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
653 git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
654 FROM threads
655 WHERE id = ?1
656 "#,
657 params![id],
658 row_to_thread,
659 )
660 .optional()
661 .context("failed to read thread")
662 }
663
664 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
669 let conn = self.conn()?;
670 let sql = if filters.include_archived {
671 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
672 } else {
673 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
674 };
675
676 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
677 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
678 let mut rows = stmt
679 .query(params![limit])
680 .context("failed to query threads")?;
681 let mut out = Vec::new();
682 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
683 out.push(row_to_thread(row)?);
684 }
685 Ok(out)
686 }
687
688 pub fn mark_archived(&self, id: &str) -> Result<()> {
691 let conn = self.conn()?;
692 conn.execute(
693 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
694 params![
695 id,
696 Utc::now().timestamp(),
697 thread_status_to_str(&ThreadStatus::Archived)
698 ],
699 )
700 .context("failed to archive thread")?;
701 Ok(())
702 }
703
704 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
706 let conn = self.conn()?;
707 conn.execute(
708 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
709 params![id],
710 )
711 .context("failed to unarchive thread")?;
712 Ok(())
713 }
714
715 pub fn delete_thread(&self, id: &str) -> Result<()> {
718 let conn = self.conn()?;
719 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
720 .context("failed to delete thread")?;
721 Ok(())
722 }
723
724 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
728 let conn = self.conn()?;
729 conn.execute(
730 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
731 params![id, mode],
732 )
733 .context("failed to update thread memory mode")?;
734 Ok(())
735 }
736
737 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
741 let conn = self.conn()?;
742 conn.query_row(
743 "SELECT memory_mode FROM threads WHERE id = ?1",
744 params![id],
745 |row| row.get::<_, Option<String>>(0),
746 )
747 .optional()
748 .context("failed to read thread memory mode")
749 .map(Option::flatten)
750 }
751
752 pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> {
754 let conn = self.conn()?;
755 let exists: Option<i64> = conn
756 .query_row(
757 "SELECT 1 FROM threads WHERE id = ?1",
758 params![goal.thread_id],
759 |row| row.get(0),
760 )
761 .optional()
762 .context("failed to verify thread before saving goal")?;
763 if exists.is_none() {
764 anyhow::bail!("thread {} not found", goal.thread_id);
765 }
766
767 conn.execute(
768 r#"
769 INSERT INTO thread_goals (
770 thread_id, goal_id, objective, status, token_budget, tokens_used,
771 time_used_seconds, continuation_count, created_at, updated_at
772 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
773 ON CONFLICT(thread_id) DO UPDATE SET
774 goal_id=excluded.goal_id,
775 objective=excluded.objective,
776 status=excluded.status,
777 token_budget=excluded.token_budget,
778 tokens_used=excluded.tokens_used,
779 time_used_seconds=excluded.time_used_seconds,
780 continuation_count=excluded.continuation_count,
781 created_at=excluded.created_at,
782 updated_at=excluded.updated_at
783 "#,
784 params![
785 goal.thread_id,
786 goal.goal_id,
787 goal.objective,
788 thread_goal_status_to_str(&goal.status),
789 goal.token_budget,
790 goal.tokens_used,
791 goal.time_used_seconds,
792 goal.continuation_count,
793 goal.created_at,
794 goal.updated_at,
795 ],
796 )
797 .context("failed to upsert thread goal")?;
798 Ok(())
799 }
800
801 pub fn record_thread_goal_usage(
817 &self,
818 thread_id: &str,
819 token_delta: i64,
820 time_delta_seconds: i64,
821 now: i64,
822 ) -> Result<Option<ThreadGoalRecord>> {
823 let conn = self.conn()?;
824 let changed = conn
825 .execute(
826 r#"
827 UPDATE thread_goals
828 SET tokens_used = tokens_used + ?2,
829 time_used_seconds = time_used_seconds + ?3,
830 updated_at = MAX(updated_at, ?4)
831 WHERE thread_id = ?1
832 "#,
833 params![thread_id, token_delta, time_delta_seconds, now],
834 )
835 .context("failed to record thread goal usage")?;
836 if changed == 0 {
837 return Ok(None);
838 }
839 self.get_thread_goal(thread_id)
840 }
841
842 pub fn record_thread_goal_continuation(
848 &self,
849 thread_id: &str,
850 now: i64,
851 ) -> Result<Option<ThreadGoalRecord>> {
852 let conn = self.conn()?;
853 let changed = conn
854 .execute(
855 r#"
856 UPDATE thread_goals
857 SET continuation_count = continuation_count + 1,
858 updated_at = MAX(updated_at, ?2)
859 WHERE thread_id = ?1
860 "#,
861 params![thread_id, now],
862 )
863 .context("failed to record thread goal continuation")?;
864 if changed == 0 {
865 return Ok(None);
866 }
867 self.get_thread_goal(thread_id)
868 }
869
870 pub fn get_thread_goal(&self, thread_id: &str) -> Result<Option<ThreadGoalRecord>> {
872 let conn = self.conn()?;
873 conn.query_row(
874 r#"
875 SELECT thread_id, goal_id, objective, status, token_budget, tokens_used,
876 time_used_seconds, continuation_count, created_at, updated_at
877 FROM thread_goals
878 WHERE thread_id = ?1
879 "#,
880 params![thread_id],
881 row_to_thread_goal,
882 )
883 .optional()
884 .context("failed to read thread goal")
885 }
886
887 pub fn delete_thread_goal(&self, thread_id: &str) -> Result<bool> {
889 let conn = self.conn()?;
890 let changed = conn
891 .execute(
892 "DELETE FROM thread_goals WHERE thread_id = ?1",
893 params![thread_id],
894 )
895 .context("failed to delete thread goal")?;
896 Ok(changed > 0)
897 }
898
899 pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
904 let conn = self.conn()?;
905 let mut stmt = conn
906 .prepare(
907 r#"
908 SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
909 FROM messages m1
910 LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
911 WHERE m1.thread_id = ?1 AND m2.id IS NULL
912 "#,
913 )
914 .context("failed to prepare message listing query")?;
915 let mut rows = stmt
916 .query(params![thread_id])
917 .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
918 let mut out = Vec::new();
919 while let Some(row) = rows.next().context("failed to iterate message rows")? {
920 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
921 let item = item_json
922 .as_deref()
923 .map(serde_json::from_str)
924 .transpose()
925 .with_context(|| {
926 format!("failed to parse message item json in thread {thread_id}")
927 })?;
928 out.push(MessageRecord {
929 id: row.get(0).context("failed to read message id")?,
930 thread_id: row.get(1).context("failed to read message thread id")?,
931 role: row.get(2).context("failed to read message role")?,
932 content: row.get(3).context("failed to read message content")?,
933 item,
934 created_at: row.get(5).context("failed to read message timestamp")?,
935 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
936 });
937 }
938 Ok(out)
939 }
940
941 pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
946 let conn = self.conn()?;
947 conn.execute(
948 "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
949 params![current_leaf_id, thread_id],
950 )
951 .context("failed to update thread current leaf id")?;
952 Ok(())
953 }
954
955 pub fn persist_dynamic_tools(
960 &self,
961 thread_id: &str,
962 tools: &[DynamicToolRecord],
963 ) -> Result<()> {
964 let mut conn = self.conn()?;
965 let tx = conn
966 .transaction()
967 .context("failed to begin dynamic tools transaction")?;
968 tx.execute(
969 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
970 params![thread_id],
971 )
972 .context("failed to clear dynamic tools")?;
973 for tool in tools {
974 tx.execute(
975 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
976 params![
977 thread_id,
978 tool.position,
979 tool.name,
980 tool.description,
981 tool.input_schema.to_string()
982 ],
983 )
984 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
985 }
986 tx.commit().context("failed to commit dynamic tools")?;
987 Ok(())
988 }
989
990 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
992 let conn = self.conn()?;
993 let mut stmt = conn
994 .prepare(
995 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
996 )
997 .context("failed to prepare get dynamic tools query")?;
998 let mut rows = stmt
999 .query(params![thread_id])
1000 .context("failed to query dynamic tools")?;
1001 let mut out = Vec::new();
1002 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
1003 let input_schema_raw: String =
1004 row.get(3).context("failed to read tool input schema")?;
1005 let input_schema: Value =
1006 serde_json::from_str(&input_schema_raw).with_context(|| {
1007 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
1008 })?;
1009 out.push(DynamicToolRecord {
1010 position: row.get(0).context("failed to read tool position")?,
1011 name: row.get(1).context("failed to read tool name")?,
1012 description: row.get(2).context("failed to read tool description")?,
1013 input_schema,
1014 });
1015 }
1016 Ok(out)
1017 }
1018
1019 pub fn append_message(
1025 &self,
1026 thread_id: &str,
1027 role: &str,
1028 content: &str,
1029 item: Option<Value>,
1030 ) -> Result<i64> {
1031 let mut conn = self.conn()?;
1032 let created_at = Utc::now().timestamp();
1033 let item_json = item
1034 .as_ref()
1035 .map(serde_json::to_string)
1036 .transpose()
1037 .context("failed to serialize message item payload")?;
1038
1039 let tx = conn
1040 .transaction()
1041 .context("failed to begin append message transaction")?;
1042
1043 let current_leaf_id: Option<i64> = tx
1044 .query_row(
1045 "SELECT current_leaf_id FROM threads WHERE id = ?1",
1046 params![thread_id],
1047 |row| row.get(0),
1048 )
1049 .with_context(|| {
1050 format!("failed to query thread current leaf id for thread {thread_id}")
1051 })?;
1052
1053 let next_leaf_id: i64 = tx.query_row(
1054 r#"
1055 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1056 SELECT ?1, ?2, ?3, ?4, ?5, ?6
1057 RETURNING id
1058 "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
1059 ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
1060
1061 tx.execute(
1062 r#"
1063 UPDATE threads
1064 SET current_leaf_id = ?1
1065 WHERE id = ?2;
1066 "#,
1067 params![next_leaf_id, thread_id],
1068 )
1069 .with_context(|| {
1070 format!("failed to update thread current leaf id for thread {thread_id}")
1071 })?;
1072
1073 tx.commit()
1074 .context("failed to commit append message transaction")?;
1075
1076 Ok(next_leaf_id)
1077 }
1078
1079 pub fn list_messages(
1085 &self,
1086 thread_id: &str,
1087 limit: Option<usize>,
1088 ) -> Result<Vec<MessageRecord>> {
1089 let conn = self.conn()?;
1090 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
1091 let mut stmt = conn
1092 .prepare(
1093 r#"
1094 WITH RECURSIVE
1095 leaf_id AS (
1096 SELECT current_leaf_id FROM threads WHERE id = ?1
1097 ),
1098 ancestors AS (
1099 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
1100 FROM messages
1101 WHERE id = (SELECT current_leaf_id FROM leaf_id)
1102
1103 UNION ALL
1104
1105 SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
1106 FROM messages m
1107 JOIN ancestors a ON m.id = a.parent_entry_id
1108 WHERE a.depth < ?2
1109 )
1110 SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
1111 ORDER BY depth DESC
1112 "#
1113 )
1114 .context("failed to prepare message listing query")?;
1115 let mut rows = stmt
1116 .query(params![thread_id, limit - 1])
1117 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
1118 let mut out = Vec::new();
1119 while let Some(row) = rows.next().context("failed to iterate message rows")? {
1120 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
1121 let item = item_json
1122 .as_deref()
1123 .map(serde_json::from_str)
1124 .transpose()
1125 .with_context(|| {
1126 format!("failed to parse message item json in thread {thread_id}")
1127 })?;
1128 out.push(MessageRecord {
1129 id: row.get(0).context("failed to read message id")?,
1130 thread_id: row.get(1).context("failed to read message thread id")?,
1131 role: row.get(2).context("failed to read message role")?,
1132 content: row.get(3).context("failed to read message content")?,
1133 item,
1134 created_at: row.get(5).context("failed to read message timestamp")?,
1135 parent_entry_id: row.get(6).context("failed to read parent entry id")?,
1136 });
1137 }
1138 Ok(out)
1139 }
1140
1141 pub fn fork_at_message(
1147 &self,
1148 message_id: &str,
1149 role: &str,
1150 content: &str,
1151 item: Option<Value>,
1152 ) -> Result<i64> {
1153 let mut conn = self.conn()?;
1154 let created_at = Utc::now().timestamp();
1155 let item_json = item
1156 .as_ref()
1157 .map(serde_json::to_string)
1158 .transpose()
1159 .context("failed to serialize message item payload")?;
1160
1161 let tx = conn
1162 .transaction()
1163 .context("failed to begin fork message transaction")?;
1164
1165 let thread_id: String = tx
1166 .query_row(
1167 "SELECT thread_id FROM messages WHERE id = ?1",
1168 params![message_id],
1169 |row| row.get(0),
1170 )
1171 .with_context(|| format!("failed to query thread id for message {message_id}"))?;
1172
1173 let next_leaf_id: i64 = tx.query_row(
1174 r#"
1175 INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1176 SELECT ?1, ?2, ?3, ?4, ?5, ?6
1177 RETURNING id
1178 "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
1179 ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
1180
1181 tx.execute(
1182 r#"
1183 UPDATE threads
1184 SET current_leaf_id = ?1
1185 WHERE id = ?2;
1186 "#,
1187 params![next_leaf_id, thread_id],
1188 )
1189 .with_context(|| {
1190 format!(
1191 "failed to update thread current leaf id for thread {:?}",
1192 thread_id
1193 )
1194 })?;
1195
1196 tx.commit()
1197 .context("failed to commit fork message transaction")?;
1198
1199 Ok(next_leaf_id)
1200 }
1201
1202 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
1206 let mut conn = self.conn()?;
1207 let tx = conn
1208 .transaction()
1209 .context("failed to begin clear messages transaction")?;
1210
1211 tx.execute(
1212 r#"
1213 UPDATE threads
1214 SET current_leaf_id = NULL
1215 WHERE id = ?1;
1216 "#,
1217 params![thread_id],
1218 )
1219 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1220 let result = tx
1221 .execute(
1222 r#"
1223 DELETE FROM messages WHERE thread_id = ?1
1224 "#,
1225 params![thread_id],
1226 )
1227 .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1228 tx.commit()
1229 .context("failed to commit clear messages transaction")?;
1230
1231 Ok(result)
1232 }
1233
1234 pub fn save_checkpoint(
1239 &self,
1240 thread_id: &str,
1241 checkpoint_id: &str,
1242 state: &Value,
1243 ) -> Result<()> {
1244 let conn = self.conn()?;
1245 let state_json =
1246 serde_json::to_string(state).context("failed to encode checkpoint state")?;
1247 conn.execute(
1248 r#"
1249 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1250 VALUES (?1, ?2, ?3, ?4)
1251 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1252 state_json = excluded.state_json,
1253 created_at = excluded.created_at
1254 "#,
1255 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1256 )
1257 .with_context(|| {
1258 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1259 })?;
1260 Ok(())
1261 }
1262
1263 pub fn load_checkpoint(
1269 &self,
1270 thread_id: &str,
1271 checkpoint_id: Option<&str>,
1272 ) -> Result<Option<CheckpointRecord>> {
1273 let conn = self.conn()?;
1274 if let Some(checkpoint_id) = checkpoint_id {
1275 let row = conn
1276 .query_row(
1277 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1278 params![thread_id, checkpoint_id],
1279 |row| {
1280 let state_json: String = row.get(2)?;
1281 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1282 Ok(CheckpointRecord {
1283 thread_id: row.get(0)?,
1284 checkpoint_id: row.get(1)?,
1285 state,
1286 created_at: row.get(3)?,
1287 })
1288 },
1289 )
1290 .optional()
1291 .with_context(|| {
1292 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1293 })?;
1294 return Ok(row);
1295 }
1296
1297 conn.query_row(
1298 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1299 params![thread_id],
1300 |row| {
1301 let state_json: String = row.get(2)?;
1302 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1303 Ok(CheckpointRecord {
1304 thread_id: row.get(0)?,
1305 checkpoint_id: row.get(1)?,
1306 state,
1307 created_at: row.get(3)?,
1308 })
1309 },
1310 )
1311 .optional()
1312 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1313 }
1314
1315 pub fn list_checkpoints(
1319 &self,
1320 thread_id: &str,
1321 limit: Option<usize>,
1322 ) -> Result<Vec<CheckpointRecord>> {
1323 let conn = self.conn()?;
1324 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1325 let mut stmt = conn
1326 .prepare(
1327 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1328 )
1329 .context("failed to prepare checkpoint list query")?;
1330 let mut rows = stmt
1331 .query(params![thread_id, limit])
1332 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1333
1334 let mut out = Vec::new();
1335 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1336 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1337 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1338 out.push(CheckpointRecord {
1339 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1340 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1341 state,
1342 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1343 });
1344 }
1345 Ok(out)
1346 }
1347
1348 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1350 let conn = self.conn()?;
1351 conn.execute(
1352 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1353 params![thread_id, checkpoint_id],
1354 )
1355 .with_context(|| {
1356 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1357 })?;
1358 Ok(())
1359 }
1360
1361 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1363 let conn = self.conn()?;
1364 conn.execute(
1365 r#"
1366 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1367 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1368 ON CONFLICT(id) DO UPDATE SET
1369 name = excluded.name,
1370 status = excluded.status,
1371 progress = excluded.progress,
1372 detail = excluded.detail,
1373 created_at = excluded.created_at,
1374 updated_at = excluded.updated_at
1375 "#,
1376 params![
1377 job.id,
1378 job.name,
1379 job_state_status_to_str(&job.status),
1380 job.progress.map(i64::from),
1381 job.detail,
1382 job.created_at,
1383 job.updated_at
1384 ],
1385 )
1386 .with_context(|| format!("failed to upsert job {}", job.id))?;
1387 Ok(())
1388 }
1389
1390 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1394 let conn = self.conn()?;
1395 conn.query_row(
1396 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1397 params![id],
1398 |row| {
1399 let status_raw: String = row.get(2)?;
1400 let progress: Option<i64> = row.get(3)?;
1401 Ok(JobStateRecord {
1402 id: row.get(0)?,
1403 name: row.get(1)?,
1404 status: job_state_status_from_str(&status_raw),
1405 progress: progress.and_then(|v| u8::try_from(v).ok()),
1406 detail: row.get(4)?,
1407 created_at: row.get(5)?,
1408 updated_at: row.get(6)?,
1409 })
1410 },
1411 )
1412 .optional()
1413 .with_context(|| format!("failed to read job {id}"))
1414 }
1415
1416 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1420 let conn = self.conn()?;
1421 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1422 let mut stmt = conn
1423 .prepare(
1424 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1425 )
1426 .context("failed to prepare job list query")?;
1427 let mut rows = stmt
1428 .query(params![limit])
1429 .context("failed to query persisted jobs")?;
1430 let mut out = Vec::new();
1431 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1432 let status_raw: String = row.get(2).context("failed to read job status")?;
1433 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1434 out.push(JobStateRecord {
1435 id: row.get(0).context("failed to read job id")?,
1436 name: row.get(1).context("failed to read job name")?,
1437 status: job_state_status_from_str(&status_raw),
1438 progress: progress.and_then(|v| u8::try_from(v).ok()),
1439 detail: row.get(4).context("failed to read job detail")?,
1440 created_at: row.get(5).context("failed to read job created_at")?,
1441 updated_at: row.get(6).context("failed to read job updated_at")?,
1442 });
1443 }
1444 Ok(out)
1445 }
1446
1447 pub fn delete_job(&self, id: &str) -> Result<()> {
1449 let conn = self.conn()?;
1450 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1451 .with_context(|| format!("failed to delete job {id}"))?;
1452 Ok(())
1453 }
1454
1455 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1457 let conn = self.conn()?;
1458 conn.query_row(
1459 "SELECT rollout_path FROM threads WHERE id = ?1",
1460 params![id],
1461 |row| row.get::<_, Option<String>>(0),
1462 )
1463 .optional()
1464 .context("failed to lookup rollout path")
1465 .map(|opt| opt.flatten().map(PathBuf::from))
1466 }
1467
1468 pub fn append_thread_name(
1474 &self,
1475 thread_id: &str,
1476 thread_name: Option<String>,
1477 updated_at: i64,
1478 rollout_path: Option<PathBuf>,
1479 ) -> Result<()> {
1480 if let Some(parent) = self.session_index_path.parent() {
1481 fs::create_dir_all(parent).with_context(|| {
1482 format!(
1483 "failed to create session index directory {}",
1484 parent.display()
1485 )
1486 })?;
1487 }
1488 let entry = SessionIndexEntry {
1489 thread_id: thread_id.to_string(),
1490 thread_name,
1491 updated_at,
1492 rollout_path,
1493 };
1494 let encoded =
1495 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1496 let mut file = OpenOptions::new()
1497 .create(true)
1498 .append(true)
1499 .open(&self.session_index_path)
1500 .with_context(|| {
1501 format!(
1502 "failed to open session index {}",
1503 self.session_index_path.display()
1504 )
1505 })?;
1506 writeln!(file, "{encoded}").context("failed to append session index entry")?;
1507 Ok(())
1508 }
1509
1510 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1514 let map = self.session_index_map()?;
1515 Ok(map
1516 .get(thread_id)
1517 .and_then(|entry| entry.thread_name.clone()))
1518 }
1519
1520 pub fn find_thread_names_by_ids(
1524 &self,
1525 ids: &[String],
1526 ) -> Result<HashMap<String, Option<String>>> {
1527 let map = self.session_index_map()?;
1528 let mut out = HashMap::new();
1529 for id in ids {
1530 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1531 out.insert(id.clone(), name);
1532 }
1533 Ok(out)
1534 }
1535
1536 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1541 let map = self.session_index_map()?;
1542 let matched = map
1543 .values()
1544 .filter(|entry| {
1545 entry
1546 .thread_name
1547 .as_deref()
1548 .is_some_and(|n| n.eq_ignore_ascii_case(name))
1549 })
1550 .max_by_key(|entry| entry.updated_at);
1551 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1552 }
1553
1554 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1555 if !self.session_index_path.exists() {
1556 return Ok(HashMap::new());
1557 }
1558 let file = OpenOptions::new()
1559 .read(true)
1560 .open(&self.session_index_path)
1561 .with_context(|| {
1562 format!(
1563 "failed to read session index {}",
1564 self.session_index_path.display()
1565 )
1566 })?;
1567 let reader = BufReader::new(file);
1568 let mut latest = HashMap::<String, SessionIndexEntry>::new();
1569 for line in reader.lines() {
1570 let line = line.context("failed to read session index line")?;
1571 if line.trim().is_empty() {
1572 continue;
1573 }
1574 let parsed: SessionIndexEntry =
1575 serde_json::from_str(&line).context("failed to parse session index entry")?;
1576 latest.insert(parsed.thread_id.clone(), parsed);
1577 }
1578 Ok(latest)
1579 }
1580}
1581
1582fn default_state_db_path() -> PathBuf {
1583 let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1584 let primary = home.join(".codewhale").join("state.db");
1587 if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1588 primary
1589 } else {
1590 home.join(".deepseek").join("state.db")
1591 }
1592}
1593
1594fn bool_to_i64(value: bool) -> i64 {
1595 if value { 1 } else { 0 }
1596}
1597
1598fn i64_to_bool(value: i64) -> bool {
1599 value != 0
1600}
1601
1602fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1603 match status {
1604 ThreadStatus::Running => "running",
1605 ThreadStatus::Idle => "idle",
1606 ThreadStatus::Completed => "completed",
1607 ThreadStatus::Failed => "failed",
1608 ThreadStatus::Paused => "paused",
1609 ThreadStatus::Archived => "archived",
1610 }
1611}
1612
1613fn thread_status_from_str(value: &str) -> ThreadStatus {
1614 match value {
1615 "running" => ThreadStatus::Running,
1616 "idle" => ThreadStatus::Idle,
1617 "completed" => ThreadStatus::Completed,
1618 "failed" => ThreadStatus::Failed,
1619 "paused" => ThreadStatus::Paused,
1620 "archived" => ThreadStatus::Archived,
1621 _ => ThreadStatus::Idle,
1622 }
1623}
1624
1625fn session_source_to_str(source: &SessionSource) -> &'static str {
1626 match source {
1627 SessionSource::Interactive => "interactive",
1628 SessionSource::Resume => "resume",
1629 SessionSource::Fork => "fork",
1630 SessionSource::Api => "api",
1631 SessionSource::Unknown => "unknown",
1632 }
1633}
1634
1635fn session_source_from_str(value: &str) -> SessionSource {
1636 match value {
1637 "interactive" => SessionSource::Interactive,
1638 "resume" => SessionSource::Resume,
1639 "fork" => SessionSource::Fork,
1640 "api" => SessionSource::Api,
1641 _ => SessionSource::Unknown,
1642 }
1643}
1644
1645fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1646 path.map(|p| p.display().to_string())
1647}
1648
1649fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1650 match status {
1651 JobStateStatus::Queued => "queued",
1652 JobStateStatus::Running => "running",
1653 JobStateStatus::Completed => "completed",
1654 JobStateStatus::Failed => "failed",
1655 JobStateStatus::Cancelled => "cancelled",
1656 }
1657}
1658
1659fn job_state_status_from_str(value: &str) -> JobStateStatus {
1660 match value {
1661 "queued" => JobStateStatus::Queued,
1662 "running" => JobStateStatus::Running,
1663 "completed" => JobStateStatus::Completed,
1664 "failed" => JobStateStatus::Failed,
1665 "cancelled" => JobStateStatus::Cancelled,
1666 _ => JobStateStatus::Queued,
1667 }
1668}
1669
1670fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str {
1671 match status {
1672 ThreadGoalStatus::Active => "active",
1673 ThreadGoalStatus::Paused => "paused",
1674 ThreadGoalStatus::Blocked => "blocked",
1675 ThreadGoalStatus::UsageLimited => "usage_limited",
1676 ThreadGoalStatus::BudgetLimited => "budget_limited",
1677 ThreadGoalStatus::Complete => "complete",
1678 }
1679}
1680
1681fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus {
1682 match value {
1683 "active" => ThreadGoalStatus::Active,
1684 "paused" => ThreadGoalStatus::Paused,
1685 "blocked" => ThreadGoalStatus::Blocked,
1686 "usage_limited" => ThreadGoalStatus::UsageLimited,
1687 "budget_limited" => ThreadGoalStatus::BudgetLimited,
1688 "complete" => ThreadGoalStatus::Complete,
1689 _ => ThreadGoalStatus::Active,
1690 }
1691}
1692
1693fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1694 let status_raw: String = row.get(7)?;
1695 let source_raw: String = row.get(11)?;
1696 let rollout_path: Option<String> = row.get(1)?;
1697 let path: Option<String> = row.get(8)?;
1698 Ok(ThreadMetadata {
1699 id: row.get(0)?,
1700 rollout_path: rollout_path.map(PathBuf::from),
1701 preview: row.get(2)?,
1702 ephemeral: i64_to_bool(row.get(3)?),
1703 model_provider: row.get(4)?,
1704 created_at: row.get(5)?,
1705 updated_at: row.get(6)?,
1706 status: thread_status_from_str(&status_raw),
1707 path: path.map(PathBuf::from),
1708 cwd: PathBuf::from(row.get::<_, String>(9)?),
1709 cli_version: row.get(10)?,
1710 source: session_source_from_str(&source_raw),
1711 name: row.get(12)?,
1712 sandbox_policy: row.get(13)?,
1713 approval_mode: row.get(14)?,
1714 archived: i64_to_bool(row.get(15)?),
1715 archived_at: row.get(16)?,
1716 git_sha: row.get(17)?,
1717 git_branch: row.get(18)?,
1718 git_origin_url: row.get(19)?,
1719 memory_mode: row.get(20)?,
1720 current_leaf_id: row.get(21)?,
1721 })
1722}
1723
1724fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadGoalRecord> {
1725 let status_raw: String = row.get(3)?;
1726 Ok(ThreadGoalRecord {
1727 thread_id: row.get(0)?,
1728 goal_id: row.get(1)?,
1729 objective: row.get(2)?,
1730 status: thread_goal_status_from_str(&status_raw),
1731 token_budget: row.get(4)?,
1732 tokens_used: row.get(5)?,
1733 time_used_seconds: row.get(6)?,
1734 continuation_count: row.get(7)?,
1735 created_at: row.get(8)?,
1736 updated_at: row.get(9)?,
1737 })
1738}
1739
1740#[cfg(test)]
1741mod tests {
1742 use super::*;
1743 use std::time::{SystemTime, UNIX_EPOCH};
1744
1745 fn temp_state_store(name: &str) -> StateStore {
1746 let suffix = SystemTime::now()
1747 .duration_since(UNIX_EPOCH)
1748 .expect("system time")
1749 .as_nanos();
1750 let dir = std::env::temp_dir().join(format!(
1751 "codewhale-state-{name}-{}-{suffix}",
1752 std::process::id()
1753 ));
1754 fs::create_dir_all(&dir).expect("create temp state dir");
1755 StateStore::open(Some(dir.join("state.db"))).expect("open state store")
1756 }
1757
1758 fn test_thread(id: &str) -> ThreadMetadata {
1759 ThreadMetadata {
1760 id: id.to_string(),
1761 rollout_path: None,
1762 preview: "test thread".to_string(),
1763 ephemeral: false,
1764 model_provider: "deepseek".to_string(),
1765 created_at: 10,
1766 updated_at: 10,
1767 status: ThreadStatus::Running,
1768 path: None,
1769 cwd: PathBuf::from("/tmp/codewhale"),
1770 cli_version: "0.0.0-test".to_string(),
1771 source: SessionSource::Interactive,
1772 name: None,
1773 sandbox_policy: None,
1774 approval_mode: None,
1775 archived: false,
1776 archived_at: None,
1777 git_sha: None,
1778 git_branch: None,
1779 git_origin_url: None,
1780 memory_mode: None,
1781 current_leaf_id: None,
1782 }
1783 }
1784
1785 fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord {
1786 ThreadGoalRecord {
1787 thread_id: thread_id.to_string(),
1788 goal_id: "goal-1".to_string(),
1789 objective: objective.to_string(),
1790 status: ThreadGoalStatus::Active,
1791 token_budget: Some(123),
1792 tokens_used: 7,
1793 time_used_seconds: 11,
1794 continuation_count: 0,
1795 created_at: 100,
1796 updated_at: 101,
1797 }
1798 }
1799
1800 #[test]
1801 fn thread_goal_crud_round_trips_and_replaces() {
1802 let store = temp_state_store("thread-goal-crud");
1803 store
1804 .upsert_thread(&test_thread("thread-1"))
1805 .expect("upsert thread");
1806
1807 let goal = test_goal("thread-1", "Ship v0.8.59");
1808 store.upsert_thread_goal(&goal).expect("upsert goal");
1809 assert_eq!(
1810 store
1811 .get_thread_goal("thread-1")
1812 .expect("read goal")
1813 .as_ref(),
1814 Some(&goal)
1815 );
1816
1817 let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely");
1818 replacement.goal_id = "goal-2".to_string();
1819 replacement.status = ThreadGoalStatus::BudgetLimited;
1820 replacement.token_budget = None;
1821 replacement.updated_at = 202;
1822 store
1823 .upsert_thread_goal(&replacement)
1824 .expect("replace goal");
1825 assert_eq!(
1826 store.get_thread_goal("thread-1").expect("read replacement"),
1827 Some(replacement)
1828 );
1829
1830 assert!(store.delete_thread_goal("thread-1").expect("delete goal"));
1831 assert!(
1832 store
1833 .get_thread_goal("thread-1")
1834 .expect("read empty")
1835 .is_none()
1836 );
1837 assert!(!store.delete_thread_goal("thread-1").expect("delete empty"));
1838 }
1839
1840 #[test]
1841 fn thread_goal_requires_existing_thread() {
1842 let store = temp_state_store("thread-goal-missing-thread");
1843 let err = store
1844 .upsert_thread_goal(&test_goal("missing-thread", "nope"))
1845 .expect_err("goal without a thread should fail");
1846 assert!(err.to_string().contains("thread missing-thread not found"));
1847 }
1848
1849 #[test]
1850 fn record_thread_goal_usage_accumulates_tokens_and_time() {
1851 let store = temp_state_store("thread-goal-usage");
1852 store
1853 .upsert_thread(&test_thread("thread-1"))
1854 .expect("upsert thread");
1855
1856 let mut goal = test_goal("thread-1", "Ship the persistent goal loop");
1858 goal.tokens_used = 0;
1859 goal.time_used_seconds = 0;
1860 goal.updated_at = 100;
1861 store.upsert_thread_goal(&goal).expect("upsert goal");
1862
1863 let after_first = store
1865 .record_thread_goal_usage("thread-1", 250, 12, 150)
1866 .expect("record usage")
1867 .expect("goal exists");
1868 assert_eq!(after_first.tokens_used, 250);
1869 assert_eq!(after_first.time_used_seconds, 12);
1870 assert_eq!(after_first.updated_at, 150);
1871 assert_eq!(after_first.goal_id, goal.goal_id);
1873 assert_eq!(after_first.objective, goal.objective);
1874 assert_eq!(after_first.status, goal.status);
1875 assert_eq!(after_first.token_budget, goal.token_budget);
1876 assert_eq!(after_first.created_at, goal.created_at);
1877 assert_eq!(after_first.continuation_count, 0);
1878
1879 let after_second = store
1881 .record_thread_goal_usage("thread-1", 75, 8, 200)
1882 .expect("record usage")
1883 .expect("goal exists");
1884 assert_eq!(after_second.tokens_used, 325);
1885 assert_eq!(after_second.time_used_seconds, 20);
1886 assert_eq!(after_second.updated_at, 200);
1887
1888 let after_stale = store
1890 .record_thread_goal_usage("thread-1", 5, 1, 1)
1891 .expect("record usage")
1892 .expect("goal exists");
1893 assert_eq!(after_stale.tokens_used, 330);
1894 assert_eq!(after_stale.time_used_seconds, 21);
1895 assert_eq!(after_stale.updated_at, 200);
1896
1897 let persisted = store
1899 .get_thread_goal("thread-1")
1900 .expect("read goal")
1901 .expect("goal exists");
1902 assert_eq!(persisted.tokens_used, 330);
1903 assert_eq!(persisted.time_used_seconds, 21);
1904 }
1905
1906 #[test]
1907 fn record_thread_goal_usage_returns_none_without_goal() {
1908 let store = temp_state_store("thread-goal-usage-missing");
1909 store
1910 .upsert_thread(&test_thread("thread-1"))
1911 .expect("upsert thread");
1912 let result = store
1915 .record_thread_goal_usage("thread-1", 100, 5, 999)
1916 .expect("record usage on goalless thread");
1917 assert!(result.is_none());
1918 assert!(
1919 store
1920 .get_thread_goal("thread-1")
1921 .expect("read goal")
1922 .is_none()
1923 );
1924 }
1925
1926 #[test]
1927 fn record_thread_goal_continuation_accumulates_durably() {
1928 let store = temp_state_store("thread-goal-continuation");
1929 store
1930 .upsert_thread(&test_thread("thread-1"))
1931 .expect("upsert thread");
1932
1933 let mut goal = test_goal("thread-1", "Keep working across turns");
1934 goal.updated_at = 100;
1935 store.upsert_thread_goal(&goal).expect("upsert goal");
1936
1937 let after_first = store
1938 .record_thread_goal_continuation("thread-1", 120)
1939 .expect("record continuation")
1940 .expect("goal exists");
1941 assert_eq!(after_first.continuation_count, 1);
1942 assert_eq!(after_first.tokens_used, goal.tokens_used);
1943 assert_eq!(after_first.time_used_seconds, goal.time_used_seconds);
1944 assert_eq!(after_first.updated_at, 120);
1945
1946 let after_second = store
1947 .record_thread_goal_continuation("thread-1", 110)
1948 .expect("record second continuation")
1949 .expect("goal exists");
1950 assert_eq!(after_second.continuation_count, 2);
1951 assert_eq!(after_second.updated_at, 120);
1952
1953 let persisted = store
1954 .get_thread_goal("thread-1")
1955 .expect("read goal")
1956 .expect("goal exists");
1957 assert_eq!(persisted.continuation_count, 2);
1958 }
1959}