Skip to main content

synwire_agent/session/
manager.rs

1//! In-memory `SessionManager` implementation.
2//!
3//! Suitable for ephemeral use and testing.  A checkpoint-backed implementation
4//! lives in `synwire-checkpoint`.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::RwLock;
10use uuid::Uuid;
11
12use synwire_core::BoxFuture;
13use synwire_core::agents::error::AgentError;
14use synwire_core::agents::session::{Session, SessionManager, SessionMetadata};
15
16fn now_unix() -> i64 {
17    let ms = std::time::SystemTime::now()
18        .duration_since(std::time::UNIX_EPOCH)
19        .unwrap_or_default()
20        .as_millis();
21    i64::try_from(ms).unwrap_or(i64::MAX)
22}
23
24/// In-memory session manager.  All data is lost when the process exits.
25#[derive(Debug, Default)]
26pub struct InMemorySessionManager {
27    sessions: Arc<RwLock<HashMap<String, Session>>>,
28}
29
30impl InMemorySessionManager {
31    /// Create an empty manager.
32    #[must_use]
33    pub fn new() -> Self {
34        Self::default()
35    }
36}
37
38impl SessionManager for InMemorySessionManager {
39    fn list(&self) -> BoxFuture<'_, Result<Vec<SessionMetadata>, AgentError>> {
40        Box::pin(async move {
41            let mut metas: Vec<SessionMetadata> = self
42                .sessions
43                .read()
44                .await
45                .values()
46                .map(|s| s.metadata.clone())
47                .collect();
48            metas.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
49            Ok(metas)
50        })
51    }
52
53    fn resume(&self, session_id: &str) -> BoxFuture<'_, Result<Session, AgentError>> {
54        let id = session_id.to_string();
55        Box::pin(async move {
56            self.sessions
57                .read()
58                .await
59                .get(&id)
60                .cloned()
61                .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))
62        })
63    }
64
65    fn save(&self, session: &Session) -> BoxFuture<'_, Result<(), AgentError>> {
66        let mut session = session.clone();
67        session.metadata.updated_at = now_unix();
68        Box::pin(async move {
69            let _ = self
70                .sessions
71                .write()
72                .await
73                .insert(session.metadata.id.clone(), session);
74            Ok(())
75        })
76    }
77
78    fn delete(&self, session_id: &str) -> BoxFuture<'_, Result<(), AgentError>> {
79        let id = session_id.to_string();
80        Box::pin(async move {
81            let _ = self.sessions.write().await.remove(&id).ok_or_else(|| {
82                AgentError::Session(format!("Session not found for deletion: {id}"))
83            })?;
84            Ok(())
85        })
86    }
87
88    fn fork(
89        &self,
90        session_id: &str,
91        new_name: Option<String>,
92    ) -> BoxFuture<'_, Result<SessionMetadata, AgentError>> {
93        let id = session_id.to_string();
94        Box::pin(async move {
95            let source = self
96                .sessions
97                .read()
98                .await
99                .get(&id)
100                .cloned()
101                .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
102
103            let new_id = Uuid::new_v4().to_string();
104            let now = now_unix();
105            let mut forked = source.clone();
106            forked.metadata.id = new_id.clone();
107            forked.metadata.name = new_name.or_else(|| {
108                source
109                    .metadata
110                    .name
111                    .as_deref()
112                    .map(|n| format!("{n} (fork)"))
113            });
114            forked.metadata.created_at = now;
115            forked.metadata.updated_at = now;
116
117            let meta = forked.metadata.clone();
118            let _ = self.sessions.write().await.insert(new_id, forked);
119            Ok(meta)
120        })
121    }
122
123    fn rewind(
124        &self,
125        session_id: &str,
126        turn_index: u32,
127    ) -> BoxFuture<'_, Result<Session, AgentError>> {
128        let id = session_id.to_string();
129        Box::pin(async move {
130            let mut guard = self.sessions.write().await;
131            let session = guard
132                .get_mut(&id)
133                .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
134            let max = session.messages.len();
135            let keep = (turn_index as usize).min(max);
136            session.messages.truncate(keep);
137            session.metadata.turn_count = u32::try_from(keep).unwrap_or(u32::MAX);
138            session.metadata.updated_at = now_unix();
139            let result = session.clone();
140            drop(guard);
141            Ok(result)
142        })
143    }
144
145    fn tag(&self, session_id: &str, tags: Vec<String>) -> BoxFuture<'_, Result<(), AgentError>> {
146        let id = session_id.to_string();
147        Box::pin(async move {
148            let mut guard = self.sessions.write().await;
149            let session = guard
150                .get_mut(&id)
151                .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
152            for tag in tags {
153                if !session.metadata.tags.contains(&tag) {
154                    session.metadata.tags.push(tag);
155                }
156            }
157            session.metadata.updated_at = now_unix();
158            drop(guard);
159            Ok(())
160        })
161    }
162
163    fn rename(&self, session_id: &str, new_name: String) -> BoxFuture<'_, Result<(), AgentError>> {
164        let id = session_id.to_string();
165        Box::pin(async move {
166            let mut guard = self.sessions.write().await;
167            let session = guard
168                .get_mut(&id)
169                .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
170            session.metadata.name = Some(new_name);
171            session.metadata.updated_at = now_unix();
172            drop(guard);
173            Ok(())
174        })
175    }
176}
177
178#[cfg(test)]
179#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
180mod tests {
181    use super::*;
182    use synwire_core::agents::session::Session;
183
184    fn make_session(id: &str, agent: &str) -> Session {
185        Session {
186            metadata: SessionMetadata {
187                id: id.to_string(),
188                name: None,
189                tags: Vec::new(),
190                agent_name: agent.to_string(),
191                created_at: now_unix(),
192                updated_at: now_unix(),
193                turn_count: 0,
194                total_tokens: 0,
195            },
196            messages: Vec::new(),
197            state: serde_json::json!({}),
198        }
199    }
200
201    #[tokio::test]
202    async fn test_save_and_resume() {
203        let mgr = InMemorySessionManager::new();
204        let session = make_session("s1", "agent-a");
205        mgr.save(&session).await.unwrap();
206
207        let loaded = mgr.resume("s1").await.unwrap();
208        assert_eq!(loaded.metadata.id, "s1");
209    }
210
211    #[tokio::test]
212    async fn test_list_ordered_by_updated() {
213        let mgr = InMemorySessionManager::new();
214        mgr.save(&make_session("a", "x")).await.unwrap();
215        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
216        mgr.save(&make_session("b", "x")).await.unwrap();
217
218        let list = mgr.list().await.unwrap();
219        // "b" has a larger updated_at.
220        assert_eq!(list[0].id, "b");
221    }
222
223    #[tokio::test]
224    async fn test_fork() {
225        let mgr = InMemorySessionManager::new();
226        mgr.save(&make_session("orig", "agent")).await.unwrap();
227        let forked = mgr.fork("orig", Some("fork-1".to_string())).await.unwrap();
228        assert_ne!(forked.id, "orig");
229        assert_eq!(forked.name.as_deref(), Some("fork-1"));
230    }
231
232    #[tokio::test]
233    async fn test_rewind() {
234        let mgr = InMemorySessionManager::new();
235        let mut s = make_session("r1", "agent");
236        s.messages = vec![
237            serde_json::json!({"role": "user", "content": "a"}),
238            serde_json::json!({"role": "assistant", "content": "b"}),
239            serde_json::json!({"role": "user", "content": "c"}),
240        ];
241        mgr.save(&s).await.unwrap();
242
243        let rewound = mgr.rewind("r1", 1).await.unwrap();
244        assert_eq!(rewound.messages.len(), 1);
245    }
246
247    #[tokio::test]
248    async fn test_tag_and_rename() {
249        let mgr = InMemorySessionManager::new();
250        mgr.save(&make_session("t1", "agent")).await.unwrap();
251        mgr.tag("t1", vec!["important".to_string()]).await.unwrap();
252        mgr.rename("t1", "My Session".to_string()).await.unwrap();
253
254        let s = mgr.resume("t1").await.unwrap();
255        assert!(s.metadata.tags.contains(&"important".to_string()));
256        assert_eq!(s.metadata.name.as_deref(), Some("My Session"));
257    }
258
259    #[tokio::test]
260    async fn test_delete() {
261        let mgr = InMemorySessionManager::new();
262        mgr.save(&make_session("del", "agent")).await.unwrap();
263        mgr.delete("del").await.unwrap();
264        assert!(mgr.resume("del").await.is_err());
265    }
266}