Skip to main content

codewhale_state/
lib.rs

1//! Persistent state management for conversation threads, messages, and jobs.
2//!
3//! The [`StateStore`] is the primary entry point, backed by a SQLite database and an
4//! append-only JSONL session index file. It provides CRUD operations for:
5//!
6//! - **Threads** — conversation metadata, archival, and session indexing.
7//! - **Messages** — append-only message storage with tree-structured branching.
8//! - **Checkpoints** — named state snapshots for restoring conversation progress.
9//! - **Jobs** — background task tracking with status and progress.
10//! - **Dynamic tools** — per-thread tool registrations.
11
12use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23/// Lifecycle status of a conversation thread.
24///
25/// Serialized as lowercase snake_case strings (e.g. `"running"`, `"archived"`).
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29    /// Thread is actively being worked on.
30    Running,
31    /// Thread exists but has no active work in progress.
32    Idle,
33    /// Thread has finished its task successfully.
34    Completed,
35    /// Thread encountered an unrecoverable error.
36    Failed,
37    /// Thread has been temporarily paused by the user.
38    Paused,
39    /// Thread has been archived and is hidden from default listings.
40    Archived,
41}
42
43/// Indicates how a session was initiated.
44///
45/// Serialized as lowercase snake_case strings.
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49    /// Started by a user interacting with the CLI.
50    Interactive,
51    /// Resumed from a previously persisted session.
52    Resume,
53    /// Created by forking an existing conversation at a specific message.
54    Fork,
55    /// Initiated programmatically via the API.
56    Api,
57    /// Source is unknown or unspecified.
58    Unknown,
59}
60
61/// Metadata for a persisted conversation thread.
62///
63/// Each thread represents a single conversation session and stores its
64/// configuration, git context, and current status.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67    /// Unique identifier for this thread.
68    pub id: String,
69    /// Optional filesystem path to the rollout (JSONL transcript) file.
70    pub rollout_path: Option<PathBuf>,
71    /// Short preview or summary of the thread content.
72    pub preview: String,
73    /// Whether this thread is ephemeral (not persisted long-term).
74    pub ephemeral: bool,
75    /// Identifier of the model provider used for this thread (e.g. `"openai"`).
76    pub model_provider: String,
77    /// Unix timestamp (seconds) when the thread was created.
78    pub created_at: i64,
79    /// Unix timestamp (seconds) of the most recent update to the thread.
80    pub updated_at: i64,
81    /// Current lifecycle status of the thread.
82    pub status: ThreadStatus,
83    /// Optional filesystem path associated with the thread working context.
84    pub path: Option<PathBuf>,
85    /// Working directory that was active when the thread was created.
86    pub cwd: PathBuf,
87    /// Version of the CLI that created this thread.
88    pub cli_version: String,
89    /// How this session was initiated.
90    pub source: SessionSource,
91    /// User-assigned display name for the thread.
92    pub name: Option<String>,
93    /// Serialized sandbox policy applied to this thread, if any.
94    pub sandbox_policy: Option<String>,
95    /// Approval mode configured for tool calls in this thread.
96    pub approval_mode: Option<String>,
97    /// Whether the thread has been archived.
98    pub archived: bool,
99    /// Unix timestamp (seconds) when the thread was archived, or `None` if not archived.
100    pub archived_at: Option<i64>,
101    /// Git commit SHA of the working tree when the thread was created.
102    pub git_sha: Option<String>,
103    /// Git branch checked out when the thread was created.
104    pub git_branch: Option<String>,
105    /// URL of the git remote origin, if available.
106    pub git_origin_url: Option<String>,
107    /// Memory mode configured for this thread (e.g. `"local"`, `"remote"`).
108    pub memory_mode: Option<String>,
109    /// ID of the current leaf message in the conversation tree.
110    pub current_leaf_id: Option<i64>,
111}
112
113/// A dynamically registered tool associated with a thread.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116    /// Ordinal position of this tool in the thread tool list.
117    pub position: i64,
118    /// Unique name identifying the tool.
119    pub name: String,
120    /// Human-readable description of what the tool does.
121    pub description: Option<String>,
122    /// JSON Schema describing the tool input parameters.
123    pub input_schema: Value,
124}
125
126/// A single message entry in a conversation thread.
127///
128/// Messages form a tree structure via [`parent_entry_id`](Self::parent_entry_id),
129/// enabling conversation branching and forking.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132    /// Auto-incremented unique identifier for this message.
133    pub id: i64,
134    /// ID of the thread this message belongs to.
135    pub thread_id: String,
136    /// Role of the message sender (e.g. `"user"`, `"assistant"`, `"system"`).
137    pub role: String,
138    /// Text content of the message.
139    pub content: String,
140    /// Optional structured item payload (tool calls, tool results, etc.).
141    pub item: Option<Value>,
142    /// Unix timestamp (seconds) when the message was created.
143    pub created_at: i64,
144    /// ID of the parent message, forming a tree structure. `None` for root messages.
145    pub parent_entry_id: Option<i64>,
146}
147
148/// A named checkpoint capturing the state of a thread at a point in time.
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151    /// ID of the thread this checkpoint belongs to.
152    pub thread_id: String,
153    /// Unique identifier for this checkpoint within its thread.
154    pub checkpoint_id: String,
155    /// Serialized state snapshot stored as a JSON value.
156    pub state: Value,
157    /// Unix timestamp (seconds) when the checkpoint was created or last updated.
158    pub created_at: i64,
159}
160
161/// Status of a background job.
162///
163/// Serialized as lowercase snake_case strings.
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167    /// Job is waiting to be executed.
168    Queued,
169    /// Job is currently executing.
170    Running,
171    /// Job has finished successfully.
172    Completed,
173    /// Job has failed with an error.
174    Failed,
175    /// Job was cancelled before completion.
176    Cancelled,
177}
178
179/// Persisted state of a background job.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182    /// Unique identifier for the job.
183    pub id: String,
184    /// Human-readable name describing the job.
185    pub name: String,
186    /// Current lifecycle status of the job.
187    pub status: JobStateStatus,
188    /// Completion progress as a percentage (0--100), if available.
189    pub progress: Option<u8>,
190    /// Optional detail message providing additional status information.
191    pub detail: Option<String>,
192    /// Unix timestamp (seconds) when the job was created.
193    pub created_at: i64,
194    /// Unix timestamp (seconds) of the most recent status update.
195    pub updated_at: i64,
196}
197
198/// Filters for listing conversation threads.
199#[derive(Debug, Clone)]
200pub struct ThreadListFilters {
201    /// Whether to include archived threads in the results.
202    pub include_archived: bool,
203    /// Maximum number of threads to return. Defaults to 50.
204    pub limit: Option<usize>,
205}
206
207impl Default for ThreadListFilters {
208    fn default() -> Self {
209        Self {
210            include_archived: false,
211            limit: Some(50),
212        }
213    }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217struct SessionIndexEntry {
218    thread_id: String,
219    thread_name: Option<String>,
220    updated_at: i64,
221    rollout_path: Option<PathBuf>,
222}
223
224/// Persistent storage for conversation threads, messages, checkpoints, and jobs.
225///
226/// Backed by a SQLite database and an append-only JSONL session index file.
227/// The database schema is automatically initialized and migrated on [`open`](Self::open).
228#[derive(Debug, Clone)]
229pub struct StateStore {
230    db_path: PathBuf,
231    session_index_path: PathBuf,
232}
233
234impl StateStore {
235    /// Open (or create) a state store at the given database path.
236    ///
237    /// If `path` is `None`, the default location (`~/.codewhale/state.db`, with
238    /// `~/.deepseek/state.db` as a legacy fallback) is used.
239    /// The database schema is created automatically if it does not exist.
240    pub fn open(path: Option<PathBuf>) -> Result<Self> {
241        let db_path = path.unwrap_or_else(default_state_db_path);
242        let session_index_path = db_path
243            .parent()
244            .unwrap_or_else(|| Path::new("."))
245            .join("session_index.jsonl");
246        if let Some(parent) = db_path.parent() {
247            fs::create_dir_all(parent).with_context(|| {
248                format!("failed to create state directory {}", parent.display())
249            })?;
250        }
251        let store = Self {
252            db_path,
253            session_index_path,
254        };
255        store.init_schema()?;
256        Ok(store)
257    }
258
259    /// Returns the filesystem path of the underlying SQLite database.
260    pub fn db_path(&self) -> &Path {
261        &self.db_path
262    }
263
264    fn conn(&self) -> Result<Connection> {
265        Connection::open(&self.db_path)
266            .with_context(|| format!("failed to open state db {}", self.db_path.display()))
267    }
268
269    fn init_schema(&self) -> Result<()> {
270        let conn = self.conn()?;
271        let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
272        if user_version == 0 {
273            conn.execute_batch(
274                r#"
275                BEGIN;
276                CREATE TABLE IF NOT EXISTS threads (
277                    id TEXT PRIMARY KEY,
278                    rollout_path TEXT,
279                    preview TEXT NOT NULL,
280                    ephemeral INTEGER NOT NULL,
281                    model_provider TEXT NOT NULL,
282                    created_at INTEGER NOT NULL,
283                    updated_at INTEGER NOT NULL,
284                    status TEXT NOT NULL,
285                    path TEXT,
286                    cwd TEXT NOT NULL,
287                    cli_version TEXT NOT NULL,
288                    source TEXT NOT NULL,
289                    title TEXT,
290                    sandbox_policy TEXT,
291                    approval_mode TEXT,
292                    archived INTEGER NOT NULL DEFAULT 0,
293                    archived_at INTEGER,
294                    git_sha TEXT,
295                    git_branch TEXT,
296                    git_origin_url TEXT,
297                    memory_mode TEXT
298                );
299                CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
300                CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
301                CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
302
303                CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
304                    thread_id TEXT NOT NULL,
305                    position INTEGER NOT NULL,
306                    name TEXT NOT NULL,
307                    description TEXT,
308                    input_schema TEXT NOT NULL,
309                    PRIMARY KEY (thread_id, position),
310                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
311                );
312
313                CREATE TABLE IF NOT EXISTS messages (
314                    id INTEGER PRIMARY KEY AUTOINCREMENT,
315                    thread_id TEXT NOT NULL,
316                    role TEXT NOT NULL,
317                    content TEXT NOT NULL,
318                    item_json TEXT,
319                    created_at INTEGER NOT NULL,
320                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
321                );
322                CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
323
324                CREATE TABLE IF NOT EXISTS checkpoints (
325                    thread_id TEXT NOT NULL,
326                    checkpoint_id TEXT NOT NULL,
327                    state_json TEXT NOT NULL,
328                    created_at INTEGER NOT NULL,
329                    PRIMARY KEY(thread_id, checkpoint_id),
330                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
331                );
332                CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
333
334                CREATE TABLE IF NOT EXISTS jobs (
335                    id TEXT PRIMARY KEY,
336                    name TEXT NOT NULL,
337                    status TEXT NOT NULL,
338                    progress INTEGER,
339                    detail TEXT,
340                    created_at INTEGER NOT NULL,
341                    updated_at INTEGER NOT NULL
342                );
343                CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
344
345                -- Add parent_entry_id column, and set to last message before current message
346                ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
347                UPDATE messages
348                    SET parent_entry_id = (
349                        SELECT m2.id
350                        FROM messages m2
351                        WHERE m2.thread_id = messages.thread_id
352                            AND (
353                                m2.created_at < messages.created_at
354                                OR (
355                                    m2.created_at = messages.created_at
356                                    AND m2.id < messages.id
357                                )
358                            )
359                        ORDER BY m2.created_at DESC, m2.id DESC
360                        LIMIT 1
361                    );
362                CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
363
364                -- Add current_leaf_id column, and set to last message in thread
365                ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
366                UPDATE threads
367                    SET current_leaf_id = (
368                        SELECT m.id
369                        FROM messages m
370                        WHERE m.thread_id = threads.id
371                        ORDER BY m.id DESC
372                        LIMIT 1
373                    );
374
375                PRAGMA user_version = 1;
376                COMMIT;
377                "#,
378            )
379            .context("failed to initialize thread schema")?;
380            user_version = 1;
381        }
382        if user_version < 2 {
383            conn.execute_batch(
384                r#"
385                BEGIN;
386                CREATE TABLE IF NOT EXISTS workflow_runs (
387                    id TEXT PRIMARY KEY,
388                    workflow_id TEXT NOT NULL,
389                    goal TEXT NOT NULL,
390                    status TEXT NOT NULL,
391                    input_hash TEXT,
392                    started_at INTEGER NOT NULL,
393                    completed_at INTEGER,
394                    metadata_json TEXT NOT NULL DEFAULT '{}'
395                );
396                CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
397                    ON workflow_runs(status, started_at DESC);
398                CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
399                    ON workflow_runs(workflow_id, started_at DESC);
400
401                CREATE TABLE IF NOT EXISTS branch_runs (
402                    id TEXT PRIMARY KEY,
403                    workflow_run_id TEXT NOT NULL,
404                    branch_id TEXT NOT NULL,
405                    node_id TEXT NOT NULL,
406                    status TEXT NOT NULL,
407                    started_at INTEGER NOT NULL,
408                    completed_at INTEGER,
409                    result_json TEXT NOT NULL DEFAULT '{}',
410                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
411                );
412                CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
413                    ON branch_runs(workflow_run_id);
414                CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
415                    ON branch_runs(branch_id);
416
417                CREATE TABLE IF NOT EXISTS leaf_runs (
418                    id TEXT PRIMARY KEY,
419                    workflow_run_id TEXT NOT NULL,
420                    branch_run_id TEXT,
421                    leaf_id TEXT NOT NULL,
422                    task_id TEXT NOT NULL,
423                    input_hash TEXT,
424                    status TEXT NOT NULL,
425                    output_json TEXT NOT NULL DEFAULT '{}',
426                    artifacts_json TEXT NOT NULL DEFAULT '[]',
427                    started_at INTEGER NOT NULL,
428                    completed_at INTEGER,
429                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
430                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
431                );
432                CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
433                    ON leaf_runs(workflow_run_id);
434                CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
435                    ON leaf_runs(workflow_run_id, leaf_id, input_hash);
436
437                CREATE TABLE IF NOT EXISTS control_node_runs (
438                    id TEXT PRIMARY KEY,
439                    workflow_run_id TEXT NOT NULL,
440                    node_id TEXT NOT NULL,
441                    kind TEXT NOT NULL,
442                    status TEXT NOT NULL,
443                    selected_children_json TEXT NOT NULL DEFAULT '[]',
444                    result_json TEXT NOT NULL DEFAULT '{}',
445                    started_at INTEGER NOT NULL,
446                    completed_at INTEGER,
447                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
448                );
449                CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
450                    ON control_node_runs(workflow_run_id);
451                CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
452                    ON control_node_runs(node_id);
453
454                CREATE TABLE IF NOT EXISTS teacher_candidates (
455                    id TEXT PRIMARY KEY,
456                    workflow_run_id TEXT NOT NULL,
457                    control_node_run_id TEXT NOT NULL,
458                    candidate_id TEXT NOT NULL,
459                    branch_run_id TEXT,
460                    score REAL,
461                    passed INTEGER,
462                    rationale_json TEXT NOT NULL DEFAULT '{}',
463                    created_at INTEGER NOT NULL,
464                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
465                    FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
466                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
467                );
468                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
469                    ON teacher_candidates(workflow_run_id);
470                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
471                    ON teacher_candidates(control_node_run_id);
472
473                PRAGMA user_version = 2;
474                COMMIT;
475                "#,
476            )
477            .context("failed to initialize workflow trace schema")?;
478        }
479        Ok(())
480    }
481
482    /// Insert or update thread metadata.
483    ///
484    /// This does **not** update `current_leaf_id`; use [`append_message`](Self::append_message)
485    /// or [`set_current_leaf_id`](Self::set_current_leaf_id) for that.
486    pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
487        let conn = self.conn()?;
488        conn.execute(
489            r#"
490            INSERT INTO threads (
491                id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
492                cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
493                git_sha, git_branch, git_origin_url, memory_mode
494            ) VALUES (
495                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
496                ?11, ?12, ?13, ?14, ?15, ?16, ?17,
497                ?18, ?19, ?20, ?21
498            )
499            ON CONFLICT(id) DO UPDATE SET
500                rollout_path=excluded.rollout_path,
501                preview=excluded.preview,
502                ephemeral=excluded.ephemeral,
503                model_provider=excluded.model_provider,
504                created_at=excluded.created_at,
505                updated_at=excluded.updated_at,
506                status=excluded.status,
507                path=excluded.path,
508                cwd=excluded.cwd,
509                cli_version=excluded.cli_version,
510                source=excluded.source,
511                title=excluded.title,
512                sandbox_policy=excluded.sandbox_policy,
513                approval_mode=excluded.approval_mode,
514                archived=excluded.archived,
515                archived_at=excluded.archived_at,
516                git_sha=excluded.git_sha,
517                git_branch=excluded.git_branch,
518                git_origin_url=excluded.git_origin_url,
519                memory_mode=excluded.memory_mode
520            "#,
521            params![
522                thread.id,
523                path_to_opt_string(thread.rollout_path.as_deref()),
524                thread.preview,
525                bool_to_i64(thread.ephemeral),
526                thread.model_provider,
527                thread.created_at,
528                thread.updated_at,
529                thread_status_to_str(&thread.status),
530                path_to_opt_string(thread.path.as_deref()),
531                thread.cwd.display().to_string(),
532                thread.cli_version,
533                session_source_to_str(&thread.source),
534                thread.name,
535                thread.sandbox_policy,
536                thread.approval_mode,
537                bool_to_i64(thread.archived),
538                thread.archived_at,
539                thread.git_sha,
540                thread.git_branch,
541                thread.git_origin_url,
542                thread.memory_mode,
543            ],
544        )
545        .context("failed to upsert thread metadata")?;
546
547        self.append_thread_name(
548            &thread.id,
549            thread.name.clone(),
550            thread.updated_at,
551            thread.rollout_path.clone(),
552        )?;
553        Ok(())
554    }
555
556    /// Retrieve a single thread by its ID.
557    ///
558    /// Returns `None` if no thread with the given ID exists.
559    pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
560        let conn = self.conn()?;
561        conn.query_row(
562            r#"
563            SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
564                   cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
565                   git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
566            FROM threads
567            WHERE id = ?1
568            "#,
569            params![id],
570            row_to_thread,
571        )
572        .optional()
573        .context("failed to read thread")
574    }
575
576    /// List threads ordered by most recently updated.
577    ///
578    /// Use [`ThreadListFilters`] to control whether archived threads are included
579    /// and the maximum number of results returned.
580    pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
581        let conn = self.conn()?;
582        let sql = if filters.include_archived {
583            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
584        } else {
585            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
586        };
587
588        let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
589        let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
590        let mut rows = stmt
591            .query(params![limit])
592            .context("failed to query threads")?;
593        let mut out = Vec::new();
594        while let Some(row) = rows.next().context("failed to iterate thread rows")? {
595            out.push(row_to_thread(row)?);
596        }
597        Ok(out)
598    }
599
600    /// Archive a thread, setting its status to [`ThreadStatus::Archived`] and
601    /// recording the current timestamp.
602    pub fn mark_archived(&self, id: &str) -> Result<()> {
603        let conn = self.conn()?;
604        conn.execute(
605            "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
606            params![
607                id,
608                Utc::now().timestamp(),
609                thread_status_to_str(&ThreadStatus::Archived)
610            ],
611        )
612        .context("failed to archive thread")?;
613        Ok(())
614    }
615
616    /// Unarchive a thread, removing the archived flag and clearing `archived_at`.
617    pub fn mark_unarchived(&self, id: &str) -> Result<()> {
618        let conn = self.conn()?;
619        conn.execute(
620            "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
621            params![id],
622        )
623        .context("failed to unarchive thread")?;
624        Ok(())
625    }
626
627    /// Permanently delete a thread and all of its associated data
628    /// (messages, checkpoints, dynamic tools) via cascading foreign keys.
629    pub fn delete_thread(&self, id: &str) -> Result<()> {
630        let conn = self.conn()?;
631        conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
632            .context("failed to delete thread")?;
633        Ok(())
634    }
635
636    /// Set the memory mode for a thread.
637    ///
638    /// Pass `None` to clear the memory mode.
639    pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
640        let conn = self.conn()?;
641        conn.execute(
642            "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
643            params![id, mode],
644        )
645        .context("failed to update thread memory mode")?;
646        Ok(())
647    }
648
649    /// Get the memory mode configured for a thread.
650    ///
651    /// Returns `None` if the thread does not exist or has no memory mode set.
652    pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
653        let conn = self.conn()?;
654        conn.query_row(
655            "SELECT memory_mode FROM threads WHERE id = ?1",
656            params![id],
657            |row| row.get::<_, Option<String>>(0),
658        )
659        .optional()
660        .context("failed to read thread memory mode")
661        .map(Option::flatten)
662    }
663
664    /// List all leaf messages in a thread.
665    ///
666    /// A leaf message is one that has no other message referencing it as a parent.
667    /// In a branching conversation tree, there may be multiple leaf messages.
668    pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
669        let conn = self.conn()?;
670        let mut stmt = conn
671            .prepare(
672                r#"
673                SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
674                FROM messages m1
675                LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
676                WHERE m1.thread_id = ?1 AND m2.id IS NULL
677                "#,
678            )
679            .context("failed to prepare message listing query")?;
680        let mut rows = stmt
681            .query(params![thread_id])
682            .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
683        let mut out = Vec::new();
684        while let Some(row) = rows.next().context("failed to iterate message rows")? {
685            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
686            let item = item_json
687                .as_deref()
688                .map(serde_json::from_str)
689                .transpose()
690                .with_context(|| {
691                    format!("failed to parse message item json in thread {thread_id}")
692                })?;
693            out.push(MessageRecord {
694                id: row.get(0).context("failed to read message id")?,
695                thread_id: row.get(1).context("failed to read message thread id")?,
696                role: row.get(2).context("failed to read message role")?,
697                content: row.get(3).context("failed to read message content")?,
698                item,
699                created_at: row.get(5).context("failed to read message timestamp")?,
700                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
701            });
702        }
703        Ok(out)
704    }
705
706    /// Update the current leaf message pointer for a thread.
707    ///
708    /// This controls which branch of the conversation tree is considered active
709    /// when listing messages via [`list_messages`](Self::list_messages).
710    pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
711        let conn = self.conn()?;
712        conn.execute(
713            "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
714            params![current_leaf_id, thread_id],
715        )
716        .context("failed to update thread current leaf id")?;
717        Ok(())
718    }
719
720    /// Replace the dynamic tools for a thread.
721    ///
722    /// All existing dynamic tools for the thread are deleted and replaced with the
723    /// provided list. The operation is performed within a transaction.
724    pub fn persist_dynamic_tools(
725        &self,
726        thread_id: &str,
727        tools: &[DynamicToolRecord],
728    ) -> Result<()> {
729        let mut conn = self.conn()?;
730        let tx = conn
731            .transaction()
732            .context("failed to begin dynamic tools transaction")?;
733        tx.execute(
734            "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
735            params![thread_id],
736        )
737        .context("failed to clear dynamic tools")?;
738        for tool in tools {
739            tx.execute(
740                "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
741                params![
742                    thread_id,
743                    tool.position,
744                    tool.name,
745                    tool.description,
746                    tool.input_schema.to_string()
747                ],
748            )
749            .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
750        }
751        tx.commit().context("failed to commit dynamic tools")?;
752        Ok(())
753    }
754
755    /// Retrieve all dynamic tools registered for a thread, ordered by position.
756    pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
757        let conn = self.conn()?;
758        let mut stmt = conn
759            .prepare(
760                "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
761            )
762            .context("failed to prepare get dynamic tools query")?;
763        let mut rows = stmt
764            .query(params![thread_id])
765            .context("failed to query dynamic tools")?;
766        let mut out = Vec::new();
767        while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
768            let input_schema_raw: String =
769                row.get(3).context("failed to read tool input schema")?;
770            let input_schema: Value =
771                serde_json::from_str(&input_schema_raw).with_context(|| {
772                    format!("failed to parse input schema for dynamic tool in thread {thread_id}")
773                })?;
774            out.push(DynamicToolRecord {
775                position: row.get(0).context("failed to read tool position")?,
776                name: row.get(1).context("failed to read tool name")?,
777                description: row.get(2).context("failed to read tool description")?,
778                input_schema,
779            });
780        }
781        Ok(out)
782    }
783
784    /// Append a new message to a thread.
785    ///
786    /// The message is linked to the thread's current leaf as its parent, and the
787    /// thread's `current_leaf_id` is updated to the new message. Returns the ID
788    /// of the newly created message.
789    pub fn append_message(
790        &self,
791        thread_id: &str,
792        role: &str,
793        content: &str,
794        item: Option<Value>,
795    ) -> Result<i64> {
796        let mut conn = self.conn()?;
797        let created_at = Utc::now().timestamp();
798        let item_json = item
799            .as_ref()
800            .map(serde_json::to_string)
801            .transpose()
802            .context("failed to serialize message item payload")?;
803
804        let tx = conn
805            .transaction()
806            .context("failed to begin append message transaction")?;
807
808        let current_leaf_id: Option<i64> = tx
809            .query_row(
810                "SELECT current_leaf_id FROM threads WHERE id = ?1",
811                params![thread_id],
812                |row| row.get(0),
813            )
814            .with_context(|| {
815                format!("failed to query thread current leaf id for thread {thread_id}")
816            })?;
817
818        let next_leaf_id: i64 = tx.query_row(
819            r#"
820                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
821                SELECT ?1, ?2, ?3, ?4, ?5, ?6
822                RETURNING id
823            "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
824        ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
825
826        tx.execute(
827            r#"
828            UPDATE threads
829            SET current_leaf_id = ?1
830            WHERE id = ?2;
831            "#,
832            params![next_leaf_id, thread_id],
833        )
834        .with_context(|| {
835            format!("failed to update thread current leaf id for thread {thread_id}")
836        })?;
837
838        tx.commit()
839            .context("failed to commit append message transaction")?;
840
841        Ok(next_leaf_id)
842    }
843
844    /// List messages in the current conversation branch, walking backwards from
845    /// the thread's `current_leaf_id`.
846    ///
847    /// Messages are returned in chronological order (oldest first). The `limit`
848    /// parameter caps how many ancestor messages are traversed; it defaults to 500.
849    pub fn list_messages(
850        &self,
851        thread_id: &str,
852        limit: Option<usize>,
853    ) -> Result<Vec<MessageRecord>> {
854        let conn = self.conn()?;
855        let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
856        let mut stmt = conn
857            .prepare(
858                r#"
859                WITH RECURSIVE
860                    leaf_id AS (
861                        SELECT current_leaf_id FROM threads WHERE id = ?1
862                    ),
863                    ancestors AS (
864                        SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
865                        FROM messages
866                        WHERE id = (SELECT current_leaf_id FROM leaf_id)
867
868                        UNION ALL
869
870                        SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
871                        FROM messages m
872                        JOIN ancestors a ON m.id = a.parent_entry_id
873                        WHERE a.depth < ?2
874                    )
875                    SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
876                    ORDER BY depth DESC
877                "#
878            )
879            .context("failed to prepare message listing query")?;
880        let mut rows = stmt
881            .query(params![thread_id, limit - 1])
882            .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
883        let mut out = Vec::new();
884        while let Some(row) = rows.next().context("failed to iterate message rows")? {
885            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
886            let item = item_json
887                .as_deref()
888                .map(serde_json::from_str)
889                .transpose()
890                .with_context(|| {
891                    format!("failed to parse message item json in thread {thread_id}")
892                })?;
893            out.push(MessageRecord {
894                id: row.get(0).context("failed to read message id")?,
895                thread_id: row.get(1).context("failed to read message thread id")?,
896                role: row.get(2).context("failed to read message role")?,
897                content: row.get(3).context("failed to read message content")?,
898                item,
899                created_at: row.get(5).context("failed to read message timestamp")?,
900                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
901            });
902        }
903        Ok(out)
904    }
905
906    /// Fork the conversation at a specific message.
907    ///
908    /// Creates a new message whose parent is `message_id` and updates the thread's
909    /// `current_leaf_id` to the new message. Returns the ID of the new message.
910    /// This enables branching conversations from any point in the history.
911    pub fn fork_at_message(
912        &self,
913        message_id: &str,
914        role: &str,
915        content: &str,
916        item: Option<Value>,
917    ) -> Result<i64> {
918        let mut conn = self.conn()?;
919        let created_at = Utc::now().timestamp();
920        let item_json = item
921            .as_ref()
922            .map(serde_json::to_string)
923            .transpose()
924            .context("failed to serialize message item payload")?;
925
926        let tx = conn
927            .transaction()
928            .context("failed to begin fork message transaction")?;
929
930        let thread_id: String = tx
931            .query_row(
932                "SELECT thread_id FROM messages WHERE id = ?1",
933                params![message_id],
934                |row| row.get(0),
935            )
936            .with_context(|| format!("failed to query thread id for message {message_id}"))?;
937
938        let next_leaf_id: i64 = tx.query_row(
939            r#"
940                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
941                SELECT ?1, ?2, ?3, ?4, ?5, ?6
942                RETURNING id
943            "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
944        ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
945
946        tx.execute(
947            r#"
948            UPDATE threads
949            SET current_leaf_id = ?1
950            WHERE id = ?2;
951            "#,
952            params![next_leaf_id, thread_id],
953        )
954        .with_context(|| {
955            format!(
956                "failed to update thread current leaf id for thread {:?}",
957                thread_id
958            )
959        })?;
960
961        tx.commit()
962            .context("failed to commit fork message transaction")?;
963
964        Ok(next_leaf_id)
965    }
966
967    /// Delete all messages belonging to a thread and reset its `current_leaf_id`.
968    ///
969    /// Returns the number of messages deleted.
970    pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
971        let mut conn = self.conn()?;
972        let tx = conn
973            .transaction()
974            .context("failed to begin clear messages transaction")?;
975
976        tx.execute(
977            r#"
978            UPDATE threads
979            SET current_leaf_id = NULL
980            WHERE id = ?1;
981            "#,
982            params![thread_id],
983        )
984        .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
985        let result = tx
986            .execute(
987                r#"
988                DELETE FROM messages WHERE thread_id = ?1
989                "#,
990                params![thread_id],
991            )
992            .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
993        tx.commit()
994            .context("failed to commit clear messages transaction")?;
995
996        Ok(result)
997    }
998
999    /// Save (or update) a named checkpoint for a thread.
1000    ///
1001    /// If a checkpoint with the same `thread_id` and `checkpoint_id` already exists,
1002    /// its state and timestamp are overwritten.
1003    pub fn save_checkpoint(
1004        &self,
1005        thread_id: &str,
1006        checkpoint_id: &str,
1007        state: &Value,
1008    ) -> Result<()> {
1009        let conn = self.conn()?;
1010        let state_json =
1011            serde_json::to_string(state).context("failed to encode checkpoint state")?;
1012        conn.execute(
1013            r#"
1014            INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1015            VALUES (?1, ?2, ?3, ?4)
1016            ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1017                state_json = excluded.state_json,
1018                created_at = excluded.created_at
1019            "#,
1020            params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1021        )
1022        .with_context(|| {
1023            format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1024        })?;
1025        Ok(())
1026    }
1027
1028    /// Load a checkpoint for a thread.
1029    ///
1030    /// If `checkpoint_id` is provided, loads that specific checkpoint. Otherwise,
1031    /// loads the most recently created checkpoint for the thread. Returns `None`
1032    /// if no matching checkpoint exists.
1033    pub fn load_checkpoint(
1034        &self,
1035        thread_id: &str,
1036        checkpoint_id: Option<&str>,
1037    ) -> Result<Option<CheckpointRecord>> {
1038        let conn = self.conn()?;
1039        if let Some(checkpoint_id) = checkpoint_id {
1040            let row = conn
1041                .query_row(
1042                    "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1043                    params![thread_id, checkpoint_id],
1044                    |row| {
1045                        let state_json: String = row.get(2)?;
1046                        let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1047                        Ok(CheckpointRecord {
1048                            thread_id: row.get(0)?,
1049                            checkpoint_id: row.get(1)?,
1050                            state,
1051                            created_at: row.get(3)?,
1052                        })
1053                    },
1054                )
1055                .optional()
1056                .with_context(|| {
1057                    format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1058                })?;
1059            return Ok(row);
1060        }
1061
1062        conn.query_row(
1063            "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1064            params![thread_id],
1065            |row| {
1066                let state_json: String = row.get(2)?;
1067                let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1068                Ok(CheckpointRecord {
1069                    thread_id: row.get(0)?,
1070                    checkpoint_id: row.get(1)?,
1071                    state,
1072                    created_at: row.get(3)?,
1073                })
1074            },
1075        )
1076        .optional()
1077        .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1078    }
1079
1080    /// List checkpoints for a thread, ordered by creation time (newest first).
1081    ///
1082    /// The `limit` parameter caps the number of results and defaults to 100.
1083    pub fn list_checkpoints(
1084        &self,
1085        thread_id: &str,
1086        limit: Option<usize>,
1087    ) -> Result<Vec<CheckpointRecord>> {
1088        let conn = self.conn()?;
1089        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1090        let mut stmt = conn
1091            .prepare(
1092                "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1093            )
1094            .context("failed to prepare checkpoint list query")?;
1095        let mut rows = stmt
1096            .query(params![thread_id, limit])
1097            .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1098
1099        let mut out = Vec::new();
1100        while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1101            let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1102            let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1103            out.push(CheckpointRecord {
1104                thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1105                checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1106                state,
1107                created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1108            });
1109        }
1110        Ok(out)
1111    }
1112
1113    /// Delete a specific checkpoint from a thread.
1114    pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1115        let conn = self.conn()?;
1116        conn.execute(
1117            "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1118            params![thread_id, checkpoint_id],
1119        )
1120        .with_context(|| {
1121            format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1122        })?;
1123        Ok(())
1124    }
1125
1126    /// Insert or update a background job record.
1127    pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1128        let conn = self.conn()?;
1129        conn.execute(
1130            r#"
1131            INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1132            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1133            ON CONFLICT(id) DO UPDATE SET
1134                name = excluded.name,
1135                status = excluded.status,
1136                progress = excluded.progress,
1137                detail = excluded.detail,
1138                created_at = excluded.created_at,
1139                updated_at = excluded.updated_at
1140            "#,
1141            params![
1142                job.id,
1143                job.name,
1144                job_state_status_to_str(&job.status),
1145                job.progress.map(i64::from),
1146                job.detail,
1147                job.created_at,
1148                job.updated_at
1149            ],
1150        )
1151        .with_context(|| format!("failed to upsert job {}", job.id))?;
1152        Ok(())
1153    }
1154
1155    /// Retrieve a single job by its ID.
1156    ///
1157    /// Returns `None` if no job with the given ID exists.
1158    pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1159        let conn = self.conn()?;
1160        conn.query_row(
1161            "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1162            params![id],
1163            |row| {
1164                let status_raw: String = row.get(2)?;
1165                let progress: Option<i64> = row.get(3)?;
1166                Ok(JobStateRecord {
1167                    id: row.get(0)?,
1168                    name: row.get(1)?,
1169                    status: job_state_status_from_str(&status_raw),
1170                    progress: progress.and_then(|v| u8::try_from(v).ok()),
1171                    detail: row.get(4)?,
1172                    created_at: row.get(5)?,
1173                    updated_at: row.get(6)?,
1174                })
1175            },
1176        )
1177        .optional()
1178        .with_context(|| format!("failed to read job {id}"))
1179    }
1180
1181    /// List jobs ordered by most recently updated.
1182    ///
1183    /// The `limit` parameter caps the number of results and defaults to 100.
1184    pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1185        let conn = self.conn()?;
1186        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1187        let mut stmt = conn
1188            .prepare(
1189                "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1190            )
1191            .context("failed to prepare job list query")?;
1192        let mut rows = stmt
1193            .query(params![limit])
1194            .context("failed to query persisted jobs")?;
1195        let mut out = Vec::new();
1196        while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1197            let status_raw: String = row.get(2).context("failed to read job status")?;
1198            let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1199            out.push(JobStateRecord {
1200                id: row.get(0).context("failed to read job id")?,
1201                name: row.get(1).context("failed to read job name")?,
1202                status: job_state_status_from_str(&status_raw),
1203                progress: progress.and_then(|v| u8::try_from(v).ok()),
1204                detail: row.get(4).context("failed to read job detail")?,
1205                created_at: row.get(5).context("failed to read job created_at")?,
1206                updated_at: row.get(6).context("failed to read job updated_at")?,
1207            });
1208        }
1209        Ok(out)
1210    }
1211
1212    /// Permanently delete a job record.
1213    pub fn delete_job(&self, id: &str) -> Result<()> {
1214        let conn = self.conn()?;
1215        conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1216            .with_context(|| format!("failed to delete job {id}"))?;
1217        Ok(())
1218    }
1219
1220    /// Look up the rollout file path for a thread by its ID.
1221    pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1222        let conn = self.conn()?;
1223        conn.query_row(
1224            "SELECT rollout_path FROM threads WHERE id = ?1",
1225            params![id],
1226            |row| row.get::<_, Option<String>>(0),
1227        )
1228        .optional()
1229        .context("failed to lookup rollout path")
1230        .map(|opt| opt.flatten().map(PathBuf::from))
1231    }
1232
1233    /// Append an entry to the JSONL session index file.
1234    ///
1235    /// The session index is an append-only log that maps thread IDs to their names,
1236    /// update timestamps, and rollout paths. It is used for fast name-based lookups
1237    /// without opening the SQLite database.
1238    pub fn append_thread_name(
1239        &self,
1240        thread_id: &str,
1241        thread_name: Option<String>,
1242        updated_at: i64,
1243        rollout_path: Option<PathBuf>,
1244    ) -> Result<()> {
1245        if let Some(parent) = self.session_index_path.parent() {
1246            fs::create_dir_all(parent).with_context(|| {
1247                format!(
1248                    "failed to create session index directory {}",
1249                    parent.display()
1250                )
1251            })?;
1252        }
1253        let entry = SessionIndexEntry {
1254            thread_id: thread_id.to_string(),
1255            thread_name,
1256            updated_at,
1257            rollout_path,
1258        };
1259        let encoded =
1260            serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1261        let mut file = OpenOptions::new()
1262            .create(true)
1263            .append(true)
1264            .open(&self.session_index_path)
1265            .with_context(|| {
1266                format!(
1267                    "failed to open session index {}",
1268                    self.session_index_path.display()
1269                )
1270            })?;
1271        writeln!(file, "{encoded}").context("failed to append session index entry")?;
1272        Ok(())
1273    }
1274
1275    /// Find the display name for a thread by its ID, using the session index.
1276    ///
1277    /// Returns `None` if the thread is not in the index or has no name.
1278    pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1279        let map = self.session_index_map()?;
1280        Ok(map
1281            .get(thread_id)
1282            .and_then(|entry| entry.thread_name.clone()))
1283    }
1284
1285    /// Look up display names for multiple thread IDs at once.
1286    ///
1287    /// Returns a map from thread ID to its name (which may be `None`).
1288    pub fn find_thread_names_by_ids(
1289        &self,
1290        ids: &[String],
1291    ) -> Result<HashMap<String, Option<String>>> {
1292        let map = self.session_index_map()?;
1293        let mut out = HashMap::new();
1294        for id in ids {
1295            let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1296            out.insert(id.clone(), name);
1297        }
1298        Ok(out)
1299    }
1300
1301    /// Find the rollout path for a thread by its display name (case-insensitive).
1302    ///
1303    /// If multiple threads share the same name, the most recently updated one is returned.
1304    /// Returns `None` if no matching thread is found.
1305    pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1306        let map = self.session_index_map()?;
1307        let matched = map
1308            .values()
1309            .filter(|entry| {
1310                entry
1311                    .thread_name
1312                    .as_deref()
1313                    .is_some_and(|n| n.eq_ignore_ascii_case(name))
1314            })
1315            .max_by_key(|entry| entry.updated_at);
1316        Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1317    }
1318
1319    fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1320        if !self.session_index_path.exists() {
1321            return Ok(HashMap::new());
1322        }
1323        let file = OpenOptions::new()
1324            .read(true)
1325            .open(&self.session_index_path)
1326            .with_context(|| {
1327                format!(
1328                    "failed to read session index {}",
1329                    self.session_index_path.display()
1330                )
1331            })?;
1332        let reader = BufReader::new(file);
1333        let mut latest = HashMap::<String, SessionIndexEntry>::new();
1334        for line in reader.lines() {
1335            let line = line.context("failed to read session index line")?;
1336            if line.trim().is_empty() {
1337                continue;
1338            }
1339            let parsed: SessionIndexEntry =
1340                serde_json::from_str(&line).context("failed to parse session index entry")?;
1341            latest.insert(parsed.thread_id.clone(), parsed);
1342        }
1343        Ok(latest)
1344    }
1345}
1346
1347fn default_state_db_path() -> PathBuf {
1348    let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
1349    // Prefer the CodeWhale directory, falling back to legacy DeepSeek path
1350    // so existing installs don't lose their session history.
1351    let primary = home.join(".codewhale").join("state.db");
1352    if primary.exists() || !home.join(".deepseek").join("state.db").exists() {
1353        primary
1354    } else {
1355        home.join(".deepseek").join("state.db")
1356    }
1357}
1358
1359fn bool_to_i64(value: bool) -> i64 {
1360    if value { 1 } else { 0 }
1361}
1362
1363fn i64_to_bool(value: i64) -> bool {
1364    value != 0
1365}
1366
1367fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1368    match status {
1369        ThreadStatus::Running => "running",
1370        ThreadStatus::Idle => "idle",
1371        ThreadStatus::Completed => "completed",
1372        ThreadStatus::Failed => "failed",
1373        ThreadStatus::Paused => "paused",
1374        ThreadStatus::Archived => "archived",
1375    }
1376}
1377
1378fn thread_status_from_str(value: &str) -> ThreadStatus {
1379    match value {
1380        "running" => ThreadStatus::Running,
1381        "idle" => ThreadStatus::Idle,
1382        "completed" => ThreadStatus::Completed,
1383        "failed" => ThreadStatus::Failed,
1384        "paused" => ThreadStatus::Paused,
1385        "archived" => ThreadStatus::Archived,
1386        _ => ThreadStatus::Idle,
1387    }
1388}
1389
1390fn session_source_to_str(source: &SessionSource) -> &'static str {
1391    match source {
1392        SessionSource::Interactive => "interactive",
1393        SessionSource::Resume => "resume",
1394        SessionSource::Fork => "fork",
1395        SessionSource::Api => "api",
1396        SessionSource::Unknown => "unknown",
1397    }
1398}
1399
1400fn session_source_from_str(value: &str) -> SessionSource {
1401    match value {
1402        "interactive" => SessionSource::Interactive,
1403        "resume" => SessionSource::Resume,
1404        "fork" => SessionSource::Fork,
1405        "api" => SessionSource::Api,
1406        _ => SessionSource::Unknown,
1407    }
1408}
1409
1410fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1411    path.map(|p| p.display().to_string())
1412}
1413
1414fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1415    match status {
1416        JobStateStatus::Queued => "queued",
1417        JobStateStatus::Running => "running",
1418        JobStateStatus::Completed => "completed",
1419        JobStateStatus::Failed => "failed",
1420        JobStateStatus::Cancelled => "cancelled",
1421    }
1422}
1423
1424fn job_state_status_from_str(value: &str) -> JobStateStatus {
1425    match value {
1426        "queued" => JobStateStatus::Queued,
1427        "running" => JobStateStatus::Running,
1428        "completed" => JobStateStatus::Completed,
1429        "failed" => JobStateStatus::Failed,
1430        "cancelled" => JobStateStatus::Cancelled,
1431        _ => JobStateStatus::Queued,
1432    }
1433}
1434
1435fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1436    let status_raw: String = row.get(7)?;
1437    let source_raw: String = row.get(11)?;
1438    let rollout_path: Option<String> = row.get(1)?;
1439    let path: Option<String> = row.get(8)?;
1440    Ok(ThreadMetadata {
1441        id: row.get(0)?,
1442        rollout_path: rollout_path.map(PathBuf::from),
1443        preview: row.get(2)?,
1444        ephemeral: i64_to_bool(row.get(3)?),
1445        model_provider: row.get(4)?,
1446        created_at: row.get(5)?,
1447        updated_at: row.get(6)?,
1448        status: thread_status_from_str(&status_raw),
1449        path: path.map(PathBuf::from),
1450        cwd: PathBuf::from(row.get::<_, String>(9)?),
1451        cli_version: row.get(10)?,
1452        source: session_source_from_str(&source_raw),
1453        name: row.get(12)?,
1454        sandbox_policy: row.get(13)?,
1455        approval_mode: row.get(14)?,
1456        archived: i64_to_bool(row.get(15)?),
1457        archived_at: row.get(16)?,
1458        git_sha: row.get(17)?,
1459        git_branch: row.get(18)?,
1460        git_origin_url: row.get(19)?,
1461        memory_mode: row.get(20)?,
1462        current_leaf_id: row.get(21)?,
1463    })
1464}