1use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use tokio::fs;
12use tokio::sync::RwLock;
13
14use mofa_kernel::agent::error::{AgentError, AgentResult};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SessionMessage {
23 pub role: String,
24 pub content: String,
25 #[serde(default = "Utc::now")]
26 pub timestamp: DateTime<Utc>,
27}
28
29impl SessionMessage {
30 pub fn new(role: impl Into<String>, content: impl Into<String>) -> Self {
32 Self {
33 role: role.into(),
34 content: content.into(),
35 timestamp: Utc::now(),
36 }
37 }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct Session {
43 pub key: String,
44 pub messages: Vec<SessionMessage>,
45 #[serde(default = "Utc::now")]
46 pub created_at: DateTime<Utc>,
47 #[serde(default = "Utc::now")]
48 pub updated_at: DateTime<Utc>,
49 #[serde(default)]
50 pub metadata: HashMap<String, Value>,
51}
52
53impl Session {
54 pub fn new(key: impl Into<String>) -> Self {
56 let key = key.into();
57 Self {
58 key,
59 messages: Vec::new(),
60 created_at: Utc::now(),
61 updated_at: Utc::now(),
62 metadata: HashMap::new(),
63 }
64 }
65
66 pub fn add_message(&mut self, role: impl Into<String>, content: impl Into<String>) {
68 let msg = SessionMessage::new(role, content);
69 self.messages.push(msg);
70 self.updated_at = Utc::now();
71 }
72
73 pub fn get_history(&self, max_messages: usize) -> Vec<SessionMessage> {
75 if self.messages.len() > max_messages {
76 self.messages[self.messages.len() - max_messages..].to_vec()
77 } else {
78 self.messages.clone()
79 }
80 }
81
82 pub fn clear(&mut self) {
84 self.messages.clear();
85 self.updated_at = Utc::now();
86 }
87
88 pub fn len(&self) -> usize {
90 self.messages.len()
91 }
92
93 pub fn is_empty(&self) -> bool {
95 self.messages.is_empty()
96 }
97}
98
99#[async_trait]
111pub trait SessionStorage: Send + Sync {
112 async fn load(&self, key: &str) -> AgentResult<Option<Session>>;
114
115 async fn save(&self, session: &Session) -> AgentResult<()>;
117
118 async fn delete(&self, key: &str) -> AgentResult<bool>;
120
121 async fn list(&self) -> AgentResult<Vec<String>>;
123}
124
125pub struct JsonlSessionStorage {
131 sessions_dir: PathBuf,
132}
133
134impl JsonlSessionStorage {
135 pub async fn new(workspace: impl AsRef<Path>) -> AgentResult<Self> {
137 let sessions_dir = workspace.as_ref().join("sessions");
138 fs::create_dir_all(&sessions_dir).await.map_err(|e| {
139 AgentError::IoError(format!("Failed to create sessions directory: {}", e))
140 })?;
141
142 Ok(Self { sessions_dir })
143 }
144
145 fn session_file(&self, key: &str) -> PathBuf {
147 let safe_key = key.replace(
148 |c: char| !c.is_alphanumeric() && c != '-' && c != ':' && c != '_',
149 "_",
150 );
151 self.sessions_dir.join(format!("{}.jsonl", safe_key))
152 }
153
154 async fn load_session(&self, key: &str) -> AgentResult<Option<Session>> {
156 let session_file = self.session_file(key);
157 if !session_file.exists() {
158 return Ok(None);
159 }
160
161 let content = fs::read_to_string(&session_file)
162 .await
163 .map_err(|e| AgentError::IoError(format!("Failed to read session file: {}", e)))?;
164
165 let mut lines = content.lines();
166 let header = lines
167 .next()
168 .ok_or_else(|| AgentError::SerializationError("Empty session file".to_string()))?;
169
170 let header_data: Value = serde_json::from_str(header).map_err(|e| {
172 AgentError::SerializationError(format!("Failed to parse session header: {}", e))
173 })?;
174
175 let key = header_data
176 .get("key")
177 .and_then(|v| v.as_str())
178 .unwrap_or(key)
179 .to_string();
180
181 let created_at = header_data
182 .get("created_at")
183 .and_then(|v| v.as_str())
184 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
185 .map(|dt| dt.with_timezone(&Utc))
186 .unwrap_or_else(Utc::now);
187
188 let metadata = header_data
189 .get("metadata")
190 .and_then(|v| serde_json::from_value::<HashMap<String, Value>>(v.clone()).ok())
191 .unwrap_or_default();
192
193 let mut messages = Vec::new();
194 for line in lines {
195 if let Ok(msg) = serde_json::from_str::<SessionMessage>(line) {
196 messages.push(msg);
197 }
198 }
199
200 Ok(Some(Session {
201 key,
202 messages,
203 created_at,
204 updated_at: Utc::now(),
205 metadata,
206 }))
207 }
208}
209
210#[async_trait]
211impl SessionStorage for JsonlSessionStorage {
212 async fn load(&self, key: &str) -> AgentResult<Option<Session>> {
213 self.load_session(key).await
214 }
215
216 async fn save(&self, session: &Session) -> AgentResult<()> {
217 let session_file = self.session_file(&session.key);
218
219 if let Some(parent) = session_file.parent() {
221 fs::create_dir_all(parent).await.map_err(|e| {
222 AgentError::IoError(format!("Failed to create sessions directory: {}", e))
223 })?;
224 }
225
226 let mut lines = vec![
227 serde_json::to_string(&serde_json::json!({
228 "key": session.key,
229 "created_at": session.created_at.to_rfc3339(),
230 "updated_at": session.updated_at.to_rfc3339(),
231 "metadata": session.metadata,
232 }))
233 .map_err(|e| {
234 AgentError::SerializationError(format!("Failed to serialize session: {}", e))
235 })?,
236 ];
237
238 for msg in &session.messages {
239 lines.push(serde_json::to_string(msg).map_err(|e| {
240 AgentError::SerializationError(format!("Failed to serialize message: {}", e))
241 })?);
242 }
243
244 fs::write(&session_file, lines.join("\n"))
245 .await
246 .map_err(|e| AgentError::IoError(format!("Failed to write session file: {}", e)))?;
247
248 Ok(())
249 }
250
251 async fn delete(&self, key: &str) -> AgentResult<bool> {
252 let session_file = self.session_file(key);
253 if session_file.exists() {
254 fs::remove_file(&session_file).await.map_err(|e| {
255 AgentError::IoError(format!("Failed to remove session file: {}", e))
256 })?;
257 Ok(true)
258 } else {
259 Ok(false)
260 }
261 }
262
263 async fn list(&self) -> AgentResult<Vec<String>> {
264 let mut entries = fs::read_dir(&self.sessions_dir).await.map_err(|e| {
265 AgentError::IoError(format!("Failed to read sessions directory: {}", e))
266 })?;
267
268 let mut keys = Vec::new();
269 while let Some(entry) = entries
270 .next_entry()
271 .await
272 .map_err(|e| AgentError::IoError(format!("Failed to read entry: {}", e)))?
273 {
274 if let Some(name) = entry.path().file_stem().and_then(|s| s.to_str()) {
275 let key = name.replace('_', ":");
277 keys.push(key);
278 }
279 }
280
281 Ok(keys)
282 }
283}
284
285pub struct MemorySessionStorage {
291 sessions: RwLock<HashMap<String, Session>>,
292}
293
294impl MemorySessionStorage {
295 pub fn new() -> Self {
296 Self {
297 sessions: RwLock::new(HashMap::new()),
298 }
299 }
300}
301
302impl Default for MemorySessionStorage {
303 fn default() -> Self {
304 Self::new()
305 }
306}
307
308#[async_trait]
309impl SessionStorage for MemorySessionStorage {
310 async fn load(&self, key: &str) -> AgentResult<Option<Session>> {
311 let sessions = self.sessions.read().await;
312 Ok(sessions.get(key).cloned())
313 }
314
315 async fn save(&self, session: &Session) -> AgentResult<()> {
316 let mut sessions = self.sessions.write().await;
317 sessions.insert(session.key.clone(), session.clone());
318 Ok(())
319 }
320
321 async fn delete(&self, key: &str) -> AgentResult<bool> {
322 let mut sessions = self.sessions.write().await;
323 Ok(sessions.remove(key).is_some())
324 }
325
326 async fn list(&self) -> AgentResult<Vec<String>> {
327 let sessions = self.sessions.read().await;
328 Ok(sessions.keys().cloned().collect())
329 }
330}
331
332pub struct SessionManager {
338 storage: Box<dyn SessionStorage>,
339 cache: RwLock<HashMap<String, Session>>,
340}
341
342impl SessionManager {
343 pub async fn with_jsonl(workspace: impl AsRef<Path>) -> AgentResult<Self> {
345 let storage = JsonlSessionStorage::new(workspace).await?;
346 Ok(Self {
347 storage: Box::new(storage),
348 cache: RwLock::new(HashMap::new()),
349 })
350 }
351
352 pub fn with_storage(storage: Box<dyn SessionStorage>) -> Self {
354 Self {
355 storage,
356 cache: RwLock::new(HashMap::new()),
357 }
358 }
359
360 pub async fn get_or_create(&self, key: &str) -> Session {
362 {
364 let cache = self.cache.read().await;
365 if let Some(session) = cache.get(key) {
366 return session.clone();
367 }
368 }
369
370 if let Ok(Some(session)) = self.storage.load(key).await {
372 let mut cache = self.cache.write().await;
373 cache.insert(key.to_string(), session.clone());
374 return session;
375 }
376
377 let session = Session::new(key);
379 let mut cache = self.cache.write().await;
380 cache.insert(key.to_string(), session.clone());
381 session
382 }
383
384 pub async fn save(&self, session: &Session) -> AgentResult<()> {
386 self.storage.save(session).await?;
387 let mut cache = self.cache.write().await;
388 cache.insert(session.key.clone(), session.clone());
389 Ok(())
390 }
391
392 pub async fn delete(&self, key: &str) -> AgentResult<bool> {
394 let result = self.storage.delete(key).await?;
395 let mut cache = self.cache.write().await;
396 cache.remove(key);
397 Ok(result)
398 }
399
400 pub async fn list(&self) -> AgentResult<Vec<String>> {
402 self.storage.list().await
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use tempfile::TempDir;
410
411 #[tokio::test]
412 async fn test_session_creation() {
413 let session = Session::new("test:key");
414 assert_eq!(session.key, "test:key");
415 assert!(session.is_empty());
416 }
417
418 #[tokio::test]
419 async fn test_session_add_message() {
420 let mut session = Session::new("test:key");
421 session.add_message("user", "Hello");
422 session.add_message("assistant", "Hi there!");
423
424 assert_eq!(session.len(), 2);
425 let history = session.get_history(10);
426 assert_eq!(history.len(), 2);
427 assert_eq!(history[0].role, "user");
428 }
429
430 #[tokio::test]
431 async fn test_memory_storage() {
432 let storage = MemorySessionStorage::new();
433 let session = Session::new("test:memory");
434
435 storage.save(&session).await.unwrap();
436 let loaded = storage.load("test:memory").await.unwrap();
437 assert!(loaded.is_some());
438 assert_eq!(loaded.unwrap().key, "test:memory");
439 }
440
441 #[tokio::test]
442 async fn test_jsonl_storage() {
443 let temp_dir = TempDir::new().unwrap();
444 let storage = JsonlSessionStorage::new(temp_dir.path()).await.unwrap();
445
446 let mut session = Session::new("test:jsonl");
447 session.add_message("user", "Hello");
448
449 storage.save(&session).await.unwrap();
450
451 let loaded = storage.load("test:jsonl").await.unwrap();
452 assert!(loaded.is_some());
453 let loaded_session = loaded.unwrap();
454 assert_eq!(loaded_session.key, "test:jsonl");
455 assert_eq!(loaded_session.len(), 1);
456 }
457
458 #[tokio::test]
459 async fn test_session_manager() {
460 let temp_dir = TempDir::new().unwrap();
461 let manager = SessionManager::with_jsonl(temp_dir.path()).await.unwrap();
462
463 let session = manager.get_or_create("test:manager").await;
464 assert_eq!(session.key, "test:manager");
465
466 manager.save(&session).await.unwrap();
467
468 let loaded = manager.get_or_create("test:manager").await;
470 assert_eq!(loaded.key, "test:manager");
471 }
472}