synwire_agent/session/
manager.rs1use std::collections::HashMap;
7use std::sync::Arc;
8
9use tokio::sync::RwLock;
10use uuid::Uuid;
11
12use synwire_core::BoxFuture;
13use synwire_core::agents::error::AgentError;
14use synwire_core::agents::session::{Session, SessionManager, SessionMetadata};
15
16fn now_unix() -> i64 {
17 let ms = std::time::SystemTime::now()
18 .duration_since(std::time::UNIX_EPOCH)
19 .unwrap_or_default()
20 .as_millis();
21 i64::try_from(ms).unwrap_or(i64::MAX)
22}
23
24#[derive(Debug, Default)]
26pub struct InMemorySessionManager {
27 sessions: Arc<RwLock<HashMap<String, Session>>>,
28}
29
30impl InMemorySessionManager {
31 #[must_use]
33 pub fn new() -> Self {
34 Self::default()
35 }
36}
37
38impl SessionManager for InMemorySessionManager {
39 fn list(&self) -> BoxFuture<'_, Result<Vec<SessionMetadata>, AgentError>> {
40 Box::pin(async move {
41 let mut metas: Vec<SessionMetadata> = self
42 .sessions
43 .read()
44 .await
45 .values()
46 .map(|s| s.metadata.clone())
47 .collect();
48 metas.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
49 Ok(metas)
50 })
51 }
52
53 fn resume(&self, session_id: &str) -> BoxFuture<'_, Result<Session, AgentError>> {
54 let id = session_id.to_string();
55 Box::pin(async move {
56 self.sessions
57 .read()
58 .await
59 .get(&id)
60 .cloned()
61 .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))
62 })
63 }
64
65 fn save(&self, session: &Session) -> BoxFuture<'_, Result<(), AgentError>> {
66 let mut session = session.clone();
67 session.metadata.updated_at = now_unix();
68 Box::pin(async move {
69 let _ = self
70 .sessions
71 .write()
72 .await
73 .insert(session.metadata.id.clone(), session);
74 Ok(())
75 })
76 }
77
78 fn delete(&self, session_id: &str) -> BoxFuture<'_, Result<(), AgentError>> {
79 let id = session_id.to_string();
80 Box::pin(async move {
81 let _ = self.sessions.write().await.remove(&id).ok_or_else(|| {
82 AgentError::Session(format!("Session not found for deletion: {id}"))
83 })?;
84 Ok(())
85 })
86 }
87
88 fn fork(
89 &self,
90 session_id: &str,
91 new_name: Option<String>,
92 ) -> BoxFuture<'_, Result<SessionMetadata, AgentError>> {
93 let id = session_id.to_string();
94 Box::pin(async move {
95 let source = self
96 .sessions
97 .read()
98 .await
99 .get(&id)
100 .cloned()
101 .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
102
103 let new_id = Uuid::new_v4().to_string();
104 let now = now_unix();
105 let mut forked = source.clone();
106 forked.metadata.id = new_id.clone();
107 forked.metadata.name = new_name.or_else(|| {
108 source
109 .metadata
110 .name
111 .as_deref()
112 .map(|n| format!("{n} (fork)"))
113 });
114 forked.metadata.created_at = now;
115 forked.metadata.updated_at = now;
116
117 let meta = forked.metadata.clone();
118 let _ = self.sessions.write().await.insert(new_id, forked);
119 Ok(meta)
120 })
121 }
122
123 fn rewind(
124 &self,
125 session_id: &str,
126 turn_index: u32,
127 ) -> BoxFuture<'_, Result<Session, AgentError>> {
128 let id = session_id.to_string();
129 Box::pin(async move {
130 let mut guard = self.sessions.write().await;
131 let session = guard
132 .get_mut(&id)
133 .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
134 let max = session.messages.len();
135 let keep = (turn_index as usize).min(max);
136 session.messages.truncate(keep);
137 session.metadata.turn_count = u32::try_from(keep).unwrap_or(u32::MAX);
138 session.metadata.updated_at = now_unix();
139 let result = session.clone();
140 drop(guard);
141 Ok(result)
142 })
143 }
144
145 fn tag(&self, session_id: &str, tags: Vec<String>) -> BoxFuture<'_, Result<(), AgentError>> {
146 let id = session_id.to_string();
147 Box::pin(async move {
148 let mut guard = self.sessions.write().await;
149 let session = guard
150 .get_mut(&id)
151 .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
152 for tag in tags {
153 if !session.metadata.tags.contains(&tag) {
154 session.metadata.tags.push(tag);
155 }
156 }
157 session.metadata.updated_at = now_unix();
158 drop(guard);
159 Ok(())
160 })
161 }
162
163 fn rename(&self, session_id: &str, new_name: String) -> BoxFuture<'_, Result<(), AgentError>> {
164 let id = session_id.to_string();
165 Box::pin(async move {
166 let mut guard = self.sessions.write().await;
167 let session = guard
168 .get_mut(&id)
169 .ok_or_else(|| AgentError::Session(format!("Session not found: {id}")))?;
170 session.metadata.name = Some(new_name);
171 session.metadata.updated_at = now_unix();
172 drop(guard);
173 Ok(())
174 })
175 }
176}
177
178#[cfg(test)]
179#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
180mod tests {
181 use super::*;
182 use synwire_core::agents::session::Session;
183
184 fn make_session(id: &str, agent: &str) -> Session {
185 Session {
186 metadata: SessionMetadata {
187 id: id.to_string(),
188 name: None,
189 tags: Vec::new(),
190 agent_name: agent.to_string(),
191 created_at: now_unix(),
192 updated_at: now_unix(),
193 turn_count: 0,
194 total_tokens: 0,
195 },
196 messages: Vec::new(),
197 state: serde_json::json!({}),
198 }
199 }
200
201 #[tokio::test]
202 async fn test_save_and_resume() {
203 let mgr = InMemorySessionManager::new();
204 let session = make_session("s1", "agent-a");
205 mgr.save(&session).await.unwrap();
206
207 let loaded = mgr.resume("s1").await.unwrap();
208 assert_eq!(loaded.metadata.id, "s1");
209 }
210
211 #[tokio::test]
212 async fn test_list_ordered_by_updated() {
213 let mgr = InMemorySessionManager::new();
214 mgr.save(&make_session("a", "x")).await.unwrap();
215 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
216 mgr.save(&make_session("b", "x")).await.unwrap();
217
218 let list = mgr.list().await.unwrap();
219 assert_eq!(list[0].id, "b");
221 }
222
223 #[tokio::test]
224 async fn test_fork() {
225 let mgr = InMemorySessionManager::new();
226 mgr.save(&make_session("orig", "agent")).await.unwrap();
227 let forked = mgr.fork("orig", Some("fork-1".to_string())).await.unwrap();
228 assert_ne!(forked.id, "orig");
229 assert_eq!(forked.name.as_deref(), Some("fork-1"));
230 }
231
232 #[tokio::test]
233 async fn test_rewind() {
234 let mgr = InMemorySessionManager::new();
235 let mut s = make_session("r1", "agent");
236 s.messages = vec![
237 serde_json::json!({"role": "user", "content": "a"}),
238 serde_json::json!({"role": "assistant", "content": "b"}),
239 serde_json::json!({"role": "user", "content": "c"}),
240 ];
241 mgr.save(&s).await.unwrap();
242
243 let rewound = mgr.rewind("r1", 1).await.unwrap();
244 assert_eq!(rewound.messages.len(), 1);
245 }
246
247 #[tokio::test]
248 async fn test_tag_and_rename() {
249 let mgr = InMemorySessionManager::new();
250 mgr.save(&make_session("t1", "agent")).await.unwrap();
251 mgr.tag("t1", vec!["important".to_string()]).await.unwrap();
252 mgr.rename("t1", "My Session".to_string()).await.unwrap();
253
254 let s = mgr.resume("t1").await.unwrap();
255 assert!(s.metadata.tags.contains(&"important".to_string()));
256 assert_eq!(s.metadata.name.as_deref(), Some("My Session"));
257 }
258
259 #[tokio::test]
260 async fn test_delete() {
261 let mgr = InMemorySessionManager::new();
262 mgr.save(&make_session("del", "agent")).await.unwrap();
263 mgr.delete("del").await.unwrap();
264 assert!(mgr.resume("del").await.is_err());
265 }
266}