1use std::collections::HashMap;
2use std::fs::{self, OpenOptions};
3use std::io::{BufRead, BufReader, Write};
4use std::path::{Path, PathBuf};
5
6use anyhow::{Context, Result};
7use chrono::Utc;
8use rusqlite::{Connection, OptionalExtension, params};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13#[serde(rename_all = "snake_case")]
14pub enum ThreadStatus {
15 Running,
16 Idle,
17 Completed,
18 Failed,
19 Paused,
20 Archived,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum SessionSource {
26 Interactive,
27 Resume,
28 Fork,
29 Api,
30 Unknown,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct ThreadMetadata {
35 pub id: String,
36 pub rollout_path: Option<PathBuf>,
37 pub preview: String,
38 pub ephemeral: bool,
39 pub model_provider: String,
40 pub created_at: i64,
41 pub updated_at: i64,
42 pub status: ThreadStatus,
43 pub path: Option<PathBuf>,
44 pub cwd: PathBuf,
45 pub cli_version: String,
46 pub source: SessionSource,
47 pub name: Option<String>,
48 pub sandbox_policy: Option<String>,
49 pub approval_mode: Option<String>,
50 pub archived: bool,
51 pub archived_at: Option<i64>,
52 pub git_sha: Option<String>,
53 pub git_branch: Option<String>,
54 pub git_origin_url: Option<String>,
55 pub memory_mode: Option<String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct DynamicToolRecord {
60 pub position: i64,
61 pub name: String,
62 pub description: Option<String>,
63 pub input_schema: Value,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct MessageRecord {
68 pub id: i64,
69 pub thread_id: String,
70 pub role: String,
71 pub content: String,
72 pub item: Option<Value>,
73 pub created_at: i64,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct CheckpointRecord {
78 pub thread_id: String,
79 pub checkpoint_id: String,
80 pub state: Value,
81 pub created_at: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
85#[serde(rename_all = "snake_case")]
86pub enum JobStateStatus {
87 Queued,
88 Running,
89 Completed,
90 Failed,
91 Cancelled,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
95pub struct JobStateRecord {
96 pub id: String,
97 pub name: String,
98 pub status: JobStateStatus,
99 pub progress: Option<u8>,
100 pub detail: Option<String>,
101 pub created_at: i64,
102 pub updated_at: i64,
103}
104
105#[derive(Debug, Clone)]
106pub struct ThreadListFilters {
107 pub include_archived: bool,
108 pub limit: Option<usize>,
109}
110
111impl Default for ThreadListFilters {
112 fn default() -> Self {
113 Self {
114 include_archived: false,
115 limit: Some(50),
116 }
117 }
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
121struct SessionIndexEntry {
122 thread_id: String,
123 thread_name: Option<String>,
124 updated_at: i64,
125 rollout_path: Option<PathBuf>,
126}
127
128#[derive(Debug, Clone)]
129pub struct StateStore {
130 db_path: PathBuf,
131 session_index_path: PathBuf,
132}
133
134impl StateStore {
135 pub fn open(path: Option<PathBuf>) -> Result<Self> {
136 let db_path = path.unwrap_or_else(default_state_db_path);
137 let session_index_path = db_path
138 .parent()
139 .unwrap_or_else(|| Path::new("."))
140 .join("session_index.jsonl");
141 if let Some(parent) = db_path.parent() {
142 fs::create_dir_all(parent).with_context(|| {
143 format!("failed to create state directory {}", parent.display())
144 })?;
145 }
146 let store = Self {
147 db_path,
148 session_index_path,
149 };
150 store.init_schema()?;
151 Ok(store)
152 }
153
154 pub fn db_path(&self) -> &Path {
155 &self.db_path
156 }
157
158 fn conn(&self) -> Result<Connection> {
159 Connection::open(&self.db_path)
160 .with_context(|| format!("failed to open state db {}", self.db_path.display()))
161 }
162
163 fn init_schema(&self) -> Result<()> {
164 let conn = self.conn()?;
165 conn.execute_batch(
166 r#"
167 CREATE TABLE IF NOT EXISTS threads (
168 id TEXT PRIMARY KEY,
169 rollout_path TEXT,
170 preview TEXT NOT NULL,
171 ephemeral INTEGER NOT NULL,
172 model_provider TEXT NOT NULL,
173 created_at INTEGER NOT NULL,
174 updated_at INTEGER NOT NULL,
175 status TEXT NOT NULL,
176 path TEXT,
177 cwd TEXT NOT NULL,
178 cli_version TEXT NOT NULL,
179 source TEXT NOT NULL,
180 title TEXT,
181 sandbox_policy TEXT,
182 approval_mode TEXT,
183 archived INTEGER NOT NULL DEFAULT 0,
184 archived_at INTEGER,
185 git_sha TEXT,
186 git_branch TEXT,
187 git_origin_url TEXT,
188 memory_mode TEXT
189 );
190 CREATE INDEX IF NOT EXISTS idx_threads_updated_at ON threads(updated_at DESC);
191 CREATE INDEX IF NOT EXISTS idx_threads_archived_at ON threads(archived_at DESC);
192 CREATE INDEX IF NOT EXISTS idx_threads_archived_updated ON threads(archived, updated_at DESC);
193
194 CREATE TABLE IF NOT EXISTS thread_dynamic_tools (
195 thread_id TEXT NOT NULL,
196 position INTEGER NOT NULL,
197 name TEXT NOT NULL,
198 description TEXT,
199 input_schema TEXT NOT NULL,
200 PRIMARY KEY (thread_id, position),
201 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
202 );
203
204 CREATE TABLE IF NOT EXISTS messages (
205 id INTEGER PRIMARY KEY AUTOINCREMENT,
206 thread_id TEXT NOT NULL,
207 role TEXT NOT NULL,
208 content TEXT NOT NULL,
209 item_json TEXT,
210 created_at INTEGER NOT NULL,
211 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
212 );
213 CREATE INDEX IF NOT EXISTS idx_messages_thread_created_at ON messages(thread_id, created_at ASC);
214
215 CREATE TABLE IF NOT EXISTS checkpoints (
216 thread_id TEXT NOT NULL,
217 checkpoint_id TEXT NOT NULL,
218 state_json TEXT NOT NULL,
219 created_at INTEGER NOT NULL,
220 PRIMARY KEY(thread_id, checkpoint_id),
221 FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
222 );
223 CREATE INDEX IF NOT EXISTS idx_checkpoints_thread_created_at ON checkpoints(thread_id, created_at DESC);
224
225 CREATE TABLE IF NOT EXISTS jobs (
226 id TEXT PRIMARY KEY,
227 name TEXT NOT NULL,
228 status TEXT NOT NULL,
229 progress INTEGER,
230 detail TEXT,
231 created_at INTEGER NOT NULL,
232 updated_at INTEGER NOT NULL
233 );
234 CREATE INDEX IF NOT EXISTS idx_jobs_updated_at ON jobs(updated_at DESC);
235 "#,
236 )
237 .context("failed to initialize thread schema")?;
238 Ok(())
239 }
240
241 pub fn upsert_thread(&self, thread: &ThreadMetadata) -> Result<()> {
242 let conn = self.conn()?;
243 conn.execute(
244 r#"
245 INSERT INTO threads (
246 id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
247 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
248 git_sha, git_branch, git_origin_url, memory_mode
249 ) VALUES (
250 ?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
251 ?11, ?12, ?13, ?14, ?15, ?16, ?17,
252 ?18, ?19, ?20, ?21
253 )
254 ON CONFLICT(id) DO UPDATE SET
255 rollout_path=excluded.rollout_path,
256 preview=excluded.preview,
257 ephemeral=excluded.ephemeral,
258 model_provider=excluded.model_provider,
259 created_at=excluded.created_at,
260 updated_at=excluded.updated_at,
261 status=excluded.status,
262 path=excluded.path,
263 cwd=excluded.cwd,
264 cli_version=excluded.cli_version,
265 source=excluded.source,
266 title=excluded.title,
267 sandbox_policy=excluded.sandbox_policy,
268 approval_mode=excluded.approval_mode,
269 archived=excluded.archived,
270 archived_at=excluded.archived_at,
271 git_sha=excluded.git_sha,
272 git_branch=excluded.git_branch,
273 git_origin_url=excluded.git_origin_url,
274 memory_mode=excluded.memory_mode
275 "#,
276 params![
277 thread.id,
278 path_to_opt_string(thread.rollout_path.as_deref()),
279 thread.preview,
280 bool_to_i64(thread.ephemeral),
281 thread.model_provider,
282 thread.created_at,
283 thread.updated_at,
284 thread_status_to_str(&thread.status),
285 path_to_opt_string(thread.path.as_deref()),
286 thread.cwd.display().to_string(),
287 thread.cli_version,
288 session_source_to_str(&thread.source),
289 thread.name,
290 thread.sandbox_policy,
291 thread.approval_mode,
292 bool_to_i64(thread.archived),
293 thread.archived_at,
294 thread.git_sha,
295 thread.git_branch,
296 thread.git_origin_url,
297 thread.memory_mode,
298 ],
299 )
300 .context("failed to upsert thread metadata")?;
301
302 self.append_thread_name(
303 &thread.id,
304 thread.name.clone(),
305 thread.updated_at,
306 thread.rollout_path.clone(),
307 )?;
308 Ok(())
309 }
310
311 pub fn get_thread(&self, id: &str) -> Result<Option<ThreadMetadata>> {
312 let conn = self.conn()?;
313 conn.query_row(
314 r#"
315 SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd,
316 cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at,
317 git_sha, git_branch, git_origin_url, memory_mode
318 FROM threads
319 WHERE id = ?1
320 "#,
321 params![id],
322 row_to_thread,
323 )
324 .optional()
325 .context("failed to read thread")
326 }
327
328 pub fn list_threads(&self, filters: ThreadListFilters) -> Result<Vec<ThreadMetadata>> {
329 let conn = self.conn()?;
330 let sql = if filters.include_archived {
331 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode FROM threads ORDER BY updated_at DESC LIMIT ?1"
332 } else {
333 "SELECT id, rollout_path, preview, ephemeral, model_provider, created_at, updated_at, status, path, cwd, cli_version, source, title, sandbox_policy, approval_mode, archived, archived_at, git_sha, git_branch, git_origin_url, memory_mode FROM threads WHERE archived = 0 ORDER BY updated_at DESC LIMIT ?1"
334 };
335
336 let mut stmt = conn.prepare(sql).context("failed to prepare list query")?;
337 let limit = i64::try_from(filters.limit.unwrap_or(50)).unwrap_or(50);
338 let mut rows = stmt
339 .query(params![limit])
340 .context("failed to query threads")?;
341 let mut out = Vec::new();
342 while let Some(row) = rows.next().context("failed to iterate thread rows")? {
343 out.push(row_to_thread(row)?);
344 }
345 Ok(out)
346 }
347
348 pub fn mark_archived(&self, id: &str) -> Result<()> {
349 let conn = self.conn()?;
350 conn.execute(
351 "UPDATE threads SET archived = 1, archived_at = ?2, status = ?3 WHERE id = ?1",
352 params![
353 id,
354 Utc::now().timestamp(),
355 thread_status_to_str(&ThreadStatus::Archived)
356 ],
357 )
358 .context("failed to archive thread")?;
359 Ok(())
360 }
361
362 pub fn mark_unarchived(&self, id: &str) -> Result<()> {
363 let conn = self.conn()?;
364 conn.execute(
365 "UPDATE threads SET archived = 0, archived_at = NULL WHERE id = ?1",
366 params![id],
367 )
368 .context("failed to unarchive thread")?;
369 Ok(())
370 }
371
372 pub fn delete_thread(&self, id: &str) -> Result<()> {
373 let conn = self.conn()?;
374 conn.execute("DELETE FROM threads WHERE id = ?1", params![id])
375 .context("failed to delete thread")?;
376 Ok(())
377 }
378
379 pub fn set_thread_memory_mode(&self, id: &str, mode: Option<&str>) -> Result<()> {
380 let conn = self.conn()?;
381 conn.execute(
382 "UPDATE threads SET memory_mode = ?2 WHERE id = ?1",
383 params![id, mode],
384 )
385 .context("failed to update thread memory mode")?;
386 Ok(())
387 }
388
389 pub fn get_thread_memory_mode(&self, id: &str) -> Result<Option<String>> {
390 let conn = self.conn()?;
391 conn.query_row(
392 "SELECT memory_mode FROM threads WHERE id = ?1",
393 params![id],
394 |row| row.get::<_, Option<String>>(0),
395 )
396 .optional()
397 .context("failed to read thread memory mode")
398 .map(Option::flatten)
399 }
400
401 pub fn persist_dynamic_tools(
402 &self,
403 thread_id: &str,
404 tools: &[DynamicToolRecord],
405 ) -> Result<()> {
406 let mut conn = self.conn()?;
407 let tx = conn
408 .transaction()
409 .context("failed to begin dynamic tools transaction")?;
410 tx.execute(
411 "DELETE FROM thread_dynamic_tools WHERE thread_id = ?1",
412 params![thread_id],
413 )
414 .context("failed to clear dynamic tools")?;
415 for tool in tools {
416 tx.execute(
417 "INSERT INTO thread_dynamic_tools(thread_id, position, name, description, input_schema) VALUES (?1, ?2, ?3, ?4, ?5)",
418 params![
419 thread_id,
420 tool.position,
421 tool.name,
422 tool.description,
423 tool.input_schema.to_string()
424 ],
425 )
426 .with_context(|| format!("failed to persist dynamic tool {}", tool.name))?;
427 }
428 tx.commit().context("failed to commit dynamic tools")?;
429 Ok(())
430 }
431
432 pub fn get_dynamic_tools(&self, thread_id: &str) -> Result<Vec<DynamicToolRecord>> {
433 let conn = self.conn()?;
434 let mut stmt = conn
435 .prepare(
436 "SELECT position, name, description, input_schema FROM thread_dynamic_tools WHERE thread_id = ?1 ORDER BY position ASC",
437 )
438 .context("failed to prepare get dynamic tools query")?;
439 let mut rows = stmt
440 .query(params![thread_id])
441 .context("failed to query dynamic tools")?;
442 let mut out = Vec::new();
443 while let Some(row) = rows.next().context("failed to iterate dynamic tools")? {
444 let input_schema_raw: String =
445 row.get(3).context("failed to read tool input schema")?;
446 let input_schema: Value =
447 serde_json::from_str(&input_schema_raw).with_context(|| {
448 format!("failed to parse input schema for dynamic tool in thread {thread_id}")
449 })?;
450 out.push(DynamicToolRecord {
451 position: row.get(0).context("failed to read tool position")?,
452 name: row.get(1).context("failed to read tool name")?,
453 description: row.get(2).context("failed to read tool description")?,
454 input_schema,
455 });
456 }
457 Ok(out)
458 }
459
460 pub fn append_message(
461 &self,
462 thread_id: &str,
463 role: &str,
464 content: &str,
465 item: Option<Value>,
466 ) -> Result<i64> {
467 let conn = self.conn()?;
468 let created_at = Utc::now().timestamp();
469 let item_json = item
470 .as_ref()
471 .map(serde_json::to_string)
472 .transpose()
473 .context("failed to serialize message item payload")?;
474 conn.execute(
475 "INSERT INTO messages(thread_id, role, content, item_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)",
476 params![thread_id, role, content, item_json, created_at],
477 )
478 .with_context(|| format!("failed to append message for thread {thread_id}"))?;
479 Ok(conn.last_insert_rowid())
480 }
481
482 pub fn list_messages(
483 &self,
484 thread_id: &str,
485 limit: Option<usize>,
486 ) -> Result<Vec<MessageRecord>> {
487 let conn = self.conn()?;
488 let limit = i64::try_from(limit.unwrap_or(500)).unwrap_or(500);
489 let mut stmt = conn
490 .prepare(
491 "SELECT id, thread_id, role, content, item_json, created_at FROM messages WHERE thread_id = ?1 ORDER BY created_at ASC LIMIT ?2",
492 )
493 .context("failed to prepare message listing query")?;
494 let mut rows = stmt
495 .query(params![thread_id, limit])
496 .with_context(|| format!("failed to list messages for thread {thread_id}"))?;
497 let mut out = Vec::new();
498 while let Some(row) = rows.next().context("failed to iterate message rows")? {
499 let item_json: Option<String> = row.get(4).context("failed to read item json")?;
500 let item = item_json
501 .as_deref()
502 .map(serde_json::from_str)
503 .transpose()
504 .with_context(|| {
505 format!("failed to parse message item json in thread {thread_id}")
506 })?;
507 out.push(MessageRecord {
508 id: row.get(0).context("failed to read message id")?,
509 thread_id: row.get(1).context("failed to read message thread id")?,
510 role: row.get(2).context("failed to read message role")?,
511 content: row.get(3).context("failed to read message content")?,
512 item,
513 created_at: row.get(5).context("failed to read message timestamp")?,
514 });
515 }
516 Ok(out)
517 }
518
519 pub fn clear_messages(&self, thread_id: &str) -> Result<usize> {
520 let conn = self.conn()?;
521 conn.execute(
522 "DELETE FROM messages WHERE thread_id = ?1",
523 params![thread_id],
524 )
525 .with_context(|| format!("failed to clear messages for thread {thread_id}"))
526 }
527
528 pub fn save_checkpoint(
529 &self,
530 thread_id: &str,
531 checkpoint_id: &str,
532 state: &Value,
533 ) -> Result<()> {
534 let conn = self.conn()?;
535 let state_json =
536 serde_json::to_string(state).context("failed to encode checkpoint state")?;
537 conn.execute(
538 r#"
539 INSERT INTO checkpoints(thread_id, checkpoint_id, state_json, created_at)
540 VALUES (?1, ?2, ?3, ?4)
541 ON CONFLICT(thread_id, checkpoint_id) DO UPDATE SET
542 state_json = excluded.state_json,
543 created_at = excluded.created_at
544 "#,
545 params![thread_id, checkpoint_id, state_json, Utc::now().timestamp()],
546 )
547 .with_context(|| {
548 format!("failed to save checkpoint {checkpoint_id} for thread {thread_id}")
549 })?;
550 Ok(())
551 }
552
553 pub fn load_checkpoint(
554 &self,
555 thread_id: &str,
556 checkpoint_id: Option<&str>,
557 ) -> Result<Option<CheckpointRecord>> {
558 let conn = self.conn()?;
559 if let Some(checkpoint_id) = checkpoint_id {
560 let row = conn
561 .query_row(
562 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
563 params![thread_id, checkpoint_id],
564 |row| {
565 let state_json: String = row.get(2)?;
566 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
567 Ok(CheckpointRecord {
568 thread_id: row.get(0)?,
569 checkpoint_id: row.get(1)?,
570 state,
571 created_at: row.get(3)?,
572 })
573 },
574 )
575 .optional()
576 .with_context(|| {
577 format!("failed to load checkpoint {checkpoint_id} for thread {thread_id}")
578 })?;
579 return Ok(row);
580 }
581
582 conn.query_row(
583 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT 1",
584 params![thread_id],
585 |row| {
586 let state_json: String = row.get(2)?;
587 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
588 Ok(CheckpointRecord {
589 thread_id: row.get(0)?,
590 checkpoint_id: row.get(1)?,
591 state,
592 created_at: row.get(3)?,
593 })
594 },
595 )
596 .optional()
597 .with_context(|| format!("failed to load latest checkpoint for thread {thread_id}"))
598 }
599
600 pub fn list_checkpoints(
601 &self,
602 thread_id: &str,
603 limit: Option<usize>,
604 ) -> Result<Vec<CheckpointRecord>> {
605 let conn = self.conn()?;
606 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
607 let mut stmt = conn
608 .prepare(
609 "SELECT thread_id, checkpoint_id, state_json, created_at FROM checkpoints WHERE thread_id = ?1 ORDER BY created_at DESC LIMIT ?2",
610 )
611 .context("failed to prepare checkpoint list query")?;
612 let mut rows = stmt
613 .query(params![thread_id, limit])
614 .with_context(|| format!("failed to list checkpoints for thread {thread_id}"))?;
615
616 let mut out = Vec::new();
617 while let Some(row) = rows.next().context("failed to iterate checkpoint rows")? {
618 let state_json: String = row.get(2).context("failed to read checkpoint state json")?;
619 let state = serde_json::from_str(&state_json).unwrap_or(Value::Null);
620 out.push(CheckpointRecord {
621 thread_id: row.get(0).context("failed to read checkpoint thread id")?,
622 checkpoint_id: row.get(1).context("failed to read checkpoint id")?,
623 state,
624 created_at: row.get(3).context("failed to read checkpoint timestamp")?,
625 });
626 }
627 Ok(out)
628 }
629
630 pub fn delete_checkpoint(&self, thread_id: &str, checkpoint_id: &str) -> Result<()> {
631 let conn = self.conn()?;
632 conn.execute(
633 "DELETE FROM checkpoints WHERE thread_id = ?1 AND checkpoint_id = ?2",
634 params![thread_id, checkpoint_id],
635 )
636 .with_context(|| {
637 format!("failed to delete checkpoint {checkpoint_id} for thread {thread_id}")
638 })?;
639 Ok(())
640 }
641
642 pub fn upsert_job(&self, job: &JobStateRecord) -> Result<()> {
643 let conn = self.conn()?;
644 conn.execute(
645 r#"
646 INSERT INTO jobs(id, name, status, progress, detail, created_at, updated_at)
647 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
648 ON CONFLICT(id) DO UPDATE SET
649 name = excluded.name,
650 status = excluded.status,
651 progress = excluded.progress,
652 detail = excluded.detail,
653 created_at = excluded.created_at,
654 updated_at = excluded.updated_at
655 "#,
656 params![
657 job.id,
658 job.name,
659 job_state_status_to_str(&job.status),
660 job.progress.map(i64::from),
661 job.detail,
662 job.created_at,
663 job.updated_at
664 ],
665 )
666 .with_context(|| format!("failed to upsert job {}", job.id))?;
667 Ok(())
668 }
669
670 pub fn get_job(&self, id: &str) -> Result<Option<JobStateRecord>> {
671 let conn = self.conn()?;
672 conn.query_row(
673 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs WHERE id = ?1",
674 params![id],
675 |row| {
676 let status_raw: String = row.get(2)?;
677 let progress: Option<i64> = row.get(3)?;
678 Ok(JobStateRecord {
679 id: row.get(0)?,
680 name: row.get(1)?,
681 status: job_state_status_from_str(&status_raw),
682 progress: progress.and_then(|v| u8::try_from(v).ok()),
683 detail: row.get(4)?,
684 created_at: row.get(5)?,
685 updated_at: row.get(6)?,
686 })
687 },
688 )
689 .optional()
690 .with_context(|| format!("failed to read job {id}"))
691 }
692
693 pub fn list_jobs(&self, limit: Option<usize>) -> Result<Vec<JobStateRecord>> {
694 let conn = self.conn()?;
695 let limit = i64::try_from(limit.unwrap_or(100)).unwrap_or(100);
696 let mut stmt = conn
697 .prepare(
698 "SELECT id, name, status, progress, detail, created_at, updated_at FROM jobs ORDER BY updated_at DESC LIMIT ?1",
699 )
700 .context("failed to prepare job list query")?;
701 let mut rows = stmt
702 .query(params![limit])
703 .context("failed to query persisted jobs")?;
704 let mut out = Vec::new();
705 while let Some(row) = rows.next().context("failed to iterate persisted jobs")? {
706 let status_raw: String = row.get(2).context("failed to read job status")?;
707 let progress: Option<i64> = row.get(3).context("failed to read job progress")?;
708 out.push(JobStateRecord {
709 id: row.get(0).context("failed to read job id")?,
710 name: row.get(1).context("failed to read job name")?,
711 status: job_state_status_from_str(&status_raw),
712 progress: progress.and_then(|v| u8::try_from(v).ok()),
713 detail: row.get(4).context("failed to read job detail")?,
714 created_at: row.get(5).context("failed to read job created_at")?,
715 updated_at: row.get(6).context("failed to read job updated_at")?,
716 });
717 }
718 Ok(out)
719 }
720
721 pub fn delete_job(&self, id: &str) -> Result<()> {
722 let conn = self.conn()?;
723 conn.execute("DELETE FROM jobs WHERE id = ?1", params![id])
724 .with_context(|| format!("failed to delete job {id}"))?;
725 Ok(())
726 }
727
728 pub fn find_rollout_path_by_id(&self, id: &str) -> Result<Option<PathBuf>> {
729 let conn = self.conn()?;
730 conn.query_row(
731 "SELECT rollout_path FROM threads WHERE id = ?1",
732 params![id],
733 |row| row.get::<_, Option<String>>(0),
734 )
735 .optional()
736 .context("failed to lookup rollout path")
737 .map(|opt| opt.flatten().map(PathBuf::from))
738 }
739
740 pub fn append_thread_name(
741 &self,
742 thread_id: &str,
743 thread_name: Option<String>,
744 updated_at: i64,
745 rollout_path: Option<PathBuf>,
746 ) -> Result<()> {
747 if let Some(parent) = self.session_index_path.parent() {
748 fs::create_dir_all(parent).with_context(|| {
749 format!(
750 "failed to create session index directory {}",
751 parent.display()
752 )
753 })?;
754 }
755 let entry = SessionIndexEntry {
756 thread_id: thread_id.to_string(),
757 thread_name,
758 updated_at,
759 rollout_path,
760 };
761 let encoded =
762 serde_json::to_string(&entry).context("failed to serialize session index entry")?;
763 let mut file = OpenOptions::new()
764 .create(true)
765 .append(true)
766 .open(&self.session_index_path)
767 .with_context(|| {
768 format!(
769 "failed to open session index {}",
770 self.session_index_path.display()
771 )
772 })?;
773 writeln!(file, "{encoded}").context("failed to append session index entry")?;
774 Ok(())
775 }
776
777 pub fn find_thread_name_by_id(&self, thread_id: &str) -> Result<Option<String>> {
778 let map = self.session_index_map()?;
779 Ok(map
780 .get(thread_id)
781 .and_then(|entry| entry.thread_name.clone()))
782 }
783
784 pub fn find_thread_names_by_ids(
785 &self,
786 ids: &[String],
787 ) -> Result<HashMap<String, Option<String>>> {
788 let map = self.session_index_map()?;
789 let mut out = HashMap::new();
790 for id in ids {
791 let name = map.get(id).and_then(|entry| entry.thread_name.clone());
792 out.insert(id.clone(), name);
793 }
794 Ok(out)
795 }
796
797 pub fn find_thread_path_by_name_str(&self, name: &str) -> Result<Option<PathBuf>> {
798 let map = self.session_index_map()?;
799 let matched = map
800 .values()
801 .filter(|entry| {
802 entry
803 .thread_name
804 .as_deref()
805 .is_some_and(|n| n.eq_ignore_ascii_case(name))
806 })
807 .max_by_key(|entry| entry.updated_at);
808 Ok(matched.and_then(|entry| entry.rollout_path.clone()))
809 }
810
811 fn session_index_map(&self) -> Result<HashMap<String, SessionIndexEntry>> {
812 if !self.session_index_path.exists() {
813 return Ok(HashMap::new());
814 }
815 let file = OpenOptions::new()
816 .read(true)
817 .open(&self.session_index_path)
818 .with_context(|| {
819 format!(
820 "failed to read session index {}",
821 self.session_index_path.display()
822 )
823 })?;
824 let reader = BufReader::new(file);
825 let mut latest = HashMap::<String, SessionIndexEntry>::new();
826 for line in reader.lines() {
827 let line = line.context("failed to read session index line")?;
828 if line.trim().is_empty() {
829 continue;
830 }
831 let parsed: SessionIndexEntry =
832 serde_json::from_str(&line).context("failed to parse session index entry")?;
833 latest.insert(parsed.thread_id.clone(), parsed);
834 }
835 Ok(latest)
836 }
837}
838
839fn default_state_db_path() -> PathBuf {
840 dirs::home_dir()
841 .unwrap_or_else(|| PathBuf::from("."))
842 .join(".deepseek")
843 .join("state.db")
844}
845
846fn bool_to_i64(value: bool) -> i64 {
847 if value { 1 } else { 0 }
848}
849
850fn i64_to_bool(value: i64) -> bool {
851 value != 0
852}
853
854fn thread_status_to_str(status: &ThreadStatus) -> &'static str {
855 match status {
856 ThreadStatus::Running => "running",
857 ThreadStatus::Idle => "idle",
858 ThreadStatus::Completed => "completed",
859 ThreadStatus::Failed => "failed",
860 ThreadStatus::Paused => "paused",
861 ThreadStatus::Archived => "archived",
862 }
863}
864
865fn thread_status_from_str(value: &str) -> ThreadStatus {
866 match value {
867 "running" => ThreadStatus::Running,
868 "idle" => ThreadStatus::Idle,
869 "completed" => ThreadStatus::Completed,
870 "failed" => ThreadStatus::Failed,
871 "paused" => ThreadStatus::Paused,
872 "archived" => ThreadStatus::Archived,
873 _ => ThreadStatus::Idle,
874 }
875}
876
877fn session_source_to_str(source: &SessionSource) -> &'static str {
878 match source {
879 SessionSource::Interactive => "interactive",
880 SessionSource::Resume => "resume",
881 SessionSource::Fork => "fork",
882 SessionSource::Api => "api",
883 SessionSource::Unknown => "unknown",
884 }
885}
886
887fn session_source_from_str(value: &str) -> SessionSource {
888 match value {
889 "interactive" => SessionSource::Interactive,
890 "resume" => SessionSource::Resume,
891 "fork" => SessionSource::Fork,
892 "api" => SessionSource::Api,
893 _ => SessionSource::Unknown,
894 }
895}
896
897fn path_to_opt_string(path: Option<&Path>) -> Option<String> {
898 path.map(|p| p.display().to_string())
899}
900
901fn job_state_status_to_str(status: &JobStateStatus) -> &'static str {
902 match status {
903 JobStateStatus::Queued => "queued",
904 JobStateStatus::Running => "running",
905 JobStateStatus::Completed => "completed",
906 JobStateStatus::Failed => "failed",
907 JobStateStatus::Cancelled => "cancelled",
908 }
909}
910
911fn job_state_status_from_str(value: &str) -> JobStateStatus {
912 match value {
913 "queued" => JobStateStatus::Queued,
914 "running" => JobStateStatus::Running,
915 "completed" => JobStateStatus::Completed,
916 "failed" => JobStateStatus::Failed,
917 "cancelled" => JobStateStatus::Cancelled,
918 _ => JobStateStatus::Queued,
919 }
920}
921
922fn row_to_thread(row: &rusqlite::Row<'_>) -> rusqlite::Result<ThreadMetadata> {
923 let status_raw: String = row.get(7)?;
924 let source_raw: String = row.get(11)?;
925 let rollout_path: Option<String> = row.get(1)?;
926 let path: Option<String> = row.get(8)?;
927 Ok(ThreadMetadata {
928 id: row.get(0)?,
929 rollout_path: rollout_path.map(PathBuf::from),
930 preview: row.get(2)?,
931 ephemeral: i64_to_bool(row.get(3)?),
932 model_provider: row.get(4)?,
933 created_at: row.get(5)?,
934 updated_at: row.get(6)?,
935 status: thread_status_from_str(&status_raw),
936 path: path.map(PathBuf::from),
937 cwd: PathBuf::from(row.get::<_, String>(9)?),
938 cli_version: row.get(10)?,
939 source: session_source_from_str(&source_raw),
940 name: row.get(12)?,
941 sandbox_policy: row.get(13)?,
942 approval_mode: row.get(14)?,
943 archived: i64_to_bool(row.get(15)?),
944 archived_at: row.get(16)?,
945 git_sha: row.get(17)?,
946 git_branch: row.get(18)?,
947 git_origin_url: row.get(19)?,
948 memory_mode: row.get(20)?,
949 })
950}