Skip to main content

deepseek_state/
lib.rs

1use std::collections::HashMap;
2use std::fs::{self, OpenOptions};
3use std::io::{BufRead, BufReader, Write};
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context, Result};
7use chrono::Utc;
8use rusqlite::{Connection, OptionalExtension, params};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum ThreadStatus {
15    Running,
16    Idle,
17    Completed,
18    Failed,
19    Paused,
20    Archived,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum SessionSource {
26    Interactive,
27    Resume,
28    Fork,
29    Api,
30    Unknown,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ThreadMetadata {
35    pub id: String,
36    pub rollout_path: Option<PathBuf>,
37    pub preview: String,
38    pub ephemeral: bool,
39    pub model_provider: String,
40    pub created_at: i64,
41    pub updated_at: i64,
42    pub status: ThreadStatus,
43    pub path: Option<PathBuf>,
44    pub cwd: PathBuf,
45    pub cli_version: String,
46    pub source: SessionSource,
47    pub name: Option<String>,
48    pub sandbox_policy: Option<String>,
49    pub approval_mode: Option<String>,
50    pub archived: bool,
51    pub archived_at: Option<i64>,
52    pub git_sha: Option<String>,
53    pub git_branch: Option<String>,
54    pub git_origin_url: Option<String>,
55    pub memory_mode: Option<String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct DynamicToolRecord {
60    pub position: i64,
61    pub name: String,
62    pub description: Option<String>,
63    pub input_schema: Value,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct MessageRecord {
68    pub id: i64,
69    pub thread_id: String,
70    pub role: String,
71    pub content: String,
72    pub item: Option<Value>,
73    pub created_at: i64,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct CheckpointRecord {
78    pub thread_id: String,
79    pub checkpoint_id: String,
80    pub state: Value,
81    pub created_at: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(rename_all = "snake_case")]
86pub enum JobStateStatus {
87    Queued,
88    Running,
89    Completed,
90    Failed,
91    Cancelled,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct JobStateRecord {
96    pub id: String,
97    pub name: String,
98    pub status: JobStateStatus,
99    pub progress: Option<u8>,
100    pub detail: Option<String>,
101    pub created_at: i64,
102    pub updated_at: i64,
103}
104
105#[derive(Debug, Clone)]
106pub struct ThreadListFilters {
107    pub include_archived: bool,
108    pub limit: Option<usize>,
109}
110
111impl Default for ThreadListFilters {
112    fn default() -> Self {
113        Self {
114            include_archived: false,
115            limit: Some(50),
116        }
117    }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
121struct SessionIndexEntry {
122    thread_id: String,
123    thread_name: Option<String>,
124    updated_at: i64,
125    rollout_path: Option<PathBuf>,
126}
127
128#[derive(Debug, Clone)]
129pub struct StateStore {
130    db_path: PathBuf,
131    session_index_path: PathBuf,
132}
133
134impl StateStore {
135    pub fn open(path: Option<PathBuf>) -> Result<Self> {
136        let db_path = path.unwrap_or_else(default_state_db_path);
137        let session_index_path = db_path
138            .parent()
139            .unwrap_or_else(|| Path::new("."))
140            .join("session_index.jsonl");
141        if let Some(parent) = db_path.parent() {
142            fs::create_dir_all(parent).with_context(|| {
143                format!("failed to create state directory {}", parent.display())
144            })?;
145        }
146        let store = Self {
147            db_path,
148            session_index_path,
149        };
150        store.init_schema()?;
151        Ok(store)
152    }
153
154    pub fn db_path(&self) -> &Path {
155        &self.db_path
156    }
157
158    fn conn(&self) -> Result<Connection> {
159        Connection::open(&self.db_path)
160            .with_context(|| format!("failed to open state db {}", self.db_path.display()))
161    }
162
163    fn init_schema(&self) -> Result<()> {
164        let conn = self.conn()?;
165        conn.execute_batch(
166            r#"
167            CREATE TABLE IF NOT EXISTS threads (
168                id TEXT PRIMARY KEY,
169                rollout_path TEXT,
170                preview TEXT NOT NULL,
171                ephemeral INTEGER NOT NULL,
172                model_provider TEXT NOT NULL,
173                created_at INTEGER NOT NULL,
174                updated_at INTEGER NOT NULL,
175                status TEXT NOT NULL,
176                path TEXT,
177                cwd TEXT NOT NULL,
178                cli_version TEXT NOT NULL,
179                source TEXT NOT NULL,
180                title TEXT,
181                sandbox_policy TEXT,
182                approval_mode TEXT,
183                archived INTEGER NOT NULL DEFAULT 0,
184                archived_at INTEGER,
185                git_sha TEXT,
186                git_branch TEXT,
187                git_origin_url TEXT,
188                memory_mode TEXT
189            );
190            CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
191            CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
192            CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
193
194            CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
195                thread_id TEXT NOT NULL,
196                position INTEGER NOT NULL,
197                name TEXT NOT NULL,
198                description TEXT,
199                input_schema TEXT NOT NULL,
200                PRIMARY KEY (thread_id, position),
201                FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
202            );
203
204            CREATE TABLE IF NOT EXISTS messages (
205                id INTEGER PRIMARY KEY AUTOINCREMENT,
206                thread_id TEXT NOT NULL,
207                role TEXT NOT NULL,
208                content TEXT NOT NULL,
209                item_json TEXT,
210                created_at INTEGER NOT NULL,
211                FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
212            );
213            CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
214
215            CREATE TABLE IF NOT EXISTS checkpoints (
216                thread_id TEXT NOT NULL,
217                checkpoint_id TEXT NOT NULL,
218                state_json TEXT NOT NULL,
219                created_at INTEGER NOT NULL,
220                PRIMARY KEY(thread_id, checkpoint_id),
221                FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
222            );
223            CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
224
225            CREATE TABLE IF NOT EXISTS jobs (
226                id TEXT PRIMARY KEY,
227                name TEXT NOT NULL,
228                status TEXT NOT NULL,
229                progress INTEGER,
230                detail TEXT,
231                created_at INTEGER NOT NULL,
232                updated_at INTEGER NOT NULL
233            );
234            CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
235            "#,
236        )
237        .context("failed to initialize thread schema")?;
238        Ok(())
239    }
240
241    pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
242        let conn = self.conn()?;
243        conn.execute(
244            r#"
245            INSERT INTO threads (
246                id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
247                cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
248                git_sha, git_branch, git_origin_url, memory_mode
249            ) VALUES (
250                ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
251                ?11, ?12, ?13, ?14, ?15, ?16, ?17,
252                ?18, ?19, ?20, ?21
253            )
254            ON CONFLICT(id) DO UPDATE SET
255                rollout_path=excluded.rollout_path,
256                preview=excluded.preview,
257                ephemeral=excluded.ephemeral,
258                model_provider=excluded.model_provider,
259                created_at=excluded.created_at,
260                updated_at=excluded.updated_at,
261                status=excluded.status,
262                path=excluded.path,
263                cwd=excluded.cwd,
264                cli_version=excluded.cli_version,
265                source=excluded.source,
266                title=excluded.title,
267                sandbox_policy=excluded.sandbox_policy,
268                approval_mode=excluded.approval_mode,
269                archived=excluded.archived,
270                archived_at=excluded.archived_at,
271                git_sha=excluded.git_sha,
272                git_branch=excluded.git_branch,
273                git_origin_url=excluded.git_origin_url,
274                memory_mode=excluded.memory_mode
275            "#,
276            params![
277                thread.id,
278                path_to_opt_string(thread.rollout_path.as_deref()),
279                thread.preview,
280                bool_to_i64(thread.ephemeral),
281                thread.model_provider,
282                thread.created_at,
283                thread.updated_at,
284                thread_status_to_str(&thread.status),
285                path_to_opt_string(thread.path.as_deref()),
286                thread.cwd.display().to_string(),
287                thread.cli_version,
288                session_source_to_str(&thread.source),
289                thread.name,
290                thread.sandbox_policy,
291                thread.approval_mode,
292                bool_to_i64(thread.archived),
293                thread.archived_at,
294                thread.git_sha,
295                thread.git_branch,
296                thread.git_origin_url,
297                thread.memory_mode,
298            ],
299        )
300        .context("failed to upsert thread metadata")?;
301
302        self.append_thread_name(
303            &thread.id,
304            thread.name.clone(),
305            thread.updated_at,
306            thread.rollout_path.clone(),
307        )?;
308        Ok(())
309    }
310
311    pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
312        let conn = self.conn()?;
313        conn.query_row(
314            r#"
315            SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
316                   cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
317                   git_sha, git_branch, git_origin_url, memory_mode
318            FROM threads
319            WHERE id = ?1
320            "#,
321            params![id],
322            row_to_thread,
323        )
324        .optional()
325        .context("failed to read thread")
326    }
327
328    pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
329        let conn = self.conn()?;
330        let sql = if filters.include_archived {
331            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode FROM threads ORDER BY updated_at DESC LIMIT ?1"
332        } else {
333            "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
334        };
335
336        let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
337        let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
338        let mut rows = stmt
339            .query(params![limit])
340            .context("failed to query threads")?;
341        let mut out = Vec::new();
342        while let Some(row) = rows.next().context("failed to iterate thread rows")? {
343            out.push(row_to_thread(row)?);
344        }
345        Ok(out)
346    }
347
348    pub fn mark_archived(&self, id: &str) -> Result<()> {
349        let conn = self.conn()?;
350        conn.execute(
351            "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
352            params![
353                id,
354                Utc::now().timestamp(),
355                thread_status_to_str(&ThreadStatus::Archived)
356            ],
357        )
358        .context("failed to archive thread")?;
359        Ok(())
360    }
361
362    pub fn mark_unarchived(&self, id: &str) -> Result<()> {
363        let conn = self.conn()?;
364        conn.execute(
365            "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
366            params![id],
367        )
368        .context("failed to unarchive thread")?;
369        Ok(())
370    }
371
372    pub fn delete_thread(&self, id: &str) -> Result<()> {
373        let conn = self.conn()?;
374        conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
375            .context("failed to delete thread")?;
376        Ok(())
377    }
378
379    pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
380        let conn = self.conn()?;
381        conn.execute(
382            "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
383            params![id, mode],
384        )
385        .context("failed to update thread memory mode")?;
386        Ok(())
387    }
388
389    pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
390        let conn = self.conn()?;
391        conn.query_row(
392            "SELECT memory_mode FROM threads WHERE id = ?1",
393            params![id],
394            |row| row.get::<_, Option<String>>(0),
395        )
396        .optional()
397        .context("failed to read thread memory mode")
398        .map(Option::flatten)
399    }
400
401    pub fn persist_dynamic_tools(
402        &self,
403        thread_id: &str,
404        tools: &[DynamicToolRecord],
405    ) -> Result<()> {
406        let mut conn = self.conn()?;
407        let tx = conn
408            .transaction()
409            .context("failed to begin dynamic tools transaction")?;
410        tx.execute(
411            "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
412            params![thread_id],
413        )
414        .context("failed to clear dynamic tools")?;
415        for tool in tools {
416            tx.execute(
417                "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
418                params![
419                    thread_id,
420                    tool.position,
421                    tool.name,
422                    tool.description,
423                    tool.input_schema.to_string()
424                ],
425            )
426            .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
427        }
428        tx.commit().context("failed to commit dynamic tools")?;
429        Ok(())
430    }
431
432    pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
433        let conn = self.conn()?;
434        let mut stmt = conn
435            .prepare(
436                "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
437            )
438            .context("failed to prepare get dynamic tools query")?;
439        let mut rows = stmt
440            .query(params![thread_id])
441            .context("failed to query dynamic tools")?;
442        let mut out = Vec::new();
443        while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
444            let input_schema_raw: String =
445                row.get(3).context("failed to read tool input schema")?;
446            let input_schema: Value =
447                serde_json::from_str(&input_schema_raw).with_context(|| {
448                    format!("failed to parse input schema for dynamic tool in thread {thread_id}")
449                })?;
450            out.push(DynamicToolRecord {
451                position: row.get(0).context("failed to read tool position")?,
452                name: row.get(1).context("failed to read tool name")?,
453                description: row.get(2).context("failed to read tool description")?,
454                input_schema,
455            });
456        }
457        Ok(out)
458    }
459
460    pub fn append_message(
461        &self,
462        thread_id: &str,
463        role: &str,
464        content: &str,
465        item: Option<Value>,
466    ) -> Result<i64> {
467        let conn = self.conn()?;
468        let created_at = Utc::now().timestamp();
469        let item_json = item
470            .as_ref()
471            .map(serde_json::to_string)
472            .transpose()
473            .context("failed to serialize message item payload")?;
474        conn.execute(
475            "INSERT INTO messages(thread_id, role, content, item_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
476            params![thread_id, role, content, item_json, created_at],
477        )
478        .with_context(|| format!("failed to append message for thread {thread_id}"))?;
479        Ok(conn.last_insert_rowid())
480    }
481
482    pub fn list_messages(
483        &self,
484        thread_id: &str,
485        limit: Option<usize>,
486    ) -> Result<Vec<MessageRecord>> {
487        let conn = self.conn()?;
488        let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
489        let mut stmt = conn
490            .prepare(
491                "SELECT id, thread_id, role, content, item_json, created_at FROM messages WHERE thread_id = ?1 ORDER BY created_at ASC LIMIT ?2",
492            )
493            .context("failed to prepare message listing query")?;
494        let mut rows = stmt
495            .query(params![thread_id, limit])
496            .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
497        let mut out = Vec::new();
498        while let Some(row) = rows.next().context("failed to iterate message rows")? {
499            let item_json: Option<String> = row.get(4).context("failed to read item json")?;
500            let item = item_json
501                .as_deref()
502                .map(serde_json::from_str)
503                .transpose()
504                .with_context(|| {
505                    format!("failed to parse message item json in thread {thread_id}")
506                })?;
507            out.push(MessageRecord {
508                id: row.get(0).context("failed to read message id")?,
509                thread_id: row.get(1).context("failed to read message thread id")?,
510                role: row.get(2).context("failed to read message role")?,
511                content: row.get(3).context("failed to read message content")?,
512                item,
513                created_at: row.get(5).context("failed to read message timestamp")?,
514            });
515        }
516        Ok(out)
517    }
518
519    pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
520        let conn = self.conn()?;
521        conn.execute(
522            "DELETE FROM messages WHERE thread_id = ?1",
523            params![thread_id],
524        )
525        .with_context(|| format!("failed to clear messages for thread {thread_id}"))
526    }
527
528    pub fn save_checkpoint(
529        &self,
530        thread_id: &str,
531        checkpoint_id: &str,
532        state: &Value,
533    ) -> Result<()> {
534        let conn = self.conn()?;
535        let state_json =
536            serde_json::to_string(state).context("failed to encode checkpoint state")?;
537        conn.execute(
538            r#"
539            INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
540            VALUES (?1, ?2, ?3, ?4)
541            ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
542                state_json = excluded.state_json,
543                created_at = excluded.created_at
544            "#,
545            params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
546        )
547        .with_context(|| {
548            format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
549        })?;
550        Ok(())
551    }
552
553    pub fn load_checkpoint(
554        &self,
555        thread_id: &str,
556        checkpoint_id: Option<&str>,
557    ) -> Result<Option<CheckpointRecord>> {
558        let conn = self.conn()?;
559        if let Some(checkpoint_id) = checkpoint_id {
560            let row = conn
561                .query_row(
562                    "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
563                    params![thread_id, checkpoint_id],
564                    |row| {
565                        let state_json: String = row.get(2)?;
566                        let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
567                        Ok(CheckpointRecord {
568                            thread_id: row.get(0)?,
569                            checkpoint_id: row.get(1)?,
570                            state,
571                            created_at: row.get(3)?,
572                        })
573                    },
574                )
575                .optional()
576                .with_context(|| {
577                    format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
578                })?;
579            return Ok(row);
580        }
581
582        conn.query_row(
583            "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
584            params![thread_id],
585            |row| {
586                let state_json: String = row.get(2)?;
587                let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
588                Ok(CheckpointRecord {
589                    thread_id: row.get(0)?,
590                    checkpoint_id: row.get(1)?,
591                    state,
592                    created_at: row.get(3)?,
593                })
594            },
595        )
596        .optional()
597        .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
598    }
599
600    pub fn list_checkpoints(
601        &self,
602        thread_id: &str,
603        limit: Option<usize>,
604    ) -> Result<Vec<CheckpointRecord>> {
605        let conn = self.conn()?;
606        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
607        let mut stmt = conn
608            .prepare(
609                "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
610            )
611            .context("failed to prepare checkpoint list query")?;
612        let mut rows = stmt
613            .query(params![thread_id, limit])
614            .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
615
616        let mut out = Vec::new();
617        while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
618            let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
619            let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
620            out.push(CheckpointRecord {
621                thread_id: row.get(0).context("failed to read checkpoint thread id")?,
622                checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
623                state,
624                created_at: row.get(3).context("failed to read checkpoint timestamp")?,
625            });
626        }
627        Ok(out)
628    }
629
630    pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
631        let conn = self.conn()?;
632        conn.execute(
633            "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
634            params![thread_id, checkpoint_id],
635        )
636        .with_context(|| {
637            format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
638        })?;
639        Ok(())
640    }
641
642    pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
643        let conn = self.conn()?;
644        conn.execute(
645            r#"
646            INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
647            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
648            ON CONFLICT(id) DO UPDATE SET
649                name = excluded.name,
650                status = excluded.status,
651                progress = excluded.progress,
652                detail = excluded.detail,
653                created_at = excluded.created_at,
654                updated_at = excluded.updated_at
655            "#,
656            params![
657                job.id,
658                job.name,
659                job_state_status_to_str(&job.status),
660                job.progress.map(i64::from),
661                job.detail,
662                job.created_at,
663                job.updated_at
664            ],
665        )
666        .with_context(|| format!("failed to upsert job {}", job.id))?;
667        Ok(())
668    }
669
670    pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
671        let conn = self.conn()?;
672        conn.query_row(
673            "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
674            params![id],
675            |row| {
676                let status_raw: String = row.get(2)?;
677                let progress: Option<i64> = row.get(3)?;
678                Ok(JobStateRecord {
679                    id: row.get(0)?,
680                    name: row.get(1)?,
681                    status: job_state_status_from_str(&status_raw),
682                    progress: progress.and_then(|v| u8::try_from(v).ok()),
683                    detail: row.get(4)?,
684                    created_at: row.get(5)?,
685                    updated_at: row.get(6)?,
686                })
687            },
688        )
689        .optional()
690        .with_context(|| format!("failed to read job {id}"))
691    }
692
693    pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
694        let conn = self.conn()?;
695        let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
696        let mut stmt = conn
697            .prepare(
698                "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
699            )
700            .context("failed to prepare job list query")?;
701        let mut rows = stmt
702            .query(params![limit])
703            .context("failed to query persisted jobs")?;
704        let mut out = Vec::new();
705        while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
706            let status_raw: String = row.get(2).context("failed to read job status")?;
707            let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
708            out.push(JobStateRecord {
709                id: row.get(0).context("failed to read job id")?,
710                name: row.get(1).context("failed to read job name")?,
711                status: job_state_status_from_str(&status_raw),
712                progress: progress.and_then(|v| u8::try_from(v).ok()),
713                detail: row.get(4).context("failed to read job detail")?,
714                created_at: row.get(5).context("failed to read job created_at")?,
715                updated_at: row.get(6).context("failed to read job updated_at")?,
716            });
717        }
718        Ok(out)
719    }
720
721    pub fn delete_job(&self, id: &str) -> Result<()> {
722        let conn = self.conn()?;
723        conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
724            .with_context(|| format!("failed to delete job {id}"))?;
725        Ok(())
726    }
727
728    pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
729        let conn = self.conn()?;
730        conn.query_row(
731            "SELECT rollout_path FROM threads WHERE id = ?1",
732            params![id],
733            |row| row.get::<_, Option<String>>(0),
734        )
735        .optional()
736        .context("failed to lookup rollout path")
737        .map(|opt| opt.flatten().map(PathBuf::from))
738    }
739
740    pub fn append_thread_name(
741        &self,
742        thread_id: &str,
743        thread_name: Option<String>,
744        updated_at: i64,
745        rollout_path: Option<PathBuf>,
746    ) -> Result<()> {
747        if let Some(parent) = self.session_index_path.parent() {
748            fs::create_dir_all(parent).with_context(|| {
749                format!(
750                    "failed to create session index directory {}",
751                    parent.display()
752                )
753            })?;
754        }
755        let entry = SessionIndexEntry {
756            thread_id: thread_id.to_string(),
757            thread_name,
758            updated_at,
759            rollout_path,
760        };
761        let encoded =
762            serde_json::to_string(&entry).context("failed to serialize session index entry")?;
763        let mut file = OpenOptions::new()
764            .create(true)
765            .append(true)
766            .open(&self.session_index_path)
767            .with_context(|| {
768                format!(
769                    "failed to open session index {}",
770                    self.session_index_path.display()
771                )
772            })?;
773        writeln!(file, "{encoded}").context("failed to append session index entry")?;
774        Ok(())
775    }
776
777    pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
778        let map = self.session_index_map()?;
779        Ok(map
780            .get(thread_id)
781            .and_then(|entry| entry.thread_name.clone()))
782    }
783
784    pub fn find_thread_names_by_ids(
785        &self,
786        ids: &[String],
787    ) -> Result<HashMap<String, Option<String>>> {
788        let map = self.session_index_map()?;
789        let mut out = HashMap::new();
790        for id in ids {
791            let name = map.get(id).and_then(|entry| entry.thread_name.clone());
792            out.insert(id.clone(), name);
793        }
794        Ok(out)
795    }
796
797    pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
798        let map = self.session_index_map()?;
799        let matched = map
800            .values()
801            .filter(|entry| {
802                entry
803                    .thread_name
804                    .as_deref()
805                    .is_some_and(|n| n.eq_ignore_ascii_case(name))
806            })
807            .max_by_key(|entry| entry.updated_at);
808        Ok(matched.and_then(|entry| entry.rollout_path.clone()))
809    }
810
811    fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
812        if !self.session_index_path.exists() {
813            return Ok(HashMap::new());
814        }
815        let file = OpenOptions::new()
816            .read(true)
817            .open(&self.session_index_path)
818            .with_context(|| {
819                format!(
820                    "failed to read session index {}",
821                    self.session_index_path.display()
822                )
823            })?;
824        let reader = BufReader::new(file);
825        let mut latest = HashMap::<String, SessionIndexEntry>::new();
826        for line in reader.lines() {
827            let line = line.context("failed to read session index line")?;
828            if line.trim().is_empty() {
829                continue;
830            }
831            let parsed: SessionIndexEntry =
832                serde_json::from_str(&line).context("failed to parse session index entry")?;
833            latest.insert(parsed.thread_id.clone(), parsed);
834        }
835        Ok(latest)
836    }
837}
838
839fn default_state_db_path() -> PathBuf {
840    dirs::home_dir()
841        .unwrap_or_else(|| PathBuf::from("."))
842        .join(".deepseek")
843        .join("state.db")
844}
845
846fn bool_to_i64(value: bool) -> i64 {
847    if value { 1 } else { 0 }
848}
849
850fn i64_to_bool(value: i64) -> bool {
851    value != 0
852}
853
854fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
855    match status {
856        ThreadStatus::Running => "running",
857        ThreadStatus::Idle => "idle",
858        ThreadStatus::Completed => "completed",
859        ThreadStatus::Failed => "failed",
860        ThreadStatus::Paused => "paused",
861        ThreadStatus::Archived => "archived",
862    }
863}
864
865fn thread_status_from_str(value: &str) -> ThreadStatus {
866    match value {
867        "running" => ThreadStatus::Running,
868        "idle" => ThreadStatus::Idle,
869        "completed" => ThreadStatus::Completed,
870        "failed" => ThreadStatus::Failed,
871        "paused" => ThreadStatus::Paused,
872        "archived" => ThreadStatus::Archived,
873        _ => ThreadStatus::Idle,
874    }
875}
876
877fn session_source_to_str(source: &SessionSource) -> &'static str {
878    match source {
879        SessionSource::Interactive => "interactive",
880        SessionSource::Resume => "resume",
881        SessionSource::Fork => "fork",
882        SessionSource::Api => "api",
883        SessionSource::Unknown => "unknown",
884    }
885}
886
887fn session_source_from_str(value: &str) -> SessionSource {
888    match value {
889        "interactive" => SessionSource::Interactive,
890        "resume" => SessionSource::Resume,
891        "fork" => SessionSource::Fork,
892        "api" => SessionSource::Api,
893        _ => SessionSource::Unknown,
894    }
895}
896
897fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
898    path.map(|p| p.display().to_string())
899}
900
901fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
902    match status {
903        JobStateStatus::Queued => "queued",
904        JobStateStatus::Running => "running",
905        JobStateStatus::Completed => "completed",
906        JobStateStatus::Failed => "failed",
907        JobStateStatus::Cancelled => "cancelled",
908    }
909}
910
911fn job_state_status_from_str(value: &str) -> JobStateStatus {
912    match value {
913        "queued" => JobStateStatus::Queued,
914        "running" => JobStateStatus::Running,
915        "completed" => JobStateStatus::Completed,
916        "failed" => JobStateStatus::Failed,
917        "cancelled" => JobStateStatus::Cancelled,
918        _ => JobStateStatus::Queued,
919    }
920}
921
922fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
923    let status_raw: String = row.get(7)?;
924    let source_raw: String = row.get(11)?;
925    let rollout_path: Option<String> = row.get(1)?;
926    let path: Option<String> = row.get(8)?;
927    Ok(ThreadMetadata {
928        id: row.get(0)?,
929        rollout_path: rollout_path.map(PathBuf::from),
930        preview: row.get(2)?,
931        ephemeral: i64_to_bool(row.get(3)?),
932        model_provider: row.get(4)?,
933        created_at: row.get(5)?,
934        updated_at: row.get(6)?,
935        status: thread_status_from_str(&status_raw),
936        path: path.map(PathBuf::from),
937        cwd: PathBuf::from(row.get::<_, String>(9)?),
938        cli_version: row.get(10)?,
939        source: session_source_from_str(&source_raw),
940        name: row.get(12)?,
941        sandbox_policy: row.get(13)?,
942        approval_mode: row.get(14)?,
943        archived: i64_to_bool(row.get(15)?),
944        archived_at: row.get(16)?,
945        git_sha: row.get(17)?,
946        git_branch: row.get(18)?,
947        git_origin_url: row.get(19)?,
948        memory_mode: row.get(20)?,
949    })
950}