1use crate::channels::session_backend::{
8 SessionBackend, SessionMetadata, SessionQuery, SessionState,
9};
10use crate::providers::traits::ChatMessage;
11use anyhow::{Context, Result};
12use chrono::{DateTime, Duration, Utc};
13use parking_lot::Mutex;
14use rusqlite::{Connection, params};
15use std::path::{Path, PathBuf};
16
17pub struct SqliteSessionBackend {
19 conn: Mutex<Connection>,
20 #[allow(dead_code)]
21 db_path: PathBuf,
22}
23
24impl SqliteSessionBackend {
25 pub fn new(workspace_dir: &Path) -> Result<Self> {
27 let sessions_dir = workspace_dir.join("sessions");
28 std::fs::create_dir_all(&sessions_dir).context("Failed to create sessions directory")?;
29 let db_path = sessions_dir.join("sessions.db");
30
31 let conn = Connection::open(&db_path)
32 .with_context(|| format!("Failed to open session DB: {}", db_path.display()))?;
33
34 conn.execute_batch(
35 "PRAGMA journal_mode = WAL;
36 PRAGMA synchronous = NORMAL;
37 PRAGMA temp_store = MEMORY;
38 PRAGMA mmap_size = 4194304;",
39 )?;
40
41 conn.execute_batch(
42 "CREATE TABLE IF NOT EXISTS sessions (
43 id INTEGER PRIMARY KEY AUTOINCREMENT,
44 session_key TEXT NOT NULL,
45 role TEXT NOT NULL,
46 content TEXT NOT NULL,
47 created_at TEXT NOT NULL
48 );
49 CREATE INDEX IF NOT EXISTS idx_sessions_key ON sessions(session_key);
50 CREATE INDEX IF NOT EXISTS idx_sessions_key_id ON sessions(session_key, id);
51
52 CREATE TABLE IF NOT EXISTS session_metadata (
53 session_key TEXT PRIMARY KEY,
54 created_at TEXT NOT NULL,
55 last_activity TEXT NOT NULL,
56 message_count INTEGER NOT NULL DEFAULT 0,
57 name TEXT
58 );
59
60 CREATE VIRTUAL TABLE IF NOT EXISTS sessions_fts USING fts5(
61 session_key, content, content=sessions, content_rowid=id
62 );
63
64 CREATE TRIGGER IF NOT EXISTS sessions_ai AFTER INSERT ON sessions BEGIN
65 INSERT INTO sessions_fts(rowid, session_key, content)
66 VALUES (new.id, new.session_key, new.content);
67 END;
68 CREATE TRIGGER IF NOT EXISTS sessions_ad AFTER DELETE ON sessions BEGIN
69 INSERT INTO sessions_fts(sessions_fts, rowid, session_key, content)
70 VALUES ('delete', old.id, old.session_key, old.content);
71 END;",
72 )
73 .context("Failed to initialize session schema")?;
74
75 let has_name: bool = conn
77 .query_row(
78 "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'name'",
79 [],
80 |row| row.get(0),
81 )
82 .unwrap_or(false);
83 if !has_name {
84 let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN name TEXT", []);
85 }
86
87 let has_state: bool = conn
89 .query_row(
90 "SELECT COUNT(*) > 0 FROM pragma_table_info('session_metadata') WHERE name = 'state'",
91 [],
92 |row| row.get(0),
93 )
94 .unwrap_or(false);
95 if !has_state {
96 let _ = conn.execute(
97 "ALTER TABLE session_metadata ADD COLUMN state TEXT NOT NULL DEFAULT 'idle'",
98 [],
99 );
100 let _ = conn.execute("ALTER TABLE session_metadata ADD COLUMN turn_id TEXT", []);
101 let _ = conn.execute(
102 "ALTER TABLE session_metadata ADD COLUMN turn_started_at TEXT",
103 [],
104 );
105 }
106
107 Ok(Self {
108 conn: Mutex::new(conn),
109 db_path,
110 })
111 }
112
113 pub fn migrate_from_jsonl(&self, workspace_dir: &Path) -> Result<usize> {
115 let sessions_dir = workspace_dir.join("sessions");
116 let entries = match std::fs::read_dir(&sessions_dir) {
117 Ok(e) => e,
118 Err(_) => return Ok(0),
119 };
120
121 let mut migrated = 0;
122 for entry in entries {
123 let entry = match entry {
124 Ok(e) => e,
125 Err(_) => continue,
126 };
127 let name = match entry.file_name().into_string() {
128 Ok(n) => n,
129 Err(_) => continue,
130 };
131 let Some(key) = name.strip_suffix(".jsonl") else {
132 continue;
133 };
134
135 let path = entry.path();
136 let file = match std::fs::File::open(&path) {
137 Ok(f) => f,
138 Err(_) => continue,
139 };
140
141 let reader = std::io::BufReader::new(file);
142 let mut count = 0;
143 for line in std::io::BufRead::lines(reader) {
144 let Ok(line) = line else { continue };
145 let trimmed = line.trim();
146 if trimmed.is_empty() {
147 continue;
148 }
149 if let Ok(msg) = serde_json::from_str::<ChatMessage>(trimmed) {
150 if self.append(key, &msg).is_ok() {
151 count += 1;
152 }
153 }
154 }
155
156 if count > 0 {
157 let migrated_path = path.with_extension("jsonl.migrated");
158 let _ = std::fs::rename(&path, &migrated_path);
159 migrated += 1;
160 }
161 }
162
163 Ok(migrated)
164 }
165}
166
167impl SessionBackend for SqliteSessionBackend {
168 fn load(&self, session_key: &str) -> Vec<ChatMessage> {
169 let conn = self.conn.lock();
170 let mut stmt = match conn
171 .prepare("SELECT role, content FROM sessions WHERE session_key = ?1 ORDER BY id ASC")
172 {
173 Ok(s) => s,
174 Err(_) => return Vec::new(),
175 };
176
177 let rows = match stmt.query_map(params![session_key], |row| {
178 Ok(ChatMessage {
179 role: row.get(0)?,
180 content: row.get(1)?,
181 })
182 }) {
183 Ok(r) => r,
184 Err(_) => return Vec::new(),
185 };
186
187 rows.filter_map(|r| r.ok()).collect()
188 }
189
190 fn append(&self, session_key: &str, message: &ChatMessage) -> std::io::Result<()> {
191 let conn = self.conn.lock();
192 let now = Utc::now().to_rfc3339();
193
194 conn.execute(
195 "INSERT INTO sessions (session_key, role, content, created_at)
196 VALUES (?1, ?2, ?3, ?4)",
197 params![session_key, message.role, message.content, now],
198 )
199 .map_err(std::io::Error::other)?;
200
201 conn.execute(
203 "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count)
204 VALUES (?1, ?2, ?3, 1)
205 ON CONFLICT(session_key) DO UPDATE SET
206 last_activity = excluded.last_activity,
207 message_count = message_count + 1",
208 params![session_key, now, now],
209 )
210 .map_err(std::io::Error::other)?;
211
212 Ok(())
213 }
214
215 fn remove_last(&self, session_key: &str) -> std::io::Result<bool> {
216 let conn = self.conn.lock();
217
218 let last_id: Option<i64> = conn
219 .query_row(
220 "SELECT id FROM sessions WHERE session_key = ?1 ORDER BY id DESC LIMIT 1",
221 params![session_key],
222 |row| row.get(0),
223 )
224 .ok();
225
226 let Some(id) = last_id else {
227 return Ok(false);
228 };
229
230 conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])
231 .map_err(std::io::Error::other)?;
232
233 conn.execute(
235 "UPDATE session_metadata SET message_count = MAX(0, message_count - 1)
236 WHERE session_key = ?1",
237 params![session_key],
238 )
239 .map_err(std::io::Error::other)?;
240
241 Ok(true)
242 }
243
244 fn list_sessions(&self) -> Vec<String> {
245 let conn = self.conn.lock();
246 let mut stmt = match conn
247 .prepare("SELECT session_key FROM session_metadata ORDER BY last_activity DESC")
248 {
249 Ok(s) => s,
250 Err(_) => return Vec::new(),
251 };
252
253 let rows = match stmt.query_map([], |row| row.get(0)) {
254 Ok(r) => r,
255 Err(_) => return Vec::new(),
256 };
257
258 rows.filter_map(|r| r.ok()).collect()
259 }
260
261 fn list_sessions_with_metadata(&self) -> Vec<SessionMetadata> {
262 let conn = self.conn.lock();
263 let mut stmt = match conn.prepare(
264 "SELECT session_key, created_at, last_activity, message_count, name
265 FROM session_metadata ORDER BY last_activity DESC",
266 ) {
267 Ok(s) => s,
268 Err(_) => return Vec::new(),
269 };
270
271 let rows = match stmt.query_map([], |row| {
272 let key: String = row.get(0)?;
273 let created_str: String = row.get(1)?;
274 let activity_str: String = row.get(2)?;
275 let count: i64 = row.get(3)?;
276 let name: Option<String> = row.get(4)?;
277
278 let created = DateTime::parse_from_rfc3339(&created_str)
279 .map(|dt| dt.with_timezone(&Utc))
280 .unwrap_or_else(|_| Utc::now());
281 let activity = DateTime::parse_from_rfc3339(&activity_str)
282 .map(|dt| dt.with_timezone(&Utc))
283 .unwrap_or_else(|_| Utc::now());
284
285 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
286 Ok(SessionMetadata {
287 key,
288 name,
289 created_at: created,
290 last_activity: activity,
291 message_count: count as usize,
292 })
293 }) {
294 Ok(r) => r,
295 Err(_) => return Vec::new(),
296 };
297
298 rows.filter_map(|r| r.ok()).collect()
299 }
300
301 fn cleanup_stale(&self, ttl_hours: u32) -> std::io::Result<usize> {
302 let conn = self.conn.lock();
303 let cutoff = (Utc::now() - Duration::hours(i64::from(ttl_hours))).to_rfc3339();
304
305 let stale_keys: Vec<String> = {
307 let mut stmt = conn
308 .prepare("SELECT session_key FROM session_metadata WHERE last_activity < ?1")
309 .map_err(std::io::Error::other)?;
310 let rows = stmt
311 .query_map(params![cutoff], |row| row.get(0))
312 .map_err(std::io::Error::other)?;
313 rows.filter_map(|r| r.ok()).collect()
314 };
315
316 let count = stale_keys.len();
317 for key in &stale_keys {
318 let _ = conn.execute("DELETE FROM sessions WHERE session_key = ?1", params![key]);
319 let _ = conn.execute(
320 "DELETE FROM session_metadata WHERE session_key = ?1",
321 params![key],
322 );
323 }
324
325 Ok(count)
326 }
327
328 fn delete_session(&self, session_key: &str) -> std::io::Result<bool> {
329 let conn = self.conn.lock();
330
331 let exists: bool = conn
333 .query_row(
334 "SELECT COUNT(*) > 0 FROM session_metadata WHERE session_key = ?1",
335 params![session_key],
336 |row| row.get(0),
337 )
338 .unwrap_or(false);
339
340 if !exists {
341 return Ok(false);
342 }
343
344 conn.execute(
346 "DELETE FROM sessions WHERE session_key = ?1",
347 params![session_key],
348 )
349 .map_err(std::io::Error::other)?;
350
351 conn.execute(
353 "DELETE FROM session_metadata WHERE session_key = ?1",
354 params![session_key],
355 )
356 .map_err(std::io::Error::other)?;
357
358 Ok(true)
359 }
360
361 fn set_session_name(&self, session_key: &str, name: &str) -> std::io::Result<()> {
362 let conn = self.conn.lock();
363 let name_val = if name.is_empty() { None } else { Some(name) };
364 conn.execute(
365 "UPDATE session_metadata SET name = ?1 WHERE session_key = ?2",
366 params![name_val, session_key],
367 )
368 .map_err(std::io::Error::other)?;
369 Ok(())
370 }
371
372 fn get_session_name(&self, session_key: &str) -> std::io::Result<Option<String>> {
373 let conn = self.conn.lock();
374 conn.query_row(
375 "SELECT name FROM session_metadata WHERE session_key = ?1",
376 params![session_key],
377 |row| row.get(0),
378 )
379 .map_err(std::io::Error::other)
380 }
381
382 fn set_session_state(
383 &self,
384 session_key: &str,
385 state: &str,
386 turn_id: Option<&str>,
387 ) -> std::io::Result<()> {
388 let conn = self.conn.lock();
389 let now = Utc::now().to_rfc3339();
390 let started_at = if state == "running" {
391 Some(now.as_str())
392 } else {
393 None
394 };
395 conn.execute(
396 "UPDATE session_metadata SET state = ?1, turn_id = ?2, turn_started_at = ?3
397 WHERE session_key = ?4",
398 params![state, turn_id, started_at, session_key],
399 )
400 .map_err(std::io::Error::other)?;
401 Ok(())
402 }
403
404 fn get_session_state(&self, session_key: &str) -> std::io::Result<Option<SessionState>> {
405 let conn = self.conn.lock();
406 conn.query_row(
407 "SELECT state, turn_id, turn_started_at FROM session_metadata WHERE session_key = ?1",
408 params![session_key],
409 |row| {
410 let state: String = row.get(0)?;
411 let turn_id: Option<String> = row.get(1)?;
412 let started_str: Option<String> = row.get(2)?;
413 let turn_started_at = started_str.and_then(|s| {
414 chrono::DateTime::parse_from_rfc3339(&s)
415 .ok()
416 .map(|dt| dt.with_timezone(&Utc))
417 });
418 Ok(SessionState {
419 state,
420 turn_id,
421 turn_started_at,
422 })
423 },
424 )
425 .map(Some)
426 .or_else(|e| match e {
427 rusqlite::Error::QueryReturnedNoRows => Ok(None),
428 other => Err(std::io::Error::other(other)),
429 })
430 }
431
432 fn list_running_sessions(&self) -> Vec<SessionMetadata> {
433 let conn = self.conn.lock();
434 let mut stmt = match conn.prepare(
435 "SELECT session_key, created_at, last_activity, message_count, name
436 FROM session_metadata WHERE state = 'running' ORDER BY turn_started_at DESC",
437 ) {
438 Ok(s) => s,
439 Err(_) => return Vec::new(),
440 };
441
442 let rows = match stmt.query_map([], |row| {
443 let key: String = row.get(0)?;
444 let created_str: String = row.get(1)?;
445 let activity_str: String = row.get(2)?;
446 let count: i64 = row.get(3)?;
447 let name: Option<String> = row.get(4)?;
448 let created = DateTime::parse_from_rfc3339(&created_str)
449 .map(|dt| dt.with_timezone(&Utc))
450 .unwrap_or_else(|_| Utc::now());
451 let activity = DateTime::parse_from_rfc3339(&activity_str)
452 .map(|dt| dt.with_timezone(&Utc))
453 .unwrap_or_else(|_| Utc::now());
454 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
455 Ok(SessionMetadata {
456 key,
457 name,
458 created_at: created,
459 last_activity: activity,
460 message_count: count as usize,
461 })
462 }) {
463 Ok(r) => r,
464 Err(_) => return Vec::new(),
465 };
466
467 rows.filter_map(|r| r.ok()).collect()
468 }
469
470 fn list_stuck_sessions(&self, threshold_secs: u64) -> Vec<SessionMetadata> {
471 let conn = self.conn.lock();
472 #[allow(clippy::cast_possible_wrap)]
473 let cutoff = (Utc::now() - chrono::Duration::seconds(threshold_secs as i64)).to_rfc3339();
474 let mut stmt = match conn.prepare(
475 "SELECT session_key, created_at, last_activity, message_count, name
476 FROM session_metadata
477 WHERE state = 'running' AND turn_started_at < ?1
478 ORDER BY turn_started_at ASC",
479 ) {
480 Ok(s) => s,
481 Err(_) => return Vec::new(),
482 };
483
484 let rows = match stmt.query_map(params![cutoff], |row| {
485 let key: String = row.get(0)?;
486 let created_str: String = row.get(1)?;
487 let activity_str: String = row.get(2)?;
488 let count: i64 = row.get(3)?;
489 let name: Option<String> = row.get(4)?;
490 let created = DateTime::parse_from_rfc3339(&created_str)
491 .map(|dt| dt.with_timezone(&Utc))
492 .unwrap_or_else(|_| Utc::now());
493 let activity = DateTime::parse_from_rfc3339(&activity_str)
494 .map(|dt| dt.with_timezone(&Utc))
495 .unwrap_or_else(|_| Utc::now());
496 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
497 Ok(SessionMetadata {
498 key,
499 name,
500 created_at: created,
501 last_activity: activity,
502 message_count: count as usize,
503 })
504 }) {
505 Ok(r) => r,
506 Err(_) => return Vec::new(),
507 };
508
509 rows.filter_map(|r| r.ok()).collect()
510 }
511
512 fn search(&self, query: &SessionQuery) -> Vec<SessionMetadata> {
513 let Some(keyword) = &query.keyword else {
514 return self.list_sessions_with_metadata();
515 };
516
517 let conn = self.conn.lock();
518 #[allow(clippy::cast_possible_wrap)]
519 let limit = query.limit.unwrap_or(50) as i64;
520
521 let mut stmt = match conn.prepare(
523 "SELECT DISTINCT f.session_key
524 FROM sessions_fts f
525 WHERE sessions_fts MATCH ?1
526 LIMIT ?2",
527 ) {
528 Ok(s) => s,
529 Err(_) => return Vec::new(),
530 };
531
532 let fts_query: String = keyword
534 .split_whitespace()
535 .map(|w| format!("\"{w}\""))
536 .collect::<Vec<_>>()
537 .join(" OR ");
538
539 let keys: Vec<String> = match stmt.query_map(params![fts_query, limit], |row| row.get(0)) {
540 Ok(r) => r.filter_map(|r| r.ok()).collect(),
541 Err(_) => return Vec::new(),
542 };
543
544 keys.iter()
546 .filter_map(|key| {
547 conn.query_row(
548 "SELECT created_at, last_activity, message_count, name FROM session_metadata WHERE session_key = ?1",
549 params![key],
550 |row| {
551 let created_str: String = row.get(0)?;
552 let activity_str: String = row.get(1)?;
553 let count: i64 = row.get(2)?;
554 let name: Option<String> = row.get(3)?;
555 Ok(SessionMetadata {
556 key: key.clone(),
557 name,
558 created_at: DateTime::parse_from_rfc3339(&created_str)
559 .map(|dt| dt.with_timezone(&Utc))
560 .unwrap_or_else(|_| Utc::now()),
561 last_activity: DateTime::parse_from_rfc3339(&activity_str)
562 .map(|dt| dt.with_timezone(&Utc))
563 .unwrap_or_else(|_| Utc::now()),
564 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
565 message_count: count as usize,
566 })
567 },
568 )
569 .ok()
570 })
571 .collect()
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578 use tempfile::TempDir;
579
580 #[test]
581 fn round_trip_sqlite() {
582 let tmp = TempDir::new().unwrap();
583 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
584
585 backend
586 .append("user1", &ChatMessage::user("hello"))
587 .unwrap();
588 backend
589 .append("user1", &ChatMessage::assistant("hi"))
590 .unwrap();
591
592 let msgs = backend.load("user1");
593 assert_eq!(msgs.len(), 2);
594 assert_eq!(msgs[0].role, "user");
595 assert_eq!(msgs[1].role, "assistant");
596 }
597
598 #[test]
599 fn remove_last_sqlite() {
600 let tmp = TempDir::new().unwrap();
601 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
602
603 backend.append("u", &ChatMessage::user("a")).unwrap();
604 backend.append("u", &ChatMessage::user("b")).unwrap();
605
606 assert!(backend.remove_last("u").unwrap());
607 let msgs = backend.load("u");
608 assert_eq!(msgs.len(), 1);
609 assert_eq!(msgs[0].content, "a");
610 }
611
612 #[test]
613 fn remove_last_empty_sqlite() {
614 let tmp = TempDir::new().unwrap();
615 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
616 assert!(!backend.remove_last("nonexistent").unwrap());
617 }
618
619 #[test]
620 fn list_sessions_sqlite() {
621 let tmp = TempDir::new().unwrap();
622 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
623
624 backend.append("a", &ChatMessage::user("hi")).unwrap();
625 backend.append("b", &ChatMessage::user("hey")).unwrap();
626
627 let sessions = backend.list_sessions();
628 assert_eq!(sessions.len(), 2);
629 }
630
631 #[test]
632 fn metadata_tracks_counts() {
633 let tmp = TempDir::new().unwrap();
634 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
635
636 backend.append("s1", &ChatMessage::user("a")).unwrap();
637 backend.append("s1", &ChatMessage::user("b")).unwrap();
638 backend.append("s1", &ChatMessage::user("c")).unwrap();
639
640 let meta = backend.list_sessions_with_metadata();
641 assert_eq!(meta.len(), 1);
642 assert_eq!(meta[0].message_count, 3);
643 }
644
645 #[test]
646 fn fts5_search_finds_content() {
647 let tmp = TempDir::new().unwrap();
648 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
649
650 backend
651 .append(
652 "code_chat",
653 &ChatMessage::user("How do I parse JSON in Rust?"),
654 )
655 .unwrap();
656 backend
657 .append("weather", &ChatMessage::user("What's the weather today?"))
658 .unwrap();
659
660 let results = backend.search(&SessionQuery {
661 keyword: Some("Rust".into()),
662 limit: Some(10),
663 });
664 assert_eq!(results.len(), 1);
665 assert_eq!(results[0].key, "code_chat");
666 }
667
668 #[test]
669 fn cleanup_stale_removes_old_sessions() {
670 let tmp = TempDir::new().unwrap();
671 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
672
673 {
675 let conn = backend.conn.lock();
676 let old_time = (Utc::now() - Duration::hours(100)).to_rfc3339();
677 conn.execute(
678 "INSERT INTO sessions (session_key, role, content, created_at) VALUES (?1, ?2, ?3, ?4)",
679 params!["old_session", "user", "ancient", old_time],
680 ).unwrap();
681 conn.execute(
682 "INSERT INTO session_metadata (session_key, created_at, last_activity, message_count) VALUES (?1, ?2, ?3, 1)",
683 params!["old_session", old_time, old_time],
684 ).unwrap();
685 }
686
687 backend
688 .append("new_session", &ChatMessage::user("fresh"))
689 .unwrap();
690
691 let cleaned = backend.cleanup_stale(48).unwrap(); assert_eq!(cleaned, 1);
693
694 let sessions = backend.list_sessions();
695 assert_eq!(sessions.len(), 1);
696 assert_eq!(sessions[0], "new_session");
697 }
698
699 #[test]
700 fn delete_session_removes_all_data() {
701 let tmp = TempDir::new().unwrap();
702 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
703
704 backend.append("s1", &ChatMessage::user("hello")).unwrap();
705 backend.append("s1", &ChatMessage::assistant("hi")).unwrap();
706 backend.append("s2", &ChatMessage::user("other")).unwrap();
707
708 assert!(backend.delete_session("s1").unwrap());
709 assert!(backend.load("s1").is_empty());
710 assert_eq!(backend.list_sessions().len(), 1);
711 assert_eq!(backend.list_sessions()[0], "s2");
712 }
713
714 #[test]
715 fn delete_session_returns_false_for_missing() {
716 let tmp = TempDir::new().unwrap();
717 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
718 assert!(!backend.delete_session("nonexistent").unwrap());
719 }
720
721 #[test]
722 fn migrate_from_jsonl_imports_and_renames() {
723 let tmp = TempDir::new().unwrap();
724 let sessions_dir = tmp.path().join("sessions");
725 std::fs::create_dir_all(&sessions_dir).unwrap();
726
727 let jsonl_path = sessions_dir.join("test_user.jsonl");
729 std::fs::write(
730 &jsonl_path,
731 "{\"role\":\"user\",\"content\":\"hello\"}\n{\"role\":\"assistant\",\"content\":\"hi\"}\n",
732 )
733 .unwrap();
734
735 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
736 let migrated = backend.migrate_from_jsonl(tmp.path()).unwrap();
737 assert_eq!(migrated, 1);
738
739 assert!(!jsonl_path.exists());
741 assert!(sessions_dir.join("test_user.jsonl.migrated").exists());
742
743 let msgs = backend.load("test_user");
745 assert_eq!(msgs.len(), 2);
746 assert_eq!(msgs[0].content, "hello");
747 }
748
749 #[test]
750 fn set_session_name_persists() {
751 let tmp = TempDir::new().unwrap();
752 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
753
754 backend.append("s1", &ChatMessage::user("hello")).unwrap();
755 backend.set_session_name("s1", "My Session").unwrap();
756
757 let meta = backend.list_sessions_with_metadata();
758 assert_eq!(meta.len(), 1);
759 assert_eq!(meta[0].name.as_deref(), Some("My Session"));
760 }
761
762 #[test]
763 fn set_session_name_updates_existing() {
764 let tmp = TempDir::new().unwrap();
765 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
766
767 backend.append("s1", &ChatMessage::user("hello")).unwrap();
768 backend.set_session_name("s1", "First").unwrap();
769 backend.set_session_name("s1", "Second").unwrap();
770
771 let meta = backend.list_sessions_with_metadata();
772 assert_eq!(meta[0].name.as_deref(), Some("Second"));
773 }
774
775 #[test]
776 fn sessions_without_name_return_none() {
777 let tmp = TempDir::new().unwrap();
778 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
779
780 backend.append("s1", &ChatMessage::user("hello")).unwrap();
781
782 let meta = backend.list_sessions_with_metadata();
783 assert_eq!(meta.len(), 1);
784 assert!(meta[0].name.is_none());
785 }
786
787 #[test]
790 fn session_state_idle_to_running() {
791 let tmp = TempDir::new().unwrap();
792 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
793 backend.append("s1", &ChatMessage::user("hello")).unwrap();
794
795 backend
796 .set_session_state("s1", "running", Some("turn-1"))
797 .unwrap();
798 let state = backend.get_session_state("s1").unwrap().unwrap();
799 assert_eq!(state.state, "running");
800 assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
801 assert!(state.turn_started_at.is_some());
802 }
803
804 #[test]
805 fn session_state_running_to_idle() {
806 let tmp = TempDir::new().unwrap();
807 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
808 backend.append("s1", &ChatMessage::user("hello")).unwrap();
809
810 backend
811 .set_session_state("s1", "running", Some("turn-1"))
812 .unwrap();
813 backend.set_session_state("s1", "idle", None).unwrap();
814
815 let state = backend.get_session_state("s1").unwrap().unwrap();
816 assert_eq!(state.state, "idle");
817 assert!(state.turn_id.is_none());
818 assert!(state.turn_started_at.is_none());
819 }
820
821 #[test]
822 fn session_state_running_to_error() {
823 let tmp = TempDir::new().unwrap();
824 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
825 backend.append("s1", &ChatMessage::user("hello")).unwrap();
826
827 backend
828 .set_session_state("s1", "running", Some("turn-1"))
829 .unwrap();
830 backend
831 .set_session_state("s1", "error", Some("turn-1"))
832 .unwrap();
833
834 let state = backend.get_session_state("s1").unwrap().unwrap();
835 assert_eq!(state.state, "error");
836 assert_eq!(state.turn_id.as_deref(), Some("turn-1"));
837 }
838
839 #[test]
840 fn list_running_sessions_returns_running_only() {
841 let tmp = TempDir::new().unwrap();
842 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
843
844 backend.append("s1", &ChatMessage::user("a")).unwrap();
845 backend.append("s2", &ChatMessage::user("b")).unwrap();
846 backend.append("s3", &ChatMessage::user("c")).unwrap();
847
848 backend
849 .set_session_state("s1", "running", Some("t1"))
850 .unwrap();
851 backend
852 .set_session_state("s2", "running", Some("t2"))
853 .unwrap();
854 let running = backend.list_running_sessions();
857 assert_eq!(running.len(), 2);
858 let keys: Vec<&str> = running.iter().map(|m| m.key.as_str()).collect();
859 assert!(keys.contains(&"s1"));
860 assert!(keys.contains(&"s2"));
861 }
862
863 #[test]
864 fn list_stuck_sessions_detects_old_running() {
865 let tmp = TempDir::new().unwrap();
866 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
867 backend.append("s1", &ChatMessage::user("a")).unwrap();
868
869 {
871 let conn = backend.conn.lock();
872 let old_time = (Utc::now() - Duration::seconds(600)).to_rfc3339();
873 conn.execute(
874 "UPDATE session_metadata SET state = 'running', turn_id = 'old', turn_started_at = ?1 WHERE session_key = 's1'",
875 params![old_time],
876 ).unwrap();
877 }
878
879 let stuck = backend.list_stuck_sessions(300); assert_eq!(stuck.len(), 1);
881 assert_eq!(stuck[0].key, "s1");
882
883 let not_stuck = backend.list_stuck_sessions(900); assert_eq!(not_stuck.len(), 0);
886 }
887
888 #[test]
889 fn get_session_state_nonexistent() {
890 let tmp = TempDir::new().unwrap();
891 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
892 let state = backend.get_session_state("nonexistent").unwrap();
893 assert!(state.is_none());
894 }
895
896 #[test]
897 fn session_state_migration_preserves_data() {
898 let tmp = TempDir::new().unwrap();
899 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
901 backend.append("s1", &ChatMessage::user("hello")).unwrap();
902
903 drop(backend);
905 let backend2 = SqliteSessionBackend::new(tmp.path()).unwrap();
906 let msgs = backend2.load("s1");
907 assert_eq!(msgs.len(), 1);
908 assert_eq!(msgs[0].content, "hello");
909
910 let state = backend2.get_session_state("s1").unwrap().unwrap();
912 assert_eq!(state.state, "idle");
913 }
914
915 #[test]
916 fn empty_name_clears_to_none() {
917 let tmp = TempDir::new().unwrap();
918 let backend = SqliteSessionBackend::new(tmp.path()).unwrap();
919
920 backend.append("s1", &ChatMessage::user("hello")).unwrap();
921 backend.set_session_name("s1", "Named").unwrap();
922 backend.set_session_name("s1", "").unwrap();
923
924 let meta = backend.list_sessions_with_metadata();
925 assert!(meta[0].name.is_none());
926 }
927}