Skip to main content

construct/channels/
session_sqlite.rs

1//! SQLite-backed session persistence with FTS5 search.
2//!
3//! Stores sessions in `{workspace}/sessions/sessions.db` using WAL mode.
4//! Provides full-text search via FTS5 and automatic TTL-based cleanup.
5//! Designed as the default backend, replacing JSONL for new installations.
6
7use crate::channels::session_backend::{
8    SessionBackend, SessionMetadata, SessionQuery, SessionState,
9};
10use crate::providers::traits::ChatMessage;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Duration, Utc};
13use parking_lot::Mutex;
14use rusqlite::{Connection, params};
15use std::path::{Path, PathBuf};
16
17/// SQLite-backed session store with FTS5 and WAL mode.
18pub struct SqliteSessionBackend {
19    conn: Mutex<Connection>,
20    #[allow(dead_code)]
21    db_path: PathBuf,
22}
23
24impl SqliteSessionBackend {
25    /// Open or create the sessions database.
26    pub fn new(workspace_dir: &Path) -> Result<Self> {
27        let sessions_dir = workspace_dir.join("sessions");
28        std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
29        let db_path = sessions_dir.join("sessions.db");
30
31        let conn = Connection::open(&db_path)
32            .with_context(|| format!("Failed to open session DB: {}", db_path.display()))?;
33
34        conn.execute_batch(
35            "PRAGMA journal_mode = WAL;
36             PRAGMA synchronous = NORMAL;
37             PRAGMA temp_store = MEMORY;
38             PRAGMA mmap_size = 4194304;",
39        )?;
40
41        conn.execute_batch(
42            "CREATE TABLE IF NOT EXISTS sessions (
43                id          INTEGER PRIMARY KEY AUTOINCREMENT,
44                session_key TEXT NOT NULL,
45                role        TEXT NOT NULL,
46                content     TEXT NOT NULL,
47                created_at  TEXT NOT NULL
48             );
49             CREATE INDEX IF NOT EXISTS idx_sessions_key ON sessions(session_key);
50             CREATE INDEX IF NOT EXISTS idx_sessions_key_id ON sessions(session_key, id);
51
52             CREATE TABLE IF NOT EXISTS session_metadata (
53                session_key  TEXT PRIMARY KEY,
54                created_at   TEXT NOT NULL,
55                last_activity TEXT NOT NULL,
56                message_count INTEGER NOT NULL DEFAULT 0,
57                name         TEXT
58             );
59
60             CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
61                session_key, content, content=sessions, content_rowid=id
62             );
63
64             CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
65                INSERT INTO sessions_fts(rowid, session_key, content)
66                VALUES (new.id, new.session_key, new.content);
67             END;
68             CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
69                INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
70                VALUES ('delete', old.id, old.session_key, old.content);
71             END;",
72        )
73        .context("Failed to initialize session schema")?;
74
75        // Migration: add name column to existing databases
76        let has_name: bool = conn
77            .query_row(
78                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'name'",
79                [],
80                |row| row.get(0),
81            )
82            .unwrap_or(false);
83        if !has_name {
84            let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN name TEXT", []);
85        }
86
87        // Migration: add state tracking columns
88        let has_state: bool = conn
89            .query_row(
90                "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'state'",
91                [],
92                |row| row.get(0),
93            )
94            .unwrap_or(false);
95        if !has_state {
96            let _ = conn.execute(
97                "ALTER TABLE session_metadata ADD COLUMN state TEXT NOT NULL DEFAULT 'idle'",
98                [],
99            );
100            let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN turn_id TEXT", []);
101            let _ = conn.execute(
102                "ALTER TABLE session_metadata ADD COLUMN turn_started_at TEXT",
103                [],
104            );
105        }
106
107        Ok(Self {
108            conn: Mutex::new(conn),
109            db_path,
110        })
111    }
112
113    /// Migrate JSONL session files into SQLite. Renames migrated files to `.jsonl.migrated`.
114    pub fn migrate_from_jsonl(&self, workspace_dir: &Path) -> Result<usize> {
115        let sessions_dir = workspace_dir.join("sessions");
116        let entries = match std::fs::read_dir(&sessions_dir) {
117            Ok(e) => e,
118            Err(_) => return Ok(0),
119        };
120
121        let mut migrated = 0;
122        for entry in entries {
123            let entry = match entry {
124                Ok(e) => e,
125                Err(_) => continue,
126            };
127            let name = match entry.file_name().into_string() {
128                Ok(n) => n,
129                Err(_) => continue,
130            };
131            let Some(key) = name.strip_suffix(".jsonl") else {
132                continue;
133            };
134
135            let path = entry.path();
136            let file = match std::fs::File::open(&path) {
137                Ok(f) => f,
138                Err(_) => continue,
139            };
140
141            let reader = std::io::BufReader::new(file);
142            let mut count = 0;
143            for line in std::io::BufRead::lines(reader) {
144                let Ok(line) = line else { continue };
145                let trimmed = line.trim();
146                if trimmed.is_empty() {
147                    continue;
148                }
149                if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
150                    if self.append(key, &msg).is_ok() {
151                        count += 1;
152                    }
153                }
154            }
155
156            if count > 0 {
157                let migrated_path = path.with_extension("jsonl.migrated");
158                let _ = std::fs::rename(&path, &migrated_path);
159                migrated += 1;
160            }
161        }
162
163        Ok(migrated)
164    }
165}
166
167impl SessionBackend for SqliteSessionBackend {
168    fn load(&self, session_key: &str) -> Vec<ChatMessage> {
169        let conn = self.conn.lock();
170        let mut stmt = match conn
171            .prepare("SELECT role, content FROM sessions WHERE session_key = ?1 ORDER BY id ASC")
172        {
173            Ok(s) => s,
174            Err(_) => return Vec::new(),
175        };
176
177        let rows = match stmt.query_map(params![session_key], |row| {
178            Ok(ChatMessage {
179                role: row.get(0)?,
180                content: row.get(1)?,
181            })
182        }) {
183            Ok(r) => r,
184            Err(_) => return Vec::new(),
185        };
186
187        rows.filter_map(|r| r.ok()).collect()
188    }
189
190    fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
191        let conn = self.conn.lock();
192        let now = Utc::now().to_rfc3339();
193
194        conn.execute(
195            "INSERT INTO sessions (session_key, role, content, created_at)
196             VALUES (?1, ?2, ?3, ?4)",
197            params![session_key, message.role, message.content, now],
198        )
199        .map_err(std::io::Error::other)?;
200
201        // Upsert metadata
202        conn.execute(
203            "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count)
204             VALUES (?1, ?2, ?3, 1)
205             ON CONFLICT(session_key) DO UPDATE SET
206                last_activity = excluded.last_activity,
207                message_count = message_count + 1",
208            params![session_key, now, now],
209        )
210        .map_err(std::io::Error::other)?;
211
212        Ok(())
213    }
214
215    fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
216        let conn = self.conn.lock();
217
218        let last_id: Option<i64> = conn
219            .query_row(
220                "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
221                params![session_key],
222                |row| row.get(0),
223            )
224            .ok();
225
226        let Some(id) = last_id else {
227            return Ok(false);
228        };
229
230        conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])
231            .map_err(std::io::Error::other)?;
232
233        // Update metadata count
234        conn.execute(
235            "UPDATE session_metadata SET message_count = MAX(0, message_count - 1)
236             WHERE session_key = ?1",
237            params![session_key],
238        )
239        .map_err(std::io::Error::other)?;
240
241        Ok(true)
242    }
243
244    fn list_sessions(&self) -> Vec<String> {
245        let conn = self.conn.lock();
246        let mut stmt = match conn
247            .prepare("SELECT session_key FROM session_metadata ORDER BY last_activity DESC")
248        {
249            Ok(s) => s,
250            Err(_) => return Vec::new(),
251        };
252
253        let rows = match stmt.query_map([], |row| row.get(0)) {
254            Ok(r) => r,
255            Err(_) => return Vec::new(),
256        };
257
258        rows.filter_map(|r| r.ok()).collect()
259    }
260
261    fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
262        let conn = self.conn.lock();
263        let mut stmt = match conn.prepare(
264            "SELECT session_key, created_at, last_activity, message_count, name
265             FROM session_metadata ORDER BY last_activity DESC",
266        ) {
267            Ok(s) => s,
268            Err(_) => return Vec::new(),
269        };
270
271        let rows = match stmt.query_map([], |row| {
272            let key: String = row.get(0)?;
273            let created_str: String = row.get(1)?;
274            let activity_str: String = row.get(2)?;
275            let count: i64 = row.get(3)?;
276            let name: Option<String> = row.get(4)?;
277
278            let created = DateTime::parse_from_rfc3339(&created_str)
279                .map(|dt| dt.with_timezone(&Utc))
280                .unwrap_or_else(|_| Utc::now());
281            let activity = DateTime::parse_from_rfc3339(&activity_str)
282                .map(|dt| dt.with_timezone(&Utc))
283                .unwrap_or_else(|_| Utc::now());
284
285            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
286            Ok(SessionMetadata {
287                key,
288                name,
289                created_at: created,
290                last_activity: activity,
291                message_count: count as usize,
292            })
293        }) {
294            Ok(r) => r,
295            Err(_) => return Vec::new(),
296        };
297
298        rows.filter_map(|r| r.ok()).collect()
299    }
300
301    fn cleanup_stale(&self, ttl_hours: u32) -> std::io::Result<usize> {
302        let conn = self.conn.lock();
303        let cutoff = (Utc::now() - Duration::hours(i64::from(ttl_hours))).to_rfc3339();
304
305        // Find stale sessions
306        let stale_keys: Vec<String> = {
307            let mut stmt = conn
308                .prepare("SELECT session_key FROM session_metadata WHERE last_activity < ?1")
309                .map_err(std::io::Error::other)?;
310            let rows = stmt
311                .query_map(params![cutoff], |row| row.get(0))
312                .map_err(std::io::Error::other)?;
313            rows.filter_map(|r| r.ok()).collect()
314        };
315
316        let count = stale_keys.len();
317        for key in &stale_keys {
318            let _ = conn.execute("DELETE FROM sessions WHERE session_key = ?1", params![key]);
319            let _ = conn.execute(
320                "DELETE FROM session_metadata WHERE session_key = ?1",
321                params![key],
322            );
323        }
324
325        Ok(count)
326    }
327
328    fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
329        let conn = self.conn.lock();
330
331        // Check if session exists
332        let exists: bool = conn
333            .query_row(
334                "SELECT COUNT(*) > 0 FROM session_metadata WHERE session_key = ?1",
335                params![session_key],
336                |row| row.get(0),
337            )
338            .unwrap_or(false);
339
340        if !exists {
341            return Ok(false);
342        }
343
344        // Delete messages (FTS5 trigger handles sessions_fts cleanup)
345        conn.execute(
346            "DELETE FROM sessions WHERE session_key = ?1",
347            params![session_key],
348        )
349        .map_err(std::io::Error::other)?;
350
351        // Delete metadata
352        conn.execute(
353            "DELETE FROM session_metadata WHERE session_key = ?1",
354            params![session_key],
355        )
356        .map_err(std::io::Error::other)?;
357
358        Ok(true)
359    }
360
361    fn set_session_name(&self, session_key: &str, name: &str) -> std::io::Result<()> {
362        let conn = self.conn.lock();
363        let name_val = if name.is_empty() { None } else { Some(name) };
364        conn.execute(
365            "UPDATE session_metadata SET name = ?1 WHERE session_key = ?2",
366            params![name_val, session_key],
367        )
368        .map_err(std::io::Error::other)?;
369        Ok(())
370    }
371
372    fn get_session_name(&self, session_key: &str) -> std::io::Result<Option<String>> {
373        let conn = self.conn.lock();
374        conn.query_row(
375            "SELECT name FROM session_metadata WHERE session_key = ?1",
376            params![session_key],
377            |row| row.get(0),
378        )
379        .map_err(std::io::Error::other)
380    }
381
382    fn set_session_state(
383        &self,
384        session_key: &str,
385        state: &str,
386        turn_id: Option<&str>,
387    ) -> std::io::Result<()> {
388        let conn = self.conn.lock();
389        let now = Utc::now().to_rfc3339();
390        let started_at = if state == "running" {
391            Some(now.as_str())
392        } else {
393            None
394        };
395        conn.execute(
396            "UPDATE session_metadata SET state = ?1, turn_id = ?2, turn_started_at = ?3
397             WHERE session_key = ?4",
398            params![state, turn_id, started_at, session_key],
399        )
400        .map_err(std::io::Error::other)?;
401        Ok(())
402    }
403
404    fn get_session_state(&self, session_key: &str) -> std::io::Result<Option<SessionState>> {
405        let conn = self.conn.lock();
406        conn.query_row(
407            "SELECT state, turn_id, turn_started_at FROM session_metadata WHERE session_key = ?1",
408            params![session_key],
409            |row| {
410                let state: String = row.get(0)?;
411                let turn_id: Option<String> = row.get(1)?;
412                let started_str: Option<String> = row.get(2)?;
413                let turn_started_at = started_str.and_then(|s| {
414                    chrono::DateTime::parse_from_rfc3339(&s)
415                        .ok()
416                        .map(|dt| dt.with_timezone(&Utc))
417                });
418                Ok(SessionState {
419                    state,
420                    turn_id,
421                    turn_started_at,
422                })
423            },
424        )
425        .map(Some)
426        .or_else(|e| match e {
427            rusqlite::Error::QueryReturnedNoRows => Ok(None),
428            other => Err(std::io::Error::other(other)),
429        })
430    }
431
432    fn list_running_sessions(&self) -> Vec<SessionMetadata> {
433        let conn = self.conn.lock();
434        let mut stmt = match conn.prepare(
435            "SELECT session_key, created_at, last_activity, message_count, name
436             FROM session_metadata WHERE state = 'running' ORDER BY turn_started_at DESC",
437        ) {
438            Ok(s) => s,
439            Err(_) => return Vec::new(),
440        };
441
442        let rows = match stmt.query_map([], |row| {
443            let key: String = row.get(0)?;
444            let created_str: String = row.get(1)?;
445            let activity_str: String = row.get(2)?;
446            let count: i64 = row.get(3)?;
447            let name: Option<String> = row.get(4)?;
448            let created = DateTime::parse_from_rfc3339(&created_str)
449                .map(|dt| dt.with_timezone(&Utc))
450                .unwrap_or_else(|_| Utc::now());
451            let activity = DateTime::parse_from_rfc3339(&activity_str)
452                .map(|dt| dt.with_timezone(&Utc))
453                .unwrap_or_else(|_| Utc::now());
454            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
455            Ok(SessionMetadata {
456                key,
457                name,
458                created_at: created,
459                last_activity: activity,
460                message_count: count as usize,
461            })
462        }) {
463            Ok(r) => r,
464            Err(_) => return Vec::new(),
465        };
466
467        rows.filter_map(|r| r.ok()).collect()
468    }
469
470    fn list_stuck_sessions(&self, threshold_secs: u64) -> Vec<SessionMetadata> {
471        let conn = self.conn.lock();
472        #[allow(clippy::cast_possible_wrap)]
473        let cutoff = (Utc::now() - chrono::Duration::seconds(threshold_secs as i64)).to_rfc3339();
474        let mut stmt = match conn.prepare(
475            "SELECT session_key, created_at, last_activity, message_count, name
476             FROM session_metadata
477             WHERE state = 'running' AND turn_started_at < ?1
478             ORDER BY turn_started_at ASC",
479        ) {
480            Ok(s) => s,
481            Err(_) => return Vec::new(),
482        };
483
484        let rows = match stmt.query_map(params![cutoff], |row| {
485            let key: String = row.get(0)?;
486            let created_str: String = row.get(1)?;
487            let activity_str: String = row.get(2)?;
488            let count: i64 = row.get(3)?;
489            let name: Option<String> = row.get(4)?;
490            let created = DateTime::parse_from_rfc3339(&created_str)
491                .map(|dt| dt.with_timezone(&Utc))
492                .unwrap_or_else(|_| Utc::now());
493            let activity = DateTime::parse_from_rfc3339(&activity_str)
494                .map(|dt| dt.with_timezone(&Utc))
495                .unwrap_or_else(|_| Utc::now());
496            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
497            Ok(SessionMetadata {
498                key,
499                name,
500                created_at: created,
501                last_activity: activity,
502                message_count: count as usize,
503            })
504        }) {
505            Ok(r) => r,
506            Err(_) => return Vec::new(),
507        };
508
509        rows.filter_map(|r| r.ok()).collect()
510    }
511
512    fn search(&self, query: &SessionQuery) -> Vec<SessionMetadata> {
513        let Some(keyword) = &query.keyword else {
514            return self.list_sessions_with_metadata();
515        };
516
517        let conn = self.conn.lock();
518        #[allow(clippy::cast_possible_wrap)]
519        let limit = query.limit.unwrap_or(50) as i64;
520
521        // FTS5 search
522        let mut stmt = match conn.prepare(
523            "SELECT DISTINCT f.session_key
524             FROM sessions_fts f
525             WHERE sessions_fts MATCH ?1
526             LIMIT ?2",
527        ) {
528            Ok(s) => s,
529            Err(_) => return Vec::new(),
530        };
531
532        // Quote each word for FTS5
533        let fts_query: String = keyword
534            .split_whitespace()
535            .map(|w| format!("\"{w}\""))
536            .collect::<Vec<_>>()
537            .join(" OR ");
538
539        let keys: Vec<String> = match stmt.query_map(params![fts_query, limit], |row| row.get(0)) {
540            Ok(r) => r.filter_map(|r| r.ok()).collect(),
541            Err(_) => return Vec::new(),
542        };
543
544        // Look up metadata for matched sessions
545        keys.iter()
546            .filter_map(|key| {
547                conn.query_row(
548                    "SELECT created_at, last_activity, message_count, name FROM session_metadata WHERE session_key = ?1",
549                    params![key],
550                    |row| {
551                        let created_str: String = row.get(0)?;
552                        let activity_str: String = row.get(1)?;
553                        let count: i64 = row.get(2)?;
554                        let name: Option<String> = row.get(3)?;
555                        Ok(SessionMetadata {
556                            key: key.clone(),
557                            name,
558                            created_at: DateTime::parse_from_rfc3339(&created_str)
559                                .map(|dt| dt.with_timezone(&Utc))
560                                .unwrap_or_else(|_| Utc::now()),
561                            last_activity: DateTime::parse_from_rfc3339(&activity_str)
562                                .map(|dt| dt.with_timezone(&Utc))
563                                .unwrap_or_else(|_| Utc::now()),
564                            #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
565                            message_count: count as usize,
566                        })
567                    },
568                )
569                .ok()
570            })
571            .collect()
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use tempfile::TempDir;
579
580    #[test]
581    fn round_trip_sqlite() {
582        let tmp = TempDir::new().unwrap();
583        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
584
585        backend
586            .append("user1", &ChatMessage::user("hello"))
587            .unwrap();
588        backend
589            .append("user1", &ChatMessage::assistant("hi"))
590            .unwrap();
591
592        let msgs = backend.load("user1");
593        assert_eq!(msgs.len(), 2);
594        assert_eq!(msgs[0].role, "user");
595        assert_eq!(msgs[1].role, "assistant");
596    }
597
598    #[test]
599    fn remove_last_sqlite() {
600        let tmp = TempDir::new().unwrap();
601        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
602
603        backend.append("u", &ChatMessage::user("a")).unwrap();
604        backend.append("u", &ChatMessage::user("b")).unwrap();
605
606        assert!(backend.remove_last("u").unwrap());
607        let msgs = backend.load("u");
608        assert_eq!(msgs.len(), 1);
609        assert_eq!(msgs[0].content, "a");
610    }
611
612    #[test]
613    fn remove_last_empty_sqlite() {
614        let tmp = TempDir::new().unwrap();
615        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
616        assert!(!backend.remove_last("nonexistent").unwrap());
617    }
618
619    #[test]
620    fn list_sessions_sqlite() {
621        let tmp = TempDir::new().unwrap();
622        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
623
624        backend.append("a", &ChatMessage::user("hi")).unwrap();
625        backend.append("b", &ChatMessage::user("hey")).unwrap();
626
627        let sessions = backend.list_sessions();
628        assert_eq!(sessions.len(), 2);
629    }
630
631    #[test]
632    fn metadata_tracks_counts() {
633        let tmp = TempDir::new().unwrap();
634        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
635
636        backend.append("s1", &ChatMessage::user("a")).unwrap();
637        backend.append("s1", &ChatMessage::user("b")).unwrap();
638        backend.append("s1", &ChatMessage::user("c")).unwrap();
639
640        let meta = backend.list_sessions_with_metadata();
641        assert_eq!(meta.len(), 1);
642        assert_eq!(meta[0].message_count, 3);
643    }
644
645    #[test]
646    fn fts5_search_finds_content() {
647        let tmp = TempDir::new().unwrap();
648        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
649
650        backend
651            .append(
652                "code_chat",
653                &ChatMessage::user("How do I parse JSON in Rust?"),
654            )
655            .unwrap();
656        backend
657            .append("weather", &ChatMessage::user("What's the weather today?"))
658            .unwrap();
659
660        let results = backend.search(&SessionQuery {
661            keyword: Some("Rust".into()),
662            limit: Some(10),
663        });
664        assert_eq!(results.len(), 1);
665        assert_eq!(results[0].key, "code_chat");
666    }
667
668    #[test]
669    fn cleanup_stale_removes_old_sessions() {
670        let tmp = TempDir::new().unwrap();
671        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
672
673        // Insert a session with old timestamp
674        {
675            let conn = backend.conn.lock();
676            let old_time = (Utc::now() - Duration::hours(100)).to_rfc3339();
677            conn.execute(
678                "INSERT INTO sessions (session_key, role, content, created_at) VALUES (?1, ?2, ?3, ?4)",
679                params!["old_session", "user", "ancient", old_time],
680            ).unwrap();
681            conn.execute(
682                "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count) VALUES (?1, ?2, ?3, 1)",
683                params!["old_session", old_time, old_time],
684            ).unwrap();
685        }
686
687        backend
688            .append("new_session", &ChatMessage::user("fresh"))
689            .unwrap();
690
691        let cleaned = backend.cleanup_stale(48).unwrap(); // 48h TTL
692        assert_eq!(cleaned, 1);
693
694        let sessions = backend.list_sessions();
695        assert_eq!(sessions.len(), 1);
696        assert_eq!(sessions[0], "new_session");
697    }
698
699    #[test]
700    fn delete_session_removes_all_data() {
701        let tmp = TempDir::new().unwrap();
702        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
703
704        backend.append("s1", &ChatMessage::user("hello")).unwrap();
705        backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
706        backend.append("s2", &ChatMessage::user("other")).unwrap();
707
708        assert!(backend.delete_session("s1").unwrap());
709        assert!(backend.load("s1").is_empty());
710        assert_eq!(backend.list_sessions().len(), 1);
711        assert_eq!(backend.list_sessions()[0], "s2");
712    }
713
714    #[test]
715    fn delete_session_returns_false_for_missing() {
716        let tmp = TempDir::new().unwrap();
717        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
718        assert!(!backend.delete_session("nonexistent").unwrap());
719    }
720
721    #[test]
722    fn migrate_from_jsonl_imports_and_renames() {
723        let tmp = TempDir::new().unwrap();
724        let sessions_dir = tmp.path().join("sessions");
725        std::fs::create_dir_all(&sessions_dir).unwrap();
726
727        // Create a JSONL file
728        let jsonl_path = sessions_dir.join("test_user.jsonl");
729        std::fs::write(
730            &jsonl_path,
731            "{\"role\":\"user\",\"content\":\"hello\"}\n{\"role\":\"assistant\",\"content\":\"hi\"}\n",
732        )
733        .unwrap();
734
735        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
736        let migrated = backend.migrate_from_jsonl(tmp.path()).unwrap();
737        assert_eq!(migrated, 1);
738
739        // JSONL should be renamed
740        assert!(!jsonl_path.exists());
741        assert!(sessions_dir.join("test_user.jsonl.migrated").exists());
742
743        // Messages should be in SQLite
744        let msgs = backend.load("test_user");
745        assert_eq!(msgs.len(), 2);
746        assert_eq!(msgs[0].content, "hello");
747    }
748
749    #[test]
750    fn set_session_name_persists() {
751        let tmp = TempDir::new().unwrap();
752        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
753
754        backend.append("s1", &ChatMessage::user("hello")).unwrap();
755        backend.set_session_name("s1", "My Session").unwrap();
756
757        let meta = backend.list_sessions_with_metadata();
758        assert_eq!(meta.len(), 1);
759        assert_eq!(meta[0].name.as_deref(), Some("My Session"));
760    }
761
762    #[test]
763    fn set_session_name_updates_existing() {
764        let tmp = TempDir::new().unwrap();
765        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
766
767        backend.append("s1", &ChatMessage::user("hello")).unwrap();
768        backend.set_session_name("s1", "First").unwrap();
769        backend.set_session_name("s1", "Second").unwrap();
770
771        let meta = backend.list_sessions_with_metadata();
772        assert_eq!(meta[0].name.as_deref(), Some("Second"));
773    }
774
775    #[test]
776    fn sessions_without_name_return_none() {
777        let tmp = TempDir::new().unwrap();
778        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
779
780        backend.append("s1", &ChatMessage::user("hello")).unwrap();
781
782        let meta = backend.list_sessions_with_metadata();
783        assert_eq!(meta.len(), 1);
784        assert!(meta[0].name.is_none());
785    }
786
787    // ── session state tests ─────────────────────────────────────────
788
789    #[test]
790    fn session_state_idle_to_running() {
791        let tmp = TempDir::new().unwrap();
792        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
793        backend.append("s1", &ChatMessage::user("hello")).unwrap();
794
795        backend
796            .set_session_state("s1", "running", Some("turn-1"))
797            .unwrap();
798        let state = backend.get_session_state("s1").unwrap().unwrap();
799        assert_eq!(state.state, "running");
800        assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
801        assert!(state.turn_started_at.is_some());
802    }
803
804    #[test]
805    fn session_state_running_to_idle() {
806        let tmp = TempDir::new().unwrap();
807        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
808        backend.append("s1", &ChatMessage::user("hello")).unwrap();
809
810        backend
811            .set_session_state("s1", "running", Some("turn-1"))
812            .unwrap();
813        backend.set_session_state("s1", "idle", None).unwrap();
814
815        let state = backend.get_session_state("s1").unwrap().unwrap();
816        assert_eq!(state.state, "idle");
817        assert!(state.turn_id.is_none());
818        assert!(state.turn_started_at.is_none());
819    }
820
821    #[test]
822    fn session_state_running_to_error() {
823        let tmp = TempDir::new().unwrap();
824        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
825        backend.append("s1", &ChatMessage::user("hello")).unwrap();
826
827        backend
828            .set_session_state("s1", "running", Some("turn-1"))
829            .unwrap();
830        backend
831            .set_session_state("s1", "error", Some("turn-1"))
832            .unwrap();
833
834        let state = backend.get_session_state("s1").unwrap().unwrap();
835        assert_eq!(state.state, "error");
836        assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
837    }
838
839    #[test]
840    fn list_running_sessions_returns_running_only() {
841        let tmp = TempDir::new().unwrap();
842        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
843
844        backend.append("s1", &ChatMessage::user("a")).unwrap();
845        backend.append("s2", &ChatMessage::user("b")).unwrap();
846        backend.append("s3", &ChatMessage::user("c")).unwrap();
847
848        backend
849            .set_session_state("s1", "running", Some("t1"))
850            .unwrap();
851        backend
852            .set_session_state("s2", "running", Some("t2"))
853            .unwrap();
854        // s3 stays idle (default)
855
856        let running = backend.list_running_sessions();
857        assert_eq!(running.len(), 2);
858        let keys: Vec<&str> = running.iter().map(|m| m.key.as_str()).collect();
859        assert!(keys.contains(&"s1"));
860        assert!(keys.contains(&"s2"));
861    }
862
863    #[test]
864    fn list_stuck_sessions_detects_old_running() {
865        let tmp = TempDir::new().unwrap();
866        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
867        backend.append("s1", &ChatMessage::user("a")).unwrap();
868
869        // Manually set an old turn_started_at
870        {
871            let conn = backend.conn.lock();
872            let old_time = (Utc::now() - Duration::seconds(600)).to_rfc3339();
873            conn.execute(
874                "UPDATE session_metadata SET state = 'running', turn_id = 'old', turn_started_at = ?1 WHERE session_key = 's1'",
875                params![old_time],
876            ).unwrap();
877        }
878
879        let stuck = backend.list_stuck_sessions(300); // 5 min threshold
880        assert_eq!(stuck.len(), 1);
881        assert_eq!(stuck[0].key, "s1");
882
883        // Not stuck if threshold is longer
884        let not_stuck = backend.list_stuck_sessions(900); // 15 min threshold
885        assert_eq!(not_stuck.len(), 0);
886    }
887
888    #[test]
889    fn get_session_state_nonexistent() {
890        let tmp = TempDir::new().unwrap();
891        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
892        let state = backend.get_session_state("nonexistent").unwrap();
893        assert!(state.is_none());
894    }
895
896    #[test]
897    fn session_state_migration_preserves_data() {
898        let tmp = TempDir::new().unwrap();
899        // Create backend (runs migration)
900        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
901        backend.append("s1", &ChatMessage::user("hello")).unwrap();
902
903        // Re-open (migration should be idempotent)
904        drop(backend);
905        let backend2 = SqliteSessionBackend::new(tmp.path()).unwrap();
906        let msgs = backend2.load("s1");
907        assert_eq!(msgs.len(), 1);
908        assert_eq!(msgs[0].content, "hello");
909
910        // State should default to idle
911        let state = backend2.get_session_state("s1").unwrap().unwrap();
912        assert_eq!(state.state, "idle");
913    }
914
915    #[test]
916    fn empty_name_clears_to_none() {
917        let tmp = TempDir::new().unwrap();
918        let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
919
920        backend.append("s1", &ChatMessage::user("hello")).unwrap();
921        backend.set_session_name("s1", "Named").unwrap();
922        backend.set_session_name("s1", "").unwrap();
923
924        let meta = backend.list_sessions_with_metadata();
925        assert!(meta[0].name.is_none());
926    }
927}