1mod s3_session_manager;
4
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use tracing::debug;
14
15use crate::types::content::Message;
16use crate::types::errors::{Result, StrandsError};
17
18#[cfg(feature = "s3-session")]
19pub use s3_session_manager::S3SessionManager;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
24pub enum SessionType {
25 Agent,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct SessionMessage {
32 pub message: Message,
33 pub message_id: usize,
34 pub redact_message: Option<Message>,
35 pub created_at: String,
36 pub updated_at: String,
37}
38
39impl SessionMessage {
40 pub fn from_message(message: Message, index: usize) -> Self {
41 let now = Utc::now().to_rfc3339();
42 Self {
43 message,
44 message_id: index,
45 redact_message: None,
46 created_at: now.clone(),
47 updated_at: now,
48 }
49 }
50
51 pub fn to_message(&self) -> Message {
52 self.redact_message.clone().unwrap_or_else(|| self.message.clone())
53 }
54}
55
56pub fn encode_bytes_values(value: serde_json::Value) -> serde_json::Value {
58 match value {
59 serde_json::Value::Object(map) => {
60 let encoded: serde_json::Map<String, serde_json::Value> = map
61 .into_iter()
62 .map(|(k, v)| (k, encode_bytes_values(v)))
63 .collect();
64 serde_json::Value::Object(encoded)
65 }
66 serde_json::Value::Array(arr) => {
67 serde_json::Value::Array(arr.into_iter().map(encode_bytes_values).collect())
68 }
69 other => other,
70 }
71}
72
73pub fn decode_bytes_values(value: serde_json::Value) -> serde_json::Value {
75 match value {
76 serde_json::Value::Object(map) => {
77 if map.get("__bytes_encoded__") == Some(&serde_json::Value::Bool(true)) {
78 if let Some(serde_json::Value::String(data)) = map.get("data") {
79 if let Ok(decoded) = BASE64.decode(data) {
80 return serde_json::json!({
81 "__decoded_bytes__": decoded
82 });
83 }
84 }
85 }
86 let decoded: serde_json::Map<String, serde_json::Value> = map
87 .into_iter()
88 .map(|(k, v)| (k, decode_bytes_values(v)))
89 .collect();
90 serde_json::Value::Object(decoded)
91 }
92 serde_json::Value::Array(arr) => {
93 serde_json::Value::Array(arr.into_iter().map(decode_bytes_values).collect())
94 }
95 other => other,
96 }
97}
98
99pub fn encode_bytes(data: &[u8]) -> serde_json::Value {
101 serde_json::json!({
102 "__bytes_encoded__": true,
103 "data": BASE64.encode(data)
104 })
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct SessionAgent {
111 pub agent_id: String,
112 pub state: HashMap<String, serde_json::Value>,
113 pub conversation_manager_state: HashMap<String, serde_json::Value>,
114 #[serde(default)]
115 pub internal_state: HashMap<String, serde_json::Value>,
116 pub created_at: String,
117 pub updated_at: String,
118}
119
120impl SessionAgent {
121 pub fn new(agent_id: impl Into<String>) -> Self {
122 let now = Utc::now().to_rfc3339();
123 Self {
124 agent_id: agent_id.into(),
125 state: HashMap::new(),
126 conversation_manager_state: HashMap::new(),
127 internal_state: HashMap::new(),
128 created_at: now.clone(),
129 updated_at: now,
130 }
131 }
132
133 pub fn from_agent(agent: &crate::agent::Agent) -> Result<Self> {
145 let now = Utc::now().to_rfc3339();
146
147
148 let mut internal_state = HashMap::new();
149 internal_state.insert(
150 "interrupt_state".to_string(),
151 serde_json::to_value(agent.interrupt_state().to_dict())
152 .unwrap_or_default(),
153 );
154
155 Ok(Self {
156 agent_id: agent.agent_id.clone(),
157 state: agent.state().get_all(),
158 conversation_manager_state: agent.conversation_manager().get_state(),
159 internal_state,
160 created_at: now.clone(),
161 updated_at: now,
162 })
163 }
164
165 pub fn initialize_internal_state(&self, agent: &mut crate::agent::Agent) {
173 if let Some(interrupt_state_value) = self.internal_state.get("interrupt_state") {
174 if let Some(interrupt_data) = interrupt_state_value.as_object() {
175 let interrupt_state = crate::types::interrupt::InterruptState::from_dict(
176 interrupt_data
177 .iter()
178 .map(|(k, v)| (k.clone(), v.clone()))
179 .collect(),
180 );
181 agent.set_interrupt_state(interrupt_state);
182 }
183 }
184 }
185
186 pub fn restore_state(&self, agent: &mut crate::agent::Agent) {
194 for (key, value) in &self.state {
195 agent.state_mut().set(key.clone(), value.clone());
196 }
197 }
198
199 pub fn restore_conversation_manager_state(
209 &self,
210 agent: &mut crate::agent::Agent,
211 ) -> Option<Vec<Message>> {
212 agent
213 .conversation_manager_mut()
214 .restore_from_session(self.conversation_manager_state.clone())
215 }
216
217 pub fn update_from_agent(&mut self, agent: &crate::agent::Agent) {
221 self.updated_at = Utc::now().to_rfc3339();
222 self.state = agent.state().get_all();
223 self.conversation_manager_state = agent.conversation_manager().get_state();
224
225
226 self.internal_state.insert(
227 "interrupt_state".to_string(),
228 serde_json::to_value(agent.interrupt_state().to_dict())
229 .unwrap_or_default(),
230 );
231 }
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize)]
236#[serde(rename_all = "camelCase")]
237pub struct Session {
238 pub session_id: String,
239 pub session_type: SessionType,
240 pub created_at: String,
241 pub updated_at: String,
242}
243
244impl Session {
245 pub fn new(session_id: impl Into<String>) -> Self {
246 let now = Utc::now().to_rfc3339();
247 Self {
248 session_id: session_id.into(),
249 session_type: SessionType::Agent,
250 created_at: now.clone(),
251 updated_at: now,
252 }
253 }
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct LegacySession {
259 pub session_id: String,
260 pub agent_id: String,
261 pub messages: Vec<Message>,
262 pub state: HashMap<String, serde_json::Value>,
263 pub created_at: DateTime<Utc>,
264 pub updated_at: DateTime<Utc>,
265}
266
267impl LegacySession {
268 pub fn new(session_id: impl Into<String>, agent_id: impl Into<String>) -> Self {
269 let now = Utc::now();
270 Self {
271 session_id: session_id.into(),
272 agent_id: agent_id.into(),
273 messages: Vec::new(),
274 state: HashMap::new(),
275 created_at: now,
276 updated_at: now,
277 }
278 }
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SessionSummary {
284 pub session_id: String,
285 pub session_type: SessionType,
286 pub created_at: String,
287 pub updated_at: String,
288}
289
290#[async_trait]
292pub trait SessionRepository: Send + Sync {
293 async fn create_session(&self, session: &Session) -> Result<()>;
294 async fn read_session(&self, session_id: &str) -> Result<Option<Session>>;
295 async fn delete_session(&self, session_id: &str) -> Result<()>;
296
297 async fn create_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()>;
298 async fn read_agent(&self, session_id: &str, agent_id: &str) -> Result<Option<SessionAgent>>;
299 async fn update_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()>;
300
301 async fn create_message(
302 &self,
303 session_id: &str,
304 agent_id: &str,
305 message: &SessionMessage,
306 ) -> Result<()>;
307 async fn read_message(
308 &self,
309 session_id: &str,
310 agent_id: &str,
311 message_id: usize,
312 ) -> Result<Option<SessionMessage>>;
313 async fn update_message(
314 &self,
315 session_id: &str,
316 agent_id: &str,
317 message: &SessionMessage,
318 ) -> Result<()>;
319 async fn list_messages(
320 &self,
321 session_id: &str,
322 agent_id: &str,
323 limit: Option<usize>,
324 offset: usize,
325 ) -> Result<Vec<SessionMessage>>;
326
327 async fn create_multi_agent(
328 &self,
329 session_id: &str,
330 multi_agent_id: &str,
331 state: &serde_json::Value,
332 ) -> Result<()>;
333 async fn read_multi_agent(
334 &self,
335 session_id: &str,
336 multi_agent_id: &str,
337 ) -> Result<Option<serde_json::Value>>;
338 async fn update_multi_agent(
339 &self,
340 session_id: &str,
341 multi_agent_id: &str,
342 state: &serde_json::Value,
343 ) -> Result<()>;
344}
345
346#[async_trait]
348pub trait SessionManager: Send + Sync {
349 async fn read_session(&self, session_id: &str) -> Result<Option<Session>>;
350 async fn write_session(&self, session: &Session) -> Result<()>;
351 async fn delete_session(&self, session_id: &str) -> Result<()>;
352 async fn list_sessions(&self) -> Result<Vec<SessionSummary>>;
353}
354
355#[derive(Default)]
357pub struct InMemorySessionManager {
358 sessions: std::sync::RwLock<HashMap<String, Session>>,
359 agents: std::sync::RwLock<HashMap<String, SessionAgent>>,
360 messages: std::sync::RwLock<HashMap<String, Vec<SessionMessage>>>,
361 multi_agents: std::sync::RwLock<HashMap<String, serde_json::Value>>,
362}
363
364impl InMemorySessionManager {
365 pub fn new() -> Self {
366 Self::default()
367 }
368
369 fn agent_key(session_id: &str, agent_id: &str) -> String {
370 format!("{}:{}", session_id, agent_id)
371 }
372
373 fn multi_agent_key(session_id: &str, multi_agent_id: &str) -> String {
374 format!("{}:ma:{}", session_id, multi_agent_id)
375 }
376}
377
378#[async_trait]
379impl SessionManager for InMemorySessionManager {
380 async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
381 let sessions = self.sessions.read().unwrap();
382 Ok(sessions.get(session_id).cloned())
383 }
384
385 async fn write_session(&self, session: &Session) -> Result<()> {
386 let mut sessions = self.sessions.write().unwrap();
387 sessions.insert(session.session_id.clone(), session.clone());
388 Ok(())
389 }
390
391 async fn delete_session(&self, session_id: &str) -> Result<()> {
392 let mut sessions = self.sessions.write().unwrap();
393 sessions.remove(session_id);
394 Ok(())
395 }
396
397 async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
398 let sessions = self.sessions.read().unwrap();
399 Ok(sessions
400 .values()
401 .map(|s| SessionSummary {
402 session_id: s.session_id.clone(),
403 session_type: s.session_type,
404 created_at: s.created_at.clone(),
405 updated_at: s.updated_at.clone(),
406 })
407 .collect())
408 }
409}
410
411#[async_trait]
412impl SessionRepository for InMemorySessionManager {
413 async fn create_session(&self, session: &Session) -> Result<()> {
414 self.write_session(session).await
415 }
416
417 async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
418 SessionManager::read_session(self, session_id).await
419 }
420
421 async fn delete_session(&self, session_id: &str) -> Result<()> {
422 SessionManager::delete_session(self, session_id).await
423 }
424
425 async fn create_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
426 let key = Self::agent_key(session_id, &agent.agent_id);
427 let mut agents = self.agents.write().unwrap();
428 agents.insert(key, agent.clone());
429 Ok(())
430 }
431
432 async fn read_agent(&self, session_id: &str, agent_id: &str) -> Result<Option<SessionAgent>> {
433 let key = Self::agent_key(session_id, agent_id);
434 let agents = self.agents.read().unwrap();
435 Ok(agents.get(&key).cloned())
436 }
437
438 async fn update_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
439 self.create_agent(session_id, agent).await
440 }
441
442 async fn create_message(
443 &self,
444 session_id: &str,
445 agent_id: &str,
446 message: &SessionMessage,
447 ) -> Result<()> {
448 let key = Self::agent_key(session_id, agent_id);
449 let mut messages = self.messages.write().unwrap();
450 messages
451 .entry(key)
452 .or_default()
453 .push(message.clone());
454 Ok(())
455 }
456
457 async fn read_message(
458 &self,
459 session_id: &str,
460 agent_id: &str,
461 message_id: usize,
462 ) -> Result<Option<SessionMessage>> {
463 let key = Self::agent_key(session_id, agent_id);
464 let messages = self.messages.read().unwrap();
465 Ok(messages
466 .get(&key)
467 .and_then(|msgs| msgs.iter().find(|m| m.message_id == message_id).cloned()))
468 }
469
470 async fn update_message(
471 &self,
472 session_id: &str,
473 agent_id: &str,
474 message: &SessionMessage,
475 ) -> Result<()> {
476 let key = Self::agent_key(session_id, agent_id);
477 let mut messages = self.messages.write().unwrap();
478 if let Some(msgs) = messages.get_mut(&key) {
479 if let Some(m) = msgs.iter_mut().find(|m| m.message_id == message.message_id) {
480 *m = message.clone();
481 }
482 }
483 Ok(())
484 }
485
486 async fn list_messages(
487 &self,
488 session_id: &str,
489 agent_id: &str,
490 limit: Option<usize>,
491 offset: usize,
492 ) -> Result<Vec<SessionMessage>> {
493 let key = Self::agent_key(session_id, agent_id);
494 let messages = self.messages.read().unwrap();
495 Ok(messages
496 .get(&key)
497 .map(|msgs| {
498 let end = limit.map(|l| offset + l).unwrap_or(msgs.len());
499 msgs.iter()
500 .skip(offset)
501 .take(end - offset)
502 .cloned()
503 .collect()
504 })
505 .unwrap_or_default())
506 }
507
508 async fn create_multi_agent(
509 &self,
510 session_id: &str,
511 multi_agent_id: &str,
512 state: &serde_json::Value,
513 ) -> Result<()> {
514 let key = Self::multi_agent_key(session_id, multi_agent_id);
515 let mut multi_agents = self.multi_agents.write().unwrap();
516 multi_agents.insert(key, state.clone());
517 Ok(())
518 }
519
520 async fn read_multi_agent(
521 &self,
522 session_id: &str,
523 multi_agent_id: &str,
524 ) -> Result<Option<serde_json::Value>> {
525 let key = Self::multi_agent_key(session_id, multi_agent_id);
526 let multi_agents = self.multi_agents.read().unwrap();
527 Ok(multi_agents.get(&key).cloned())
528 }
529
530 async fn update_multi_agent(
531 &self,
532 session_id: &str,
533 multi_agent_id: &str,
534 state: &serde_json::Value,
535 ) -> Result<()> {
536 self.create_multi_agent(session_id, multi_agent_id, state).await
537 }
538}
539
540pub struct RepositorySessionManager {
545 session_repository: Arc<dyn SessionRepository>,
546 session_id: String,
547 session: Session,
548 latest_agent_message: std::sync::RwLock<HashMap<String, Option<SessionMessage>>>,
549}
550
551impl RepositorySessionManager {
552 pub async fn new(
556 session_id: impl Into<String>,
557 session_repository: Arc<dyn SessionRepository>,
558 ) -> Result<Self> {
559 let session_id = session_id.into();
560 let session = match session_repository.read_session(&session_id).await? {
561 Some(s) => s,
562 None => {
563 debug!("session_id=<{}> | session not found, creating new session", session_id);
564 let new_session = Session::new(&session_id);
565 session_repository.create_session(&new_session).await?;
566 new_session
567 }
568 };
569
570 Ok(Self {
571 session_repository,
572 session_id,
573 session,
574 latest_agent_message: std::sync::RwLock::new(HashMap::new()),
575 })
576 }
577
578 pub fn session_id(&self) -> &str {
580 &self.session_id
581 }
582
583 pub fn session(&self) -> &Session {
585 &self.session
586 }
587
588 pub async fn append_message(&self, message: Message, agent_id: &str) -> Result<()> {
590 let next_index = {
591 let latest = self.latest_agent_message.read().unwrap();
592 latest
593 .get(agent_id)
594 .and_then(|m| m.as_ref())
595 .map(|m| m.message_id + 1)
596 .unwrap_or(0)
597 };
598
599 let session_message = SessionMessage::from_message(message, next_index);
600
601 {
602 let mut latest = self.latest_agent_message.write().unwrap();
603 latest.insert(agent_id.to_string(), Some(session_message.clone()));
604 }
605
606 self.session_repository
607 .create_message(&self.session_id, agent_id, &session_message)
608 .await
609 }
610
611 pub async fn redact_latest_message(
613 &self,
614 redact_message: Message,
615 agent_id: &str,
616 ) -> Result<()> {
617 let mut latest = self.latest_agent_message.write().unwrap();
618 let latest_message = latest
619 .get_mut(agent_id)
620 .and_then(|m| m.as_mut())
621 .ok_or_else(|| StrandsError::SessionError {
622 message: "No message to redact.".to_string(),
623 })?;
624
625 latest_message.redact_message = Some(redact_message);
626 let message_to_update = latest_message.clone();
627 drop(latest);
628
629 self.session_repository
630 .update_message(&self.session_id, agent_id, &message_to_update)
631 .await
632 }
633
634 pub async fn sync_agent(&self, agent: &SessionAgent) -> Result<()> {
636 self.session_repository
637 .update_agent(&self.session_id, agent)
638 .await
639 }
640
641 pub async fn initialize_agent(&self, agent_id: &str) -> Result<Option<SessionAgent>> {
646 {
647 let latest = self.latest_agent_message.read().unwrap();
648 if latest.contains_key(agent_id) {
649 return Err(StrandsError::SessionError {
650 message: "The agent_id must be unique in a session.".to_string(),
651 });
652 }
653 }
654
655 {
656 let mut latest = self.latest_agent_message.write().unwrap();
657 latest.insert(agent_id.to_string(), None);
658 }
659
660 let session_agent = self
661 .session_repository
662 .read_agent(&self.session_id, agent_id)
663 .await?;
664
665 if session_agent.is_none() {
666 debug!(
667 "agent_id=<{}> | session_id=<{}> | creating agent",
668 agent_id, self.session_id
669 );
670 let new_agent = SessionAgent::new(agent_id);
671 self.session_repository
672 .create_agent(&self.session_id, &new_agent)
673 .await?;
674 return Ok(None);
675 }
676
677 debug!(
678 "agent_id=<{}> | session_id=<{}> | restoring agent",
679 agent_id, self.session_id
680 );
681
682 Ok(session_agent)
683 }
684
685 pub async fn restore_messages(
687 &self,
688 agent_id: &str,
689 offset: usize,
690 ) -> Result<Vec<Message>> {
691 let session_messages = self
692 .session_repository
693 .list_messages(&self.session_id, agent_id, None, offset)
694 .await?;
695
696 if !session_messages.is_empty() {
697 let mut latest = self.latest_agent_message.write().unwrap();
698 latest.insert(
699 agent_id.to_string(),
700 Some(session_messages.last().unwrap().clone()),
701 );
702 }
703
704 Ok(session_messages.into_iter().map(|m| m.to_message()).collect())
705 }
706
707 pub async fn sync_multi_agent(
709 &self,
710 multi_agent_id: &str,
711 state: &serde_json::Value,
712 ) -> Result<()> {
713 self.session_repository
714 .update_multi_agent(&self.session_id, multi_agent_id, state)
715 .await
716 }
717
718 pub async fn initialize_multi_agent(
720 &self,
721 multi_agent_id: &str,
722 ) -> Result<Option<serde_json::Value>> {
723 let state = self
724 .session_repository
725 .read_multi_agent(&self.session_id, multi_agent_id)
726 .await?;
727
728 if state.is_none() {
729 self.session_repository
730 .create_multi_agent(&self.session_id, multi_agent_id, &serde_json::json!({}))
731 .await?;
732 }
733
734 Ok(state)
735 }
736
737 pub fn fix_broken_tool_use(&self, messages: Vec<Message>) -> Vec<Message> {
739 let mut result = messages;
740
741 if !result.is_empty() {
742 let first = &result[0];
743 if first.role == crate::types::content::Role::User {
744 let has_tool_result = first.content.iter().any(|c| c.tool_result.is_some());
745 if has_tool_result {
746 tracing::warn!(
747 "Session message history starts with orphaned toolResult. Removing."
748 );
749 result.remove(0);
750 }
751 }
752 }
753
754 result
755 }
756}
757
758pub struct FileSessionManager {
760 storage_dir: PathBuf,
761 session_id: String,
762}
763
764impl FileSessionManager {
765 const SESSION_PREFIX: &'static str = "session_";
766 const AGENT_PREFIX: &'static str = "agent_";
767 const MESSAGE_PREFIX: &'static str = "message_";
768
769 pub fn new(session_id: impl Into<String>, storage_dir: Option<PathBuf>) -> Result<Self> {
770 let session_id = session_id.into();
771 let storage_dir = storage_dir.unwrap_or_else(|| {
772 std::env::temp_dir().join("strands").join("sessions")
773 });
774
775 std::fs::create_dir_all(&storage_dir).map_err(|e| StrandsError::SessionError {
776 message: format!("Failed to create storage directory: {}", e),
777 })?;
778
779 Ok(Self {
780 storage_dir,
781 session_id,
782 })
783 }
784
785 fn get_session_path(&self, session_id: &str) -> PathBuf {
786 self.storage_dir
787 .join(format!("{}{}", Self::SESSION_PREFIX, session_id))
788 }
789
790 fn get_agent_path(&self, session_id: &str, agent_id: &str) -> PathBuf {
791 self.get_session_path(session_id)
792 .join("agents")
793 .join(format!("{}{}", Self::AGENT_PREFIX, agent_id))
794 }
795
796 fn get_message_path(&self, session_id: &str, agent_id: &str, message_id: usize) -> PathBuf {
797 self.get_agent_path(session_id, agent_id)
798 .join("messages")
799 .join(format!("{}{}.json", Self::MESSAGE_PREFIX, message_id))
800 }
801
802 fn read_json<T: serde::de::DeserializeOwned>(&self, path: &Path) -> Result<T> {
803 let content = std::fs::read_to_string(path).map_err(|e| StrandsError::SessionError {
804 message: format!("Failed to read file {}: {}", path.display(), e),
805 })?;
806
807 serde_json::from_str(&content).map_err(|e| StrandsError::SessionError {
808 message: format!("Invalid JSON in {}: {}", path.display(), e),
809 })
810 }
811
812 fn write_json<T: serde::Serialize>(&self, path: &Path, data: &T) -> Result<()> {
813 if let Some(parent) = path.parent() {
814 std::fs::create_dir_all(parent).map_err(|e| StrandsError::SessionError {
815 message: format!("Failed to create directory: {}", e),
816 })?;
817 }
818
819 let tmp_path = path.with_extension("tmp");
820 let content = serde_json::to_string_pretty(data).map_err(|e| StrandsError::SessionError {
821 message: format!("Failed to serialize: {}", e),
822 })?;
823
824 std::fs::write(&tmp_path, &content).map_err(|e| StrandsError::SessionError {
825 message: format!("Failed to write file: {}", e),
826 })?;
827
828 std::fs::rename(&tmp_path, path).map_err(|e| StrandsError::SessionError {
829 message: format!("Failed to rename file: {}", e),
830 })?;
831
832 Ok(())
833 }
834
835 pub fn session_id(&self) -> &str {
836 &self.session_id
837 }
838}
839
840#[async_trait]
841impl SessionManager for FileSessionManager {
842 async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
843 let path = self.get_session_path(session_id).join("session.json");
844 if !path.exists() {
845 return Ok(None);
846 }
847 self.read_json(&path).map(Some)
848 }
849
850 async fn write_session(&self, session: &Session) -> Result<()> {
851 let session_dir = self.get_session_path(&session.session_id);
852 std::fs::create_dir_all(&session_dir).map_err(|e| StrandsError::SessionError {
853 message: format!("Failed to create session directory: {}", e),
854 })?;
855 std::fs::create_dir_all(session_dir.join("agents")).ok();
856 std::fs::create_dir_all(session_dir.join("multi_agents")).ok();
857
858 let path = session_dir.join("session.json");
859 self.write_json(&path, session)
860 }
861
862 async fn delete_session(&self, session_id: &str) -> Result<()> {
863 let session_dir = self.get_session_path(session_id);
864 if session_dir.exists() {
865 std::fs::remove_dir_all(&session_dir).map_err(|e| StrandsError::SessionError {
866 message: format!("Failed to delete session: {}", e),
867 })?;
868 }
869 Ok(())
870 }
871
872 async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
873 let mut summaries = Vec::new();
874
875 if let Ok(entries) = std::fs::read_dir(&self.storage_dir) {
876 for entry in entries.flatten() {
877 let name = entry.file_name();
878 let name_str = name.to_string_lossy();
879 if name_str.starts_with(Self::SESSION_PREFIX) {
880 let session_id = name_str.trim_start_matches(Self::SESSION_PREFIX);
881 if let Some(session) = SessionManager::read_session(self, session_id).await? {
882 summaries.push(SessionSummary {
883 session_id: session.session_id,
884 session_type: session.session_type,
885 created_at: session.created_at,
886 updated_at: session.updated_at,
887 });
888 }
889 }
890 }
891 }
892
893 Ok(summaries)
894 }
895}
896
897#[async_trait]
898impl SessionRepository for FileSessionManager {
899 async fn create_session(&self, session: &Session) -> Result<()> {
900 let session_dir = self.get_session_path(&session.session_id);
901 if session_dir.exists() {
902 return Err(StrandsError::SessionError {
903 message: format!("Session {} already exists", session.session_id),
904 });
905 }
906 self.write_session(session).await
907 }
908
909 async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
910 SessionManager::read_session(self, session_id).await
911 }
912
913 async fn delete_session(&self, session_id: &str) -> Result<()> {
914 SessionManager::delete_session(self, session_id).await
915 }
916
917 async fn create_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
918 let agent_dir = self.get_agent_path(session_id, &agent.agent_id);
919 std::fs::create_dir_all(&agent_dir).map_err(|e| StrandsError::SessionError {
920 message: format!("Failed to create agent directory: {}", e),
921 })?;
922 std::fs::create_dir_all(agent_dir.join("messages")).ok();
923
924 let path = agent_dir.join("agent.json");
925 self.write_json(&path, agent)
926 }
927
928 async fn read_agent(&self, session_id: &str, agent_id: &str) -> Result<Option<SessionAgent>> {
929 let path = self.get_agent_path(session_id, agent_id).join("agent.json");
930 if !path.exists() {
931 return Ok(None);
932 }
933 self.read_json(&path).map(Some)
934 }
935
936 async fn update_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
937 let existing = self.read_agent(session_id, &agent.agent_id).await?;
938 if existing.is_none() {
939 return Err(StrandsError::SessionError {
940 message: format!("Agent {} does not exist", agent.agent_id),
941 });
942 }
943
944 let path = self.get_agent_path(session_id, &agent.agent_id).join("agent.json");
945 self.write_json(&path, agent)
946 }
947
948 async fn create_message(
949 &self,
950 session_id: &str,
951 agent_id: &str,
952 message: &SessionMessage,
953 ) -> Result<()> {
954 let path = self.get_message_path(session_id, agent_id, message.message_id);
955 self.write_json(&path, message)
956 }
957
958 async fn read_message(
959 &self,
960 session_id: &str,
961 agent_id: &str,
962 message_id: usize,
963 ) -> Result<Option<SessionMessage>> {
964 let path = self.get_message_path(session_id, agent_id, message_id);
965 if !path.exists() {
966 return Ok(None);
967 }
968 self.read_json(&path).map(Some)
969 }
970
971 async fn update_message(
972 &self,
973 session_id: &str,
974 agent_id: &str,
975 message: &SessionMessage,
976 ) -> Result<()> {
977 let existing = self.read_message(session_id, agent_id, message.message_id).await?;
978 if existing.is_none() {
979 return Err(StrandsError::SessionError {
980 message: format!("Message {} does not exist", message.message_id),
981 });
982 }
983
984 let path = self.get_message_path(session_id, agent_id, message.message_id);
985 self.write_json(&path, message)
986 }
987
988 async fn list_messages(
989 &self,
990 session_id: &str,
991 agent_id: &str,
992 limit: Option<usize>,
993 offset: usize,
994 ) -> Result<Vec<SessionMessage>> {
995 let messages_dir = self.get_agent_path(session_id, agent_id).join("messages");
996 if !messages_dir.exists() {
997 return Ok(Vec::new());
998 }
999
1000 let mut message_files: Vec<(usize, PathBuf)> = Vec::new();
1001
1002 for entry in std::fs::read_dir(&messages_dir).map_err(|e| StrandsError::SessionError {
1003 message: format!("Failed to read messages directory: {}", e),
1004 })? {
1005 let entry = entry.map_err(|e| StrandsError::SessionError {
1006 message: format!("Failed to read directory entry: {}", e),
1007 })?;
1008
1009 let name = entry.file_name();
1010 let name_str = name.to_string_lossy();
1011
1012 if name_str.starts_with(Self::MESSAGE_PREFIX) && name_str.ends_with(".json") {
1013 let id_str = name_str
1014 .trim_start_matches(Self::MESSAGE_PREFIX)
1015 .trim_end_matches(".json");
1016 if let Ok(id) = id_str.parse::<usize>() {
1017 message_files.push((id, entry.path()));
1018 }
1019 }
1020 }
1021
1022 message_files.sort_by_key(|(id, _)| *id);
1023
1024 let end = limit.map(|l| offset + l).unwrap_or(message_files.len());
1025 let mut messages = Vec::new();
1026
1027 for (_, path) in message_files.into_iter().skip(offset).take(end - offset) {
1028 let message: SessionMessage = self.read_json(&path)?;
1029 messages.push(message);
1030 }
1031
1032 Ok(messages)
1033 }
1034
1035 async fn create_multi_agent(
1036 &self,
1037 session_id: &str,
1038 multi_agent_id: &str,
1039 state: &serde_json::Value,
1040 ) -> Result<()> {
1041 let path = self
1042 .get_session_path(session_id)
1043 .join("multi_agents")
1044 .join(format!("{}.json", multi_agent_id));
1045 self.write_json(&path, state)
1046 }
1047
1048 async fn read_multi_agent(
1049 &self,
1050 session_id: &str,
1051 multi_agent_id: &str,
1052 ) -> Result<Option<serde_json::Value>> {
1053 let path = self
1054 .get_session_path(session_id)
1055 .join("multi_agents")
1056 .join(format!("{}.json", multi_agent_id));
1057 if !path.exists() {
1058 return Ok(None);
1059 }
1060 self.read_json(&path).map(Some)
1061 }
1062
1063 async fn update_multi_agent(
1064 &self,
1065 session_id: &str,
1066 multi_agent_id: &str,
1067 state: &serde_json::Value,
1068 ) -> Result<()> {
1069 self.create_multi_agent(session_id, multi_agent_id, state).await
1070 }
1071}