strands_agents/session/
mod.rs

1//! Session management for persistent conversations.
2
3mod s3_session_manager;
4
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
11use chrono::{DateTime, Utc};
12use serde::{Deserialize, Serialize};
13use tracing::debug;
14
15use crate::types::content::Message;
16use crate::types::errors::{Result, StrandsError};
17
18#[cfg(feature = "s3-session")]
19pub use s3_session_manager::S3SessionManager;
20
21/// Session type enumeration.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
24pub enum SessionType {
25    Agent,
26}
27
28/// A message within a session agent.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct SessionMessage {
32    pub message: Message,
33    pub message_id: usize,
34    pub redact_message: Option<Message>,
35    pub created_at: String,
36    pub updated_at: String,
37}
38
39impl SessionMessage {
40    pub fn from_message(message: Message, index: usize) -> Self {
41        let now = Utc::now().to_rfc3339();
42        Self {
43            message,
44            message_id: index,
45            redact_message: None,
46            created_at: now.clone(),
47            updated_at: now,
48        }
49    }
50
51    pub fn to_message(&self) -> Message {
52        self.redact_message.clone().unwrap_or_else(|| self.message.clone())
53    }
54}
55
56/// Encode bytes values to base64 for JSON serialization.
57pub fn encode_bytes_values(value: serde_json::Value) -> serde_json::Value {
58    match value {
59        serde_json::Value::Object(map) => {
60            let encoded: serde_json::Map<String, serde_json::Value> = map
61                .into_iter()
62                .map(|(k, v)| (k, encode_bytes_values(v)))
63                .collect();
64            serde_json::Value::Object(encoded)
65        }
66        serde_json::Value::Array(arr) => {
67            serde_json::Value::Array(arr.into_iter().map(encode_bytes_values).collect())
68        }
69        other => other,
70    }
71}
72
73/// Decode base64-encoded bytes values from JSON.
74pub fn decode_bytes_values(value: serde_json::Value) -> serde_json::Value {
75    match value {
76        serde_json::Value::Object(map) => {
77            if map.get("__bytes_encoded__") == Some(&serde_json::Value::Bool(true)) {
78                if let Some(serde_json::Value::String(data)) = map.get("data") {
79                    if let Ok(decoded) = BASE64.decode(data) {
80                        return serde_json::json!({
81                            "__decoded_bytes__": decoded
82                        });
83                    }
84                }
85            }
86            let decoded: serde_json::Map<String, serde_json::Value> = map
87                .into_iter()
88                .map(|(k, v)| (k, decode_bytes_values(v)))
89                .collect();
90            serde_json::Value::Object(decoded)
91        }
92        serde_json::Value::Array(arr) => {
93            serde_json::Value::Array(arr.into_iter().map(decode_bytes_values).collect())
94        }
95        other => other,
96    }
97}
98
99/// Encode bytes to base64 string with marker.
100pub fn encode_bytes(data: &[u8]) -> serde_json::Value {
101    serde_json::json!({
102        "__bytes_encoded__": true,
103        "data": BASE64.encode(data)
104    })
105}
106
107/// Agent state within a session.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109#[serde(rename_all = "camelCase")]
110pub struct SessionAgent {
111    pub agent_id: String,
112    pub state: HashMap<String, serde_json::Value>,
113    pub conversation_manager_state: HashMap<String, serde_json::Value>,
114    #[serde(default)]
115    pub internal_state: HashMap<String, serde_json::Value>,
116    pub created_at: String,
117    pub updated_at: String,
118}
119
120impl SessionAgent {
121    pub fn new(agent_id: impl Into<String>) -> Self {
122        let now = Utc::now().to_rfc3339();
123        Self {
124            agent_id: agent_id.into(),
125            state: HashMap::new(),
126            conversation_manager_state: HashMap::new(),
127            internal_state: HashMap::new(),
128            created_at: now.clone(),
129            updated_at: now,
130        }
131    }
132
133    /// Create a SessionAgent from an Agent instance.
134    ///
135    /// This captures the agent's current state for session persistence.
136    ///
137    /// # Arguments
138    ///
139    /// * `agent` - The agent to create a session state from
140    ///
141    /// # Returns
142    ///
143    /// A SessionAgent with the captured state.
144    pub fn from_agent(agent: &crate::agent::Agent) -> Result<Self> {
145        let now = Utc::now().to_rfc3339();
146
147
148        let mut internal_state = HashMap::new();
149        internal_state.insert(
150            "interrupt_state".to_string(),
151            serde_json::to_value(agent.interrupt_state().to_dict())
152                .unwrap_or_default(),
153        );
154
155        Ok(Self {
156            agent_id: agent.agent_id.clone(),
157            state: agent.state().get_all(),
158            conversation_manager_state: agent.conversation_manager().get_state(),
159            internal_state,
160            created_at: now.clone(),
161            updated_at: now,
162        })
163    }
164
165    /// Initialize an agent's internal state from this session.
166    ///
167    /// This restores internal state like interrupt state to the agent.
168    ///
169    /// # Arguments
170    ///
171    /// * `agent` - The agent to initialize internal state for
172    pub fn initialize_internal_state(&self, agent: &mut crate::agent::Agent) {
173        if let Some(interrupt_state_value) = self.internal_state.get("interrupt_state") {
174            if let Some(interrupt_data) = interrupt_state_value.as_object() {
175                let interrupt_state = crate::types::interrupt::InterruptState::from_dict(
176                    interrupt_data
177                        .iter()
178                        .map(|(k, v)| (k.clone(), v.clone()))
179                        .collect(),
180                );
181                agent.set_interrupt_state(interrupt_state);
182            }
183        }
184    }
185
186    /// Restore agent state from this session.
187    ///
188    /// This restores the user-managed state to the agent.
189    ///
190    /// # Arguments
191    ///
192    /// * `agent` - The agent to restore state to
193    pub fn restore_state(&self, agent: &mut crate::agent::Agent) {
194        for (key, value) in &self.state {
195            agent.state_mut().set(key.clone(), value.clone());
196        }
197    }
198
199    /// Restore conversation manager state from this session.
200    ///
201    /// # Arguments
202    ///
203    /// * `agent` - The agent to restore conversation manager state to
204    ///
205    /// # Returns
206    ///
207    /// Optional prepend messages if the conversation manager provides them.
208    pub fn restore_conversation_manager_state(
209        &self,
210        agent: &mut crate::agent::Agent,
211    ) -> Option<Vec<Message>> {
212        agent
213            .conversation_manager_mut()
214            .restore_from_session(self.conversation_manager_state.clone())
215    }
216
217    /// Update with current agent state.
218    ///
219    /// Updates timestamps and captures latest state from agent.
220    pub fn update_from_agent(&mut self, agent: &crate::agent::Agent) {
221        self.updated_at = Utc::now().to_rfc3339();
222        self.state = agent.state().get_all();
223        self.conversation_manager_state = agent.conversation_manager().get_state();
224
225
226        self.internal_state.insert(
227            "interrupt_state".to_string(),
228            serde_json::to_value(agent.interrupt_state().to_dict())
229                .unwrap_or_default(),
230        );
231    }
232}
233
234/// Session data model.
235#[derive(Debug, Clone, Serialize, Deserialize)]
236#[serde(rename_all = "camelCase")]
237pub struct Session {
238    pub session_id: String,
239    pub session_type: SessionType,
240    pub created_at: String,
241    pub updated_at: String,
242}
243
244impl Session {
245    pub fn new(session_id: impl Into<String>) -> Self {
246        let now = Utc::now().to_rfc3339();
247        Self {
248            session_id: session_id.into(),
249            session_type: SessionType::Agent,
250            created_at: now.clone(),
251            updated_at: now,
252        }
253    }
254}
255
256/// Legacy session format for backward compatibility.
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct LegacySession {
259    pub session_id: String,
260    pub agent_id: String,
261    pub messages: Vec<Message>,
262    pub state: HashMap<String, serde_json::Value>,
263    pub created_at: DateTime<Utc>,
264    pub updated_at: DateTime<Utc>,
265}
266
267impl LegacySession {
268    pub fn new(session_id: impl Into<String>, agent_id: impl Into<String>) -> Self {
269        let now = Utc::now();
270        Self {
271            session_id: session_id.into(),
272            agent_id: agent_id.into(),
273            messages: Vec::new(),
274            state: HashMap::new(),
275            created_at: now,
276            updated_at: now,
277        }
278    }
279}
280
281/// Summary of a session.
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub struct SessionSummary {
284    pub session_id: String,
285    pub session_type: SessionType,
286    pub created_at: String,
287    pub updated_at: String,
288}
289
290/// Session repository trait for storage backends.
291#[async_trait]
292pub trait SessionRepository: Send + Sync {
293    async fn create_session(&self, session: &Session) -> Result<()>;
294    async fn read_session(&self, session_id: &str) -> Result<Option<Session>>;
295    async fn delete_session(&self, session_id: &str) -> Result<()>;
296
297    async fn create_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()>;
298    async fn read_agent(&self, session_id: &str, agent_id: &str) -> Result<Option<SessionAgent>>;
299    async fn update_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()>;
300
301    async fn create_message(
302        &self,
303        session_id: &str,
304        agent_id: &str,
305        message: &SessionMessage,
306    ) -> Result<()>;
307    async fn read_message(
308        &self,
309        session_id: &str,
310        agent_id: &str,
311        message_id: usize,
312    ) -> Result<Option<SessionMessage>>;
313    async fn update_message(
314        &self,
315        session_id: &str,
316        agent_id: &str,
317        message: &SessionMessage,
318    ) -> Result<()>;
319    async fn list_messages(
320        &self,
321        session_id: &str,
322        agent_id: &str,
323        limit: Option<usize>,
324        offset: usize,
325    ) -> Result<Vec<SessionMessage>>;
326
327    async fn create_multi_agent(
328        &self,
329        session_id: &str,
330        multi_agent_id: &str,
331        state: &serde_json::Value,
332    ) -> Result<()>;
333    async fn read_multi_agent(
334        &self,
335        session_id: &str,
336        multi_agent_id: &str,
337    ) -> Result<Option<serde_json::Value>>;
338    async fn update_multi_agent(
339        &self,
340        session_id: &str,
341        multi_agent_id: &str,
342        state: &serde_json::Value,
343    ) -> Result<()>;
344}
345
346/// Trait for implementing session managers.
347#[async_trait]
348pub trait SessionManager: Send + Sync {
349    async fn read_session(&self, session_id: &str) -> Result<Option<Session>>;
350    async fn write_session(&self, session: &Session) -> Result<()>;
351    async fn delete_session(&self, session_id: &str) -> Result<()>;
352    async fn list_sessions(&self) -> Result<Vec<SessionSummary>>;
353}
354
355/// In-memory session manager for testing.
356#[derive(Default)]
357pub struct InMemorySessionManager {
358    sessions: std::sync::RwLock<HashMap<String, Session>>,
359    agents: std::sync::RwLock<HashMap<String, SessionAgent>>,
360    messages: std::sync::RwLock<HashMap<String, Vec<SessionMessage>>>,
361    multi_agents: std::sync::RwLock<HashMap<String, serde_json::Value>>,
362}
363
364impl InMemorySessionManager {
365    pub fn new() -> Self {
366        Self::default()
367    }
368
369    fn agent_key(session_id: &str, agent_id: &str) -> String {
370        format!("{}:{}", session_id, agent_id)
371    }
372
373    fn multi_agent_key(session_id: &str, multi_agent_id: &str) -> String {
374        format!("{}:ma:{}", session_id, multi_agent_id)
375    }
376}
377
378#[async_trait]
379impl SessionManager for InMemorySessionManager {
380    async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
381        let sessions = self.sessions.read().unwrap();
382        Ok(sessions.get(session_id).cloned())
383    }
384
385    async fn write_session(&self, session: &Session) -> Result<()> {
386        let mut sessions = self.sessions.write().unwrap();
387        sessions.insert(session.session_id.clone(), session.clone());
388        Ok(())
389    }
390
391    async fn delete_session(&self, session_id: &str) -> Result<()> {
392        let mut sessions = self.sessions.write().unwrap();
393        sessions.remove(session_id);
394        Ok(())
395    }
396
397    async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
398        let sessions = self.sessions.read().unwrap();
399        Ok(sessions
400            .values()
401            .map(|s| SessionSummary {
402                session_id: s.session_id.clone(),
403                session_type: s.session_type,
404                created_at: s.created_at.clone(),
405                updated_at: s.updated_at.clone(),
406            })
407            .collect())
408    }
409}
410
411#[async_trait]
412impl SessionRepository for InMemorySessionManager {
413    async fn create_session(&self, session: &Session) -> Result<()> {
414        self.write_session(session).await
415    }
416
417    async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
418        SessionManager::read_session(self, session_id).await
419    }
420
421    async fn delete_session(&self, session_id: &str) -> Result<()> {
422        SessionManager::delete_session(self, session_id).await
423    }
424
425    async fn create_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
426        let key = Self::agent_key(session_id, &agent.agent_id);
427        let mut agents = self.agents.write().unwrap();
428        agents.insert(key, agent.clone());
429        Ok(())
430    }
431
432    async fn read_agent(&self, session_id: &str, agent_id: &str) -> Result<Option<SessionAgent>> {
433        let key = Self::agent_key(session_id, agent_id);
434        let agents = self.agents.read().unwrap();
435        Ok(agents.get(&key).cloned())
436    }
437
438    async fn update_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
439        self.create_agent(session_id, agent).await
440    }
441
442    async fn create_message(
443        &self,
444        session_id: &str,
445        agent_id: &str,
446        message: &SessionMessage,
447    ) -> Result<()> {
448        let key = Self::agent_key(session_id, agent_id);
449        let mut messages = self.messages.write().unwrap();
450        messages
451            .entry(key)
452            .or_default()
453            .push(message.clone());
454        Ok(())
455    }
456
457    async fn read_message(
458        &self,
459        session_id: &str,
460        agent_id: &str,
461        message_id: usize,
462    ) -> Result<Option<SessionMessage>> {
463        let key = Self::agent_key(session_id, agent_id);
464        let messages = self.messages.read().unwrap();
465        Ok(messages
466            .get(&key)
467            .and_then(|msgs| msgs.iter().find(|m| m.message_id == message_id).cloned()))
468    }
469
470    async fn update_message(
471        &self,
472        session_id: &str,
473        agent_id: &str,
474        message: &SessionMessage,
475    ) -> Result<()> {
476        let key = Self::agent_key(session_id, agent_id);
477        let mut messages = self.messages.write().unwrap();
478        if let Some(msgs) = messages.get_mut(&key) {
479            if let Some(m) = msgs.iter_mut().find(|m| m.message_id == message.message_id) {
480                *m = message.clone();
481            }
482        }
483        Ok(())
484    }
485
486    async fn list_messages(
487        &self,
488        session_id: &str,
489        agent_id: &str,
490        limit: Option<usize>,
491        offset: usize,
492    ) -> Result<Vec<SessionMessage>> {
493        let key = Self::agent_key(session_id, agent_id);
494        let messages = self.messages.read().unwrap();
495        Ok(messages
496            .get(&key)
497            .map(|msgs| {
498                let end = limit.map(|l| offset + l).unwrap_or(msgs.len());
499                msgs.iter()
500                    .skip(offset)
501                    .take(end - offset)
502                    .cloned()
503                    .collect()
504            })
505            .unwrap_or_default())
506    }
507
508    async fn create_multi_agent(
509        &self,
510        session_id: &str,
511        multi_agent_id: &str,
512        state: &serde_json::Value,
513    ) -> Result<()> {
514        let key = Self::multi_agent_key(session_id, multi_agent_id);
515        let mut multi_agents = self.multi_agents.write().unwrap();
516        multi_agents.insert(key, state.clone());
517        Ok(())
518    }
519
520    async fn read_multi_agent(
521        &self,
522        session_id: &str,
523        multi_agent_id: &str,
524    ) -> Result<Option<serde_json::Value>> {
525        let key = Self::multi_agent_key(session_id, multi_agent_id);
526        let multi_agents = self.multi_agents.read().unwrap();
527        Ok(multi_agents.get(&key).cloned())
528    }
529
530    async fn update_multi_agent(
531        &self,
532        session_id: &str,
533        multi_agent_id: &str,
534        state: &serde_json::Value,
535    ) -> Result<()> {
536        self.create_multi_agent(session_id, multi_agent_id, state).await
537    }
538}
539
540/// Session manager that uses a repository for persistence.
541///
542/// This is the primary session manager for production use, providing
543/// agent initialization, message appending, and state synchronization.
544pub struct RepositorySessionManager {
545    session_repository: Arc<dyn SessionRepository>,
546    session_id: String,
547    session: Session,
548    latest_agent_message: std::sync::RwLock<HashMap<String, Option<SessionMessage>>>,
549}
550
551impl RepositorySessionManager {
552    /// Create a new RepositorySessionManager.
553    ///
554    /// If no session with the specified session_id exists, it will be created.
555    pub async fn new(
556        session_id: impl Into<String>,
557        session_repository: Arc<dyn SessionRepository>,
558    ) -> Result<Self> {
559        let session_id = session_id.into();
560        let session = match session_repository.read_session(&session_id).await? {
561            Some(s) => s,
562            None => {
563                debug!("session_id=<{}> | session not found, creating new session", session_id);
564                let new_session = Session::new(&session_id);
565                session_repository.create_session(&new_session).await?;
566                new_session
567            }
568        };
569
570        Ok(Self {
571            session_repository,
572            session_id,
573            session,
574            latest_agent_message: std::sync::RwLock::new(HashMap::new()),
575        })
576    }
577
578    /// Get the session ID.
579    pub fn session_id(&self) -> &str {
580        &self.session_id
581    }
582
583    /// Get the session.
584    pub fn session(&self) -> &Session {
585        &self.session
586    }
587
588    /// Append a message to the agent's session.
589    pub async fn append_message(&self, message: Message, agent_id: &str) -> Result<()> {
590        let next_index = {
591            let latest = self.latest_agent_message.read().unwrap();
592            latest
593                .get(agent_id)
594                .and_then(|m| m.as_ref())
595                .map(|m| m.message_id + 1)
596                .unwrap_or(0)
597        };
598
599        let session_message = SessionMessage::from_message(message, next_index);
600        
601        {
602            let mut latest = self.latest_agent_message.write().unwrap();
603            latest.insert(agent_id.to_string(), Some(session_message.clone()));
604        }
605
606        self.session_repository
607            .create_message(&self.session_id, agent_id, &session_message)
608            .await
609    }
610
611    /// Redact the latest message appended to the session.
612    pub async fn redact_latest_message(
613        &self,
614        redact_message: Message,
615        agent_id: &str,
616    ) -> Result<()> {
617        let mut latest = self.latest_agent_message.write().unwrap();
618        let latest_message = latest
619            .get_mut(agent_id)
620            .and_then(|m| m.as_mut())
621            .ok_or_else(|| StrandsError::SessionError {
622                message: "No message to redact.".to_string(),
623            })?;
624
625        latest_message.redact_message = Some(redact_message);
626        let message_to_update = latest_message.clone();
627        drop(latest);
628
629        self.session_repository
630            .update_message(&self.session_id, agent_id, &message_to_update)
631            .await
632    }
633
634    /// Sync an agent's state to the session repository.
635    pub async fn sync_agent(&self, agent: &SessionAgent) -> Result<()> {
636        self.session_repository
637            .update_agent(&self.session_id, agent)
638            .await
639    }
640
641    /// Initialize an agent with a session.
642    ///
643    /// If the agent already exists in the session, its state will be restored.
644    /// Otherwise, a new agent entry will be created.
645    pub async fn initialize_agent(&self, agent_id: &str) -> Result<Option<SessionAgent>> {
646        {
647            let latest = self.latest_agent_message.read().unwrap();
648            if latest.contains_key(agent_id) {
649                return Err(StrandsError::SessionError {
650                    message: "The agent_id must be unique in a session.".to_string(),
651                });
652            }
653        }
654
655        {
656            let mut latest = self.latest_agent_message.write().unwrap();
657            latest.insert(agent_id.to_string(), None);
658        }
659
660        let session_agent = self
661            .session_repository
662            .read_agent(&self.session_id, agent_id)
663            .await?;
664
665        if session_agent.is_none() {
666            debug!(
667                "agent_id=<{}> | session_id=<{}> | creating agent",
668                agent_id, self.session_id
669            );
670            let new_agent = SessionAgent::new(agent_id);
671            self.session_repository
672                .create_agent(&self.session_id, &new_agent)
673                .await?;
674            return Ok(None);
675        }
676
677        debug!(
678            "agent_id=<{}> | session_id=<{}> | restoring agent",
679            agent_id, self.session_id
680        );
681
682        Ok(session_agent)
683    }
684
685    /// Restore messages for an agent.
686    pub async fn restore_messages(
687        &self,
688        agent_id: &str,
689        offset: usize,
690    ) -> Result<Vec<Message>> {
691        let session_messages = self
692            .session_repository
693            .list_messages(&self.session_id, agent_id, None, offset)
694            .await?;
695
696        if !session_messages.is_empty() {
697            let mut latest = self.latest_agent_message.write().unwrap();
698            latest.insert(
699                agent_id.to_string(),
700                Some(session_messages.last().unwrap().clone()),
701            );
702        }
703
704        Ok(session_messages.into_iter().map(|m| m.to_message()).collect())
705    }
706
707    /// Sync multi-agent state to the session repository.
708    pub async fn sync_multi_agent(
709        &self,
710        multi_agent_id: &str,
711        state: &serde_json::Value,
712    ) -> Result<()> {
713        self.session_repository
714            .update_multi_agent(&self.session_id, multi_agent_id, state)
715            .await
716    }
717
718    /// Initialize multi-agent state from the session repository.
719    pub async fn initialize_multi_agent(
720        &self,
721        multi_agent_id: &str,
722    ) -> Result<Option<serde_json::Value>> {
723        let state = self
724            .session_repository
725            .read_multi_agent(&self.session_id, multi_agent_id)
726            .await?;
727
728        if state.is_none() {
729            self.session_repository
730                .create_multi_agent(&self.session_id, multi_agent_id, &serde_json::json!({}))
731                .await?;
732        }
733
734        Ok(state)
735    }
736
737    /// Fix broken tool use/result pairs in message history.
738    pub fn fix_broken_tool_use(&self, messages: Vec<Message>) -> Vec<Message> {
739        let mut result = messages;
740
741        if !result.is_empty() {
742            let first = &result[0];
743            if first.role == crate::types::content::Role::User {
744                let has_tool_result = first.content.iter().any(|c| c.tool_result.is_some());
745                if has_tool_result {
746                    tracing::warn!(
747                        "Session message history starts with orphaned toolResult. Removing."
748                    );
749                    result.remove(0);
750                }
751            }
752        }
753
754        result
755    }
756}
757
758/// File-based session manager for local filesystem storage.
759pub struct FileSessionManager {
760    storage_dir: PathBuf,
761    session_id: String,
762}
763
764impl FileSessionManager {
765    const SESSION_PREFIX: &'static str = "session_";
766    const AGENT_PREFIX: &'static str = "agent_";
767    const MESSAGE_PREFIX: &'static str = "message_";
768
769    pub fn new(session_id: impl Into<String>, storage_dir: Option<PathBuf>) -> Result<Self> {
770        let session_id = session_id.into();
771        let storage_dir = storage_dir.unwrap_or_else(|| {
772            std::env::temp_dir().join("strands").join("sessions")
773        });
774
775        std::fs::create_dir_all(&storage_dir).map_err(|e| StrandsError::SessionError {
776            message: format!("Failed to create storage directory: {}", e),
777        })?;
778
779        Ok(Self {
780            storage_dir,
781            session_id,
782        })
783    }
784
785    fn get_session_path(&self, session_id: &str) -> PathBuf {
786        self.storage_dir
787            .join(format!("{}{}", Self::SESSION_PREFIX, session_id))
788    }
789
790    fn get_agent_path(&self, session_id: &str, agent_id: &str) -> PathBuf {
791        self.get_session_path(session_id)
792            .join("agents")
793            .join(format!("{}{}", Self::AGENT_PREFIX, agent_id))
794    }
795
796    fn get_message_path(&self, session_id: &str, agent_id: &str, message_id: usize) -> PathBuf {
797        self.get_agent_path(session_id, agent_id)
798            .join("messages")
799            .join(format!("{}{}.json", Self::MESSAGE_PREFIX, message_id))
800    }
801
802    fn read_json<T: serde::de::DeserializeOwned>(&self, path: &Path) -> Result<T> {
803        let content = std::fs::read_to_string(path).map_err(|e| StrandsError::SessionError {
804            message: format!("Failed to read file {}: {}", path.display(), e),
805        })?;
806
807        serde_json::from_str(&content).map_err(|e| StrandsError::SessionError {
808            message: format!("Invalid JSON in {}: {}", path.display(), e),
809        })
810    }
811
812    fn write_json<T: serde::Serialize>(&self, path: &Path, data: &T) -> Result<()> {
813        if let Some(parent) = path.parent() {
814            std::fs::create_dir_all(parent).map_err(|e| StrandsError::SessionError {
815                message: format!("Failed to create directory: {}", e),
816            })?;
817        }
818
819        let tmp_path = path.with_extension("tmp");
820        let content = serde_json::to_string_pretty(data).map_err(|e| StrandsError::SessionError {
821            message: format!("Failed to serialize: {}", e),
822        })?;
823
824        std::fs::write(&tmp_path, &content).map_err(|e| StrandsError::SessionError {
825            message: format!("Failed to write file: {}", e),
826        })?;
827
828        std::fs::rename(&tmp_path, path).map_err(|e| StrandsError::SessionError {
829            message: format!("Failed to rename file: {}", e),
830        })?;
831
832        Ok(())
833    }
834
835    pub fn session_id(&self) -> &str {
836        &self.session_id
837    }
838}
839
840#[async_trait]
841impl SessionManager for FileSessionManager {
842    async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
843        let path = self.get_session_path(session_id).join("session.json");
844        if !path.exists() {
845            return Ok(None);
846        }
847        self.read_json(&path).map(Some)
848    }
849
850    async fn write_session(&self, session: &Session) -> Result<()> {
851        let session_dir = self.get_session_path(&session.session_id);
852        std::fs::create_dir_all(&session_dir).map_err(|e| StrandsError::SessionError {
853            message: format!("Failed to create session directory: {}", e),
854        })?;
855        std::fs::create_dir_all(session_dir.join("agents")).ok();
856        std::fs::create_dir_all(session_dir.join("multi_agents")).ok();
857
858        let path = session_dir.join("session.json");
859        self.write_json(&path, session)
860    }
861
862    async fn delete_session(&self, session_id: &str) -> Result<()> {
863        let session_dir = self.get_session_path(session_id);
864        if session_dir.exists() {
865            std::fs::remove_dir_all(&session_dir).map_err(|e| StrandsError::SessionError {
866                message: format!("Failed to delete session: {}", e),
867            })?;
868        }
869        Ok(())
870    }
871
872    async fn list_sessions(&self) -> Result<Vec<SessionSummary>> {
873        let mut summaries = Vec::new();
874
875        if let Ok(entries) = std::fs::read_dir(&self.storage_dir) {
876            for entry in entries.flatten() {
877                let name = entry.file_name();
878                let name_str = name.to_string_lossy();
879                if name_str.starts_with(Self::SESSION_PREFIX) {
880                    let session_id = name_str.trim_start_matches(Self::SESSION_PREFIX);
881                    if let Some(session) = SessionManager::read_session(self, session_id).await? {
882                        summaries.push(SessionSummary {
883                            session_id: session.session_id,
884                            session_type: session.session_type,
885                            created_at: session.created_at,
886                            updated_at: session.updated_at,
887                        });
888                    }
889                }
890            }
891        }
892
893        Ok(summaries)
894    }
895}
896
897#[async_trait]
898impl SessionRepository for FileSessionManager {
899    async fn create_session(&self, session: &Session) -> Result<()> {
900        let session_dir = self.get_session_path(&session.session_id);
901        if session_dir.exists() {
902            return Err(StrandsError::SessionError {
903                message: format!("Session {} already exists", session.session_id),
904            });
905        }
906        self.write_session(session).await
907    }
908
909    async fn read_session(&self, session_id: &str) -> Result<Option<Session>> {
910        SessionManager::read_session(self, session_id).await
911    }
912
913    async fn delete_session(&self, session_id: &str) -> Result<()> {
914        SessionManager::delete_session(self, session_id).await
915    }
916
917    async fn create_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
918        let agent_dir = self.get_agent_path(session_id, &agent.agent_id);
919        std::fs::create_dir_all(&agent_dir).map_err(|e| StrandsError::SessionError {
920            message: format!("Failed to create agent directory: {}", e),
921        })?;
922        std::fs::create_dir_all(agent_dir.join("messages")).ok();
923
924        let path = agent_dir.join("agent.json");
925        self.write_json(&path, agent)
926    }
927
928    async fn read_agent(&self, session_id: &str, agent_id: &str) -> Result<Option<SessionAgent>> {
929        let path = self.get_agent_path(session_id, agent_id).join("agent.json");
930        if !path.exists() {
931            return Ok(None);
932        }
933        self.read_json(&path).map(Some)
934    }
935
936    async fn update_agent(&self, session_id: &str, agent: &SessionAgent) -> Result<()> {
937        let existing = self.read_agent(session_id, &agent.agent_id).await?;
938        if existing.is_none() {
939            return Err(StrandsError::SessionError {
940                message: format!("Agent {} does not exist", agent.agent_id),
941            });
942        }
943
944        let path = self.get_agent_path(session_id, &agent.agent_id).join("agent.json");
945        self.write_json(&path, agent)
946    }
947
948    async fn create_message(
949        &self,
950        session_id: &str,
951        agent_id: &str,
952        message: &SessionMessage,
953    ) -> Result<()> {
954        let path = self.get_message_path(session_id, agent_id, message.message_id);
955        self.write_json(&path, message)
956    }
957
958    async fn read_message(
959        &self,
960        session_id: &str,
961        agent_id: &str,
962        message_id: usize,
963    ) -> Result<Option<SessionMessage>> {
964        let path = self.get_message_path(session_id, agent_id, message_id);
965        if !path.exists() {
966            return Ok(None);
967        }
968        self.read_json(&path).map(Some)
969    }
970
971    async fn update_message(
972        &self,
973        session_id: &str,
974        agent_id: &str,
975        message: &SessionMessage,
976    ) -> Result<()> {
977        let existing = self.read_message(session_id, agent_id, message.message_id).await?;
978        if existing.is_none() {
979            return Err(StrandsError::SessionError {
980                message: format!("Message {} does not exist", message.message_id),
981            });
982        }
983
984        let path = self.get_message_path(session_id, agent_id, message.message_id);
985        self.write_json(&path, message)
986    }
987
988    async fn list_messages(
989        &self,
990        session_id: &str,
991        agent_id: &str,
992        limit: Option<usize>,
993        offset: usize,
994    ) -> Result<Vec<SessionMessage>> {
995        let messages_dir = self.get_agent_path(session_id, agent_id).join("messages");
996        if !messages_dir.exists() {
997            return Ok(Vec::new());
998        }
999
1000        let mut message_files: Vec<(usize, PathBuf)> = Vec::new();
1001
1002        for entry in std::fs::read_dir(&messages_dir).map_err(|e| StrandsError::SessionError {
1003            message: format!("Failed to read messages directory: {}", e),
1004        })? {
1005            let entry = entry.map_err(|e| StrandsError::SessionError {
1006                message: format!("Failed to read directory entry: {}", e),
1007            })?;
1008
1009            let name = entry.file_name();
1010            let name_str = name.to_string_lossy();
1011
1012            if name_str.starts_with(Self::MESSAGE_PREFIX) && name_str.ends_with(".json") {
1013                let id_str = name_str
1014                    .trim_start_matches(Self::MESSAGE_PREFIX)
1015                    .trim_end_matches(".json");
1016                if let Ok(id) = id_str.parse::<usize>() {
1017                    message_files.push((id, entry.path()));
1018                }
1019            }
1020        }
1021
1022        message_files.sort_by_key(|(id, _)| *id);
1023
1024        let end = limit.map(|l| offset + l).unwrap_or(message_files.len());
1025        let mut messages = Vec::new();
1026
1027        for (_, path) in message_files.into_iter().skip(offset).take(end - offset) {
1028            let message: SessionMessage = self.read_json(&path)?;
1029            messages.push(message);
1030        }
1031
1032        Ok(messages)
1033    }
1034
1035    async fn create_multi_agent(
1036        &self,
1037        session_id: &str,
1038        multi_agent_id: &str,
1039        state: &serde_json::Value,
1040    ) -> Result<()> {
1041        let path = self
1042            .get_session_path(session_id)
1043            .join("multi_agents")
1044            .join(format!("{}.json", multi_agent_id));
1045        self.write_json(&path, state)
1046    }
1047
1048    async fn read_multi_agent(
1049        &self,
1050        session_id: &str,
1051        multi_agent_id: &str,
1052    ) -> Result<Option<serde_json::Value>> {
1053        let path = self
1054            .get_session_path(session_id)
1055            .join("multi_agents")
1056            .join(format!("{}.json", multi_agent_id));
1057        if !path.exists() {
1058            return Ok(None);
1059        }
1060        self.read_json(&path).map(Some)
1061    }
1062
1063    async fn update_multi_agent(
1064        &self,
1065        session_id: &str,
1066        multi_agent_id: &str,
1067        state: &serde_json::Value,
1068    ) -> Result<()> {
1069        self.create_multi_agent(session_id, multi_agent_id, state).await
1070    }
1071}