1use crate::filter::SessionFilter;
7use crate::session::{
8 EntryType, Session, SessionEntry, SessionId, SessionMetadata, SessionStatus, SessionType,
9 Timestamp,
10};
11use crate::{Result, Workspace, WorkspaceError};
12use serde_json::Value;
13use std::collections::HashMap;
14use std::path::PathBuf;
15use std::sync::Arc;
16use tokio::fs;
17use tokio::sync::RwLock;
18use tracing::{debug, info, warn};
19
20pub struct SessionManager {
40 sessions_dir: PathBuf,
42 active_sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
44}
45
46impl SessionManager {
47 pub async fn new(workspace: &Workspace) -> Result<Self> {
51 let sessions_dir = workspace.root.join(".thulp").join("sessions");
52 fs::create_dir_all(&sessions_dir).await?;
53
54 debug!(?sessions_dir, "Initialized session manager");
55
56 Ok(Self {
57 sessions_dir,
58 active_sessions: Arc::new(RwLock::new(HashMap::new())),
59 })
60 }
61
62 pub async fn with_sessions_dir(sessions_dir: PathBuf) -> Result<Self> {
64 fs::create_dir_all(&sessions_dir).await?;
65
66 Ok(Self {
67 sessions_dir,
68 active_sessions: Arc::new(RwLock::new(HashMap::new())),
69 })
70 }
71
72 fn session_path(&self, id: &SessionId) -> PathBuf {
74 self.sessions_dir.join(format!("{}.json", id))
75 }
76
77 pub async fn create_session(
81 &self,
82 name: impl Into<String>,
83 session_type: SessionType,
84 ) -> Result<Session> {
85 let session = Session::new(name, session_type);
86 let id = session.id().clone();
87
88 self.save_session_internal(&session).await?;
90
91 {
93 let mut sessions = self.active_sessions.write().await;
94 sessions.insert(id.clone(), session.clone());
95 }
96
97 info!(session_id = %id, "Created new session");
98 Ok(session)
99 }
100
101 pub async fn load_session(&self, id: &SessionId) -> Result<Session> {
105 {
107 let sessions = self.active_sessions.read().await;
108 if let Some(session) = sessions.get(id) {
109 return Ok(session.clone());
110 }
111 }
112
113 let path = self.session_path(id);
115 let content = fs::read_to_string(&path).await.map_err(|e| {
116 if e.kind() == std::io::ErrorKind::NotFound {
117 WorkspaceError::NotFound(format!("Session {} not found", id))
118 } else {
119 WorkspaceError::Io(e)
120 }
121 })?;
122
123 let session: Session = serde_json::from_str(&content)
124 .map_err(|e| WorkspaceError::Serialization(e.to_string()))?;
125
126 {
128 let mut sessions = self.active_sessions.write().await;
129 sessions.insert(id.clone(), session.clone());
130 }
131
132 debug!(session_id = %id, "Loaded session from disk");
133 Ok(session)
134 }
135
136 pub async fn save_session(&self, session: &Session) -> Result<()> {
138 self.save_session_internal(session).await?;
139
140 {
142 let mut sessions = self.active_sessions.write().await;
143 sessions.insert(session.id().clone(), session.clone());
144 }
145
146 Ok(())
147 }
148
149 async fn save_session_internal(&self, session: &Session) -> Result<()> {
151 let path = self.session_path(session.id());
152 let content = serde_json::to_string_pretty(session)
153 .map_err(|e| WorkspaceError::Serialization(e.to_string()))?;
154
155 fs::write(&path, content).await?;
156 debug!(session_id = %session.id(), "Saved session to disk");
157 Ok(())
158 }
159
160 pub async fn add_entry(
164 &self,
165 session_id: &SessionId,
166 entry_type: EntryType,
167 content: Value,
168 ) -> Result<SessionEntry> {
169 let entry = SessionEntry::new(entry_type, content);
170
171 {
172 let mut sessions = self.active_sessions.write().await;
173 if let Some(session) = sessions.get_mut(session_id) {
174 session.add_entry(entry.clone());
175 self.save_session_internal(session).await?;
177 } else {
178 drop(sessions); let mut session = self.load_session(session_id).await?;
181 session.add_entry(entry.clone());
182 self.save_session(&session).await?;
183 }
184 }
185
186 debug!(session_id = %session_id, entry_id = %entry.id, "Added entry to session");
187 Ok(entry)
188 }
189
190 pub async fn complete_session(&self, session_id: &SessionId) -> Result<()> {
194 self.update_status(session_id, SessionStatus::Completed)
195 .await
196 }
197
198 pub async fn fail_session(&self, session_id: &SessionId) -> Result<()> {
202 self.update_status(session_id, SessionStatus::Failed).await
203 }
204
205 pub async fn cancel_session(&self, session_id: &SessionId) -> Result<()> {
209 self.update_status(session_id, SessionStatus::Cancelled)
210 .await
211 }
212
213 pub async fn pause_session(&self, session_id: &SessionId) -> Result<()> {
217 self.update_status(session_id, SessionStatus::Paused).await
218 }
219
220 pub async fn resume_session(&self, session_id: &SessionId) -> Result<()> {
224 self.update_status(session_id, SessionStatus::Active).await
225 }
226
227 async fn update_status(&self, session_id: &SessionId, status: SessionStatus) -> Result<()> {
229 let mut session = self.load_session(session_id).await?;
230 session.set_status(status);
231 self.save_session(&session).await?;
232
233 info!(session_id = %session_id, ?status, "Updated session status");
234 Ok(())
235 }
236
237 pub async fn list_sessions(
239 &self,
240 filter: Option<&SessionFilter>,
241 ) -> Result<Vec<SessionMetadata>> {
242 let mut metadata_list = Vec::new();
243
244 let mut entries = fs::read_dir(&self.sessions_dir).await?;
245 while let Some(entry) = entries.next_entry().await? {
246 let path = entry.path();
247 if path.extension().and_then(|s| s.to_str()) != Some("json") {
248 continue;
249 }
250
251 match fs::read_to_string(&path).await {
252 Ok(content) => {
253 match serde_json::from_str::<Session>(&content) {
254 Ok(session) => {
255 let metadata = session.metadata.clone();
256
257 if let Some(filter) = filter {
259 if filter.matches(&session) {
260 metadata_list.push(metadata);
261 }
262 } else {
263 metadata_list.push(metadata);
264 }
265 }
266 Err(e) => {
267 warn!(path = ?path, error = %e, "Failed to parse session file");
268 }
269 }
270 }
271 Err(e) => {
272 warn!(path = ?path, error = %e, "Failed to read session file");
273 }
274 }
275 }
276
277 metadata_list.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
279
280 Ok(metadata_list)
281 }
282
283 pub async fn delete_session(&self, session_id: &SessionId) -> Result<()> {
287 {
289 let mut sessions = self.active_sessions.write().await;
290 sessions.remove(session_id);
291 }
292
293 let path = self.session_path(session_id);
295 if path.exists() {
296 fs::remove_file(&path).await?;
297 info!(session_id = %session_id, "Deleted session");
298 }
299
300 Ok(())
301 }
302
303 pub async fn session_exists(&self, session_id: &SessionId) -> bool {
305 {
307 let sessions = self.active_sessions.read().await;
308 if sessions.contains_key(session_id) {
309 return true;
310 }
311 }
312
313 self.session_path(session_id).exists()
315 }
316
317 pub async fn peek_session(&self, session_id: &SessionId) -> Result<Session> {
321 let path = self.session_path(session_id);
322 let content = fs::read_to_string(&path).await.map_err(|e| {
323 if e.kind() == std::io::ErrorKind::NotFound {
324 WorkspaceError::NotFound(format!("Session {} not found", session_id))
325 } else {
326 WorkspaceError::Io(e)
327 }
328 })?;
329
330 serde_json::from_str(&content).map_err(|e| WorkspaceError::Serialization(e.to_string()))
331 }
332
333 pub async fn evict_from_cache(&self, session_id: &SessionId) {
337 let mut sessions = self.active_sessions.write().await;
338 sessions.remove(session_id);
339 }
340
341 pub async fn clear_cache(&self) {
343 let mut sessions = self.active_sessions.write().await;
344 sessions.clear();
345 }
346
347 pub async fn cached_session_count(&self) -> usize {
349 let sessions = self.active_sessions.read().await;
350 sessions.len()
351 }
352
353 pub async fn active_sessions(&self) -> Vec<Session> {
355 let sessions = self.active_sessions.read().await;
356 sessions.values().cloned().collect()
357 }
358
359 pub async fn find_by_tag(&self, tag: &str) -> Result<Vec<SessionMetadata>> {
361 self.list_sessions(Some(&SessionFilter::HasTag(tag.to_string())))
362 .await
363 }
364
365 pub async fn find_by_type(&self, session_type_name: &str) -> Result<Vec<SessionMetadata>> {
367 self.list_sessions(Some(&SessionFilter::ByTypeName(
368 session_type_name.to_string(),
369 )))
370 .await
371 }
372
373 pub async fn find_by_status(&self, status: SessionStatus) -> Result<Vec<SessionMetadata>> {
375 self.list_sessions(Some(&SessionFilter::ByStatus(status)))
376 .await
377 }
378
379 pub async fn find_created_after(&self, timestamp: Timestamp) -> Result<Vec<SessionMetadata>> {
381 self.list_sessions(Some(&SessionFilter::CreatedAfter(timestamp)))
382 .await
383 }
384
385 pub async fn find_updated_after(&self, timestamp: Timestamp) -> Result<Vec<SessionMetadata>> {
387 self.list_sessions(Some(&SessionFilter::UpdatedAfter(timestamp)))
388 .await
389 }
390
391 pub async fn session_count(&self) -> Result<usize> {
393 let mut count = 0;
394 let mut entries = fs::read_dir(&self.sessions_dir).await?;
395 while let Some(entry) = entries.next_entry().await? {
396 if entry.path().extension().and_then(|s| s.to_str()) == Some("json") {
397 count += 1;
398 }
399 }
400 Ok(count)
401 }
402}
403
404impl std::fmt::Debug for SessionManager {
405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 f.debug_struct("SessionManager")
407 .field("sessions_dir", &self.sessions_dir)
408 .finish_non_exhaustive()
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use tempfile::TempDir;
416
417 async fn create_test_manager() -> (SessionManager, TempDir) {
418 let temp_dir = TempDir::new().unwrap();
419 let sessions_dir = temp_dir.path().join("sessions");
420 let manager = SessionManager::with_sessions_dir(sessions_dir)
421 .await
422 .unwrap();
423 (manager, temp_dir)
424 }
425
426 #[tokio::test]
427 async fn test_create_session() {
428 let (manager, _temp) = create_test_manager().await;
429
430 let session = manager
431 .create_session(
432 "Test Session",
433 SessionType::Conversation {
434 purpose: "Testing".to_string(),
435 },
436 )
437 .await
438 .unwrap();
439
440 assert_eq!(session.name(), "Test Session");
441 assert_eq!(session.status(), SessionStatus::Active);
442 }
443
444 #[tokio::test]
445 async fn test_load_session() {
446 let (manager, _temp) = create_test_manager().await;
447
448 let created = manager
449 .create_session(
450 "Test Session",
451 SessionType::Conversation {
452 purpose: "Testing".to_string(),
453 },
454 )
455 .await
456 .unwrap();
457
458 manager.clear_cache().await;
460
461 let loaded = manager.load_session(created.id()).await.unwrap();
462 assert_eq!(loaded.name(), created.name());
463 assert_eq!(loaded.id(), created.id());
464 }
465
466 #[tokio::test]
467 async fn test_add_entry() {
468 let (manager, _temp) = create_test_manager().await;
469
470 let session = manager
471 .create_session(
472 "Test Session",
473 SessionType::Conversation {
474 purpose: "Testing".to_string(),
475 },
476 )
477 .await
478 .unwrap();
479
480 let entry = manager
481 .add_entry(
482 session.id(),
483 EntryType::UserMessage,
484 serde_json::json!({"text": "Hello"}),
485 )
486 .await
487 .unwrap();
488
489 assert!(matches!(entry.entry_type, EntryType::UserMessage));
490
491 manager.clear_cache().await;
493 let loaded = manager.load_session(session.id()).await.unwrap();
494 assert_eq!(loaded.entries.len(), 1);
495 }
496
497 #[tokio::test]
498 async fn test_complete_session() {
499 let (manager, _temp) = create_test_manager().await;
500
501 let session = manager
502 .create_session(
503 "Test Session",
504 SessionType::Conversation {
505 purpose: "Testing".to_string(),
506 },
507 )
508 .await
509 .unwrap();
510
511 manager.complete_session(session.id()).await.unwrap();
512
513 let loaded = manager.load_session(session.id()).await.unwrap();
514 assert_eq!(loaded.status(), SessionStatus::Completed);
515 }
516
517 #[tokio::test]
518 async fn test_list_sessions() {
519 let (manager, _temp) = create_test_manager().await;
520
521 manager
523 .create_session(
524 "Session 1",
525 SessionType::Conversation {
526 purpose: "Test 1".to_string(),
527 },
528 )
529 .await
530 .unwrap();
531
532 manager
533 .create_session(
534 "Session 2",
535 SessionType::Conversation {
536 purpose: "Test 2".to_string(),
537 },
538 )
539 .await
540 .unwrap();
541
542 let sessions = manager.list_sessions(None).await.unwrap();
543 assert_eq!(sessions.len(), 2);
544 }
545
546 #[tokio::test]
547 async fn test_delete_session() {
548 let (manager, _temp) = create_test_manager().await;
549
550 let session = manager
551 .create_session(
552 "Test Session",
553 SessionType::Conversation {
554 purpose: "Testing".to_string(),
555 },
556 )
557 .await
558 .unwrap();
559
560 manager.delete_session(session.id()).await.unwrap();
561
562 assert!(!manager.session_exists(session.id()).await);
563 }
564
565 #[tokio::test]
566 async fn test_session_exists() {
567 let (manager, _temp) = create_test_manager().await;
568
569 let session = manager
570 .create_session(
571 "Test Session",
572 SessionType::Conversation {
573 purpose: "Testing".to_string(),
574 },
575 )
576 .await
577 .unwrap();
578
579 assert!(manager.session_exists(session.id()).await);
580
581 let fake_id = SessionId::new();
582 assert!(!manager.session_exists(&fake_id).await);
583 }
584
585 #[tokio::test]
586 async fn test_filter_by_status() {
587 let (manager, _temp) = create_test_manager().await;
588
589 let session1 = manager
590 .create_session(
591 "Active Session",
592 SessionType::Conversation {
593 purpose: "Test".to_string(),
594 },
595 )
596 .await
597 .unwrap();
598
599 let session2 = manager
600 .create_session(
601 "Completed Session",
602 SessionType::Conversation {
603 purpose: "Test".to_string(),
604 },
605 )
606 .await
607 .unwrap();
608
609 manager.complete_session(session2.id()).await.unwrap();
610
611 let active = manager.find_by_status(SessionStatus::Active).await.unwrap();
612 assert_eq!(active.len(), 1);
613 assert_eq!(active[0].id, session1.metadata.id);
614
615 let completed = manager
616 .find_by_status(SessionStatus::Completed)
617 .await
618 .unwrap();
619 assert_eq!(completed.len(), 1);
620 assert_eq!(completed[0].id, session2.metadata.id);
621 }
622
623 #[tokio::test]
624 async fn test_cache_operations() {
625 let (manager, _temp) = create_test_manager().await;
626
627 let session = manager
628 .create_session(
629 "Test Session",
630 SessionType::Conversation {
631 purpose: "Testing".to_string(),
632 },
633 )
634 .await
635 .unwrap();
636
637 assert_eq!(manager.cached_session_count().await, 1);
638
639 manager.evict_from_cache(session.id()).await;
640 assert_eq!(manager.cached_session_count().await, 0);
641
642 assert!(manager.session_exists(session.id()).await);
644 }
645
646 #[tokio::test]
647 async fn test_session_count() {
648 let (manager, _temp) = create_test_manager().await;
649
650 assert_eq!(manager.session_count().await.unwrap(), 0);
651
652 manager
653 .create_session(
654 "Session 1",
655 SessionType::Conversation {
656 purpose: "Test".to_string(),
657 },
658 )
659 .await
660 .unwrap();
661
662 manager
663 .create_session(
664 "Session 2",
665 SessionType::Conversation {
666 purpose: "Test".to_string(),
667 },
668 )
669 .await
670 .unwrap();
671
672 assert_eq!(manager.session_count().await.unwrap(), 2);
673 }
674}