Skip to main content

codewhale_state/
lib.rs

1//! Persistent state management for conversation threads, messages, and jobs.
2//!
3//! The [`StateStore`] is the primary entry point, backed by a SQLite database and an
4//! append-only JSONL session index file. It provides CRUD operations for:
5//!
6//! - **Threads** — conversation metadata, archival, and session indexing.
7//! - **Messages** — append-only message storage with tree-structured branching.
8//! - **Checkpoints** — named state snapshots for restoring conversation progress.
9//! - **Jobs** — background task tracking with status and progress.
10//! - **Dynamic tools** — per-thread tool registrations.
11
12use std::collections::HashMap;
13use std::fs::{self, OpenOptions};
14use std::io::{BufRead, BufReader, Write};
15use std::path::{Path, PathBuf};
16
17use anyhow::{Context, Result};
18use chrono::Utc;
19use rusqlite::{Connection, OptionalExtension, params};
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22
23/// Lifecycle status of a conversation thread.
24///
25/// Serialized as lowercase snake_case strings (e.g. `"running"`, `"archived"`).
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(rename_all = "snake_case")]
28pub enum ThreadStatus {
29    /// Thread is actively being worked on.
30    Running,
31    /// Thread exists but has no active work in progress.
32    Idle,
33    /// Thread has finished its task successfully.
34    Completed,
35    /// Thread encountered an unrecoverable error.
36    Failed,
37    /// Thread has been temporarily paused by the user.
38    Paused,
39    /// Thread has been archived and is hidden from default listings.
40    Archived,
41}
42
43/// Indicates how a session was initiated.
44///
45/// Serialized as lowercase snake_case strings.
46#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
47#[serde(rename_all = "snake_case")]
48pub enum SessionSource {
49    /// Started by a user interacting with the CLI.
50    Interactive,
51    /// Resumed from a previously persisted session.
52    Resume,
53    /// Created by forking an existing conversation at a specific message.
54    Fork,
55    /// Initiated programmatically via the API.
56    Api,
57    /// Source is unknown or unspecified.
58    Unknown,
59}
60
61/// Metadata for a persisted conversation thread.
62///
63/// Each thread represents a single conversation session and stores its
64/// configuration, git context, and current status.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct ThreadMetadata {
67    /// Unique identifier for this thread.
68    pub id: String,
69    /// Optional filesystem path to the rollout (JSONL transcript) file.
70    pub rollout_path: Option<PathBuf>,
71    /// Short preview or summary of the thread content.
72    pub preview: String,
73    /// Whether this thread is ephemeral (not persisted long-term).
74    pub ephemeral: bool,
75    /// Identifier of the model provider used for this thread (e.g. `"openai"`).
76    pub model_provider: String,
77    /// Unix timestamp (seconds) when the thread was created.
78    pub created_at: i64,
79    /// Unix timestamp (seconds) of the most recent update to the thread.
80    pub updated_at: i64,
81    /// Current lifecycle status of the thread.
82    pub status: ThreadStatus,
83    /// Optional filesystem path associated with the thread working context.
84    pub path: Option<PathBuf>,
85    /// Working directory that was active when the thread was created.
86    pub cwd: PathBuf,
87    /// Version of the CLI that created this thread.
88    pub cli_version: String,
89    /// How this session was initiated.
90    pub source: SessionSource,
91    /// User-assigned display name for the thread.
92    pub name: Option<String>,
93    /// Serialized sandbox policy applied to this thread, if any.
94    pub sandbox_policy: Option<String>,
95    /// Approval mode configured for tool calls in this thread.
96    pub approval_mode: Option<String>,
97    /// Whether the thread has been archived.
98    pub archived: bool,
99    /// Unix timestamp (seconds) when the thread was archived, or `None` if not archived.
100    pub archived_at: Option<i64>,
101    /// Git commit SHA of the working tree when the thread was created.
102    pub git_sha: Option<String>,
103    /// Git branch checked out when the thread was created.
104    pub git_branch: Option<String>,
105    /// URL of the git remote origin, if available.
106    pub git_origin_url: Option<String>,
107    /// Memory mode configured for this thread (e.g. `"local"`, `"remote"`).
108    pub memory_mode: Option<String>,
109    /// ID of the current leaf message in the conversation tree.
110    pub current_leaf_id: Option<i64>,
111}
112
113/// A dynamically registered tool associated with a thread.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct DynamicToolRecord {
116    /// Ordinal position of this tool in the thread tool list.
117    pub position: i64,
118    /// Unique name identifying the tool.
119    pub name: String,
120    /// Human-readable description of what the tool does.
121    pub description: Option<String>,
122    /// JSON Schema describing the tool input parameters.
123    pub input_schema: Value,
124}
125
126/// A single message entry in a conversation thread.
127///
128/// Messages form a tree structure via [`parent_entry_id`](Self::parent_entry_id),
129/// enabling conversation branching and forking.
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct MessageRecord {
132    /// Auto-incremented unique identifier for this message.
133    pub id: i64,
134    /// ID of the thread this message belongs to.
135    pub thread_id: String,
136    /// Role of the message sender (e.g. `"user"`, `"assistant"`, `"system"`).
137    pub role: String,
138    /// Text content of the message.
139    pub content: String,
140    /// Optional structured item payload (tool calls, tool results, etc.).
141    pub item: Option<Value>,
142    /// Unix timestamp (seconds) when the message was created.
143    pub created_at: i64,
144    /// ID of the parent message, forming a tree structure. `None` for root messages.
145    pub parent_entry_id: Option<i64>,
146}
147
148/// A named checkpoint capturing the state of a thread at a point in time.
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct CheckpointRecord {
151    /// ID of the thread this checkpoint belongs to.
152    pub thread_id: String,
153    /// Unique identifier for this checkpoint within its thread.
154    pub checkpoint_id: String,
155    /// Serialized state snapshot stored as a JSON value.
156    pub state: Value,
157    /// Unix timestamp (seconds) when the checkpoint was created or last updated.
158    pub created_at: i64,
159}
160
161/// Status of a background job.
162///
163/// Serialized as lowercase snake_case strings.
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
165#[serde(rename_all = "snake_case")]
166pub enum JobStateStatus {
167    /// Job is waiting to be executed.
168    Queued,
169    /// Job is currently executing.
170    Running,
171    /// Job has finished successfully.
172    Completed,
173    /// Job has failed with an error.
174    Failed,
175    /// Job was cancelled before completion.
176    Cancelled,
177}
178
179/// Persisted state of a background job.
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct JobStateRecord {
182    /// Unique identifier for the job.
183    pub id: String,
184    /// Human-readable name describing the job.
185    pub name: String,
186    /// Current lifecycle status of the job.
187    pub status: JobStateStatus,
188    /// Completion progress as a percentage (0--100), if available.
189    pub progress: Option<u8>,
190    /// Optional detail message providing additional status information.
191    pub detail: Option<String>,
192    /// Unix timestamp (seconds) when the job was created.
193    pub created_at: i64,
194    /// Unix timestamp (seconds) of the most recent status update.
195    pub updated_at: i64,
196}
197
198/// Filters for listing conversation threads.
199#[derive(Debug, Clone)]
200pub struct ThreadListFilters {
201    /// Whether to include archived threads in the results.
202    pub include_archived: bool,
203    /// Maximum number of threads to return. Defaults to 50.
204    pub limit: Option<usize>,
205}
206
207impl Default for ThreadListFilters {
208    fn default() -> Self {
209        Self {
210            include_archived: false,
211            limit: Some(50),
212        }
213    }
214}
215
216#[derive(Debug, Clone, Serialize, Deserialize)]
217struct SessionIndexEntry {
218    thread_id: String,
219    thread_name: Option<String>,
220    updated_at: i64,
221    rollout_path: Option<PathBuf>,
222}
223
224/// Persistent storage for conversation threads, messages, checkpoints, and jobs.
225///
226/// Backed by a SQLite database and an append-only JSONL session index file.
227/// The database schema is automatically initialized and migrated on [`open`](Self::open).
228#[derive(Debug, Clone)]
229pub struct StateStore {
230    db_path: PathBuf,
231    session_index_path: PathBuf,
232}
233
234impl StateStore {
235    /// Open (or create) a state store at the given database path.
236    ///
237    /// If `path` is `None`, the default location (`~/.deepseek/state.db`) is used.
238    /// The database schema is created automatically if it does not exist.
239    pub fn open(path: Option<PathBuf>) -> Result<Self> {
240        let db_path = path.unwrap_or_else(default_state_db_path);
241        let session_index_path = db_path
242            .parent()
243            .unwrap_or_else(|| Path::new("."))
244            .join("session_index.jsonl");
245        if let Some(parent) = db_path.parent() {
246            fs::create_dir_all(parent).with_context(|| {
247                format!("failed to create state directory {}", parent.display())
248            })?;
249        }
250        let store = Self {
251            db_path,
252            session_index_path,
253        };
254        store.init_schema()?;
255        Ok(store)
256    }
257
258    /// Returns the filesystem path of the underlying SQLite database.
259    pub fn db_path(&self) -> &Path {
260        &self.db_path
261    }
262
263    fn conn(&self) -> Result<Connection> {
264        Connection::open(&self.db_path)
265            .with_context(|| format!("failed to open state db {}", self.db_path.display()))
266    }
267
268    fn init_schema(&self) -> Result<()> {
269        let conn = self.conn()?;
270        let mut user_version: u32 = conn.query_row("PRAGMA user_version;", [], |row| row.get(0))?;
271        if user_version == 0 {
272            conn.execute_batch(
273                r#"
274                BEGIN;
275                CREATE TABLE IF NOT EXISTS threads (
276                    id TEXT PRIMARY KEY,
277                    rollout_path TEXT,
278                    preview TEXT NOT NULL,
279                    ephemeral INTEGER NOT NULL,
280                    model_provider TEXT NOT NULL,
281                    created_at INTEGER NOT NULL,
282                    updated_at INTEGER NOT NULL,
283                    status TEXT NOT NULL,
284                    path TEXT,
285                    cwd TEXT NOT NULL,
286                    cli_version TEXT NOT NULL,
287                    source TEXT NOT NULL,
288                    title TEXT,
289                    sandbox_policy TEXT,
290                    approval_mode TEXT,
291                    archived INTEGER NOT NULL DEFAULT 0,
292                    archived_at INTEGER,
293                    git_sha TEXT,
294                    git_branch TEXT,
295                    git_origin_url TEXT,
296                    memory_mode TEXT
297                );
298                CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
299                CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
300                CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
301
302                CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
303                    thread_id TEXT NOT NULL,
304                    position INTEGER NOT NULL,
305                    name TEXT NOT NULL,
306                    description TEXT,
307                    input_schema TEXT NOT NULL,
308                    PRIMARY KEY (thread_id, position),
309                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
310                );
311
312                CREATE TABLE IF NOT EXISTS messages (
313                    id INTEGER PRIMARY KEY AUTOINCREMENT,
314                    thread_id TEXT NOT NULL,
315                    role TEXT NOT NULL,
316                    content TEXT NOT NULL,
317                    item_json TEXT,
318                    created_at INTEGER NOT NULL,
319                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
320                );
321                CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
322
323                CREATE TABLE IF NOT EXISTS checkpoints (
324                    thread_id TEXT NOT NULL,
325                    checkpoint_id TEXT NOT NULL,
326                    state_json TEXT NOT NULL,
327                    created_at INTEGER NOT NULL,
328                    PRIMARY KEY(thread_id, checkpoint_id),
329                    FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
330                );
331                CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
332
333                CREATE TABLE IF NOT EXISTS jobs (
334                    id TEXT PRIMARY KEY,
335                    name TEXT NOT NULL,
336                    status TEXT NOT NULL,
337                    progress INTEGER,
338                    detail TEXT,
339                    created_at INTEGER NOT NULL,
340                    updated_at INTEGER NOT NULL
341                );
342                CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
343
344                -- Add parent_entry_id column, and set to last message before current message
345                ALTER TABLE messages ADD COLUMN parent_entry_id INTEGER NULL;
346                UPDATE messages
347                    SET parent_entry_id = (
348                        SELECT m2.id
349                        FROM messages m2
350                        WHERE m2.thread_id = messages.thread_id
351                            AND (
352                                m2.created_at < messages.created_at
353                                OR (
354                                    m2.created_at = messages.created_at
355                                    AND m2.id < messages.id
356                                )
357                            )
358                        ORDER BY m2.created_at DESC, m2.id DESC
359                        LIMIT 1
360                    );
361                CREATE INDEX idx_messages_parent_entry_id ON messages(parent_entry_id);
362
363                -- Add current_leaf_id column, and set to last message in thread
364                ALTER TABLE threads ADD COLUMN current_leaf_id INTEGER NULL;
365                UPDATE threads
366                    SET current_leaf_id = (
367                        SELECT m.id
368                        FROM messages m
369                        WHERE m.thread_id = threads.id
370                        ORDER BY m.id DESC
371                        LIMIT 1
372                    );
373
374                PRAGMA user_version = 1;
375                COMMIT;
376                "#,
377            )
378            .context("failed to initialize thread schema")?;
379            user_version = 1;
380        }
381        if user_version < 2 {
382            conn.execute_batch(
383                r#"
384                BEGIN;
385                CREATE TABLE IF NOT EXISTS workflow_runs (
386                    id TEXT PRIMARY KEY,
387                    workflow_id TEXT NOT NULL,
388                    goal TEXT NOT NULL,
389                    status TEXT NOT NULL,
390                    input_hash TEXT,
391                    started_at INTEGER NOT NULL,
392                    completed_at INTEGER,
393                    metadata_json TEXT NOT NULL DEFAULT '{}'
394                );
395                CREATE INDEX IF NOT EXISTS idx_workflow_runs_status_started_at
396                    ON workflow_runs(status, started_at DESC);
397                CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_started_at
398                    ON workflow_runs(workflow_id, started_at DESC);
399
400                CREATE TABLE IF NOT EXISTS branch_runs (
401                    id TEXT PRIMARY KEY,
402                    workflow_run_id TEXT NOT NULL,
403                    branch_id TEXT NOT NULL,
404                    node_id TEXT NOT NULL,
405                    status TEXT NOT NULL,
406                    started_at INTEGER NOT NULL,
407                    completed_at INTEGER,
408                    result_json TEXT NOT NULL DEFAULT '{}',
409                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
410                );
411                CREATE INDEX IF NOT EXISTS idx_branch_runs_workflow_run_id
412                    ON branch_runs(workflow_run_id);
413                CREATE INDEX IF NOT EXISTS idx_branch_runs_branch_id
414                    ON branch_runs(branch_id);
415
416                CREATE TABLE IF NOT EXISTS leaf_runs (
417                    id TEXT PRIMARY KEY,
418                    workflow_run_id TEXT NOT NULL,
419                    branch_run_id TEXT,
420                    leaf_id TEXT NOT NULL,
421                    task_id TEXT NOT NULL,
422                    input_hash TEXT,
423                    status TEXT NOT NULL,
424                    output_json TEXT NOT NULL DEFAULT '{}',
425                    artifacts_json TEXT NOT NULL DEFAULT '[]',
426                    started_at INTEGER NOT NULL,
427                    completed_at INTEGER,
428                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
429                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
430                );
431                CREATE INDEX IF NOT EXISTS idx_leaf_runs_workflow_run_id
432                    ON leaf_runs(workflow_run_id);
433                CREATE INDEX IF NOT EXISTS idx_leaf_runs_replay_lookup
434                    ON leaf_runs(workflow_run_id, leaf_id, input_hash);
435
436                CREATE TABLE IF NOT EXISTS control_node_runs (
437                    id TEXT PRIMARY KEY,
438                    workflow_run_id TEXT NOT NULL,
439                    node_id TEXT NOT NULL,
440                    kind TEXT NOT NULL,
441                    status TEXT NOT NULL,
442                    selected_children_json TEXT NOT NULL DEFAULT '[]',
443                    result_json TEXT NOT NULL DEFAULT '{}',
444                    started_at INTEGER NOT NULL,
445                    completed_at INTEGER,
446                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE
447                );
448                CREATE INDEX IF NOT EXISTS idx_control_node_runs_workflow_run_id
449                    ON control_node_runs(workflow_run_id);
450                CREATE INDEX IF NOT EXISTS idx_control_node_runs_node_id
451                    ON control_node_runs(node_id);
452
453                CREATE TABLE IF NOT EXISTS teacher_candidates (
454                    id TEXT PRIMARY KEY,
455                    workflow_run_id TEXT NOT NULL,
456                    control_node_run_id TEXT NOT NULL,
457                    candidate_id TEXT NOT NULL,
458                    branch_run_id TEXT,
459                    score REAL,
460                    passed INTEGER,
461                    rationale_json TEXT NOT NULL DEFAULT '{}',
462                    created_at INTEGER NOT NULL,
463                    FOREIGN KEY(workflow_run_id) REFERENCES workflow_runs(id) ON DELETE CASCADE,
464                    FOREIGN KEY(control_node_run_id) REFERENCES control_node_runs(id) ON DELETE CASCADE,
465                    FOREIGN KEY(branch_run_id) REFERENCES branch_runs(id) ON DELETE SET NULL
466                );
467                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_workflow_run_id
468                    ON teacher_candidates(workflow_run_id);
469                CREATE INDEX IF NOT EXISTS idx_teacher_candidates_control_node_run_id
470                    ON teacher_candidates(control_node_run_id);
471
472                PRAGMA user_version = 2;
473                COMMIT;
474                "#,
475            )
476            .context("failed to initialize workflow trace schema")?;
477        }
478        Ok(())
479    }
480
481    /// Insert or update thread metadata.
482    ///
483    /// This does **not** update `current_leaf_id`; use [`append_message`](Self::append_message)
484    /// or [`set_current_leaf_id`](Self::set_current_leaf_id) for that.
485    pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
486        let conn = self.conn()?;
487        conn.execute(
488            r#"
489            INSERT INTO threads (
490                id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
491                cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
492                git_sha, git_branch, git_origin_url, memory_mode
493            ) VALUES (
494                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
495                ?11, ?12, ?13, ?14, ?15, ?16, ?17,
496                ?18, ?19, ?20, ?21
497            )
498            ON CONFLICT(id) DO UPDATE SET
499                rollout_path=excluded.rollout_path,
500                preview=excluded.preview,
501                ephemeral=excluded.ephemeral,
502                model_provider=excluded.model_provider,
503                created_at=excluded.created_at,
504                updated_at=excluded.updated_at,
505                status=excluded.status,
506                path=excluded.path,
507                cwd=excluded.cwd,
508                cli_version=excluded.cli_version,
509                source=excluded.source,
510                title=excluded.title,
511                sandbox_policy=excluded.sandbox_policy,
512                approval_mode=excluded.approval_mode,
513                archived=excluded.archived,
514                archived_at=excluded.archived_at,
515                git_sha=excluded.git_sha,
516                git_branch=excluded.git_branch,
517                git_origin_url=excluded.git_origin_url,
518                memory_mode=excluded.memory_mode
519            "#,
520            params![
521                thread.id,
522                path_to_opt_string(thread.rollout_path.as_deref()),
523                thread.preview,
524                bool_to_i64(thread.ephemeral),
525                thread.model_provider,
526                thread.created_at,
527                thread.updated_at,
528                thread_status_to_str(&thread.status),
529                path_to_opt_string(thread.path.as_deref()),
530                thread.cwd.display().to_string(),
531                thread.cli_version,
532                session_source_to_str(&thread.source),
533                thread.name,
534                thread.sandbox_policy,
535                thread.approval_mode,
536                bool_to_i64(thread.archived),
537                thread.archived_at,
538                thread.git_sha,
539                thread.git_branch,
540                thread.git_origin_url,
541                thread.memory_mode,
542            ],
543        )
544        .context("failed to upsert thread metadata")?;
545
546        self.append_thread_name(
547            &thread.id,
548            thread.name.clone(),
549            thread.updated_at,
550            thread.rollout_path.clone(),
551        )?;
552        Ok(())
553    }
554
555    /// Retrieve a single thread by its ID.
556    ///
557    /// Returns `None` if no thread with the given ID exists.
558    pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
559        let conn = self.conn()?;
560        conn.query_row(
561            r#"
562            SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
563                   cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
564                   git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id
565            FROM threads
566            WHERE id = ?1
567            "#,
568            params![id],
569            row_to_thread,
570        )
571        .optional()
572        .context("failed to read thread")
573    }
574
575    /// List threads ordered by most recently updated.
576    ///
577    /// Use [`ThreadListFilters`] to control whether archived threads are included
578    /// and the maximum number of results returned.
579    pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
580        let conn = self.conn()?;
581        let sql = if filters.include_archived {
582            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads ORDER BY updated_at DESC LIMIT ?1"
583        } else {
584            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode, current_leaf_id FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
585        };
586
587        let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
588        let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
589        let mut rows = stmt
590            .query(params![limit])
591            .context("failed to query threads")?;
592        let mut out = Vec::new();
593        while let Some(row) = rows.next().context("failed to iterate thread rows")? {
594            out.push(row_to_thread(row)?);
595        }
596        Ok(out)
597    }
598
599    /// Archive a thread, setting its status to [`ThreadStatus::Archived`] and
600    /// recording the current timestamp.
601    pub fn mark_archived(&self, id: &str) -> Result<()> {
602        let conn = self.conn()?;
603        conn.execute(
604            "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
605            params![
606                id,
607                Utc::now().timestamp(),
608                thread_status_to_str(&ThreadStatus::Archived)
609            ],
610        )
611        .context("failed to archive thread")?;
612        Ok(())
613    }
614
615    /// Unarchive a thread, removing the archived flag and clearing `archived_at`.
616    pub fn mark_unarchived(&self, id: &str) -> Result<()> {
617        let conn = self.conn()?;
618        conn.execute(
619            "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
620            params![id],
621        )
622        .context("failed to unarchive thread")?;
623        Ok(())
624    }
625
626    /// Permanently delete a thread and all of its associated data
627    /// (messages, checkpoints, dynamic tools) via cascading foreign keys.
628    pub fn delete_thread(&self, id: &str) -> Result<()> {
629        let conn = self.conn()?;
630        conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
631            .context("failed to delete thread")?;
632        Ok(())
633    }
634
635    /// Set the memory mode for a thread.
636    ///
637    /// Pass `None` to clear the memory mode.
638    pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
639        let conn = self.conn()?;
640        conn.execute(
641            "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
642            params![id, mode],
643        )
644        .context("failed to update thread memory mode")?;
645        Ok(())
646    }
647
648    /// Get the memory mode configured for a thread.
649    ///
650    /// Returns `None` if the thread does not exist or has no memory mode set.
651    pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
652        let conn = self.conn()?;
653        conn.query_row(
654            "SELECT memory_mode FROM threads WHERE id = ?1",
655            params![id],
656            |row| row.get::<_, Option<String>>(0),
657        )
658        .optional()
659        .context("failed to read thread memory mode")
660        .map(Option::flatten)
661    }
662
663    /// List all leaf messages in a thread.
664    ///
665    /// A leaf message is one that has no other message referencing it as a parent.
666    /// In a branching conversation tree, there may be multiple leaf messages.
667    pub fn list_leaf_messages(&self, thread_id: &str) -> Result<Vec<MessageRecord>> {
668        let conn = self.conn()?;
669        let mut stmt = conn
670            .prepare(
671                r#"
672                SELECT m1.id, m1.thread_id, m1.role, m1.content, m1.item_json, m1.created_at, m1.parent_entry_id
673                FROM messages m1
674                LEFT JOIN messages m2 ON m1.id = m2.parent_entry_id
675                WHERE m1.thread_id = ?1 AND m2.id IS NULL
676                "#,
677            )
678            .context("failed to prepare message listing query")?;
679        let mut rows = stmt
680            .query(params![thread_id])
681            .with_context(|| format!("failed to list leaf messages for thread {thread_id}"))?;
682        let mut out = Vec::new();
683        while let Some(row) = rows.next().context("failed to iterate message rows")? {
684            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
685            let item = item_json
686                .as_deref()
687                .map(serde_json::from_str)
688                .transpose()
689                .with_context(|| {
690                    format!("failed to parse message item json in thread {thread_id}")
691                })?;
692            out.push(MessageRecord {
693                id: row.get(0).context("failed to read message id")?,
694                thread_id: row.get(1).context("failed to read message thread id")?,
695                role: row.get(2).context("failed to read message role")?,
696                content: row.get(3).context("failed to read message content")?,
697                item,
698                created_at: row.get(5).context("failed to read message timestamp")?,
699                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
700            });
701        }
702        Ok(out)
703    }
704
705    /// Update the current leaf message pointer for a thread.
706    ///
707    /// This controls which branch of the conversation tree is considered active
708    /// when listing messages via [`list_messages`](Self::list_messages).
709    pub fn set_current_leaf_id(&self, thread_id: &str, current_leaf_id: &str) -> Result<()> {
710        let conn = self.conn()?;
711        conn.execute(
712            "UPDATE threads SET current_leaf_id = ?1 WHERE id = ?2",
713            params![current_leaf_id, thread_id],
714        )
715        .context("failed to update thread current leaf id")?;
716        Ok(())
717    }
718
719    /// Replace the dynamic tools for a thread.
720    ///
721    /// All existing dynamic tools for the thread are deleted and replaced with the
722    /// provided list. The operation is performed within a transaction.
723    pub fn persist_dynamic_tools(
724        &self,
725        thread_id: &str,
726        tools: &[DynamicToolRecord],
727    ) -> Result<()> {
728        let mut conn = self.conn()?;
729        let tx = conn
730            .transaction()
731            .context("failed to begin dynamic tools transaction")?;
732        tx.execute(
733            "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
734            params![thread_id],
735        )
736        .context("failed to clear dynamic tools")?;
737        for tool in tools {
738            tx.execute(
739                "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
740                params![
741                    thread_id,
742                    tool.position,
743                    tool.name,
744                    tool.description,
745                    tool.input_schema.to_string()
746                ],
747            )
748            .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
749        }
750        tx.commit().context("failed to commit dynamic tools")?;
751        Ok(())
752    }
753
754    /// Retrieve all dynamic tools registered for a thread, ordered by position.
755    pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
756        let conn = self.conn()?;
757        let mut stmt = conn
758            .prepare(
759                "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
760            )
761            .context("failed to prepare get dynamic tools query")?;
762        let mut rows = stmt
763            .query(params![thread_id])
764            .context("failed to query dynamic tools")?;
765        let mut out = Vec::new();
766        while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
767            let input_schema_raw: String =
768                row.get(3).context("failed to read tool input schema")?;
769            let input_schema: Value =
770                serde_json::from_str(&input_schema_raw).with_context(|| {
771                    format!("failed to parse input schema for dynamic tool in thread {thread_id}")
772                })?;
773            out.push(DynamicToolRecord {
774                position: row.get(0).context("failed to read tool position")?,
775                name: row.get(1).context("failed to read tool name")?,
776                description: row.get(2).context("failed to read tool description")?,
777                input_schema,
778            });
779        }
780        Ok(out)
781    }
782
783    /// Append a new message to a thread.
784    ///
785    /// The message is linked to the thread's current leaf as its parent, and the
786    /// thread's `current_leaf_id` is updated to the new message. Returns the ID
787    /// of the newly created message.
788    pub fn append_message(
789        &self,
790        thread_id: &str,
791        role: &str,
792        content: &str,
793        item: Option<Value>,
794    ) -> Result<i64> {
795        let mut conn = self.conn()?;
796        let created_at = Utc::now().timestamp();
797        let item_json = item
798            .as_ref()
799            .map(serde_json::to_string)
800            .transpose()
801            .context("failed to serialize message item payload")?;
802
803        let tx = conn
804            .transaction()
805            .context("failed to begin append message transaction")?;
806
807        let current_leaf_id: Option<i64> = tx
808            .query_row(
809                "SELECT current_leaf_id FROM threads WHERE id = ?1",
810                params![thread_id],
811                |row| row.get(0),
812            )
813            .with_context(|| {
814                format!("failed to query thread current leaf id for thread {thread_id}")
815            })?;
816
817        let next_leaf_id: i64 = tx.query_row(
818            r#"
819                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
820                SELECT ?1, ?2, ?3, ?4, ?5, ?6
821                RETURNING id
822            "#, params![thread_id, role, content, item_json, created_at, current_leaf_id], |row| row.get(0)
823        ).with_context(|| format!("failed to append message for thread {thread_id}"))?;
824
825        tx.execute(
826            r#"
827            UPDATE threads
828            SET current_leaf_id = ?1
829            WHERE id = ?2;
830            "#,
831            params![next_leaf_id, thread_id],
832        )
833        .with_context(|| {
834            format!("failed to update thread current leaf id for thread {thread_id}")
835        })?;
836
837        tx.commit()
838            .context("failed to commit append message transaction")?;
839
840        Ok(next_leaf_id)
841    }
842
843    /// List messages in the current conversation branch, walking backwards from
844    /// the thread's `current_leaf_id`.
845    ///
846    /// Messages are returned in chronological order (oldest first). The `limit`
847    /// parameter caps how many ancestor messages are traversed; it defaults to 500.
848    pub fn list_messages(
849        &self,
850        thread_id: &str,
851        limit: Option<usize>,
852    ) -> Result<Vec<MessageRecord>> {
853        let conn = self.conn()?;
854        let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
855        let mut stmt = conn
856            .prepare(
857                r#"
858                WITH RECURSIVE
859                    leaf_id AS (
860                        SELECT current_leaf_id FROM threads WHERE id = ?1
861                    ),
862                    ancestors AS (
863                        SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id, 0 AS depth
864                        FROM messages
865                        WHERE id = (SELECT current_leaf_id FROM leaf_id)
866
867                        UNION ALL
868
869                        SELECT m.id, m.thread_id, m.role, m.content, m.item_json, m.created_at, m.parent_entry_id, a.depth + 1
870                        FROM messages m
871                        JOIN ancestors a ON m.id = a.parent_entry_id
872                        WHERE a.depth < ?2
873                    )
874                    SELECT id, thread_id, role, content, item_json, created_at, parent_entry_id FROM ancestors
875                    ORDER BY depth DESC
876                "#
877            )
878            .context("failed to prepare message listing query")?;
879        let mut rows = stmt
880            .query(params![thread_id, limit - 1])
881            .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
882        let mut out = Vec::new();
883        while let Some(row) = rows.next().context("failed to iterate message rows")? {
884            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
885            let item = item_json
886                .as_deref()
887                .map(serde_json::from_str)
888                .transpose()
889                .with_context(|| {
890                    format!("failed to parse message item json in thread {thread_id}")
891                })?;
892            out.push(MessageRecord {
893                id: row.get(0).context("failed to read message id")?,
894                thread_id: row.get(1).context("failed to read message thread id")?,
895                role: row.get(2).context("failed to read message role")?,
896                content: row.get(3).context("failed to read message content")?,
897                item,
898                created_at: row.get(5).context("failed to read message timestamp")?,
899                parent_entry_id: row.get(6).context("failed to read parent entry id")?,
900            });
901        }
902        Ok(out)
903    }
904
905    /// Fork the conversation at a specific message.
906    ///
907    /// Creates a new message whose parent is `message_id` and updates the thread's
908    /// `current_leaf_id` to the new message. Returns the ID of the new message.
909    /// This enables branching conversations from any point in the history.
910    pub fn fork_at_message(
911        &self,
912        message_id: &str,
913        role: &str,
914        content: &str,
915        item: Option<Value>,
916    ) -> Result<i64> {
917        let mut conn = self.conn()?;
918        let created_at = Utc::now().timestamp();
919        let item_json = item
920            .as_ref()
921            .map(serde_json::to_string)
922            .transpose()
923            .context("failed to serialize message item payload")?;
924
925        let tx = conn
926            .transaction()
927            .context("failed to begin fork message transaction")?;
928
929        let thread_id: String = tx
930            .query_row(
931                "SELECT thread_id FROM messages WHERE id = ?1",
932                params![message_id],
933                |row| row.get(0),
934            )
935            .with_context(|| format!("failed to query thread id for message {message_id}"))?;
936
937        let next_leaf_id: i64 = tx.query_row(
938            r#"
939                INSERT INTO messages(thread_id, role, content, item_json, created_at, parent_entry_id)
940                SELECT ?1, ?2, ?3, ?4, ?5, ?6
941                RETURNING id
942            "#, params![thread_id, role, content, item_json, created_at, message_id], |row| row.get(0)
943        ).with_context(|| format!("failed to fork at message for thread {:?}", thread_id))?;
944
945        tx.execute(
946            r#"
947            UPDATE threads
948            SET current_leaf_id = ?1
949            WHERE id = ?2;
950            "#,
951            params![next_leaf_id, thread_id],
952        )
953        .with_context(|| {
954            format!(
955                "failed to update thread current leaf id for thread {:?}",
956                thread_id
957            )
958        })?;
959
960        tx.commit()
961            .context("failed to commit fork message transaction")?;
962
963        Ok(next_leaf_id)
964    }
965
966    /// Delete all messages belonging to a thread and reset its `current_leaf_id`.
967    ///
968    /// Returns the number of messages deleted.
969    pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
970        let mut conn = self.conn()?;
971        let tx = conn
972            .transaction()
973            .context("failed to begin clear messages transaction")?;
974
975        tx.execute(
976            r#"
977            UPDATE threads
978            SET current_leaf_id = NULL
979            WHERE id = ?1;
980            "#,
981            params![thread_id],
982        )
983        .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
984        let result = tx
985            .execute(
986                r#"
987                DELETE FROM messages WHERE thread_id = ?1
988                "#,
989                params![thread_id],
990            )
991            .with_context(|| format!("failed to clear messages for thread {thread_id}"))?;
992        tx.commit()
993            .context("failed to commit clear messages transaction")?;
994
995        Ok(result)
996    }
997
998    /// Save (or update) a named checkpoint for a thread.
999    ///
1000    /// If a checkpoint with the same `thread_id` and `checkpoint_id` already exists,
1001    /// its state and timestamp are overwritten.
1002    pub fn save_checkpoint(
1003        &self,
1004        thread_id: &str,
1005        checkpoint_id: &str,
1006        state: &Value,
1007    ) -> Result<()> {
1008        let conn = self.conn()?;
1009        let state_json =
1010            serde_json::to_string(state).context("failed to encode checkpoint state")?;
1011        conn.execute(
1012            r#"
1013            INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
1014            VALUES (?1, ?2, ?3, ?4)
1015            ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
1016                state_json = excluded.state_json,
1017                created_at = excluded.created_at
1018            "#,
1019            params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
1020        )
1021        .with_context(|| {
1022            format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
1023        })?;
1024        Ok(())
1025    }
1026
1027    /// Load a checkpoint for a thread.
1028    ///
1029    /// If `checkpoint_id` is provided, loads that specific checkpoint. Otherwise,
1030    /// loads the most recently created checkpoint for the thread. Returns `None`
1031    /// if no matching checkpoint exists.
1032    pub fn load_checkpoint(
1033        &self,
1034        thread_id: &str,
1035        checkpoint_id: Option<&str>,
1036    ) -> Result<Option<CheckpointRecord>> {
1037        let conn = self.conn()?;
1038        if let Some(checkpoint_id) = checkpoint_id {
1039            let row = conn
1040                .query_row(
1041                    "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1042                    params![thread_id, checkpoint_id],
1043                    |row| {
1044                        let state_json: String = row.get(2)?;
1045                        let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1046                        Ok(CheckpointRecord {
1047                            thread_id: row.get(0)?,
1048                            checkpoint_id: row.get(1)?,
1049                            state,
1050                            created_at: row.get(3)?,
1051                        })
1052                    },
1053                )
1054                .optional()
1055                .with_context(|| {
1056                    format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
1057                })?;
1058            return Ok(row);
1059        }
1060
1061        conn.query_row(
1062            "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
1063            params![thread_id],
1064            |row| {
1065                let state_json: String = row.get(2)?;
1066                let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1067                Ok(CheckpointRecord {
1068                    thread_id: row.get(0)?,
1069                    checkpoint_id: row.get(1)?,
1070                    state,
1071                    created_at: row.get(3)?,
1072                })
1073            },
1074        )
1075        .optional()
1076        .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
1077    }
1078
1079    /// List checkpoints for a thread, ordered by creation time (newest first).
1080    ///
1081    /// The `limit` parameter caps the number of results and defaults to 100.
1082    pub fn list_checkpoints(
1083        &self,
1084        thread_id: &str,
1085        limit: Option<usize>,
1086    ) -> Result<Vec<CheckpointRecord>> {
1087        let conn = self.conn()?;
1088        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1089        let mut stmt = conn
1090            .prepare(
1091                "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
1092            )
1093            .context("failed to prepare checkpoint list query")?;
1094        let mut rows = stmt
1095            .query(params![thread_id, limit])
1096            .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
1097
1098        let mut out = Vec::new();
1099        while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
1100            let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
1101            let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
1102            out.push(CheckpointRecord {
1103                thread_id: row.get(0).context("failed to read checkpoint thread id")?,
1104                checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
1105                state,
1106                created_at: row.get(3).context("failed to read checkpoint timestamp")?,
1107            });
1108        }
1109        Ok(out)
1110    }
1111
1112    /// Delete a specific checkpoint from a thread.
1113    pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
1114        let conn = self.conn()?;
1115        conn.execute(
1116            "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
1117            params![thread_id, checkpoint_id],
1118        )
1119        .with_context(|| {
1120            format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
1121        })?;
1122        Ok(())
1123    }
1124
1125    /// Insert or update a background job record.
1126    pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
1127        let conn = self.conn()?;
1128        conn.execute(
1129            r#"
1130            INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
1131            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
1132            ON CONFLICT(id) DO UPDATE SET
1133                name = excluded.name,
1134                status = excluded.status,
1135                progress = excluded.progress,
1136                detail = excluded.detail,
1137                created_at = excluded.created_at,
1138                updated_at = excluded.updated_at
1139            "#,
1140            params![
1141                job.id,
1142                job.name,
1143                job_state_status_to_str(&job.status),
1144                job.progress.map(i64::from),
1145                job.detail,
1146                job.created_at,
1147                job.updated_at
1148            ],
1149        )
1150        .with_context(|| format!("failed to upsert job {}", job.id))?;
1151        Ok(())
1152    }
1153
1154    /// Retrieve a single job by its ID.
1155    ///
1156    /// Returns `None` if no job with the given ID exists.
1157    pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
1158        let conn = self.conn()?;
1159        conn.query_row(
1160            "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
1161            params![id],
1162            |row| {
1163                let status_raw: String = row.get(2)?;
1164                let progress: Option<i64> = row.get(3)?;
1165                Ok(JobStateRecord {
1166                    id: row.get(0)?,
1167                    name: row.get(1)?,
1168                    status: job_state_status_from_str(&status_raw),
1169                    progress: progress.and_then(|v| u8::try_from(v).ok()),
1170                    detail: row.get(4)?,
1171                    created_at: row.get(5)?,
1172                    updated_at: row.get(6)?,
1173                })
1174            },
1175        )
1176        .optional()
1177        .with_context(|| format!("failed to read job {id}"))
1178    }
1179
1180    /// List jobs ordered by most recently updated.
1181    ///
1182    /// The `limit` parameter caps the number of results and defaults to 100.
1183    pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
1184        let conn = self.conn()?;
1185        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
1186        let mut stmt = conn
1187            .prepare(
1188                "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
1189            )
1190            .context("failed to prepare job list query")?;
1191        let mut rows = stmt
1192            .query(params![limit])
1193            .context("failed to query persisted jobs")?;
1194        let mut out = Vec::new();
1195        while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
1196            let status_raw: String = row.get(2).context("failed to read job status")?;
1197            let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
1198            out.push(JobStateRecord {
1199                id: row.get(0).context("failed to read job id")?,
1200                name: row.get(1).context("failed to read job name")?,
1201                status: job_state_status_from_str(&status_raw),
1202                progress: progress.and_then(|v| u8::try_from(v).ok()),
1203                detail: row.get(4).context("failed to read job detail")?,
1204                created_at: row.get(5).context("failed to read job created_at")?,
1205                updated_at: row.get(6).context("failed to read job updated_at")?,
1206            });
1207        }
1208        Ok(out)
1209    }
1210
1211    /// Permanently delete a job record.
1212    pub fn delete_job(&self, id: &str) -> Result<()> {
1213        let conn = self.conn()?;
1214        conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
1215            .with_context(|| format!("failed to delete job {id}"))?;
1216        Ok(())
1217    }
1218
1219    /// Look up the rollout file path for a thread by its ID.
1220    pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
1221        let conn = self.conn()?;
1222        conn.query_row(
1223            "SELECT rollout_path FROM threads WHERE id = ?1",
1224            params![id],
1225            |row| row.get::<_, Option<String>>(0),
1226        )
1227        .optional()
1228        .context("failed to lookup rollout path")
1229        .map(|opt| opt.flatten().map(PathBuf::from))
1230    }
1231
1232    /// Append an entry to the JSONL session index file.
1233    ///
1234    /// The session index is an append-only log that maps thread IDs to their names,
1235    /// update timestamps, and rollout paths. It is used for fast name-based lookups
1236    /// without opening the SQLite database.
1237    pub fn append_thread_name(
1238        &self,
1239        thread_id: &str,
1240        thread_name: Option<String>,
1241        updated_at: i64,
1242        rollout_path: Option<PathBuf>,
1243    ) -> Result<()> {
1244        if let Some(parent) = self.session_index_path.parent() {
1245            fs::create_dir_all(parent).with_context(|| {
1246                format!(
1247                    "failed to create session index directory {}",
1248                    parent.display()
1249                )
1250            })?;
1251        }
1252        let entry = SessionIndexEntry {
1253            thread_id: thread_id.to_string(),
1254            thread_name,
1255            updated_at,
1256            rollout_path,
1257        };
1258        let encoded =
1259            serde_json::to_string(&entry).context("failed to serialize session index entry")?;
1260        let mut file = OpenOptions::new()
1261            .create(true)
1262            .append(true)
1263            .open(&self.session_index_path)
1264            .with_context(|| {
1265                format!(
1266                    "failed to open session index {}",
1267                    self.session_index_path.display()
1268                )
1269            })?;
1270        writeln!(file, "{encoded}").context("failed to append session index entry")?;
1271        Ok(())
1272    }
1273
1274    /// Find the display name for a thread by its ID, using the session index.
1275    ///
1276    /// Returns `None` if the thread is not in the index or has no name.
1277    pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
1278        let map = self.session_index_map()?;
1279        Ok(map
1280            .get(thread_id)
1281            .and_then(|entry| entry.thread_name.clone()))
1282    }
1283
1284    /// Look up display names for multiple thread IDs at once.
1285    ///
1286    /// Returns a map from thread ID to its name (which may be `None`).
1287    pub fn find_thread_names_by_ids(
1288        &self,
1289        ids: &[String],
1290    ) -> Result<HashMap<String, Option<String>>> {
1291        let map = self.session_index_map()?;
1292        let mut out = HashMap::new();
1293        for id in ids {
1294            let name = map.get(id).and_then(|entry| entry.thread_name.clone());
1295            out.insert(id.clone(), name);
1296        }
1297        Ok(out)
1298    }
1299
1300    /// Find the rollout path for a thread by its display name (case-insensitive).
1301    ///
1302    /// If multiple threads share the same name, the most recently updated one is returned.
1303    /// Returns `None` if no matching thread is found.
1304    pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
1305        let map = self.session_index_map()?;
1306        let matched = map
1307            .values()
1308            .filter(|entry| {
1309                entry
1310                    .thread_name
1311                    .as_deref()
1312                    .is_some_and(|n| n.eq_ignore_ascii_case(name))
1313            })
1314            .max_by_key(|entry| entry.updated_at);
1315        Ok(matched.and_then(|entry| entry.rollout_path.clone()))
1316    }
1317
1318    fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
1319        if !self.session_index_path.exists() {
1320            return Ok(HashMap::new());
1321        }
1322        let file = OpenOptions::new()
1323            .read(true)
1324            .open(&self.session_index_path)
1325            .with_context(|| {
1326                format!(
1327                    "failed to read session index {}",
1328                    self.session_index_path.display()
1329                )
1330            })?;
1331        let reader = BufReader::new(file);
1332        let mut latest = HashMap::<String, SessionIndexEntry>::new();
1333        for line in reader.lines() {
1334            let line = line.context("failed to read session index line")?;
1335            if line.trim().is_empty() {
1336                continue;
1337            }
1338            let parsed: SessionIndexEntry =
1339                serde_json::from_str(&line).context("failed to parse session index entry")?;
1340            latest.insert(parsed.thread_id.clone(), parsed);
1341        }
1342        Ok(latest)
1343    }
1344}
1345
1346fn default_state_db_path() -> PathBuf {
1347    dirs::home_dir()
1348        .unwrap_or_else(|| PathBuf::from("."))
1349        .join(".deepseek")
1350        .join("state.db")
1351}
1352
1353fn bool_to_i64(value: bool) -> i64 {
1354    if value { 1 } else { 0 }
1355}
1356
1357fn i64_to_bool(value: i64) -> bool {
1358    value != 0
1359}
1360
1361fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
1362    match status {
1363        ThreadStatus::Running => "running",
1364        ThreadStatus::Idle => "idle",
1365        ThreadStatus::Completed => "completed",
1366        ThreadStatus::Failed => "failed",
1367        ThreadStatus::Paused => "paused",
1368        ThreadStatus::Archived => "archived",
1369    }
1370}
1371
1372fn thread_status_from_str(value: &str) -> ThreadStatus {
1373    match value {
1374        "running" => ThreadStatus::Running,
1375        "idle" => ThreadStatus::Idle,
1376        "completed" => ThreadStatus::Completed,
1377        "failed" => ThreadStatus::Failed,
1378        "paused" => ThreadStatus::Paused,
1379        "archived" => ThreadStatus::Archived,
1380        _ => ThreadStatus::Idle,
1381    }
1382}
1383
1384fn session_source_to_str(source: &SessionSource) -> &'static str {
1385    match source {
1386        SessionSource::Interactive => "interactive",
1387        SessionSource::Resume => "resume",
1388        SessionSource::Fork => "fork",
1389        SessionSource::Api => "api",
1390        SessionSource::Unknown => "unknown",
1391    }
1392}
1393
1394fn session_source_from_str(value: &str) -> SessionSource {
1395    match value {
1396        "interactive" => SessionSource::Interactive,
1397        "resume" => SessionSource::Resume,
1398        "fork" => SessionSource::Fork,
1399        "api" => SessionSource::Api,
1400        _ => SessionSource::Unknown,
1401    }
1402}
1403
1404fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
1405    path.map(|p| p.display().to_string())
1406}
1407
1408fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
1409    match status {
1410        JobStateStatus::Queued => "queued",
1411        JobStateStatus::Running => "running",
1412        JobStateStatus::Completed => "completed",
1413        JobStateStatus::Failed => "failed",
1414        JobStateStatus::Cancelled => "cancelled",
1415    }
1416}
1417
1418fn job_state_status_from_str(value: &str) -> JobStateStatus {
1419    match value {
1420        "queued" => JobStateStatus::Queued,
1421        "running" => JobStateStatus::Running,
1422        "completed" => JobStateStatus::Completed,
1423        "failed" => JobStateStatus::Failed,
1424        "cancelled" => JobStateStatus::Cancelled,
1425        _ => JobStateStatus::Queued,
1426    }
1427}
1428
1429fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
1430    let status_raw: String = row.get(7)?;
1431    let source_raw: String = row.get(11)?;
1432    let rollout_path: Option<String> = row.get(1)?;
1433    let path: Option<String> = row.get(8)?;
1434    Ok(ThreadMetadata {
1435        id: row.get(0)?,
1436        rollout_path: rollout_path.map(PathBuf::from),
1437        preview: row.get(2)?,
1438        ephemeral: i64_to_bool(row.get(3)?),
1439        model_provider: row.get(4)?,
1440        created_at: row.get(5)?,
1441        updated_at: row.get(6)?,
1442        status: thread_status_from_str(&status_raw),
1443        path: path.map(PathBuf::from),
1444        cwd: PathBuf::from(row.get::<_, String>(9)?),
1445        cli_version: row.get(10)?,
1446        source: session_source_from_str(&source_raw),
1447        name: row.get(12)?,
1448        sandbox_policy: row.get(13)?,
1449        approval_mode: row.get(14)?,
1450        archived: i64_to_bool(row.get(15)?),
1451        archived_at: row.get(16)?,
1452        git_sha: row.get(17)?,
1453        git_branch: row.get(18)?,
1454        git_origin_url: row.get(19)?,
1455        memory_mode: row.get(20)?,
1456        current_leaf_id: row.get(21)?,
1457    })
1458}