Skip to main content

codewhale_state/
lib.rs

1//! Persistent state management for conversation threads, messages, and jobs.
2//!
3//! The [`StateStore`] is the primary entry point, backed by a SQLite database and an
4//! append-only JSONL session index file. It provides CRUD operations for:
5//!
6//! - **Threads** — conversation metadata, archival, and session indexing.
7//! - **Messages** — append-only message storage with tree-structured branching.
8//! - **Checkpoints** — named state snapshots for restoring conversation progress.
9//! - **Jobs** — background task tracking with status and progress.
10//! - **Dynamic tools** — per-thread tool registrations.
11
12use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23/// Lifecycle status of a conversation thread.
24///
25/// Serialized as lowercase snake_case strings (e.g. `"running"`, `"archived"`).
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29    /// Thread is actively being worked on.
30    Running,
31    /// Thread exists but has no active work in progress.
32    Idle,
33    /// Thread has finished its task successfully.
34    Completed,
35    /// Thread encountered an unrecoverable error.
36    Failed,
37    /// Thread has been temporarily paused by the user.
38    Paused,
39    /// Thread has been archived and is hidden from default listings.
40    Archived,
41}
42
43/// Indicates how a session was initiated.
44///
45/// Serialized as lowercase snake_case strings.
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49    /// Started by a user interacting with the CLI.
50    Interactive,
51    /// Resumed from a previously persisted session.
52    Resume,
53    /// Created by forking an existing conversation at a specific message.
54    Fork,
55    /// Initiated programmatically via the API.
56    Api,
57    /// Source is unknown or unspecified.
58    Unknown,
59}
60
61/// Metadata for a persisted conversation thread.
62///
63/// Each thread represents a single conversation session and stores its
64/// configuration, git context, and current status.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67    /// Unique identifier for this thread.
68    pub id: String,
69    /// Optional filesystem path to the rollout (JSONL transcript) file.
70    pub rollout_path: Option<PathBuf>,
71    /// Short preview or summary of the thread content.
72    pub preview: String,
73    /// Whether this thread is ephemeral (not persisted long-term).
74    pub ephemeral: bool,
75    /// Identifier of the model provider used for this thread (e.g. `"openai"`).
76    pub model_provider: String,
77    /// Unix timestamp (seconds) when the thread was created.
78    pub created_at: i64,
79    /// Unix timestamp (seconds) of the most recent update to the thread.
80    pub updated_at: i64,
81    /// Current lifecycle status of the thread.
82    pub status: ThreadStatus,
83    /// Optional filesystem path associated with the thread working context.
84    pub path: Option<PathBuf>,
85    /// Working directory that was active when the thread was created.
86    pub cwd: PathBuf,
87    /// Version of the CLI that created this thread.
88    pub cli_version: String,
89    /// How this session was initiated.
90    pub source: SessionSource,
91    /// User-assigned display name for the thread.
92    pub name: Option<String>,
93    /// Serialized sandbox policy applied to this thread, if any.
94    pub sandbox_policy: Option<String>,
95    /// Approval mode configured for tool calls in this thread.
96    pub approval_mode: Option<String>,
97    /// Whether the thread has been archived.
98    pub archived: bool,
99    /// Unix timestamp (seconds) when the thread was archived, or `None` if not archived.
100    pub archived_at: Option<i64>,
101    /// Git commit SHA of the working tree when the thread was created.
102    pub git_sha: Option<String>,
103    /// Git branch checked out when the thread was created.
104    pub git_branch: Option<String>,
105    /// URL of the git remote origin, if available.
106    pub git_origin_url: Option<String>,
107    /// Memory mode configured for this thread (e.g. `"local"`, `"remote"`).
108    pub memory_mode: Option<String>,
109    /// ID of the current leaf message in the conversation tree.
110    pub current_leaf_id: Option<i64>,
111}
112
113/// A dynamically registered tool associated with a thread.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116    /// Ordinal position of this tool in the thread tool list.
117    pub position: i64,
118    /// Unique name identifying the tool.
119    pub name: String,
120    /// Human-readable description of what the tool does.
121    pub description: Option<String>,
122    /// JSON Schema describing the tool input parameters.
123    pub input_schema: Value,
124}
125
126/// A single message entry in a conversation thread.
127///
128/// Messages form a tree structure via [`parent_entry_id`](Self::parent_entry_id),
129/// enabling conversation branching and forking.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132    /// Auto-incremented unique identifier for this message.
133    pub id: i64,
134    /// ID of the thread this message belongs to.
135    pub thread_id: String,
136    /// Role of the message sender (e.g. `"user"`, `"assistant"`, `"system"`).
137    pub role: String,
138    /// Text content of the message.
139    pub content: String,
140    /// Optional structured item payload (tool calls, tool results, etc.).
141    pub item: Option<Value>,
142    /// Unix timestamp (seconds) when the message was created.
143    pub created_at: i64,
144    /// ID of the parent message, forming a tree structure. `None` for root messages.
145    pub parent_entry_id: Option<i64>,
146}
147
148/// A named checkpoint capturing the state of a thread at a point in time.
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151    /// ID of the thread this checkpoint belongs to.
152    pub thread_id: String,
153    /// Unique identifier for this checkpoint within its thread.
154    pub checkpoint_id: String,
155    /// Serialized state snapshot stored as a JSON value.
156    pub state: Value,
157    /// Unix timestamp (seconds) when the checkpoint was created or last updated.
158    pub created_at: i64,
159}
160
161/// Status of a background job.
162///
163/// Serialized as lowercase snake_case strings.
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167    /// Job is waiting to be executed.
168    Queued,
169    /// Job is currently executing.
170    Running,
171    /// Job has finished successfully.
172    Completed,
173    /// Job has failed with an error.
174    Failed,
175    /// Job was cancelled before completion.
176    Cancelled,
177}
178
179/// Persisted state of a background job.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182    /// Unique identifier for the job.
183    pub id: String,
184    /// Human-readable name describing the job.
185    pub name: String,
186    /// Current lifecycle status of the job.
187    pub status: JobStateStatus,
188    /// Completion progress as a percentage (0--100), if available.
189    pub progress: Option<u8>,
190    /// Optional detail message providing additional status information.
191    pub detail: Option<String>,
192    /// Unix timestamp (seconds) when the job was created.
193    pub created_at: i64,
194    /// Unix timestamp (seconds) of the most recent status update.
195    pub updated_at: i64,
196}
197
198/// Persisted lifecycle status for a thread goal.
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
200#[serde(rename_all = "snake_case")]
201pub enum ThreadGoalStatus {
202    /// Goal is active and should continue receiving work.
203    Active,
204    /// Goal is paused by the user.
205    Paused,
206    /// Goal is blocked and cannot make meaningful progress.
207    Blocked,
208    /// Goal stopped because account/service usage limits were reached.
209    UsageLimited,
210    /// Goal stopped because its explicit token budget was reached.
211    BudgetLimited,
212    /// Goal has been completed.
213    Complete,
214}
215
216/// Persisted goal state attached to a thread.
217#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
218pub struct ThreadGoalRecord {
219    /// Thread this goal belongs to.
220    pub thread_id: String,
221    /// Stable identifier for this goal revision.
222    pub goal_id: String,
223    /// User-visible objective.
224    pub objective: String,
225    /// Current lifecycle status.
226    pub status: ThreadGoalStatus,
227    /// Optional token budget requested by the user.
228    pub token_budget: Option<i64>,
229    /// Tokens consumed while pursuing the goal.
230    pub tokens_used: i64,
231    /// Elapsed wall-clock work time in seconds.
232    pub time_used_seconds: i64,
233    /// Unix timestamp (seconds) when the goal was created.
234    pub created_at: i64,
235    /// Unix timestamp (seconds) when the goal was last updated.
236    pub updated_at: i64,
237}
238
239/// Filters for listing conversation threads.
240#[derive(Debug, Clone)]
241pub struct ThreadListFilters {
242    /// Whether to include archived threads in the results.
243    pub include_archived: bool,
244    /// Maximum number of threads to return. Defaults to 50.
245    pub limit: Option<usize>,
246}
247
248impl Default for ThreadListFilters {
249    fn default() -> Self {
250        Self {
251            include_archived: false,
252            limit: Some(50),
253        }
254    }
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258struct SessionIndexEntry {
259    thread_id: String,
260    thread_name: Option<String>,
261    updated_at: i64,
262    rollout_path: Option<PathBuf>,
263}
264
265/// Persistent storage for conversation threads, messages, checkpoints, and jobs.
266///
267/// Backed by a SQLite database and an append-only JSONL session index file.
268/// The database schema is automatically initialized and migrated on [`open`](Self::open).
269#[derive(Debug, Clone)]
270pub struct StateStore {
271    db_path: PathBuf,
272    session_index_path: PathBuf,
273}
274
275impl StateStore {
276    /// Open (or create) a state store at the given database path.
277    ///
278    /// If `path` is `None`, the default location (`~/.codewhale/state.db`, with
279    /// `~/.deepseek/state.db` as a legacy fallback) is used.
280    /// The database schema is created automatically if it does not exist.
281    pub fn open(path: Option<PathBuf>) -> Result<Self> {
282        let db_path = path.unwrap_or_else(default_state_db_path);
283        let session_index_path = db_path
284            .parent()
285            .unwrap_or_else(|| Path::new("."))
286            .join("session_index.jsonl");
287        if let Some(parent) = db_path.parent() {
288            fs::create_dir_all(parent).with_context(|| {
289                format!("failed to create state directory {}", parent.display())
290            })?;
291        }
292        let store = Self {
293            db_path,
294            session_index_path,
295        };
296        store.init_schema()?;
297        Ok(store)
298    }
299
300    /// Returns the filesystem path of the underlying SQLite database.
301    pub fn db_path(&self) -> &Path {
302        &self.db_path
303    }
304
305    fn conn(&self) -> Result<Connection> {
306        Connection::open(&self.db_path)
307            .with_context(|| format!("failed to open state db {}", self.db_path.display()))
308    }
309
310    fn init_schema(&self) -> Result<()> {
311        let conn = self.conn()?;
312        let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
313        if user_version == 0 {
314            conn.execute_batch(
315                r#"
316                BEGIN;
317                CREATE TABLE IF NOT EXISTS threads (
318                    id TEXT PRIMARY KEY,
319                    rollout_path TEXT,
320                    preview TEXT NOT NULL,
321                    ephemeral INTEGER NOT NULL,
322                    model_provider TEXT NOT NULL,
323                    created_at INTEGER NOT NULL,
324                    updated_at INTEGER NOT NULL,
325                    status TEXT NOT NULL,
326                    path TEXT,
327                    cwd TEXT NOT NULL,
328                    cli_version TEXT NOT NULL,
329                    source TEXT NOT NULL,
330                    title TEXT,
331                    sandbox_policy TEXT,
332                    approval_mode TEXT,
333                    archived INTEGER NOT NULL DEFAULT 0,
334                    archived_at INTEGER,
335                    git_sha TEXT,
336                    git_branch TEXT,
337                    git_origin_url TEXT,
338                    memory_mode TEXT
339                );
340                CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
341                CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
342                CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
343
344                CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
345                    thread_id TEXT NOT NULL,
346                    position INTEGER NOT NULL,
347                    name TEXT NOT NULL,
348                    description TEXT,
349                    input_schema TEXT NOT NULL,
350                    PRIMARY KEY (thread_id, position),
351                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
352                );
353
354                CREATE TABLE IF NOT EXISTS messages (
355                    id INTEGER PRIMARY KEY AUTOINCREMENT,
356                    thread_id TEXT NOT NULL,
357                    role TEXT NOT NULL,
358                    content TEXT NOT NULL,
359                    item_json TEXT,
360                    created_at INTEGER NOT NULL,
361                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
362                );
363                CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
364
365                CREATE TABLE IF NOT EXISTS checkpoints (
366                    thread_id TEXT NOT NULL,
367                    checkpoint_id TEXT NOT NULL,
368                    state_json TEXT NOT NULL,
369                    created_at INTEGER NOT NULL,
370                    PRIMARY KEY(thread_id, checkpoint_id),
371                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
372                );
373                CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
374
375                CREATE TABLE IF NOT EXISTS jobs (
376                    id TEXT PRIMARY KEY,
377                    name TEXT NOT NULL,
378                    status TEXT NOT NULL,
379                    progress INTEGER,
380                    detail TEXT,
381                    created_at INTEGER NOT NULL,
382                    updated_at INTEGER NOT NULL
383                );
384                CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
385
386                -- Add parent_entry_id column, and set to last message before current message
387                ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
388                UPDATE messages
389                    SET parent_entry_id = (
390                        SELECT m2.id
391                        FROM messages m2
392                        WHERE m2.thread_id = messages.thread_id
393                            AND (
394                                m2.created_at < messages.created_at
395                                OR (
396                                    m2.created_at = messages.created_at
397                                    AND m2.id < messages.id
398                                )
399                            )
400                        ORDER BY m2.created_at DESC, m2.id DESC
401                        LIMIT 1
402                    );
403                CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
404
405                -- Add current_leaf_id column, and set to last message in thread
406                ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
407                UPDATE threads
408                    SET current_leaf_id = (
409                        SELECT m.id
410                        FROM messages m
411                        WHERE m.thread_id = threads.id
412                        ORDER BY m.id DESC
413                        LIMIT 1
414                    );
415
416                PRAGMA user_version = 1;
417                COMMIT;
418                "#,
419            )
420            .context("failed to initialize thread schema")?;
421            user_version = 1;
422        }
423        if user_version < 2 {
424            conn.execute_batch(
425                r#"
426                BEGIN;
427                CREATE TABLE IF NOT EXISTS workflow_runs (
428                    id TEXT PRIMARY KEY,
429                    workflow_id TEXT NOT NULL,
430                    goal TEXT NOT NULL,
431                    status TEXT NOT NULL,
432                    input_hash TEXT,
433                    started_at INTEGER NOT NULL,
434                    completed_at INTEGER,
435                    metadata_json TEXT NOT NULL DEFAULT '{}'
436                );
437                CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
438                    ON workflow_runs(status, started_at DESC);
439                CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
440                    ON workflow_runs(workflow_id, started_at DESC);
441
442                CREATE TABLE IF NOT EXISTS branch_runs (
443                    id TEXT PRIMARY KEY,
444                    workflow_run_id TEXT NOT NULL,
445                    branch_id TEXT NOT NULL,
446                    node_id TEXT NOT NULL,
447                    status TEXT NOT NULL,
448                    started_at INTEGER NOT NULL,
449                    completed_at INTEGER,
450                    result_json TEXT NOT NULL DEFAULT '{}',
451                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
452                );
453                CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
454                    ON branch_runs(workflow_run_id);
455                CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
456                    ON branch_runs(branch_id);
457
458                CREATE TABLE IF NOT EXISTS leaf_runs (
459                    id TEXT PRIMARY KEY,
460                    workflow_run_id TEXT NOT NULL,
461                    branch_run_id TEXT,
462                    leaf_id TEXT NOT NULL,
463                    task_id TEXT NOT NULL,
464                    input_hash TEXT,
465                    status TEXT NOT NULL,
466                    output_json TEXT NOT NULL DEFAULT '{}',
467                    artifacts_json TEXT NOT NULL DEFAULT '[]',
468                    started_at INTEGER NOT NULL,
469                    completed_at INTEGER,
470                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
471                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
472                );
473                CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
474                    ON leaf_runs(workflow_run_id);
475                CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
476                    ON leaf_runs(workflow_run_id, leaf_id, input_hash);
477
478                CREATE TABLE IF NOT EXISTS control_node_runs (
479                    id TEXT PRIMARY KEY,
480                    workflow_run_id TEXT NOT NULL,
481                    node_id TEXT NOT NULL,
482                    kind TEXT NOT NULL,
483                    status TEXT NOT NULL,
484                    selected_children_json TEXT NOT NULL DEFAULT '[]',
485                    result_json TEXT NOT NULL DEFAULT '{}',
486                    started_at INTEGER NOT NULL,
487                    completed_at INTEGER,
488                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
489                );
490                CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
491                    ON control_node_runs(workflow_run_id);
492                CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
493                    ON control_node_runs(node_id);
494
495                CREATE TABLE IF NOT EXISTS teacher_candidates (
496                    id TEXT PRIMARY KEY,
497                    workflow_run_id TEXT NOT NULL,
498                    control_node_run_id TEXT NOT NULL,
499                    candidate_id TEXT NOT NULL,
500                    branch_run_id TEXT,
501                    score REAL,
502                    passed INTEGER,
503                    rationale_json TEXT NOT NULL DEFAULT '{}',
504                    created_at INTEGER NOT NULL,
505                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
506                    FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
507                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
508                );
509                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
510                    ON teacher_candidates(workflow_run_id);
511                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
512                    ON teacher_candidates(control_node_run_id);
513
514                PRAGMA user_version = 2;
515                COMMIT;
516                "#,
517            )
518            .context("failed to initialize workflow trace schema")?;
519            user_version = 2;
520        }
521        if user_version < 3 {
522            conn.execute_batch(
523                r#"
524                BEGIN;
525                CREATE TABLE IF NOT EXISTS thread_goals (
526                    thread_id TEXT PRIMARY KEY NOT NULL,
527                    goal_id TEXT NOT NULL,
528                    objective TEXT NOT NULL,
529                    status TEXT NOT NULL CHECK(status IN (
530                        'active',
531                        'paused',
532                        'blocked',
533                        'usage_limited',
534                        'budget_limited',
535                        'complete'
536                    )),
537                    token_budget INTEGER,
538                    tokens_used INTEGER NOT NULL DEFAULT 0,
539                    time_used_seconds INTEGER NOT NULL DEFAULT 0,
540                    created_at INTEGER NOT NULL,
541                    updated_at INTEGER NOT NULL,
542                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
543                );
544
545                PRAGMA user_version = 3;
546                COMMIT;
547                "#,
548            )
549            .context("failed to initialize thread goal schema")?;
550        }
551        Ok(())
552    }
553
554    /// Insert or update thread metadata.
555    ///
556    /// This does **not** update `current_leaf_id`; use [`append_message`](Self::append_message)
557    /// or [`set_current_leaf_id`](Self::set_current_leaf_id) for that.
558    pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
559        let conn = self.conn()?;
560        conn.execute(
561            r#"
562            INSERT INTO threads (
563                id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
564                cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
565                git_sha, git_branch, git_origin_url, memory_mode
566            ) VALUES (
567                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
568                ?11, ?12, ?13, ?14, ?15, ?16, ?17,
569                ?18, ?19, ?20, ?21
570            )
571            ON CONFLICT(id) DO UPDATE SET
572                rollout_path=excluded.rollout_path,
573                preview=excluded.preview,
574                ephemeral=excluded.ephemeral,
575                model_provider=excluded.model_provider,
576                created_at=excluded.created_at,
577                updated_at=excluded.updated_at,
578                status=excluded.status,
579                path=excluded.path,
580                cwd=excluded.cwd,
581                cli_version=excluded.cli_version,
582                source=excluded.source,
583                title=excluded.title,
584                sandbox_policy=excluded.sandbox_policy,
585                approval_mode=excluded.approval_mode,
586                archived=excluded.archived,
587                archived_at=excluded.archived_at,
588                git_sha=excluded.git_sha,
589                git_branch=excluded.git_branch,
590                git_origin_url=excluded.git_origin_url,
591                memory_mode=excluded.memory_mode
592            "#,
593            params![
594                thread.id,
595                path_to_opt_string(thread.rollout_path.as_deref()),
596                thread.preview,
597                bool_to_i64(thread.ephemeral),
598                thread.model_provider,
599                thread.created_at,
600                thread.updated_at,
601                thread_status_to_str(&thread.status),
602                path_to_opt_string(thread.path.as_deref()),
603                thread.cwd.display().to_string(),
604                thread.cli_version,
605                session_source_to_str(&thread.source),
606                thread.name,
607                thread.sandbox_policy,
608                thread.approval_mode,
609                bool_to_i64(thread.archived),
610                thread.archived_at,
611                thread.git_sha,
612                thread.git_branch,
613                thread.git_origin_url,
614                thread.memory_mode,
615            ],
616        )
617        .context("failed to upsert thread metadata")?;
618
619        self.append_thread_name(
620            &thread.id,
621            thread.name.clone(),
622            thread.updated_at,
623            thread.rollout_path.clone(),
624        )?;
625        Ok(())
626    }
627
628    /// Retrieve a single thread by its ID.
629    ///
630    /// Returns `None` if no thread with the given ID exists.
631    pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
632        let conn = self.conn()?;
633        conn.query_row(
634            r#"
635            SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
636                   cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
637                   git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
638            FROM threads
639            WHERE id = ?1
640            "#,
641            params![id],
642            row_to_thread,
643        )
644        .optional()
645        .context("failed to read thread")
646    }
647
648    /// List threads ordered by most recently updated.
649    ///
650    /// Use [`ThreadListFilters`] to control whether archived threads are included
651    /// and the maximum number of results returned.
652    pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
653        let conn = self.conn()?;
654        let sql = if filters.include_archived {
655            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
656        } else {
657            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
658        };
659
660        let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
661        let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
662        let mut rows = stmt
663            .query(params![limit])
664            .context("failed to query threads")?;
665        let mut out = Vec::new();
666        while let Some(row) = rows.next().context("failed to iterate thread rows")? {
667            out.push(row_to_thread(row)?);
668        }
669        Ok(out)
670    }
671
672    /// Archive a thread, setting its status to [`ThreadStatus::Archived`] and
673    /// recording the current timestamp.
674    pub fn mark_archived(&self, id: &str) -> Result<()> {
675        let conn = self.conn()?;
676        conn.execute(
677            "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
678            params![
679                id,
680                Utc::now().timestamp(),
681                thread_status_to_str(&ThreadStatus::Archived)
682            ],
683        )
684        .context("failed to archive thread")?;
685        Ok(())
686    }
687
688    /// Unarchive a thread, removing the archived flag and clearing `archived_at`.
689    pub fn mark_unarchived(&self, id: &str) -> Result<()> {
690        let conn = self.conn()?;
691        conn.execute(
692            "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
693            params![id],
694        )
695        .context("failed to unarchive thread")?;
696        Ok(())
697    }
698
699    /// Permanently delete a thread and all of its associated data
700    /// (messages, checkpoints, dynamic tools) via cascading foreign keys.
701    pub fn delete_thread(&self, id: &str) -> Result<()> {
702        let conn = self.conn()?;
703        conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
704            .context("failed to delete thread")?;
705        Ok(())
706    }
707
708    /// Set the memory mode for a thread.
709    ///
710    /// Pass `None` to clear the memory mode.
711    pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
712        let conn = self.conn()?;
713        conn.execute(
714            "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
715            params![id, mode],
716        )
717        .context("failed to update thread memory mode")?;
718        Ok(())
719    }
720
721    /// Get the memory mode configured for a thread.
722    ///
723    /// Returns `None` if the thread does not exist or has no memory mode set.
724    pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
725        let conn = self.conn()?;
726        conn.query_row(
727            "SELECT memory_mode FROM threads WHERE id = ?1",
728            params![id],
729            |row| row.get::<_, Option<String>>(0),
730        )
731        .optional()
732        .context("failed to read thread memory mode")
733        .map(Option::flatten)
734    }
735
736    /// Insert or replace the persisted goal for a thread.
737    pub fn upsert_thread_goal(&self, goal: &ThreadGoalRecord) -> Result<()> {
738        let conn = self.conn()?;
739        let exists: Option<i64> = conn
740            .query_row(
741                "SELECT 1 FROM threads WHERE id = ?1",
742                params![goal.thread_id],
743                |row| row.get(0),
744            )
745            .optional()
746            .context("failed to verify thread before saving goal")?;
747        if exists.is_none() {
748            anyhow::bail!("thread {} not found", goal.thread_id);
749        }
750
751        conn.execute(
752            r#"
753            INSERT INTO thread_goals (
754                thread_id, goal_id, objective, status, token_budget, tokens_used,
755                time_used_seconds, created_at, updated_at
756            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
757            ON CONFLICT(thread_id) DO UPDATE SET
758                goal_id=excluded.goal_id,
759                objective=excluded.objective,
760                status=excluded.status,
761                token_budget=excluded.token_budget,
762                tokens_used=excluded.tokens_used,
763                time_used_seconds=excluded.time_used_seconds,
764                created_at=excluded.created_at,
765                updated_at=excluded.updated_at
766            "#,
767            params![
768                goal.thread_id,
769                goal.goal_id,
770                goal.objective,
771                thread_goal_status_to_str(&goal.status),
772                goal.token_budget,
773                goal.tokens_used,
774                goal.time_used_seconds,
775                goal.created_at,
776                goal.updated_at,
777            ],
778        )
779        .context("failed to upsert thread goal")?;
780        Ok(())
781    }
782
783    /// Retrieve the persisted goal for a thread.
784    pub fn get_thread_goal(&self, thread_id: &str) -> Result<Option<ThreadGoalRecord>> {
785        let conn = self.conn()?;
786        conn.query_row(
787            r#"
788            SELECT thread_id, goal_id, objective, status, token_budget, tokens_used,
789                   time_used_seconds, created_at, updated_at
790            FROM thread_goals
791            WHERE thread_id = ?1
792            "#,
793            params![thread_id],
794            row_to_thread_goal,
795        )
796        .optional()
797        .context("failed to read thread goal")
798    }
799
800    /// Delete the persisted goal for a thread.
801    pub fn delete_thread_goal(&self, thread_id: &str) -> Result<bool> {
802        let conn = self.conn()?;
803        let changed = conn
804            .execute(
805                "DELETE FROM thread_goals WHERE thread_id = ?1",
806                params![thread_id],
807            )
808            .context("failed to delete thread goal")?;
809        Ok(changed > 0)
810    }
811
812    /// List all leaf messages in a thread.
813    ///
814    /// A leaf message is one that has no other message referencing it as a parent.
815    /// In a branching conversation tree, there may be multiple leaf messages.
816    pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
817        let conn = self.conn()?;
818        let mut stmt = conn
819            .prepare(
820                r#"
821                SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
822                FROM messages m1
823                LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
824                WHERE m1.thread_id = ?1 AND m2.id IS NULL
825                "#,
826            )
827            .context("failed to prepare message listing query")?;
828        let mut rows = stmt
829            .query(params![thread_id])
830            .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
831        let mut out = Vec::new();
832        while let Some(row) = rows.next().context("failed to iterate message rows")? {
833            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
834            let item = item_json
835                .as_deref()
836                .map(serde_json::from_str)
837                .transpose()
838                .with_context(|| {
839                    format!("failed to parse message item json in thread {thread_id}")
840                })?;
841            out.push(MessageRecord {
842                id: row.get(0).context("failed to read message id")?,
843                thread_id: row.get(1).context("failed to read message thread id")?,
844                role: row.get(2).context("failed to read message role")?,
845                content: row.get(3).context("failed to read message content")?,
846                item,
847                created_at: row.get(5).context("failed to read message timestamp")?,
848                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
849            });
850        }
851        Ok(out)
852    }
853
854    /// Update the current leaf message pointer for a thread.
855    ///
856    /// This controls which branch of the conversation tree is considered active
857    /// when listing messages via [`list_messages`](Self::list_messages).
858    pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
859        let conn = self.conn()?;
860        conn.execute(
861            "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
862            params![current_leaf_id, thread_id],
863        )
864        .context("failed to update thread current leaf id")?;
865        Ok(())
866    }
867
868    /// Replace the dynamic tools for a thread.
869    ///
870    /// All existing dynamic tools for the thread are deleted and replaced with the
871    /// provided list. The operation is performed within a transaction.
872    pub fn persist_dynamic_tools(
873        &self,
874        thread_id: &str,
875        tools: &[DynamicToolRecord],
876    ) -> Result<()> {
877        let mut conn = self.conn()?;
878        let tx = conn
879            .transaction()
880            .context("failed to begin dynamic tools transaction")?;
881        tx.execute(
882            "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
883            params![thread_id],
884        )
885        .context("failed to clear dynamic tools")?;
886        for tool in tools {
887            tx.execute(
888                "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
889                params![
890                    thread_id,
891                    tool.position,
892                    tool.name,
893                    tool.description,
894                    tool.input_schema.to_string()
895                ],
896            )
897            .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
898        }
899        tx.commit().context("failed to commit dynamic tools")?;
900        Ok(())
901    }
902
903    /// Retrieve all dynamic tools registered for a thread, ordered by position.
904    pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
905        let conn = self.conn()?;
906        let mut stmt = conn
907            .prepare(
908                "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
909            )
910            .context("failed to prepare get dynamic tools query")?;
911        let mut rows = stmt
912            .query(params![thread_id])
913            .context("failed to query dynamic tools")?;
914        let mut out = Vec::new();
915        while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
916            let input_schema_raw: String =
917                row.get(3).context("failed to read tool input schema")?;
918            let input_schema: Value =
919                serde_json::from_str(&input_schema_raw).with_context(|| {
920                    format!("failed to parse input schema for dynamic tool in thread {thread_id}")
921                })?;
922            out.push(DynamicToolRecord {
923                position: row.get(0).context("failed to read tool position")?,
924                name: row.get(1).context("failed to read tool name")?,
925                description: row.get(2).context("failed to read tool description")?,
926                input_schema,
927            });
928        }
929        Ok(out)
930    }
931
932    /// Append a new message to a thread.
933    ///
934    /// The message is linked to the thread's current leaf as its parent, and the
935    /// thread's `current_leaf_id` is updated to the new message. Returns the ID
936    /// of the newly created message.
937    pub fn append_message(
938        &self,
939        thread_id: &str,
940        role: &str,
941        content: &str,
942        item: Option<Value>,
943    ) -> Result<i64> {
944        let mut conn = self.conn()?;
945        let created_at = Utc::now().timestamp();
946        let item_json = item
947            .as_ref()
948            .map(serde_json::to_string)
949            .transpose()
950            .context("failed to serialize message item payload")?;
951
952        let tx = conn
953            .transaction()
954            .context("failed to begin append message transaction")?;
955
956        let current_leaf_id: Option<i64> = tx
957            .query_row(
958                "SELECT current_leaf_id FROM threads WHERE id = ?1",
959                params![thread_id],
960                |row| row.get(0),
961            )
962            .with_context(|| {
963                format!("failed to query thread current leaf id for thread {thread_id}")
964            })?;
965
966        let next_leaf_id: i64 = tx.query_row(
967            r#"
968                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
969                SELECT ?1, ?2, ?3, ?4, ?5, ?6
970                RETURNING id
971            "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
972        ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
973
974        tx.execute(
975            r#"
976            UPDATE threads
977            SET current_leaf_id = ?1
978            WHERE id = ?2;
979            "#,
980            params![next_leaf_id, thread_id],
981        )
982        .with_context(|| {
983            format!("failed to update thread current leaf id for thread {thread_id}")
984        })?;
985
986        tx.commit()
987            .context("failed to commit append message transaction")?;
988
989        Ok(next_leaf_id)
990    }
991
992    /// List messages in the current conversation branch, walking backwards from
993    /// the thread's `current_leaf_id`.
994    ///
995    /// Messages are returned in chronological order (oldest first). The `limit`
996    /// parameter caps how many ancestor messages are traversed; it defaults to 500.
997    pub fn list_messages(
998        &self,
999        thread_id: &str,
1000        limit: Option<usize>,
1001    ) -> Result<Vec<MessageRecord>> {
1002        let conn = self.conn()?;
1003        let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
1004        let mut stmt = conn
1005            .prepare(
1006                r#"
1007                WITH RECURSIVE
1008                    leaf_id AS (
1009                        SELECT current_leaf_id FROM threads WHERE id = ?1
1010                    ),
1011                    ancestors AS (
1012                        SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
1013                        FROM messages
1014                        WHERE id = (SELECT current_leaf_id FROM leaf_id)
1015
1016                        UNION ALL
1017
1018                        SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
1019                        FROM messages m
1020                        JOIN ancestors a ON m.id = a.parent_entry_id
1021                        WHERE a.depth < ?2
1022                    )
1023                    SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
1024                    ORDER BY depth DESC
1025                "#
1026            )
1027            .context("failed to prepare message listing query")?;
1028        let mut rows = stmt
1029            .query(params![thread_id, limit - 1])
1030            .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
1031        let mut out = Vec::new();
1032        while let Some(row) = rows.next().context("failed to iterate message rows")? {
1033            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
1034            let item = item_json
1035                .as_deref()
1036                .map(serde_json::from_str)
1037                .transpose()
1038                .with_context(|| {
1039                    format!("failed to parse message item json in thread {thread_id}")
1040                })?;
1041            out.push(MessageRecord {
1042                id: row.get(0).context("failed to read message id")?,
1043                thread_id: row.get(1).context("failed to read message thread id")?,
1044                role: row.get(2).context("failed to read message role")?,
1045                content: row.get(3).context("failed to read message content")?,
1046                item,
1047                created_at: row.get(5).context("failed to read message timestamp")?,
1048                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
1049            });
1050        }
1051        Ok(out)
1052    }
1053
1054    /// Fork the conversation at a specific message.
1055    ///
1056    /// Creates a new message whose parent is `message_id` and updates the thread's
1057    /// `current_leaf_id` to the new message. Returns the ID of the new message.
1058    /// This enables branching conversations from any point in the history.
1059    pub fn fork_at_message(
1060        &self,
1061        message_id: &str,
1062        role: &str,
1063        content: &str,
1064        item: Option<Value>,
1065    ) -> Result<i64> {
1066        let mut conn = self.conn()?;
1067        let created_at = Utc::now().timestamp();
1068        let item_json = item
1069            .as_ref()
1070            .map(serde_json::to_string)
1071            .transpose()
1072            .context("failed to serialize message item payload")?;
1073
1074        let tx = conn
1075            .transaction()
1076            .context("failed to begin fork message transaction")?;
1077
1078        let thread_id: String = tx
1079            .query_row(
1080                "SELECT thread_id FROM messages WHERE id = ?1",
1081                params![message_id],
1082                |row| row.get(0),
1083            )
1084            .with_context(|| format!("failed to query thread id for message {message_id}"))?;
1085
1086        let next_leaf_id: i64 = tx.query_row(
1087            r#"
1088                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
1089                SELECT ?1, ?2, ?3, ?4, ?5, ?6
1090                RETURNING id
1091            "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
1092        ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
1093
1094        tx.execute(
1095            r#"
1096            UPDATE threads
1097            SET current_leaf_id = ?1
1098            WHERE id = ?2;
1099            "#,
1100            params![next_leaf_id, thread_id],
1101        )
1102        .with_context(|| {
1103            format!(
1104                "failed to update thread current leaf id for thread {:?}",
1105                thread_id
1106            )
1107        })?;
1108
1109        tx.commit()
1110            .context("failed to commit fork message transaction")?;
1111
1112        Ok(next_leaf_id)
1113    }
1114
1115    /// Delete all messages belonging to a thread and reset its `current_leaf_id`.
1116    ///
1117    /// Returns the number of messages deleted.
1118    pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
1119        let mut conn = self.conn()?;
1120        let tx = conn
1121            .transaction()
1122            .context("failed to begin clear messages transaction")?;
1123
1124        tx.execute(
1125            r#"
1126            UPDATE threads
1127            SET current_leaf_id = NULL
1128            WHERE id = ?1;
1129            "#,
1130            params![thread_id],
1131        )
1132        .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1133        let result = tx
1134            .execute(
1135                r#"
1136                DELETE FROM messages WHERE thread_id = ?1
1137                "#,
1138                params![thread_id],
1139            )
1140            .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
1141        tx.commit()
1142            .context("failed to commit clear messages transaction")?;
1143
1144        Ok(result)
1145    }
1146
1147    /// Save (or update) a named checkpoint for a thread.
1148    ///
1149    /// If a checkpoint with the same `thread_id` and `checkpoint_id` already exists,
1150    /// its state and timestamp are overwritten.
1151    pub fn save_checkpoint(
1152        &self,
1153        thread_id: &str,
1154        checkpoint_id: &str,
1155        state: &Value,
1156    ) -> Result<()> {
1157        let conn = self.conn()?;
1158        let state_json =
1159            serde_json::to_string(state).context("failed to encode checkpoint state")?;
1160        conn.execute(
1161            r#"
1162            INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1163            VALUES (?1, ?2, ?3, ?4)
1164            ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1165                state_json = excluded.state_json,
1166                created_at = excluded.created_at
1167            "#,
1168            params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1169        )
1170        .with_context(|| {
1171            format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1172        })?;
1173        Ok(())
1174    }
1175
1176    /// Load a checkpoint for a thread.
1177    ///
1178    /// If `checkpoint_id` is provided, loads that specific checkpoint. Otherwise,
1179    /// loads the most recently created checkpoint for the thread. Returns `None`
1180    /// if no matching checkpoint exists.
1181    pub fn load_checkpoint(
1182        &self,
1183        thread_id: &str,
1184        checkpoint_id: Option<&str>,
1185    ) -> Result<Option<CheckpointRecord>> {
1186        let conn = self.conn()?;
1187        if let Some(checkpoint_id) = checkpoint_id {
1188            let row = conn
1189                .query_row(
1190                    "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1191                    params![thread_id, checkpoint_id],
1192                    |row| {
1193                        let state_json: String = row.get(2)?;
1194                        let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1195                        Ok(CheckpointRecord {
1196                            thread_id: row.get(0)?,
1197                            checkpoint_id: row.get(1)?,
1198                            state,
1199                            created_at: row.get(3)?,
1200                        })
1201                    },
1202                )
1203                .optional()
1204                .with_context(|| {
1205                    format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1206                })?;
1207            return Ok(row);
1208        }
1209
1210        conn.query_row(
1211            "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1212            params![thread_id],
1213            |row| {
1214                let state_json: String = row.get(2)?;
1215                let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1216                Ok(CheckpointRecord {
1217                    thread_id: row.get(0)?,
1218                    checkpoint_id: row.get(1)?,
1219                    state,
1220                    created_at: row.get(3)?,
1221                })
1222            },
1223        )
1224        .optional()
1225        .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1226    }
1227
1228    /// List checkpoints for a thread, ordered by creation time (newest first).
1229    ///
1230    /// The `limit` parameter caps the number of results and defaults to 100.
1231    pub fn list_checkpoints(
1232        &self,
1233        thread_id: &str,
1234        limit: Option<usize>,
1235    ) -> Result<Vec<CheckpointRecord>> {
1236        let conn = self.conn()?;
1237        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1238        let mut stmt = conn
1239            .prepare(
1240                "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1241            )
1242            .context("failed to prepare checkpoint list query")?;
1243        let mut rows = stmt
1244            .query(params![thread_id, limit])
1245            .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1246
1247        let mut out = Vec::new();
1248        while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1249            let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1250            let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1251            out.push(CheckpointRecord {
1252                thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1253                checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1254                state,
1255                created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1256            });
1257        }
1258        Ok(out)
1259    }
1260
1261    /// Delete a specific checkpoint from a thread.
1262    pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1263        let conn = self.conn()?;
1264        conn.execute(
1265            "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1266            params![thread_id, checkpoint_id],
1267        )
1268        .with_context(|| {
1269            format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1270        })?;
1271        Ok(())
1272    }
1273
1274    /// Insert or update a background job record.
1275    pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1276        let conn = self.conn()?;
1277        conn.execute(
1278            r#"
1279            INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1280            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1281            ON CONFLICT(id) DO UPDATE SET
1282                name = excluded.name,
1283                status = excluded.status,
1284                progress = excluded.progress,
1285                detail = excluded.detail,
1286                created_at = excluded.created_at,
1287                updated_at = excluded.updated_at
1288            "#,
1289            params![
1290                job.id,
1291                job.name,
1292                job_state_status_to_str(&job.status),
1293                job.progress.map(i64::from),
1294                job.detail,
1295                job.created_at,
1296                job.updated_at
1297            ],
1298        )
1299        .with_context(|| format!("failed to upsert job {}", job.id))?;
1300        Ok(())
1301    }
1302
1303    /// Retrieve a single job by its ID.
1304    ///
1305    /// Returns `None` if no job with the given ID exists.
1306    pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1307        let conn = self.conn()?;
1308        conn.query_row(
1309            "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1310            params![id],
1311            |row| {
1312                let status_raw: String = row.get(2)?;
1313                let progress: Option<i64> = row.get(3)?;
1314                Ok(JobStateRecord {
1315                    id: row.get(0)?,
1316                    name: row.get(1)?,
1317                    status: job_state_status_from_str(&status_raw),
1318                    progress: progress.and_then(|v| u8::try_from(v).ok()),
1319                    detail: row.get(4)?,
1320                    created_at: row.get(5)?,
1321                    updated_at: row.get(6)?,
1322                })
1323            },
1324        )
1325        .optional()
1326        .with_context(|| format!("failed to read job {id}"))
1327    }
1328
1329    /// List jobs ordered by most recently updated.
1330    ///
1331    /// The `limit` parameter caps the number of results and defaults to 100.
1332    pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1333        let conn = self.conn()?;
1334        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1335        let mut stmt = conn
1336            .prepare(
1337                "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1338            )
1339            .context("failed to prepare job list query")?;
1340        let mut rows = stmt
1341            .query(params![limit])
1342            .context("failed to query persisted jobs")?;
1343        let mut out = Vec::new();
1344        while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1345            let status_raw: String = row.get(2).context("failed to read job status")?;
1346            let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1347            out.push(JobStateRecord {
1348                id: row.get(0).context("failed to read job id")?,
1349                name: row.get(1).context("failed to read job name")?,
1350                status: job_state_status_from_str(&status_raw),
1351                progress: progress.and_then(|v| u8::try_from(v).ok()),
1352                detail: row.get(4).context("failed to read job detail")?,
1353                created_at: row.get(5).context("failed to read job created_at")?,
1354                updated_at: row.get(6).context("failed to read job updated_at")?,
1355            });
1356        }
1357        Ok(out)
1358    }
1359
1360    /// Permanently delete a job record.
1361    pub fn delete_job(&self, id: &str) -> Result<()> {
1362        let conn = self.conn()?;
1363        conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1364            .with_context(|| format!("failed to delete job {id}"))?;
1365        Ok(())
1366    }
1367
1368    /// Look up the rollout file path for a thread by its ID.
1369    pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1370        let conn = self.conn()?;
1371        conn.query_row(
1372            "SELECT rollout_path FROM threads WHERE id = ?1",
1373            params![id],
1374            |row| row.get::<_, Option<String>>(0),
1375        )
1376        .optional()
1377        .context("failed to lookup rollout path")
1378        .map(|opt| opt.flatten().map(PathBuf::from))
1379    }
1380
1381    /// Append an entry to the JSONL session index file.
1382    ///
1383    /// The session index is an append-only log that maps thread IDs to their names,
1384    /// update timestamps, and rollout paths. It is used for fast name-based lookups
1385    /// without opening the SQLite database.
1386    pub fn append_thread_name(
1387        &self,
1388        thread_id: &str,
1389        thread_name: Option<String>,
1390        updated_at: i64,
1391        rollout_path: Option<PathBuf>,
1392    ) -> Result<()> {
1393        if let Some(parent) = self.session_index_path.parent() {
1394            fs::create_dir_all(parent).with_context(|| {
1395                format!(
1396                    "failed to create session index directory {}",
1397                    parent.display()
1398                )
1399            })?;
1400        }
1401        let entry = SessionIndexEntry {
1402            thread_id: thread_id.to_string(),
1403            thread_name,
1404            updated_at,
1405            rollout_path,
1406        };
1407        let encoded =
1408            serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1409        let mut file = OpenOptions::new()
1410            .create(true)
1411            .append(true)
1412            .open(&self.session_index_path)
1413            .with_context(|| {
1414                format!(
1415                    "failed to open session index {}",
1416                    self.session_index_path.display()
1417                )
1418            })?;
1419        writeln!(file, "{encoded}").context("failed to append session index entry")?;
1420        Ok(())
1421    }
1422
1423    /// Find the display name for a thread by its ID, using the session index.
1424    ///
1425    /// Returns `None` if the thread is not in the index or has no name.
1426    pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1427        let map = self.session_index_map()?;
1428        Ok(map
1429            .get(thread_id)
1430            .and_then(|entry| entry.thread_name.clone()))
1431    }
1432
1433    /// Look up display names for multiple thread IDs at once.
1434    ///
1435    /// Returns a map from thread ID to its name (which may be `None`).
1436    pub fn find_thread_names_by_ids(
1437        &self,
1438        ids: &[String],
1439    ) -> Result<HashMap<String, Option<String>>> {
1440        let map = self.session_index_map()?;
1441        let mut out = HashMap::new();
1442        for id in ids {
1443            let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1444            out.insert(id.clone(), name);
1445        }
1446        Ok(out)
1447    }
1448
1449    /// Find the rollout path for a thread by its display name (case-insensitive).
1450    ///
1451    /// If multiple threads share the same name, the most recently updated one is returned.
1452    /// Returns `None` if no matching thread is found.
1453    pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1454        let map = self.session_index_map()?;
1455        let matched = map
1456            .values()
1457            .filter(|entry| {
1458                entry
1459                    .thread_name
1460                    .as_deref()
1461                    .is_some_and(|n| n.eq_ignore_ascii_case(name))
1462            })
1463            .max_by_key(|entry| entry.updated_at);
1464        Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1465    }
1466
1467    fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1468        if !self.session_index_path.exists() {
1469            return Ok(HashMap::new());
1470        }
1471        let file = OpenOptions::new()
1472            .read(true)
1473            .open(&self.session_index_path)
1474            .with_context(|| {
1475                format!(
1476                    "failed to read session index {}",
1477                    self.session_index_path.display()
1478                )
1479            })?;
1480        let reader = BufReader::new(file);
1481        let mut latest = HashMap::<String, SessionIndexEntry>::new();
1482        for line in reader.lines() {
1483            let line = line.context("failed to read session index line")?;
1484            if line.trim().is_empty() {
1485                continue;
1486            }
1487            let parsed: SessionIndexEntry =
1488                serde_json::from_str(&line).context("failed to parse session index entry")?;
1489            latest.insert(parsed.thread_id.clone(), parsed);
1490        }
1491        Ok(latest)
1492    }
1493}
1494
1495fn default_state_db_path() -> PathBuf {
1496    let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1497    // Prefer the CodeWhale directory, falling back to legacy DeepSeek path
1498    // so existing installs don't lose their session history.
1499    let primary = home.join(".codewhale").join("state.db");
1500    if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1501        primary
1502    } else {
1503        home.join(".deepseek").join("state.db")
1504    }
1505}
1506
1507fn bool_to_i64(value: bool) -> i64 {
1508    if value { 1 } else { 0 }
1509}
1510
1511fn i64_to_bool(value: i64) -> bool {
1512    value != 0
1513}
1514
1515fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1516    match status {
1517        ThreadStatus::Running => "running",
1518        ThreadStatus::Idle => "idle",
1519        ThreadStatus::Completed => "completed",
1520        ThreadStatus::Failed => "failed",
1521        ThreadStatus::Paused => "paused",
1522        ThreadStatus::Archived => "archived",
1523    }
1524}
1525
1526fn thread_status_from_str(value: &str) -> ThreadStatus {
1527    match value {
1528        "running" => ThreadStatus::Running,
1529        "idle" => ThreadStatus::Idle,
1530        "completed" => ThreadStatus::Completed,
1531        "failed" => ThreadStatus::Failed,
1532        "paused" => ThreadStatus::Paused,
1533        "archived" => ThreadStatus::Archived,
1534        _ => ThreadStatus::Idle,
1535    }
1536}
1537
1538fn session_source_to_str(source: &SessionSource) -> &'static str {
1539    match source {
1540        SessionSource::Interactive => "interactive",
1541        SessionSource::Resume => "resume",
1542        SessionSource::Fork => "fork",
1543        SessionSource::Api => "api",
1544        SessionSource::Unknown => "unknown",
1545    }
1546}
1547
1548fn session_source_from_str(value: &str) -> SessionSource {
1549    match value {
1550        "interactive" => SessionSource::Interactive,
1551        "resume" => SessionSource::Resume,
1552        "fork" => SessionSource::Fork,
1553        "api" => SessionSource::Api,
1554        _ => SessionSource::Unknown,
1555    }
1556}
1557
1558fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1559    path.map(|p| p.display().to_string())
1560}
1561
1562fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1563    match status {
1564        JobStateStatus::Queued => "queued",
1565        JobStateStatus::Running => "running",
1566        JobStateStatus::Completed => "completed",
1567        JobStateStatus::Failed => "failed",
1568        JobStateStatus::Cancelled => "cancelled",
1569    }
1570}
1571
1572fn job_state_status_from_str(value: &str) -> JobStateStatus {
1573    match value {
1574        "queued" => JobStateStatus::Queued,
1575        "running" => JobStateStatus::Running,
1576        "completed" => JobStateStatus::Completed,
1577        "failed" => JobStateStatus::Failed,
1578        "cancelled" => JobStateStatus::Cancelled,
1579        _ => JobStateStatus::Queued,
1580    }
1581}
1582
1583fn thread_goal_status_to_str(status: &ThreadGoalStatus) -> &'static str {
1584    match status {
1585        ThreadGoalStatus::Active => "active",
1586        ThreadGoalStatus::Paused => "paused",
1587        ThreadGoalStatus::Blocked => "blocked",
1588        ThreadGoalStatus::UsageLimited => "usage_limited",
1589        ThreadGoalStatus::BudgetLimited => "budget_limited",
1590        ThreadGoalStatus::Complete => "complete",
1591    }
1592}
1593
1594fn thread_goal_status_from_str(value: &str) -> ThreadGoalStatus {
1595    match value {
1596        "active" => ThreadGoalStatus::Active,
1597        "paused" => ThreadGoalStatus::Paused,
1598        "blocked" => ThreadGoalStatus::Blocked,
1599        "usage_limited" => ThreadGoalStatus::UsageLimited,
1600        "budget_limited" => ThreadGoalStatus::BudgetLimited,
1601        "complete" => ThreadGoalStatus::Complete,
1602        _ => ThreadGoalStatus::Active,
1603    }
1604}
1605
1606fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1607    let status_raw: String = row.get(7)?;
1608    let source_raw: String = row.get(11)?;
1609    let rollout_path: Option<String> = row.get(1)?;
1610    let path: Option<String> = row.get(8)?;
1611    Ok(ThreadMetadata {
1612        id: row.get(0)?,
1613        rollout_path: rollout_path.map(PathBuf::from),
1614        preview: row.get(2)?,
1615        ephemeral: i64_to_bool(row.get(3)?),
1616        model_provider: row.get(4)?,
1617        created_at: row.get(5)?,
1618        updated_at: row.get(6)?,
1619        status: thread_status_from_str(&status_raw),
1620        path: path.map(PathBuf::from),
1621        cwd: PathBuf::from(row.get::<_, String>(9)?),
1622        cli_version: row.get(10)?,
1623        source: session_source_from_str(&source_raw),
1624        name: row.get(12)?,
1625        sandbox_policy: row.get(13)?,
1626        approval_mode: row.get(14)?,
1627        archived: i64_to_bool(row.get(15)?),
1628        archived_at: row.get(16)?,
1629        git_sha: row.get(17)?,
1630        git_branch: row.get(18)?,
1631        git_origin_url: row.get(19)?,
1632        memory_mode: row.get(20)?,
1633        current_leaf_id: row.get(21)?,
1634    })
1635}
1636
1637fn row_to_thread_goal(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadGoalRecord> {
1638    let status_raw: String = row.get(3)?;
1639    Ok(ThreadGoalRecord {
1640        thread_id: row.get(0)?,
1641        goal_id: row.get(1)?,
1642        objective: row.get(2)?,
1643        status: thread_goal_status_from_str(&status_raw),
1644        token_budget: row.get(4)?,
1645        tokens_used: row.get(5)?,
1646        time_used_seconds: row.get(6)?,
1647        created_at: row.get(7)?,
1648        updated_at: row.get(8)?,
1649    })
1650}
1651
1652#[cfg(test)]
1653mod tests {
1654    use super::*;
1655    use std::time::{SystemTime, UNIX_EPOCH};
1656
1657    fn temp_state_store(name: &str) -> StateStore {
1658        let suffix = SystemTime::now()
1659            .duration_since(UNIX_EPOCH)
1660            .expect("system time")
1661            .as_nanos();
1662        let dir = std::env::temp_dir().join(format!(
1663            "codewhale-state-{name}-{}-{suffix}",
1664            std::process::id()
1665        ));
1666        fs::create_dir_all(&dir).expect("create temp state dir");
1667        StateStore::open(Some(dir.join("state.db"))).expect("open state store")
1668    }
1669
1670    fn test_thread(id: &str) -> ThreadMetadata {
1671        ThreadMetadata {
1672            id: id.to_string(),
1673            rollout_path: None,
1674            preview: "test thread".to_string(),
1675            ephemeral: false,
1676            model_provider: "deepseek".to_string(),
1677            created_at: 10,
1678            updated_at: 10,
1679            status: ThreadStatus::Running,
1680            path: None,
1681            cwd: PathBuf::from("/tmp/codewhale"),
1682            cli_version: "0.0.0-test".to_string(),
1683            source: SessionSource::Interactive,
1684            name: None,
1685            sandbox_policy: None,
1686            approval_mode: None,
1687            archived: false,
1688            archived_at: None,
1689            git_sha: None,
1690            git_branch: None,
1691            git_origin_url: None,
1692            memory_mode: None,
1693            current_leaf_id: None,
1694        }
1695    }
1696
1697    fn test_goal(thread_id: &str, objective: &str) -> ThreadGoalRecord {
1698        ThreadGoalRecord {
1699            thread_id: thread_id.to_string(),
1700            goal_id: "goal-1".to_string(),
1701            objective: objective.to_string(),
1702            status: ThreadGoalStatus::Active,
1703            token_budget: Some(123),
1704            tokens_used: 7,
1705            time_used_seconds: 11,
1706            created_at: 100,
1707            updated_at: 101,
1708        }
1709    }
1710
1711    #[test]
1712    fn thread_goal_crud_round_trips_and_replaces() {
1713        let store = temp_state_store("thread-goal-crud");
1714        store
1715            .upsert_thread(&test_thread("thread-1"))
1716            .expect("upsert thread");
1717
1718        let goal = test_goal("thread-1", "Ship v0.8.59");
1719        store.upsert_thread_goal(&goal).expect("upsert goal");
1720        assert_eq!(
1721            store
1722                .get_thread_goal("thread-1")
1723                .expect("read goal")
1724                .as_ref(),
1725            Some(&goal)
1726        );
1727
1728        let mut replacement = test_goal("thread-1", "Ship v0.8.59 safely");
1729        replacement.goal_id = "goal-2".to_string();
1730        replacement.status = ThreadGoalStatus::BudgetLimited;
1731        replacement.token_budget = None;
1732        replacement.updated_at = 202;
1733        store
1734            .upsert_thread_goal(&replacement)
1735            .expect("replace goal");
1736        assert_eq!(
1737            store.get_thread_goal("thread-1").expect("read replacement"),
1738            Some(replacement)
1739        );
1740
1741        assert!(store.delete_thread_goal("thread-1").expect("delete goal"));
1742        assert!(
1743            store
1744                .get_thread_goal("thread-1")
1745                .expect("read empty")
1746                .is_none()
1747        );
1748        assert!(!store.delete_thread_goal("thread-1").expect("delete empty"));
1749    }
1750
1751    #[test]
1752    fn thread_goal_requires_existing_thread() {
1753        let store = temp_state_store("thread-goal-missing-thread");
1754        let err = store
1755            .upsert_thread_goal(&test_goal("missing-thread", "nope"))
1756            .expect_err("goal without a thread should fail");
1757        assert!(err.to_string().contains("thread missing-thread not found"));
1758    }
1759}