Skip to main content

mofa_foundation/agent/
session.rs

1//! Session management for conversation persistence
2//!
3//! Supports multiple storage backends through a trait abstraction
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use tokio::fs;
12use tokio::sync::RwLock;
13
14use mofa_kernel::agent::error::{AgentError, AgentResult};
15
16// ============================================================================
17// Session 数据类型
18// ============================================================================
19
20/// Session message
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SessionMessage {
23    pub role: String,
24    pub content: String,
25    #[serde(default = "Utc::now")]
26    pub timestamp: DateTime<Utc>,
27}
28
29impl SessionMessage {
30    /// Create a new session message
31    pub fn new(role: impl Into<String>, content: impl Into<String>) -> Self {
32        Self {
33            role: role.into(),
34            content: content.into(),
35            timestamp: Utc::now(),
36        }
37    }
38}
39
40/// Conversation session
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct Session {
43    pub key: String,
44    pub messages: Vec<SessionMessage>,
45    #[serde(default = "Utc::now")]
46    pub created_at: DateTime<Utc>,
47    #[serde(default = "Utc::now")]
48    pub updated_at: DateTime<Utc>,
49    #[serde(default)]
50    pub metadata: HashMap<String, Value>,
51}
52
53impl Session {
54    /// Create a new session
55    pub fn new(key: impl Into<String>) -> Self {
56        let key = key.into();
57        Self {
58            key,
59            messages: Vec::new(),
60            created_at: Utc::now(),
61            updated_at: Utc::now(),
62            metadata: HashMap::new(),
63        }
64    }
65
66    /// Add a message to the session
67    pub fn add_message(&mut self, role: impl Into<String>, content: impl Into<String>) {
68        let msg = SessionMessage::new(role, content);
69        self.messages.push(msg);
70        self.updated_at = Utc::now();
71    }
72
73    /// Get message history (limited to recent messages)
74    pub fn get_history(&self, max_messages: usize) -> Vec<SessionMessage> {
75        if self.messages.len() > max_messages {
76            self.messages[self.messages.len() - max_messages..].to_vec()
77        } else {
78            self.messages.clone()
79        }
80    }
81
82    /// Clear all messages
83    pub fn clear(&mut self) {
84        self.messages.clear();
85        self.updated_at = Utc::now();
86    }
87
88    /// Get the number of messages
89    pub fn len(&self) -> usize {
90        self.messages.len()
91    }
92
93    /// Check if empty
94    pub fn is_empty(&self) -> bool {
95        self.messages.is_empty()
96    }
97}
98
99// ============================================================================
100// SessionStorage trait - 多种后端支持
101// ============================================================================
102
103/// Session storage backend trait
104///
105/// This trait provides an abstraction for session storage that can be
106/// implemented by different backends (file-based, in-memory, database, etc.).
107///
108/// Note: A kernel-level storage abstraction exists in mofa_kernel::storage,
109/// but this trait is foundation-specific as it works with the concrete Session type.
110#[async_trait]
111pub trait SessionStorage: Send + Sync {
112    /// Load a session by key
113    async fn load(&self, key: &str) -> AgentResult<Option<Session>>;
114
115    /// Save a session
116    async fn save(&self, session: &Session) -> AgentResult<()>;
117
118    /// Delete a session
119    async fn delete(&self, key: &str) -> AgentResult<bool>;
120
121    /// List all session keys
122    async fn list(&self) -> AgentResult<Vec<String>>;
123}
124
125// ============================================================================
126// JSONL 文件存储实现
127// ============================================================================
128
129/// JSONL file-based session storage
130pub struct JsonlSessionStorage {
131    sessions_dir: PathBuf,
132}
133
134impl JsonlSessionStorage {
135    /// Create a new JSONL session storage
136    pub async fn new(workspace: impl AsRef<Path>) -> AgentResult<Self> {
137        let sessions_dir = workspace.as_ref().join("sessions");
138        fs::create_dir_all(&sessions_dir).await.map_err(|e| {
139            AgentError::IoError(format!("Failed to create sessions directory: {}", e))
140        })?;
141
142        Ok(Self { sessions_dir })
143    }
144
145    /// Get the file path for a session
146    fn session_file(&self, key: &str) -> PathBuf {
147        let safe_key = key.replace(
148            |c: char| !c.is_alphanumeric() && c != '-' && c != ':' && c != '_',
149            "_",
150        );
151        self.sessions_dir.join(format!("{}.jsonl", safe_key))
152    }
153
154    /// Load a session from file
155    async fn load_session(&self, key: &str) -> AgentResult<Option<Session>> {
156        let session_file = self.session_file(key);
157        if !session_file.exists() {
158            return Ok(None);
159        }
160
161        let content = fs::read_to_string(&session_file)
162            .await
163            .map_err(|e| AgentError::IoError(format!("Failed to read session file: {}", e)))?;
164
165        let mut lines = content.lines();
166        let header = lines
167            .next()
168            .ok_or_else(|| AgentError::SerializationError("Empty session file".to_string()))?;
169
170        // Parse header for metadata
171        let header_data: Value = serde_json::from_str(header).map_err(|e| {
172            AgentError::SerializationError(format!("Failed to parse session header: {}", e))
173        })?;
174
175        let key = header_data
176            .get("key")
177            .and_then(|v| v.as_str())
178            .unwrap_or(key)
179            .to_string();
180
181        let created_at = header_data
182            .get("created_at")
183            .and_then(|v| v.as_str())
184            .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
185            .map(|dt| dt.with_timezone(&Utc))
186            .unwrap_or_else(Utc::now);
187
188        let metadata = header_data
189            .get("metadata")
190            .and_then(|v| serde_json::from_value::<HashMap<String, Value>>(v.clone()).ok())
191            .unwrap_or_default();
192
193        let mut messages = Vec::new();
194        for line in lines {
195            if let Ok(msg) = serde_json::from_str::<SessionMessage>(line) {
196                messages.push(msg);
197            }
198        }
199
200        Ok(Some(Session {
201            key,
202            messages,
203            created_at,
204            updated_at: Utc::now(),
205            metadata,
206        }))
207    }
208}
209
210#[async_trait]
211impl SessionStorage for JsonlSessionStorage {
212    async fn load(&self, key: &str) -> AgentResult<Option<Session>> {
213        self.load_session(key).await
214    }
215
216    async fn save(&self, session: &Session) -> AgentResult<()> {
217        let session_file = self.session_file(&session.key);
218
219        // Ensure directory exists
220        if let Some(parent) = session_file.parent() {
221            fs::create_dir_all(parent).await.map_err(|e| {
222                AgentError::IoError(format!("Failed to create sessions directory: {}", e))
223            })?;
224        }
225
226        let mut lines = vec![
227            serde_json::to_string(&serde_json::json!({
228                "key": session.key,
229                "created_at": session.created_at.to_rfc3339(),
230                "updated_at": session.updated_at.to_rfc3339(),
231                "metadata": session.metadata,
232            }))
233            .map_err(|e| {
234                AgentError::SerializationError(format!("Failed to serialize session: {}", e))
235            })?,
236        ];
237
238        for msg in &session.messages {
239            lines.push(serde_json::to_string(msg).map_err(|e| {
240                AgentError::SerializationError(format!("Failed to serialize message: {}", e))
241            })?);
242        }
243
244        fs::write(&session_file, lines.join("\n"))
245            .await
246            .map_err(|e| AgentError::IoError(format!("Failed to write session file: {}", e)))?;
247
248        Ok(())
249    }
250
251    async fn delete(&self, key: &str) -> AgentResult<bool> {
252        let session_file = self.session_file(key);
253        if session_file.exists() {
254            fs::remove_file(&session_file).await.map_err(|e| {
255                AgentError::IoError(format!("Failed to remove session file: {}", e))
256            })?;
257            Ok(true)
258        } else {
259            Ok(false)
260        }
261    }
262
263    async fn list(&self) -> AgentResult<Vec<String>> {
264        let mut entries = fs::read_dir(&self.sessions_dir).await.map_err(|e| {
265            AgentError::IoError(format!("Failed to read sessions directory: {}", e))
266        })?;
267
268        let mut keys = Vec::new();
269        while let Some(entry) = entries
270            .next_entry()
271            .await
272            .map_err(|e| AgentError::IoError(format!("Failed to read entry: {}", e)))?
273        {
274            if let Some(name) = entry.path().file_stem().and_then(|s| s.to_str()) {
275                // Convert back the safe filename to original key
276                let key = name.replace('_', ":");
277                keys.push(key);
278            }
279        }
280
281        Ok(keys)
282    }
283}
284
285// ============================================================================
286// 内存存储实现(用于测试)
287// ============================================================================
288
289/// In-memory session storage (for testing)
290pub struct MemorySessionStorage {
291    sessions: RwLock<HashMap<String, Session>>,
292}
293
294impl MemorySessionStorage {
295    pub fn new() -> Self {
296        Self {
297            sessions: RwLock::new(HashMap::new()),
298        }
299    }
300}
301
302impl Default for MemorySessionStorage {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308#[async_trait]
309impl SessionStorage for MemorySessionStorage {
310    async fn load(&self, key: &str) -> AgentResult<Option<Session>> {
311        let sessions = self.sessions.read().await;
312        Ok(sessions.get(key).cloned())
313    }
314
315    async fn save(&self, session: &Session) -> AgentResult<()> {
316        let mut sessions = self.sessions.write().await;
317        sessions.insert(session.key.clone(), session.clone());
318        Ok(())
319    }
320
321    async fn delete(&self, key: &str) -> AgentResult<bool> {
322        let mut sessions = self.sessions.write().await;
323        Ok(sessions.remove(key).is_some())
324    }
325
326    async fn list(&self) -> AgentResult<Vec<String>> {
327        let sessions = self.sessions.read().await;
328        Ok(sessions.keys().cloned().collect())
329    }
330}
331
332// ============================================================================
333// SessionManager - 统一会话管理接口
334// ============================================================================
335
336/// Session manager with pluggable storage backend
337pub struct SessionManager {
338    storage: Box<dyn SessionStorage>,
339    cache: RwLock<HashMap<String, Session>>,
340}
341
342impl SessionManager {
343    /// Create with JSONL file storage
344    pub async fn with_jsonl(workspace: impl AsRef<Path>) -> AgentResult<Self> {
345        let storage = JsonlSessionStorage::new(workspace).await?;
346        Ok(Self {
347            storage: Box::new(storage),
348            cache: RwLock::new(HashMap::new()),
349        })
350    }
351
352    /// Create with custom storage backend
353    pub fn with_storage(storage: Box<dyn SessionStorage>) -> Self {
354        Self {
355            storage,
356            cache: RwLock::new(HashMap::new()),
357        }
358    }
359
360    /// Get or create a session
361    pub async fn get_or_create(&self, key: &str) -> Session {
362        // Try cache first
363        {
364            let cache = self.cache.read().await;
365            if let Some(session) = cache.get(key) {
366                return session.clone();
367            }
368        }
369
370        // Try storage
371        if let Ok(Some(session)) = self.storage.load(key).await {
372            let mut cache = self.cache.write().await;
373            cache.insert(key.to_string(), session.clone());
374            return session;
375        }
376
377        // Create new session
378        let session = Session::new(key);
379        let mut cache = self.cache.write().await;
380        cache.insert(key.to_string(), session.clone());
381        session
382    }
383
384    /// Save a session
385    pub async fn save(&self, session: &Session) -> AgentResult<()> {
386        self.storage.save(session).await?;
387        let mut cache = self.cache.write().await;
388        cache.insert(session.key.clone(), session.clone());
389        Ok(())
390    }
391
392    /// Delete a session
393    pub async fn delete(&self, key: &str) -> AgentResult<bool> {
394        let result = self.storage.delete(key).await?;
395        let mut cache = self.cache.write().await;
396        cache.remove(key);
397        Ok(result)
398    }
399
400    /// List all session keys
401    pub async fn list(&self) -> AgentResult<Vec<String>> {
402        self.storage.list().await
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use tempfile::TempDir;
410
411    #[tokio::test]
412    async fn test_session_creation() {
413        let session = Session::new("test:key");
414        assert_eq!(session.key, "test:key");
415        assert!(session.is_empty());
416    }
417
418    #[tokio::test]
419    async fn test_session_add_message() {
420        let mut session = Session::new("test:key");
421        session.add_message("user", "Hello");
422        session.add_message("assistant", "Hi there!");
423
424        assert_eq!(session.len(), 2);
425        let history = session.get_history(10);
426        assert_eq!(history.len(), 2);
427        assert_eq!(history[0].role, "user");
428    }
429
430    #[tokio::test]
431    async fn test_memory_storage() {
432        let storage = MemorySessionStorage::new();
433        let session = Session::new("test:memory");
434
435        storage.save(&session).await.unwrap();
436        let loaded = storage.load("test:memory").await.unwrap();
437        assert!(loaded.is_some());
438        assert_eq!(loaded.unwrap().key, "test:memory");
439    }
440
441    #[tokio::test]
442    async fn test_jsonl_storage() {
443        let temp_dir = TempDir::new().unwrap();
444        let storage = JsonlSessionStorage::new(temp_dir.path()).await.unwrap();
445
446        let mut session = Session::new("test:jsonl");
447        session.add_message("user", "Hello");
448
449        storage.save(&session).await.unwrap();
450
451        let loaded = storage.load("test:jsonl").await.unwrap();
452        assert!(loaded.is_some());
453        let loaded_session = loaded.unwrap();
454        assert_eq!(loaded_session.key, "test:jsonl");
455        assert_eq!(loaded_session.len(), 1);
456    }
457
458    #[tokio::test]
459    async fn test_session_manager() {
460        let temp_dir = TempDir::new().unwrap();
461        let manager = SessionManager::with_jsonl(temp_dir.path()).await.unwrap();
462
463        let session = manager.get_or_create("test:manager").await;
464        assert_eq!(session.key, "test:manager");
465
466        manager.save(&session).await.unwrap();
467
468        // Reload
469        let loaded = manager.get_or_create("test:manager").await;
470        assert_eq!(loaded.key, "test:manager");
471    }
472}