Skip to main content

codewhale_state/
lib.rs

1//! Persistent state management for conversation threads, messages, and jobs.
2//!
3//! The [`StateStore`] is the primary entry point, backed by a SQLite database and an
4//! append-only JSONL session index file. It provides CRUD operations for:
5//!
6//! - **Threads** — conversation metadata, archival, and session indexing.
7//! - **Messages** — append-only message storage with tree-structured branching.
8//! - **Checkpoints** — named state snapshots for restoring conversation progress.
9//! - **Jobs** — background task tracking with status and progress.
10//! - **Dynamic tools** — per-thread tool registrations.
11
12use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23/// Lifecycle status of a conversation thread.
24///
25/// Serialized as lowercase snake_case strings (e.g. `"running"`, `"archived"`).
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29    /// Thread is actively being worked on.
30    Running,
31    /// Thread exists but has no active work in progress.
32    Idle,
33    /// Thread has finished its task successfully.
34    Completed,
35    /// Thread encountered an unrecoverable error.
36    Failed,
37    /// Thread has been temporarily paused by the user.
38    Paused,
39    /// Thread has been archived and is hidden from default listings.
40    Archived,
41}
42
43/// Indicates how a session was initiated.
44///
45/// Serialized as lowercase snake_case strings.
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49    /// Started by a user interacting with the CLI.
50    Interactive,
51    /// Resumed from a previously persisted session.
52    Resume,
53    /// Created by forking an existing conversation at a specific message.
54    Fork,
55    /// Initiated programmatically via the API.
56    Api,
57    /// Source is unknown or unspecified.
58    Unknown,
59}
60
61/// Metadata for a persisted conversation thread.
62///
63/// Each thread represents a single conversation session and stores its
64/// configuration, git context, and current status.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67    /// Unique identifier for this thread.
68    pub id: String,
69    /// Optional filesystem path to the rollout (JSONL transcript) file.
70    pub rollout_path: Option<PathBuf>,
71    /// Short preview or summary of the thread content.
72    pub preview: String,
73    /// Whether this thread is ephemeral (not persisted long-term).
74    pub ephemeral: bool,
75    /// Identifier of the model provider used for this thread (e.g. `"openai"`).
76    pub model_provider: String,
77    /// Unix timestamp (seconds) when the thread was created.
78    pub created_at: i64,
79    /// Unix timestamp (seconds) of the most recent update to the thread.
80    pub updated_at: i64,
81    /// Current lifecycle status of the thread.
82    pub status: ThreadStatus,
83    /// Optional filesystem path associated with the thread working context.
84    pub path: Option<PathBuf>,
85    /// Working directory that was active when the thread was created.
86    pub cwd: PathBuf,
87    /// Version of the CLI that created this thread.
88    pub cli_version: String,
89    /// How this session was initiated.
90    pub source: SessionSource,
91    /// User-assigned display name for the thread.
92    pub name: Option<String>,
93    /// Serialized sandbox policy applied to this thread, if any.
94    pub sandbox_policy: Option<String>,
95    /// Approval mode configured for tool calls in this thread.
96    pub approval_mode: Option<String>,
97    /// Whether the thread has been archived.
98    pub archived: bool,
99    /// Unix timestamp (seconds) when the thread was archived, or `None` if not archived.
100    pub archived_at: Option<i64>,
101    /// Git commit SHA of the working tree when the thread was created.
102    pub git_sha: Option<String>,
103    /// Git branch checked out when the thread was created.
104    pub git_branch: Option<String>,
105    /// URL of the git remote origin, if available.
106    pub git_origin_url: Option<String>,
107    /// Memory mode configured for this thread (e.g. `"local"`, `"remote"`).
108    pub memory_mode: Option<String>,
109    /// ID of the current leaf message in the conversation tree.
110    pub current_leaf_id: Option<i64>,
111}
112
113/// A dynamically registered tool associated with a thread.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116    /// Ordinal position of this tool in the thread tool list.
117    pub position: i64,
118    /// Unique name identifying the tool.
119    pub name: String,
120    /// Human-readable description of what the tool does.
121    pub description: Option<String>,
122    /// JSON Schema describing the tool input parameters.
123    pub input_schema: Value,
124}
125
126/// A single message entry in a conversation thread.
127///
128/// Messages form a tree structure via [`parent_entry_id`](Self::parent_entry_id),
129/// enabling conversation branching and forking.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132    /// Auto-incremented unique identifier for this message.
133    pub id: i64,
134    /// ID of the thread this message belongs to.
135    pub thread_id: String,
136    /// Role of the message sender (e.g. `"user"`, `"assistant"`, `"system"`).
137    pub role: String,
138    /// Text content of the message.
139    pub content: String,
140    /// Optional structured item payload (tool calls, tool results, etc.).
141    pub item: Option<Value>,
142    /// Unix timestamp (seconds) when the message was created.
143    pub created_at: i64,
144    /// ID of the parent message, forming a tree structure. `None` for root messages.
145    pub parent_entry_id: Option<i64>,
146}
147
148/// A named checkpoint capturing the state of a thread at a point in time.
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151    /// ID of the thread this checkpoint belongs to.
152    pub thread_id: String,
153    /// Unique identifier for this checkpoint within its thread.
154    pub checkpoint_id: String,
155    /// Serialized state snapshot stored as a JSON value.
156    pub state: Value,
157    /// Unix timestamp (seconds) when the checkpoint was created or last updated.
158    pub created_at: i64,
159}
160
161/// Status of a background job.
162///
163/// Serialized as lowercase snake_case strings.
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167    /// Job is waiting to be executed.
168    Queued,
169    /// Job is currently executing.
170    Running,
171    /// Job has finished successfully.
172    Completed,
173    /// Job has failed with an error.
174    Failed,
175    /// Job was cancelled before completion.
176    Cancelled,
177}
178
179/// Persisted state of a background job.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182    /// Unique identifier for the job.
183    pub id: String,
184    /// Human-readable name describing the job.
185    pub name: String,
186    /// Current lifecycle status of the job.
187    pub status: JobStateStatus,
188    /// Completion progress as a percentage (0--100), if available.
189    pub progress: Option<u8>,
190    /// Optional detail message providing additional status information.
191    pub detail: Option<String>,
192    /// Unix timestamp (seconds) when the job was created.
193    pub created_at: i64,
194    /// Unix timestamp (seconds) of the most recent status update.
195    pub updated_at: i64,
196}
197
198/// Persisted lifecycle status for a thread goal.
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
200#[serde(rename_all = "snake_case")]
201pub enum ThreadGoalStatus {
202    /// Goal is active and should continue receiving work.
203    Active,
204    /// Goal is paused by the user.
205    Paused,
206    /// Goal is blocked and cannot make meaningful progress.
207    Blocked,
208    /// Goal stopped because account/service usage limits were reached.
209    UsageLimited,
210    /// Goal stopped because its explicit token budget was reached.
211    BudgetLimited,
212    /// Goal has been completed.
213    Complete,
214}
215
216/// Persisted goal state attached to a thread.
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
218pub struct ThreadGoalRecord {
219    /// Thread this goal belongs to.
220    pub thread_id: String,
221    /// Stable identifier for this goal revision.
222    pub goal_id: String,
223    /// User-visible objective.
224    pub objective: String,
225    /// Current lifecycle status.
226    pub status: ThreadGoalStatus,
227    /// Optional token budget requested by the user.
228    pub token_budget: Option<i64>,
229    /// Tokens consumed while pursuing the goal.
230    pub tokens_used: i64,
231    /// Elapsed wall-clock work time in seconds.
232    pub time_used_seconds: i64,
233    /// Durable continuation passes dispatched for this objective.
234    pub continuation_count: i64,
235    /// Unix timestamp (seconds) when the goal was created.
236    pub created_at: i64,
237    /// Unix timestamp (seconds) when the goal was last updated.
238    pub updated_at: i64,
239}
240
241/// Filters for listing conversation threads.
242#[derive(Debug, Clone)]
243pub struct ThreadListFilters {
244    /// Whether to include archived threads in the results.
245    pub include_archived: bool,
246    /// Maximum number of threads to return. Defaults to 50.
247    pub limit: Option<usize>,
248}
249
250impl Default for ThreadListFilters {
251    fn default() -> Self {
252        Self {
253            include_archived: false,
254            limit: Some(50),
255        }
256    }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260struct SessionIndexEntry {
261    thread_id: String,
262    thread_name: Option<String>,
263    updated_at: i64,
264    rollout_path: Option<PathBuf>,
265}
266
267/// Persistent storage for conversation threads, messages, checkpoints, and jobs.
268///
269/// Backed by a SQLite database and an append-only JSONL session index file.
270/// The database schema is automatically initialized and migrated on [`open`](Self::open).
271#[derive(Debug, Clone)]
272pub struct StateStore {
273    db_path: PathBuf,
274    session_index_path: PathBuf,
275}
276
277impl StateStore {
278    /// Open (or create) a state store at the given database path.
279    ///
280    /// If `path` is `None`, the default location (`~/.codewhale/state.db`, with
281    /// `~/.deepseek/state.db` as a legacy fallback) is used.
282    /// The database schema is created automatically if it does not exist.
283    pub fn open(path: Option<PathBuf>) -> Result<Self> {
284        let db_path = path.unwrap_or_else(default_state_db_path);
285        let session_index_path = db_path
286            .parent()
287            .unwrap_or_else(|| Path::new("."))
288            .join("session_index.jsonl");
289        if let Some(parent) = db_path.parent() {
290            fs::create_dir_all(parent).with_context(|| {
291                format!("failed to create state directory {}", parent.display())
292            })?;
293        }
294        let store = Self {
295            db_path,
296            session_index_path,
297        };
298        store.init_schema()?;
299        Ok(store)
300    }
301
302    /// Returns the filesystem path of the underlying SQLite database.
303    pub fn db_path(&self) -> &Path {
304        &self.db_path
305    }
306
307    fn conn(&self) -> Result<Connection> {
308        Connection::open(&self.db_path)
309            .with_context(|| format!("failed to open state db {}", self.db_path.display()))
310    }
311
312    fn init_schema(&self) -> Result<()> {
313        let conn = self.conn()?;
314        let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
315        if user_version == 0 {
316            conn.execute_batch(
317                r#"
318                BEGIN;
319                CREATE TABLE IF NOT EXISTS threads (
320                    id TEXT PRIMARY KEY,
321                    rollout_path TEXT,
322                    preview TEXT NOT NULL,
323                    ephemeral INTEGER NOT NULL,
324                    model_provider TEXT NOT NULL,
325                    created_at INTEGER NOT NULL,
326                    updated_at INTEGER NOT NULL,
327                    status TEXT NOT NULL,
328                    path TEXT,
329                    cwd TEXT NOT NULL,
330                    cli_version TEXT NOT NULL,
331                    source TEXT NOT NULL,
332                    title TEXT,
333                    sandbox_policy TEXT,
334                    approval_mode TEXT,
335                    archived INTEGER NOT NULL DEFAULT 0,
336                    archived_at INTEGER,
337                    git_sha TEXT,
338                    git_branch TEXT,
339                    git_origin_url TEXT,
340                    memory_mode TEXT
341                );
342                CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
343                CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
344                CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
345
346                CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
347                    thread_id TEXT NOT NULL,
348                    position INTEGER NOT NULL,
349                    name TEXT NOT NULL,
350                    description TEXT,
351                    input_schema TEXT NOT NULL,
352                    PRIMARY KEY (thread_id, position),
353                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
354                );
355
356                CREATE TABLE IF NOT EXISTS messages (
357                    id INTEGER PRIMARY KEY AUTOINCREMENT,
358                    thread_id TEXT NOT NULL,
359                    role TEXT NOT NULL,
360                    content TEXT NOT NULL,
361                    item_json TEXT,
362                    created_at INTEGER NOT NULL,
363                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
364                );
365                CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
366
367                CREATE TABLE IF NOT EXISTS checkpoints (
368                    thread_id TEXT NOT NULL,
369                    checkpoint_id TEXT NOT NULL,
370                    state_json TEXT NOT NULL,
371                    created_at INTEGER NOT NULL,
372                    PRIMARY KEY(thread_id, checkpoint_id),
373                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
374                );
375                CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
376
377                CREATE TABLE IF NOT EXISTS jobs (
378                    id TEXT PRIMARY KEY,
379                    name TEXT NOT NULL,
380                    status TEXT NOT NULL,
381                    progress INTEGER,
382                    detail TEXT,
383                    created_at INTEGER NOT NULL,
384                    updated_at INTEGER NOT NULL
385                );
386                CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
387
388                -- Add parent_entry_id column, and set to last message before current message
389                ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
390                UPDATE messages
391                    SET parent_entry_id = (
392                        SELECT m2.id
393                        FROM messages m2
394                        WHERE m2.thread_id = messages.thread_id
395                            AND (
396                                m2.created_at < messages.created_at
397                                OR (
398                                    m2.created_at = messages.created_at
399                                    AND m2.id < messages.id
400                                )
401                            )
402                        ORDER BY m2.created_at DESC, m2.id DESC
403                        LIMIT 1
404                    );
405                CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
406
407                -- Add current_leaf_id column, and set to last message in thread
408                ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
409                UPDATE threads
410                    SET current_leaf_id = (
411                        SELECT m.id
412                        FROM messages m
413                        WHERE m.thread_id = threads.id
414                        ORDER BY m.id DESC
415                        LIMIT 1
416                    );
417
418                PRAGMA user_version = 1;
419                COMMIT;
420                "#,
421            )
422            .context("failed to initialize thread schema")?;
423            user_version = 1;
424        }
425        if user_version < 2 {
426            conn.execute_batch(
427                r#"
428                BEGIN;
429                CREATE TABLE IF NOT EXISTS workflow_runs (
430                    id TEXT PRIMARY KEY,
431                    workflow_id TEXT NOT NULL,
432                    goal TEXT NOT NULL,
433                    status TEXT NOT NULL,
434                    input_hash TEXT,
435                    started_at INTEGER NOT NULL,
436                    completed_at INTEGER,
437                    metadata_json TEXT NOT NULL DEFAULT '{}'
438                );
439                CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
440                    ON workflow_runs(status, started_at DESC);
441                CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
442                    ON workflow_runs(workflow_id, started_at DESC);
443
444                CREATE TABLE IF NOT EXISTS branch_runs (
445                    id TEXT PRIMARY KEY,
446                    workflow_run_id TEXT NOT NULL,
447                    branch_id TEXT NOT NULL,
448                    node_id TEXT NOT NULL,
449                    status TEXT NOT NULL,
450                    started_at INTEGER NOT NULL,
451                    completed_at INTEGER,
452                    result_json TEXT NOT NULL DEFAULT '{}',
453                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
454                );
455                CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
456                    ON branch_runs(workflow_run_id);
457                CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
458                    ON branch_runs(branch_id);
459
460                CREATE TABLE IF NOT EXISTS leaf_runs (
461                    id TEXT PRIMARY KEY,
462                    workflow_run_id TEXT NOT NULL,
463                    branch_run_id TEXT,
464                    leaf_id TEXT NOT NULL,
465                    task_id TEXT NOT NULL,
466                    input_hash TEXT,
467                    status TEXT NOT NULL,
468                    output_json TEXT NOT NULL DEFAULT '{}',
469                    artifacts_json TEXT NOT NULL DEFAULT '[]',
470                    started_at INTEGER NOT NULL,
471                    completed_at INTEGER,
472                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
473                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
474                );
475                CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
476                    ON leaf_runs(workflow_run_id);
477                CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
478                    ON leaf_runs(workflow_run_id, leaf_id, input_hash);
479
480                CREATE TABLE IF NOT EXISTS control_node_runs (
481                    id TEXT PRIMARY KEY,
482                    workflow_run_id TEXT NOT NULL,
483                    node_id TEXT NOT NULL,
484                    kind TEXT NOT NULL,
485                    status TEXT NOT NULL,
486                    selected_children_json TEXT NOT NULL DEFAULT '[]',
487                    result_json TEXT NOT NULL DEFAULT '{}',
488                    started_at INTEGER NOT NULL,
489                    completed_at INTEGER,
490                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
491                );
492                CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
493                    ON control_node_runs(workflow_run_id);
494                CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
495                    ON control_node_runs(node_id);
496
497                CREATE TABLE IF NOT EXISTS teacher_candidates (
498                    id TEXT PRIMARY KEY,
499                    workflow_run_id TEXT NOT NULL,
500                    control_node_run_id TEXT NOT NULL,
501                    candidate_id TEXT NOT NULL,
502                    branch_run_id TEXT,
503                    score REAL,
504                    passed INTEGER,
505                    rationale_json TEXT NOT NULL DEFAULT '{}',
506                    created_at INTEGER NOT NULL,
507                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
508                    FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
509                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
510                );
511                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
512                    ON teacher_candidates(workflow_run_id);
513                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
514                    ON teacher_candidates(control_node_run_id);
515
516                PRAGMA user_version = 2;
517                COMMIT;
518                "#,
519            )
520            .context("failed to initialize workflow trace schema")?;
521            user_version = 2;
522        }
523        if user_version < 3 {
524            conn.execute_batch(
525                r#"
526                BEGIN;
527                CREATE TABLE IF NOT EXISTS thread_goals (
528                    thread_id TEXT PRIMARY KEY NOT NULL,
529                    goal_id TEXT NOT NULL,
530                    objective TEXT NOT NULL,
531                    status TEXT NOT NULL CHECK(status IN (
532                        'active',
533                        'paused',
534                        'blocked',
535                        'usage_limited',
536                        'budget_limited',
537                        'complete'
538                    )),
539                    token_budget INTEGER,
540                    tokens_used INTEGER NOT NULL DEFAULT 0,
541                    time_used_seconds INTEGER NOT NULL DEFAULT 0,
542                    created_at INTEGER NOT NULL,
543                    updated_at INTEGER NOT NULL,
544                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
545                );
546
547                PRAGMA user_version = 3;
548                COMMIT;
549                "#,
550            )
551            .context("failed to initialize thread goal schema")?;
552            user_version = 3;
553        }
554        if user_version < 4 {
555            conn.execute_batch(
556                r#"
557                BEGIN;
558                ALTER TABLE thread_goals
559                    ADD COLUMN continuation_count INTEGER NOT NULL DEFAULT 0;
560
561                PRAGMA user_version = 4;
562                COMMIT;
563                "#,
564            )
565            .context("failed to initialize thread goal continuation schema")?;
566        }
567        Ok(())
568    }
569
570    /// Insert or update thread metadata.
571    ///
572    /// This does **not** update `current_leaf_id`; use [`append_message`](Self::append_message)
573    /// or [`set_current_leaf_id`](Self::set_current_leaf_id) for that.
574    pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
575        let conn = self.conn()?;
576        conn.execute(
577            r#"
578            INSERT INTO threads (
579                id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
580                cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
581                git_sha, git_branch, git_origin_url, memory_mode
582            ) VALUES (
583                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
584                ?11, ?12, ?13, ?14, ?15, ?16, ?17,
585                ?18, ?19, ?20, ?21
586            )
587            ON CONFLICT(id) DO UPDATE SET
588                rollout_path=excluded.rollout_path,
589                preview=excluded.preview,
590                ephemeral=excluded.ephemeral,
591                model_provider=excluded.model_provider,
592                created_at=excluded.created_at,
593                updated_at=excluded.updated_at,
594                status=excluded.status,
595                path=excluded.path,
596                cwd=excluded.cwd,
597                cli_version=excluded.cli_version,
598                source=excluded.source,
599                title=excluded.title,
600                sandbox_policy=excluded.sandbox_policy,
601                approval_mode=excluded.approval_mode,
602                archived=excluded.archived,
603                archived_at=excluded.archived_at,
604                git_sha=excluded.git_sha,
605                git_branch=excluded.git_branch,
606                git_origin_url=excluded.git_origin_url,
607                memory_mode=excluded.memory_mode
608            "#,
609            params![
610                thread.id,
611                path_to_opt_string(thread.rollout_path.as_deref()),
612                thread.preview,
613                bool_to_i64(thread.ephemeral),
614                thread.model_provider,
615                thread.created_at,
616                thread.updated_at,
617                thread_status_to_str(&thread.status),
618                path_to_opt_string(thread.path.as_deref()),
619                thread.cwd.display().to_string(),
620                thread.cli_version,
621                session_source_to_str(&thread.source),
622                thread.name,
623                thread.sandbox_policy,
624                thread.approval_mode,
625                bool_to_i64(thread.archived),
626                thread.archived_at,
627                thread.git_sha,
628                thread.git_branch,
629                thread.git_origin_url,
630                thread.memory_mode,
631            ],
632        )
633        .context("failed to upsert thread metadata")?;
634
635        self.append_thread_name(
636            &thread.id,
637            thread.name.clone(),
638            thread.updated_at,
639            thread.rollout_path.clone(),
640        )?;
641        Ok(())
642    }
643
644    /// Retrieve a single thread by its ID.
645    ///
646    /// Returns `None` if no thread with the given ID exists.
647    pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
648        let conn = self.conn()?;
649        conn.query_row(
650            r#"
651            SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
652                   cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
653                   git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
654            FROM threads
655            WHERE id = ?1
656            "#,
657            params![id],
658            row_to_thread,
659        )
660        .optional()
661        .context("failed to read thread")
662    }
663
664    /// List threads ordered by most recently updated.
665    ///
666    /// Use [`ThreadListFilters`] to control whether archived threads are included
667    /// and the maximum number of results returned.
668    pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
669        let conn = self.conn()?;
670        let sql = if filters.include_archived {
671            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
672        } else {
673            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
674        };
675
676        let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
677        let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
678        let mut rows = stmt
679            .query(params![limit])
680            .context("failed to query threads")?;
681        let mut out = Vec::new();
682        while let Some(row) = rows.next().context("failed to iterate thread rows")? {
683            out.push(row_to_thread(row)?);
684        }
685        Ok(out)
686    }
687
688    /// Archive a thread, setting its status to [`ThreadStatus::Archived`] and
689    /// recording the current timestamp.
690    pub fn mark_archived(&self, id: &str) -> Result<()> {
691        let conn = self.conn()?;
692        conn.execute(
693            "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
694            params![
695                id,
696                Utc::now().timestamp(),
697                thread_status_to_str(&ThreadStatus::Archived)
698            ],
699        )
700        .context("failed to archive thread")?;
701        Ok(())
702    }
703
704    /// Unarchive a thread, removing the archived flag and clearing `archived_at`.
705    pub fn mark_unarchived(&self, id: &str) -> Result<()> {
706        let conn = self.conn()?;
707        conn.execute(
708            "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
709            params![id],
710        )
711        .context("failed to unarchive thread")?;
712        Ok(())
713    }
714
715    /// Permanently delete a thread and all of its associated data
716    /// (messages, checkpoints, dynamic tools) via cascading foreign keys.
717    pub fn delete_thread(&self, id: &str) -> Result<()> {
718        let conn = self.conn()?;
719        conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
720            .context("failed to delete thread")?;
721        Ok(())
722    }
723
724    /// Set the memory mode for a thread.
725    ///
726    /// Pass `None` to clear the memory mode.
727    pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
728        let conn = self.conn()?;
729        conn.execute(
730            "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
731            params![id, mode],
732        )
733        .context("failed to update thread memory mode")?;
734        Ok(())
735    }
736
737    /// Get the memory mode configured for a thread.
738    ///
739    /// Returns `None` if the thread does not exist or has no memory mode set.
740    pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
741        let conn = self.conn()?;
742        conn.query_row(
743            "SELECT memory_mode FROM threads WHERE id = ?1",
744            params![id],
745            |row| row.get::<_, Option<String>>(0),
746        )
747        .optional()
748        .context("failed to read thread memory mode")
749        .map(Option::flatten)
750    }
751
752    /// Insert or replace the persisted goal for a thread.
753    pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> {
754        let conn = self.conn()?;
755        let exists: Option<i64> = conn
756            .query_row(
757                "SELECT 1 FROM threads WHERE id = ?1",
758                params![goal.thread_id],
759                |row| row.get(0),
760            )
761            .optional()
762            .context("failed to verify thread before saving goal")?;
763        if exists.is_none() {
764            anyhow::bail!("thread {} not found", goal.thread_id);
765        }
766
767        conn.execute(
768            r#"
769            INSERT INTO thread_goals (
770                thread_id, goal_id, objective, status, token_budget, tokens_used,
771                time_used_seconds, continuation_count, created_at, updated_at
772            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
773            ON CONFLICT(thread_id) DO UPDATE SET
774                goal_id=excluded.goal_id,
775                objective=excluded.objective,
776                status=excluded.status,
777                token_budget=excluded.token_budget,
778                tokens_used=excluded.tokens_used,
779                time_used_seconds=excluded.time_used_seconds,
780                continuation_count=excluded.continuation_count,
781                created_at=excluded.created_at,
782                updated_at=excluded.updated_at
783            "#,
784            params![
785                goal.thread_id,
786                goal.goal_id,
787                goal.objective,
788                thread_goal_status_to_str(&goal.status),
789                goal.token_budget,
790                goal.tokens_used,
791                goal.time_used_seconds,
792                goal.continuation_count,
793                goal.created_at,
794                goal.updated_at,
795            ],
796        )
797        .context("failed to upsert thread goal")?;
798        Ok(())
799    }
800
801    /// Accrue additional token and wall-clock usage onto a thread's persisted goal.
802    ///
803    /// This is the durable, additive accounting path for the persistent goal loop: it
804    /// increments `tokens_used` and `time_used_seconds` in a single atomic SQL `UPDATE`
805    /// (`col = col + ?`) so concurrent accruals do not race a read-modify-write. The
806    /// goal's `updated_at` is advanced to the larger of its current value and `now`,
807    /// keeping the timestamp monotonic even if a stale `now` is supplied.
808    ///
809    /// `token_delta` and `time_delta_seconds` are added on the database side; callers
810    /// should pass non-negative deltas (negative values are accepted and will decrement,
811    /// which is intentionally left to the caller's discretion).
812    ///
813    /// Returns the updated [`ThreadGoalRecord`], or `Ok(None)` if the thread has no
814    /// persisted goal. Unlike [`upsert_thread_goal`](Self::upsert_thread_goal) this never
815    /// creates a goal row; it only accumulates onto an existing one.
816    pub fn record_thread_goal_usage(
817        &self,
818        thread_id: &str,
819        token_delta: i64,
820        time_delta_seconds: i64,
821        now: i64,
822    ) -> Result<Option<ThreadGoalRecord>> {
823        let conn = self.conn()?;
824        let changed = conn
825            .execute(
826                r#"
827                UPDATE thread_goals
828                SET tokens_used = tokens_used + ?2,
829                    time_used_seconds = time_used_seconds + ?3,
830                    updated_at = MAX(updated_at, ?4)
831                WHERE thread_id = ?1
832                "#,
833                params![thread_id, token_delta, time_delta_seconds, now],
834            )
835            .context("failed to record thread goal usage")?;
836        if changed == 0 {
837            return Ok(None);
838        }
839        self.get_thread_goal(thread_id)
840    }
841
842    /// Increment the durable cross-turn continuation counter for a thread goal.
843    ///
844    /// The older TUI continuation guard is scoped to one engine turn. This
845    /// counter is intentionally persisted so a resumed goal loop can feed
846    /// `goal_loop::decide_continuation` with the true cross-turn count.
847    pub fn record_thread_goal_continuation(
848        &self,
849        thread_id: &str,
850        now: i64,
851    ) -> Result<Option<ThreadGoalRecord>> {
852        let conn = self.conn()?;
853        let changed = conn
854            .execute(
855                r#"
856                UPDATE thread_goals
857                SET continuation_count = continuation_count + 1,
858                    updated_at = MAX(updated_at, ?2)
859                WHERE thread_id = ?1
860                "#,
861                params![thread_id, now],
862            )
863            .context("failed to record thread goal continuation")?;
864        if changed == 0 {
865            return Ok(None);
866        }
867        self.get_thread_goal(thread_id)
868    }
869
870    /// Retrieve the persisted goal for a thread.
871    pub fn get_thread_goal(&self, thread_id: &str) -> Result<Option<ThreadGoalRecord>> {
872        let conn = self.conn()?;
873        conn.query_row(
874            r#"
875            SELECT thread_id, goal_id, objective, status, token_budget, tokens_used,
876                   time_used_seconds, continuation_count, created_at, updated_at
877            FROM thread_goals
878            WHERE thread_id = ?1
879            "#,
880            params![thread_id],
881            row_to_thread_goal,
882        )
883        .optional()
884        .context("failed to read thread goal")
885    }
886
887    /// Delete the persisted goal for a thread.
888    pub fn delete_thread_goal(&self, thread_id: &str) -> Result<bool> {
889        let conn = self.conn()?;
890        let changed = conn
891            .execute(
892                "DELETE FROM thread_goals WHERE thread_id = ?1",
893                params![thread_id],
894            )
895            .context("failed to delete thread goal")?;
896        Ok(changed > 0)
897    }
898
899    /// List all leaf messages in a thread.
900    ///
901    /// A leaf message is one that has no other message referencing it as a parent.
902    /// In a branching conversation tree, there may be multiple leaf messages.
903    pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
904        let conn = self.conn()?;
905        let mut stmt = conn
906            .prepare(
907                r#"
908                SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
909                FROM messages m1
910                LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
911                WHERE m1.thread_id = ?1 AND m2.id IS NULL
912                "#,
913            )
914            .context("failed to prepare message listing query")?;
915        let mut rows = stmt
916            .query(params![thread_id])
917            .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
918        let mut out = Vec::new();
919        while let Some(row) = rows.next().context("failed to iterate message rows")? {
920            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
921            let item = item_json
922                .as_deref()
923                .map(serde_json::from_str)
924                .transpose()
925                .with_context(|| {
926                    format!("failed to parse message item json in thread {thread_id}")
927                })?;
928            out.push(MessageRecord {
929                id: row.get(0).context("failed to read message id")?,
930                thread_id: row.get(1).context("failed to read message thread id")?,
931                role: row.get(2).context("failed to read message role")?,
932                content: row.get(3).context("failed to read message content")?,
933                item,
934                created_at: row.get(5).context("failed to read message timestamp")?,
935                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
936            });
937        }
938        Ok(out)
939    }
940
941    /// Update the current leaf message pointer for a thread.
942    ///
943    /// This controls which branch of the conversation tree is considered active
944    /// when listing messages via [`list_messages`](Self::list_messages).
945    pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
946        let conn = self.conn()?;
947        conn.execute(
948            "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
949            params![current_leaf_id, thread_id],
950        )
951        .context("failed to update thread current leaf id")?;
952        Ok(())
953    }
954
955    /// Replace the dynamic tools for a thread.
956    ///
957    /// All existing dynamic tools for the thread are deleted and replaced with the
958    /// provided list. The operation is performed within a transaction.
959    pub fn persist_dynamic_tools(
960        &self,
961        thread_id: &str,
962        tools: &[DynamicToolRecord],
963    ) -> Result<()> {
964        let mut conn = self.conn()?;
965        let tx = conn
966            .transaction()
967            .context("failed to begin dynamic tools transaction")?;
968        tx.execute(
969            "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
970            params![thread_id],
971        )
972        .context("failed to clear dynamic tools")?;
973        for tool in tools {
974            tx.execute(
975                "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
976                params![
977                    thread_id,
978                    tool.position,
979                    tool.name,
980                    tool.description,
981                    tool.input_schema.to_string()
982                ],
983            )
984            .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
985        }
986        tx.commit().context("failed to commit dynamic tools")?;
987        Ok(())
988    }
989
990    /// Retrieve all dynamic tools registered for a thread, ordered by position.
991    pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
992        let conn = self.conn()?;
993        let mut stmt = conn
994            .prepare(
995                "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
996            )
997            .context("failed to prepare get dynamic tools query")?;
998        let mut rows = stmt
999            .query(params![thread_id])
1000            .context("failed to query dynamic tools")?;
1001        let mut out = Vec::new();
1002        while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
1003            let input_schema_raw: String =
1004                row.get(3).context("failed to read tool input schema")?;
1005            let input_schema: Value =
1006                serde_json::from_str(&input_schema_raw).with_context(|| {
1007                    format!("failed to parse input schema for dynamic tool in thread {thread_id}")
1008                })?;
1009            out.push(DynamicToolRecord {
1010                position: row.get(0).context("failed to read tool position")?,
1011                name: row.get(1).context("failed to read tool name")?,
1012                description: row.get(2).context("failed to read tool description")?,
1013                input_schema,
1014            });
1015        }
1016        Ok(out)
1017    }
1018
1019    /// Append a new message to a thread.
1020    ///
1021    /// The message is linked to the thread's current leaf as its parent, and the
1022    /// thread's `current_leaf_id` is updated to the new message. Returns the ID
1023    /// of the newly created message.
1024    pub fn append_message(
1025        &self,
1026        thread_id: &str,
1027        role: &str,
1028        content: &str,
1029        item: Option<Value>,
1030    ) -> Result<i64> {
1031        let mut conn = self.conn()?;
1032        let created_at = Utc::now().timestamp();
1033        let item_json = item
1034            .as_ref()
1035            .map(serde_json::to_string)
1036            .transpose()
1037            .context("failed to serialize message item payload")?;
1038
1039        let tx = conn
1040            .transaction()
1041            .context("failed to begin append message transaction")?;
1042
1043        let current_leaf_id: Option<i64> = tx
1044            .query_row(
1045                "SELECT current_leaf_id FROM threads WHERE id = ?1",
1046                params![thread_id],
1047                |row| row.get(0),
1048            )
1049            .with_context(|| {
1050                format!("failed to query thread current leaf id for thread {thread_id}")
1051            })?;
1052
1053        let next_leaf_id: i64 = tx.query_row(
1054            r#"
1055                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1056                SELECT ?1, ?2, ?3, ?4, ?5, ?6
1057                RETURNING id
1058            "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
1059        ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
1060
1061        tx.execute(
1062            r#"
1063            UPDATE threads
1064            SET current_leaf_id = ?1
1065            WHERE id = ?2;
1066            "#,
1067            params![next_leaf_id, thread_id],
1068        )
1069        .with_context(|| {
1070            format!("failed to update thread current leaf id for thread {thread_id}")
1071        })?;
1072
1073        tx.commit()
1074            .context("failed to commit append message transaction")?;
1075
1076        Ok(next_leaf_id)
1077    }
1078
1079    /// List messages in the current conversation branch, walking backwards from
1080    /// the thread's `current_leaf_id`.
1081    ///
1082    /// Messages are returned in chronological order (oldest first). The `limit`
1083    /// parameter caps how many ancestor messages are traversed; it defaults to 500.
1084    pub fn list_messages(
1085        &self,
1086        thread_id: &str,
1087        limit: Option<usize>,
1088    ) -> Result<Vec<MessageRecord>> {
1089        let conn = self.conn()?;
1090        let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
1091        let mut stmt = conn
1092            .prepare(
1093                r#"
1094                WITH RECURSIVE
1095                    leaf_id AS (
1096                        SELECT current_leaf_id FROM threads WHERE id = ?1
1097                    ),
1098                    ancestors AS (
1099                        SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
1100                        FROM messages
1101                        WHERE id = (SELECT current_leaf_id FROM leaf_id)
1102
1103                        UNION ALL
1104
1105                        SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
1106                        FROM messages m
1107                        JOIN ancestors a ON m.id = a.parent_entry_id
1108                        WHERE a.depth < ?2
1109                    )
1110                    SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
1111                    ORDER BY depth DESC
1112                "#
1113            )
1114            .context("failed to prepare message listing query")?;
1115        let mut rows = stmt
1116            .query(params![thread_id, limit - 1])
1117            .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
1118        let mut out = Vec::new();
1119        while let Some(row) = rows.next().context("failed to iterate message rows")? {
1120            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
1121            let item = item_json
1122                .as_deref()
1123                .map(serde_json::from_str)
1124                .transpose()
1125                .with_context(|| {
1126                    format!("failed to parse message item json in thread {thread_id}")
1127                })?;
1128            out.push(MessageRecord {
1129                id: row.get(0).context("failed to read message id")?,
1130                thread_id: row.get(1).context("failed to read message thread id")?,
1131                role: row.get(2).context("failed to read message role")?,
1132                content: row.get(3).context("failed to read message content")?,
1133                item,
1134                created_at: row.get(5).context("failed to read message timestamp")?,
1135                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
1136            });
1137        }
1138        Ok(out)
1139    }
1140
1141    /// Fork the conversation at a specific message.
1142    ///
1143    /// Creates a new message whose parent is `message_id` and updates the thread's
1144    /// `current_leaf_id` to the new message. Returns the ID of the new message.
1145    /// This enables branching conversations from any point in the history.
1146    pub fn fork_at_message(
1147        &self,
1148        message_id: &str,
1149        role: &str,
1150        content: &str,
1151        item: Option<Value>,
1152    ) -> Result<i64> {
1153        let mut conn = self.conn()?;
1154        let created_at = Utc::now().timestamp();
1155        let item_json = item
1156            .as_ref()
1157            .map(serde_json::to_string)
1158            .transpose()
1159            .context("failed to serialize message item payload")?;
1160
1161        let tx = conn
1162            .transaction()
1163            .context("failed to begin fork message transaction")?;
1164
1165        let thread_id: String = tx
1166            .query_row(
1167                "SELECT thread_id FROM messages WHERE id = ?1",
1168                params![message_id],
1169                |row| row.get(0),
1170            )
1171            .with_context(|| format!("failed to query thread id for message {message_id}"))?;
1172
1173        let next_leaf_id: i64 = tx.query_row(
1174            r#"
1175                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1176                SELECT ?1, ?2, ?3, ?4, ?5, ?6
1177                RETURNING id
1178            "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
1179        ).with_context(|| format!("failed to fork at message for thread {thread_id:?}"))?;
1180
1181        tx.execute(
1182            r#"
1183            UPDATE threads
1184            SET current_leaf_id = ?1
1185            WHERE id = ?2;
1186            "#,
1187            params![next_leaf_id, thread_id],
1188        )
1189        .with_context(|| {
1190            format!("failed to update thread current leaf id for thread {thread_id:?}")
1191        })?;
1192
1193        tx.commit()
1194            .context("failed to commit fork message transaction")?;
1195
1196        Ok(next_leaf_id)
1197    }
1198
1199    /// Delete all messages belonging to a thread and reset its `current_leaf_id`.
1200    ///
1201    /// Returns the number of messages deleted.
1202    pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
1203        let mut conn = self.conn()?;
1204        let tx = conn
1205            .transaction()
1206            .context("failed to begin clear messages transaction")?;
1207
1208        tx.execute(
1209            r#"
1210            UPDATE threads
1211            SET current_leaf_id = NULL
1212            WHERE id = ?1;
1213            "#,
1214            params![thread_id],
1215        )
1216        .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1217        let result = tx
1218            .execute(
1219                r#"
1220                DELETE FROM messages WHERE thread_id = ?1
1221                "#,
1222                params![thread_id],
1223            )
1224            .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1225        tx.commit()
1226            .context("failed to commit clear messages transaction")?;
1227
1228        Ok(result)
1229    }
1230
1231    /// Save (or update) a named checkpoint for a thread.
1232    ///
1233    /// If a checkpoint with the same `thread_id` and `checkpoint_id` already exists,
1234    /// its state and timestamp are overwritten.
1235    pub fn save_checkpoint(
1236        &self,
1237        thread_id: &str,
1238        checkpoint_id: &str,
1239        state: &Value,
1240    ) -> Result<()> {
1241        let conn = self.conn()?;
1242        let state_json =
1243            serde_json::to_string(state).context("failed to encode checkpoint state")?;
1244        conn.execute(
1245            r#"
1246            INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1247            VALUES (?1, ?2, ?3, ?4)
1248            ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1249                state_json = excluded.state_json,
1250                created_at = excluded.created_at
1251            "#,
1252            params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1253        )
1254        .with_context(|| {
1255            format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1256        })?;
1257        Ok(())
1258    }
1259
1260    /// Load a checkpoint for a thread.
1261    ///
1262    /// If `checkpoint_id` is provided, loads that specific checkpoint. Otherwise,
1263    /// loads the most recently created checkpoint for the thread. Returns `None`
1264    /// if no matching checkpoint exists.
1265    pub fn load_checkpoint(
1266        &self,
1267        thread_id: &str,
1268        checkpoint_id: Option<&str>,
1269    ) -> Result<Option<CheckpointRecord>> {
1270        let conn = self.conn()?;
1271        if let Some(checkpoint_id) = checkpoint_id {
1272            let row = conn
1273                .query_row(
1274                    "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1275                    params![thread_id, checkpoint_id],
1276                    |row| {
1277                        let state_json: String = row.get(2)?;
1278                        let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1279                        Ok(CheckpointRecord {
1280                            thread_id: row.get(0)?,
1281                            checkpoint_id: row.get(1)?,
1282                            state,
1283                            created_at: row.get(3)?,
1284                        })
1285                    },
1286                )
1287                .optional()
1288                .with_context(|| {
1289                    format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1290                })?;
1291            return Ok(row);
1292        }
1293
1294        conn.query_row(
1295            "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1296            params![thread_id],
1297            |row| {
1298                let state_json: String = row.get(2)?;
1299                let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1300                Ok(CheckpointRecord {
1301                    thread_id: row.get(0)?,
1302                    checkpoint_id: row.get(1)?,
1303                    state,
1304                    created_at: row.get(3)?,
1305                })
1306            },
1307        )
1308        .optional()
1309        .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1310    }
1311
1312    /// List checkpoints for a thread, ordered by creation time (newest first).
1313    ///
1314    /// The `limit` parameter caps the number of results and defaults to 100.
1315    pub fn list_checkpoints(
1316        &self,
1317        thread_id: &str,
1318        limit: Option<usize>,
1319    ) -> Result<Vec<CheckpointRecord>> {
1320        let conn = self.conn()?;
1321        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1322        let mut stmt = conn
1323            .prepare(
1324                "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1325            )
1326            .context("failed to prepare checkpoint list query")?;
1327        let mut rows = stmt
1328            .query(params![thread_id, limit])
1329            .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1330
1331        let mut out = Vec::new();
1332        while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1333            let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1334            let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1335            out.push(CheckpointRecord {
1336                thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1337                checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1338                state,
1339                created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1340            });
1341        }
1342        Ok(out)
1343    }
1344
1345    /// Delete a specific checkpoint from a thread.
1346    pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1347        let conn = self.conn()?;
1348        conn.execute(
1349            "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1350            params![thread_id, checkpoint_id],
1351        )
1352        .with_context(|| {
1353            format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1354        })?;
1355        Ok(())
1356    }
1357
1358    /// Insert or update a background job record.
1359    pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1360        let conn = self.conn()?;
1361        conn.execute(
1362            r#"
1363            INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1364            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1365            ON CONFLICT(id) DO UPDATE SET
1366                name = excluded.name,
1367                status = excluded.status,
1368                progress = excluded.progress,
1369                detail = excluded.detail,
1370                created_at = excluded.created_at,
1371                updated_at = excluded.updated_at
1372            "#,
1373            params![
1374                job.id,
1375                job.name,
1376                job_state_status_to_str(&job.status),
1377                job.progress.map(i64::from),
1378                job.detail,
1379                job.created_at,
1380                job.updated_at
1381            ],
1382        )
1383        .with_context(|| format!("failed to upsert job {}", job.id))?;
1384        Ok(())
1385    }
1386
1387    /// Retrieve a single job by its ID.
1388    ///
1389    /// Returns `None` if no job with the given ID exists.
1390    pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1391        let conn = self.conn()?;
1392        conn.query_row(
1393            "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1394            params![id],
1395            |row| {
1396                let status_raw: String = row.get(2)?;
1397                let progress: Option<i64> = row.get(3)?;
1398                Ok(JobStateRecord {
1399                    id: row.get(0)?,
1400                    name: row.get(1)?,
1401                    status: job_state_status_from_str(&status_raw),
1402                    progress: progress.and_then(|v| u8::try_from(v).ok()),
1403                    detail: row.get(4)?,
1404                    created_at: row.get(5)?,
1405                    updated_at: row.get(6)?,
1406                })
1407            },
1408        )
1409        .optional()
1410        .with_context(|| format!("failed to read job {id}"))
1411    }
1412
1413    /// List jobs ordered by most recently updated.
1414    ///
1415    /// The `limit` parameter caps the number of results and defaults to 100.
1416    pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1417        let conn = self.conn()?;
1418        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1419        let mut stmt = conn
1420            .prepare(
1421                "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1422            )
1423            .context("failed to prepare job list query")?;
1424        let mut rows = stmt
1425            .query(params![limit])
1426            .context("failed to query persisted jobs")?;
1427        let mut out = Vec::new();
1428        while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1429            let status_raw: String = row.get(2).context("failed to read job status")?;
1430            let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1431            out.push(JobStateRecord {
1432                id: row.get(0).context("failed to read job id")?,
1433                name: row.get(1).context("failed to read job name")?,
1434                status: job_state_status_from_str(&status_raw),
1435                progress: progress.and_then(|v| u8::try_from(v).ok()),
1436                detail: row.get(4).context("failed to read job detail")?,
1437                created_at: row.get(5).context("failed to read job created_at")?,
1438                updated_at: row.get(6).context("failed to read job updated_at")?,
1439            });
1440        }
1441        Ok(out)
1442    }
1443
1444    /// Permanently delete a job record.
1445    pub fn delete_job(&self, id: &str) -> Result<()> {
1446        let conn = self.conn()?;
1447        conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1448            .with_context(|| format!("failed to delete job {id}"))?;
1449        Ok(())
1450    }
1451
1452    /// Look up the rollout file path for a thread by its ID.
1453    pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1454        let conn = self.conn()?;
1455        conn.query_row(
1456            "SELECT rollout_path FROM threads WHERE id = ?1",
1457            params![id],
1458            |row| row.get::<_, Option<String>>(0),
1459        )
1460        .optional()
1461        .context("failed to lookup rollout path")
1462        .map(|opt| opt.flatten().map(PathBuf::from))
1463    }
1464
1465    /// Append an entry to the JSONL session index file.
1466    ///
1467    /// The session index is an append-only log that maps thread IDs to their names,
1468    /// update timestamps, and rollout paths. It is used for fast name-based lookups
1469    /// without opening the SQLite database.
1470    pub fn append_thread_name(
1471        &self,
1472        thread_id: &str,
1473        thread_name: Option<String>,
1474        updated_at: i64,
1475        rollout_path: Option<PathBuf>,
1476    ) -> Result<()> {
1477        if let Some(parent) = self.session_index_path.parent() {
1478            fs::create_dir_all(parent).with_context(|| {
1479                format!(
1480                    "failed to create session index directory {}",
1481                    parent.display()
1482                )
1483            })?;
1484        }
1485        let entry = SessionIndexEntry {
1486            thread_id: thread_id.to_string(),
1487            thread_name,
1488            updated_at,
1489            rollout_path,
1490        };
1491        let encoded =
1492            serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1493        let mut file = OpenOptions::new()
1494            .create(true)
1495            .append(true)
1496            .open(&self.session_index_path)
1497            .with_context(|| {
1498                format!(
1499                    "failed to open session index {}",
1500                    self.session_index_path.display()
1501                )
1502            })?;
1503        writeln!(file, "{encoded}").context("failed to append session index entry")?;
1504        Ok(())
1505    }
1506
1507    /// Find the display name for a thread by its ID, using the session index.
1508    ///
1509    /// Returns `None` if the thread is not in the index or has no name.
1510    pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1511        let map = self.session_index_map()?;
1512        Ok(map
1513            .get(thread_id)
1514            .and_then(|entry| entry.thread_name.clone()))
1515    }
1516
1517    /// Look up display names for multiple thread IDs at once.
1518    ///
1519    /// Returns a map from thread ID to its name (which may be `None`).
1520    pub fn find_thread_names_by_ids(
1521        &self,
1522        ids: &[String],
1523    ) -> Result<HashMap<String, Option<String>>> {
1524        let map = self.session_index_map()?;
1525        let mut out = HashMap::new();
1526        for id in ids {
1527            let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1528            out.insert(id.clone(), name);
1529        }
1530        Ok(out)
1531    }
1532
1533    /// Find the rollout path for a thread by its display name (case-insensitive).
1534    ///
1535    /// If multiple threads share the same name, the most recently updated one is returned.
1536    /// Returns `None` if no matching thread is found.
1537    pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1538        let map = self.session_index_map()?;
1539        let matched = map
1540            .values()
1541            .filter(|entry| {
1542                entry
1543                    .thread_name
1544                    .as_deref()
1545                    .is_some_and(|n| n.eq_ignore_ascii_case(name))
1546            })
1547            .max_by_key(|entry| entry.updated_at);
1548        Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1549    }
1550
1551    fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1552        if !self.session_index_path.exists() {
1553            return Ok(HashMap::new());
1554        }
1555        let file = OpenOptions::new()
1556            .read(true)
1557            .open(&self.session_index_path)
1558            .with_context(|| {
1559                format!(
1560                    "failed to read session index {}",
1561                    self.session_index_path.display()
1562                )
1563            })?;
1564        let reader = BufReader::new(file);
1565        let mut latest = HashMap::<String, SessionIndexEntry>::new();
1566        for line in reader.lines() {
1567            let line = line.context("failed to read session index line")?;
1568            if line.trim().is_empty() {
1569                continue;
1570            }
1571            let parsed: SessionIndexEntry =
1572                serde_json::from_str(&line).context("failed to parse session index entry")?;
1573            latest.insert(parsed.thread_id.clone(), parsed);
1574        }
1575        Ok(latest)
1576    }
1577}
1578
1579fn default_state_db_path() -> PathBuf {
1580    // $CODEWHALE_HOME is a hard override of the base data directory
1581    // (docs/CONFIGURATION.md): when set, the state DB lives under it and we do
1582    // NOT fall back to the legacy ~/.deepseek path — silent fallback would
1583    // defeat the isolation the override promises (CI, containers, multi-project,
1584    // test harnesses). Legacy ~/.deepseek migration only applies to the default
1585    // home location.
1586    if let Some(overridden) = codewhale_home_override() {
1587        return overridden.join("state.db");
1588    }
1589    let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1590    // Prefer the CodeWhale directory, falling back to legacy DeepSeek path
1591    // so existing installs don't lose their session history.
1592    let primary = home.join(".codewhale").join("state.db");
1593    if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1594        primary
1595    } else {
1596        home.join(".deepseek").join("state.db")
1597    }
1598}
1599
1600/// Resolve `$CODEWHALE_HOME` as a hard override of the data directory root.
1601///
1602/// Returns the path verbatim (the env var IS the home dir, matching
1603/// `codewhale_home()` in config — `$CODEWHALE_HOME=/data/cw` means the home is
1604/// `/data/cw`, not `/data/cw/.codewhale`). Returns `None` when unset/empty so
1605/// callers can branch on "explicit override" vs "default home + legacy
1606/// fallback." Mirrors config's helper without taking a dependency on it (state
1607/// is a low-level leaf crate; config cannot be a dependency here without
1608/// inverting the layering).
1609fn codewhale_home_override() -> Option<PathBuf> {
1610    std::env::var_os("CODEWHALE_HOME")
1611        .filter(|value| !value.is_empty())
1612        .map(PathBuf::from)
1613}
1614
1615fn bool_to_i64(value: bool) -> i64 {
1616    if value { 1 } else { 0 }
1617}
1618
1619fn i64_to_bool(value: i64) -> bool {
1620    value != 0
1621}
1622
1623fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1624    match status {
1625        ThreadStatus::Running => "running",
1626        ThreadStatus::Idle => "idle",
1627        ThreadStatus::Completed => "completed",
1628        ThreadStatus::Failed => "failed",
1629        ThreadStatus::Paused => "paused",
1630        ThreadStatus::Archived => "archived",
1631    }
1632}
1633
1634fn thread_status_from_str(value: &str) -> ThreadStatus {
1635    match value {
1636        "running" => ThreadStatus::Running,
1637        "idle" => ThreadStatus::Idle,
1638        "completed" => ThreadStatus::Completed,
1639        "failed" => ThreadStatus::Failed,
1640        "paused" => ThreadStatus::Paused,
1641        "archived" => ThreadStatus::Archived,
1642        _ => ThreadStatus::Idle,
1643    }
1644}
1645
1646fn session_source_to_str(source: &SessionSource) -> &'static str {
1647    match source {
1648        SessionSource::Interactive => "interactive",
1649        SessionSource::Resume => "resume",
1650        SessionSource::Fork => "fork",
1651        SessionSource::Api => "api",
1652        SessionSource::Unknown => "unknown",
1653    }
1654}
1655
1656fn session_source_from_str(value: &str) -> SessionSource {
1657    match value {
1658        "interactive" => SessionSource::Interactive,
1659        "resume" => SessionSource::Resume,
1660        "fork" => SessionSource::Fork,
1661        "api" => SessionSource::Api,
1662        _ => SessionSource::Unknown,
1663    }
1664}
1665
1666fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1667    path.map(|p| p.display().to_string())
1668}
1669
1670fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1671    match status {
1672        JobStateStatus::Queued => "queued",
1673        JobStateStatus::Running => "running",
1674        JobStateStatus::Completed => "completed",
1675        JobStateStatus::Failed => "failed",
1676        JobStateStatus::Cancelled => "cancelled",
1677    }
1678}
1679
1680fn job_state_status_from_str(value: &str) -> JobStateStatus {
1681    match value {
1682        "queued" => JobStateStatus::Queued,
1683        "running" => JobStateStatus::Running,
1684        "completed" => JobStateStatus::Completed,
1685        "failed" => JobStateStatus::Failed,
1686        "cancelled" => JobStateStatus::Cancelled,
1687        _ => JobStateStatus::Queued,
1688    }
1689}
1690
1691fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str {
1692    match status {
1693        ThreadGoalStatus::Active => "active",
1694        ThreadGoalStatus::Paused => "paused",
1695        ThreadGoalStatus::Blocked => "blocked",
1696        ThreadGoalStatus::UsageLimited => "usage_limited",
1697        ThreadGoalStatus::BudgetLimited => "budget_limited",
1698        ThreadGoalStatus::Complete => "complete",
1699    }
1700}
1701
1702fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus {
1703    match value {
1704        "active" => ThreadGoalStatus::Active,
1705        "paused" => ThreadGoalStatus::Paused,
1706        "blocked" => ThreadGoalStatus::Blocked,
1707        "usage_limited" => ThreadGoalStatus::UsageLimited,
1708        "budget_limited" => ThreadGoalStatus::BudgetLimited,
1709        "complete" => ThreadGoalStatus::Complete,
1710        _ => ThreadGoalStatus::Active,
1711    }
1712}
1713
1714fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1715    let status_raw: String = row.get(7)?;
1716    let source_raw: String = row.get(11)?;
1717    let rollout_path: Option<String> = row.get(1)?;
1718    let path: Option<String> = row.get(8)?;
1719    Ok(ThreadMetadata {
1720        id: row.get(0)?,
1721        rollout_path: rollout_path.map(PathBuf::from),
1722        preview: row.get(2)?,
1723        ephemeral: i64_to_bool(row.get(3)?),
1724        model_provider: row.get(4)?,
1725        created_at: row.get(5)?,
1726        updated_at: row.get(6)?,
1727        status: thread_status_from_str(&status_raw),
1728        path: path.map(PathBuf::from),
1729        cwd: PathBuf::from(row.get::<_, String>(9)?),
1730        cli_version: row.get(10)?,
1731        source: session_source_from_str(&source_raw),
1732        name: row.get(12)?,
1733        sandbox_policy: row.get(13)?,
1734        approval_mode: row.get(14)?,
1735        archived: i64_to_bool(row.get(15)?),
1736        archived_at: row.get(16)?,
1737        git_sha: row.get(17)?,
1738        git_branch: row.get(18)?,
1739        git_origin_url: row.get(19)?,
1740        memory_mode: row.get(20)?,
1741        current_leaf_id: row.get(21)?,
1742    })
1743}
1744
1745fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadGoalRecord> {
1746    let status_raw: String = row.get(3)?;
1747    Ok(ThreadGoalRecord {
1748        thread_id: row.get(0)?,
1749        goal_id: row.get(1)?,
1750        objective: row.get(2)?,
1751        status: thread_goal_status_from_str(&status_raw),
1752        token_budget: row.get(4)?,
1753        tokens_used: row.get(5)?,
1754        time_used_seconds: row.get(6)?,
1755        continuation_count: row.get(7)?,
1756        created_at: row.get(8)?,
1757        updated_at: row.get(9)?,
1758    })
1759}
1760
1761#[cfg(test)]
1762mod tests {
1763    use super::*;
1764    use std::time::{SystemTime, UNIX_EPOCH};
1765
1766    fn temp_state_store(name: &str) -> StateStore {
1767        let suffix = SystemTime::now()
1768            .duration_since(UNIX_EPOCH)
1769            .expect("system time")
1770            .as_nanos();
1771        let dir = std::env::temp_dir().join(format!(
1772            "codewhale-state-{name}-{}-{suffix}",
1773            std::process::id()
1774        ));
1775        fs::create_dir_all(&dir).expect("create temp state dir");
1776        StateStore::open(Some(dir.join("state.db"))).expect("open state store")
1777    }
1778
1779    fn test_thread(id: &str) -> ThreadMetadata {
1780        ThreadMetadata {
1781            id: id.to_string(),
1782            rollout_path: None,
1783            preview: "test thread".to_string(),
1784            ephemeral: false,
1785            model_provider: "deepseek".to_string(),
1786            created_at: 10,
1787            updated_at: 10,
1788            status: ThreadStatus::Running,
1789            path: None,
1790            cwd: PathBuf::from("/tmp/codewhale"),
1791            cli_version: "0.0.0-test".to_string(),
1792            source: SessionSource::Interactive,
1793            name: None,
1794            sandbox_policy: None,
1795            approval_mode: None,
1796            archived: false,
1797            archived_at: None,
1798            git_sha: None,
1799            git_branch: None,
1800            git_origin_url: None,
1801            memory_mode: None,
1802            current_leaf_id: None,
1803        }
1804    }
1805
1806    fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord {
1807        ThreadGoalRecord {
1808            thread_id: thread_id.to_string(),
1809            goal_id: "goal-1".to_string(),
1810            objective: objective.to_string(),
1811            status: ThreadGoalStatus::Active,
1812            token_budget: Some(123),
1813            tokens_used: 7,
1814            time_used_seconds: 11,
1815            continuation_count: 0,
1816            created_at: 100,
1817            updated_at: 101,
1818        }
1819    }
1820
1821    #[test]
1822    fn thread_goal_crud_round_trips_and_replaces() {
1823        let store = temp_state_store("thread-goal-crud");
1824        store
1825            .upsert_thread(&test_thread("thread-1"))
1826            .expect("upsert thread");
1827
1828        let goal = test_goal("thread-1", "Ship v0.8.59");
1829        store.upsert_thread_goal(&goal).expect("upsert goal");
1830        assert_eq!(
1831            store
1832                .get_thread_goal("thread-1")
1833                .expect("read goal")
1834                .as_ref(),
1835            Some(&goal)
1836        );
1837
1838        let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely");
1839        replacement.goal_id = "goal-2".to_string();
1840        replacement.status = ThreadGoalStatus::BudgetLimited;
1841        replacement.token_budget = None;
1842        replacement.updated_at = 202;
1843        store
1844            .upsert_thread_goal(&replacement)
1845            .expect("replace goal");
1846        assert_eq!(
1847            store.get_thread_goal("thread-1").expect("read replacement"),
1848            Some(replacement)
1849        );
1850
1851        assert!(store.delete_thread_goal("thread-1").expect("delete goal"));
1852        assert!(
1853            store
1854                .get_thread_goal("thread-1")
1855                .expect("read empty")
1856                .is_none()
1857        );
1858        assert!(!store.delete_thread_goal("thread-1").expect("delete empty"));
1859    }
1860
1861    #[test]
1862    fn thread_goal_requires_existing_thread() {
1863        let store = temp_state_store("thread-goal-missing-thread");
1864        let err = store
1865            .upsert_thread_goal(&test_goal("missing-thread", "nope"))
1866            .expect_err("goal without a thread should fail");
1867        assert!(err.to_string().contains("thread missing-thread not found"));
1868    }
1869
1870    #[test]
1871    fn record_thread_goal_usage_accumulates_tokens_and_time() {
1872        let store = temp_state_store("thread-goal-usage");
1873        store
1874            .upsert_thread(&test_thread("thread-1"))
1875            .expect("upsert thread");
1876
1877        // Mirror the runtime, which creates goals with zeroed accounting.
1878        let mut goal = test_goal("thread-1", "Ship the persistent goal loop");
1879        goal.tokens_used = 0;
1880        goal.time_used_seconds = 0;
1881        goal.updated_at = 100;
1882        store.upsert_thread_goal(&goal).expect("upsert goal");
1883
1884        // First accrual lands the deltas and advances updated_at.
1885        let after_first = store
1886            .record_thread_goal_usage("thread-1", 250, 12, 150)
1887            .expect("record usage")
1888            .expect("goal exists");
1889        assert_eq!(after_first.tokens_used, 250);
1890        assert_eq!(after_first.time_used_seconds, 12);
1891        assert_eq!(after_first.updated_at, 150);
1892        // Identity fields are preserved across accrual.
1893        assert_eq!(after_first.goal_id, goal.goal_id);
1894        assert_eq!(after_first.objective, goal.objective);
1895        assert_eq!(after_first.status, goal.status);
1896        assert_eq!(after_first.token_budget, goal.token_budget);
1897        assert_eq!(after_first.created_at, goal.created_at);
1898        assert_eq!(after_first.continuation_count, 0);
1899
1900        // Second accrual adds on top of the first (additive, not replacing).
1901        let after_second = store
1902            .record_thread_goal_usage("thread-1", 75, 8, 200)
1903            .expect("record usage")
1904            .expect("goal exists");
1905        assert_eq!(after_second.tokens_used, 325);
1906        assert_eq!(after_second.time_used_seconds, 20);
1907        assert_eq!(after_second.updated_at, 200);
1908
1909        // A stale `now` must not move updated_at backwards.
1910        let after_stale = store
1911            .record_thread_goal_usage("thread-1", 5, 1, 1)
1912            .expect("record usage")
1913            .expect("goal exists");
1914        assert_eq!(after_stale.tokens_used, 330);
1915        assert_eq!(after_stale.time_used_seconds, 21);
1916        assert_eq!(after_stale.updated_at, 200);
1917
1918        // Read back through the normal getter to confirm durability.
1919        let persisted = store
1920            .get_thread_goal("thread-1")
1921            .expect("read goal")
1922            .expect("goal exists");
1923        assert_eq!(persisted.tokens_used, 330);
1924        assert_eq!(persisted.time_used_seconds, 21);
1925    }
1926
1927    #[test]
1928    fn record_thread_goal_usage_returns_none_without_goal() {
1929        let store = temp_state_store("thread-goal-usage-missing");
1930        store
1931            .upsert_thread(&test_thread("thread-1"))
1932            .expect("upsert thread");
1933        // Thread exists but has no goal row yet: accrual is a no-op, not an error,
1934        // and must not create a goal.
1935        let result = store
1936            .record_thread_goal_usage("thread-1", 100, 5, 999)
1937            .expect("record usage on goalless thread");
1938        assert!(result.is_none());
1939        assert!(
1940            store
1941                .get_thread_goal("thread-1")
1942                .expect("read goal")
1943                .is_none()
1944        );
1945    }
1946
1947    #[test]
1948    fn record_thread_goal_continuation_accumulates_durably() {
1949        let store = temp_state_store("thread-goal-continuation");
1950        store
1951            .upsert_thread(&test_thread("thread-1"))
1952            .expect("upsert thread");
1953
1954        let mut goal = test_goal("thread-1", "Keep working across turns");
1955        goal.updated_at = 100;
1956        store.upsert_thread_goal(&goal).expect("upsert goal");
1957
1958        let after_first = store
1959            .record_thread_goal_continuation("thread-1", 120)
1960            .expect("record continuation")
1961            .expect("goal exists");
1962        assert_eq!(after_first.continuation_count, 1);
1963        assert_eq!(after_first.tokens_used, goal.tokens_used);
1964        assert_eq!(after_first.time_used_seconds, goal.time_used_seconds);
1965        assert_eq!(after_first.updated_at, 120);
1966
1967        let after_second = store
1968            .record_thread_goal_continuation("thread-1", 110)
1969            .expect("record second continuation")
1970            .expect("goal exists");
1971        assert_eq!(after_second.continuation_count, 2);
1972        assert_eq!(after_second.updated_at, 120);
1973
1974        let persisted = store
1975            .get_thread_goal("thread-1")
1976            .expect("read goal")
1977            .expect("goal exists");
1978        assert_eq!(persisted.continuation_count, 2);
1979    }
1980
1981    // ── $CODEWHALE_HOME override tests ──────────────────────────────
1982    //
1983    // These touch a process-global env var, so they serialize against each
1984    // other (and restore the prior value) to stay hermetic under parallel test
1985    // runs — the same concern AGENTS.md flags for config_command_allow_shell_*.
1986
1987    static CODEWHALE_HOME_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
1988
1989    struct CodeWhaleHomeGuard {
1990        prior: Option<std::ffi::OsString>,
1991    }
1992    impl CodeWhaleHomeGuard {
1993        fn set(value: &str) -> Self {
1994            let prior = std::env::var_os("CODEWHALE_HOME");
1995            // SAFETY: serialised by CODEWHALE_HOME_TEST_LOCK.
1996            unsafe { std::env::set_var("CODEWHALE_HOME", value) };
1997            Self { prior }
1998        }
1999        fn remove() -> Self {
2000            let prior = std::env::var_os("CODEWHALE_HOME");
2001            // SAFETY: serialised by CODEWHALE_HOME_TEST_LOCK.
2002            unsafe { std::env::remove_var("CODEWHALE_HOME") };
2003            Self { prior }
2004        }
2005    }
2006    impl Drop for CodeWhaleHomeGuard {
2007        fn drop(&mut self) {
2008            // SAFETY: serialised by CODEWHALE_HOME_TEST_LOCK.
2009            unsafe {
2010                match &self.prior {
2011                    Some(value) => std::env::set_var("CODEWHALE_HOME", value),
2012                    None => std::env::remove_var("CODEWHALE_HOME"),
2013                }
2014            }
2015        }
2016    }
2017
2018    #[test]
2019    fn codewhale_home_override_returns_the_env_value_verbatim() {
2020        let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2021        let _g = CodeWhaleHomeGuard::set("/tmp/cw-isolated-state");
2022        // The env var IS the home dir — no ".codewhale" appended. This matches
2023        // codewhale_home() in config ($CODEWHALE_HOME=/x means home is /x).
2024        assert_eq!(
2025            codewhale_home_override().as_deref(),
2026            Some(std::path::Path::new("/tmp/cw-isolated-state"))
2027        );
2028    }
2029
2030    #[test]
2031    fn codewhale_home_override_none_when_unset() {
2032        let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2033        let _g = CodeWhaleHomeGuard::remove();
2034        assert!(codewhale_home_override().is_none());
2035    }
2036
2037    #[test]
2038    fn codewhale_home_override_none_when_empty() {
2039        let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2040        let _g = CodeWhaleHomeGuard::set("   ");
2041        // The helper filters empty values (after the OsString check). Note:
2042        // var_os returns the raw "   ", and our filter only catches truly-empty,
2043        // so this documents that whitespace-only is NOT treated as unset at the
2044        // override layer (config's codewhale_home trims; we don't here — the
2045        // branch is "was it set at all").
2046        assert!(
2047            codewhale_home_override().is_some(),
2048            "non-empty (even whitespace) counts as set; trimming is the caller's job"
2049        );
2050    }
2051
2052    #[test]
2053    fn default_state_db_path_uses_codewhale_home_when_set() {
2054        let _lock = CODEWHALE_HOME_TEST_LOCK.lock().unwrap();
2055        let dir = std::env::temp_dir().join(format!(
2056            "cw-home-state-{}-{}",
2057            std::process::id(),
2058            std::time::SystemTime::now()
2059                .duration_since(std::time::UNIX_EPOCH)
2060                .unwrap()
2061                .as_nanos()
2062        ));
2063        let _g = CodeWhaleHomeGuard::set(dir.to_str().unwrap());
2064        // Hard override: the DB is <CODEWHALE_HOME>/state.db, NOT
2065        // <CODEWHALE_HOME>/.codewhale/state.db, and the legacy ~/.deepseek
2066        // fallback is bypassed entirely.
2067        assert_eq!(default_state_db_path(), dir.join("state.db"));
2068    }
2069}