steer_core/session/
manager.rs

1use crate::app::{
2    App, AppCommand, AppConfig, AppEvent, Conversation, Message as ConversationMessage, MessageData,
3};
4use crate::config::model::ModelId;
5use crate::error::{Error, Result};
6use crate::events::StreamEvent;
7use crate::session::{
8    Session, SessionConfig, SessionFilter, SessionInfo, SessionState, SessionStore,
9    SessionStoreError, ToolCallUpdate,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use steer_tools::{ToolCall, ToolResult};
14use thiserror::Error;
15use tokio::sync::{RwLock, mpsc};
16use tokio::task::JoinHandle;
17use tracing::{debug, error, info, warn};
18
19/// Session manager specific errors
20#[derive(Debug, Error)]
21pub enum SessionManagerError {
22    #[error("Maximum session capacity reached ({current}/{max}). Cannot create new session.")]
23    CapacityExceeded { current: usize, max: usize },
24
25    #[error("Session not active: {session_id}")]
26    SessionNotActive { session_id: String },
27
28    #[error("Session {session_id} already has an active listener")]
29    SessionAlreadyHasListener { session_id: String },
30
31    #[error("Failed to create managed session: {message}")]
32    CreationFailed { message: String },
33
34    #[error(transparent)]
35    Storage(#[from] SessionStoreError),
36}
37
38/// Configuration for the SessionManager
39#[derive(Debug, Clone)]
40pub struct SessionManagerConfig {
41    /// Maximum number of concurrent active sessions
42    pub max_concurrent_sessions: usize,
43    /// Default model for new sessions
44    pub default_model: ModelId,
45    /// Whether to automatically persist sessions
46    pub auto_persist: bool,
47}
48
49/// A managed session contains both the session state and the App instance
50pub struct ManagedSession {
51    /// The session data
52    pub session: Session,
53    /// Command sender for the App
54    pub command_tx: mpsc::Sender<AppCommand>,
55    /// Event receiver from the App (for external consumers like TUI)
56    pub event_rx: Option<mpsc::Receiver<AppEvent>>,
57    /// Event subscriber count
58    pub subscriber_count: usize,
59    /// Last activity timestamp for cleanup
60    pub last_activity: chrono::DateTime<chrono::Utc>,
61    /// Handle to the app actor loop task
62    pub app_task_handle: JoinHandle<()>,
63    /// Handle to the event translation task
64    pub event_task_handle: JoinHandle<()>,
65}
66
67impl ManagedSession {
68    /// Create a new managed session
69    pub async fn new(
70        mut session: Session,
71        app_config: AppConfig,
72        store: Arc<dyn SessionStore>,
73        default_model: ModelId,
74        conversation: Option<Conversation>,
75    ) -> Result<Self> {
76        // Create channels for the App
77        let (app_event_tx, mut app_event_rx) = mpsc::channel(100);
78        let (app_command_tx, app_command_rx) = mpsc::channel::<AppCommand>(32);
79
80        // Always create external event channel
81        let (external_event_tx, external_event_rx) = mpsc::channel(100);
82
83        // Initialize the global command sender for tool approval requests
84        crate::app::OpContext::init_command_tx(app_command_tx.clone());
85
86        // Build workspace from session config first
87        let workspace = session.build_workspace().await?;
88
89        // Build backend registry from session tool config, passing workspace
90        let (backend_registry, mcp_servers) = session
91            .config
92            .build_registry(
93                Arc::new(app_config.llm_config_provider.clone()),
94                workspace.clone(),
95            )
96            .await?;
97
98        // Update session state with MCP server info
99        session.state.mcp_servers = mcp_servers;
100
101        let tool_executor = Arc::new(crate::tools::ToolExecutor::with_all_components(
102            workspace.clone(),
103            Arc::new(backend_registry),
104            Arc::new(crate::app::validation::ValidatorRegistry::new()),
105            app_config.llm_config_provider.clone(),
106        ));
107
108        // Create the App instance with the configured tool executor and session config
109        let mut app = if let Some(conv) = conversation {
110            App::new_with_conversation(
111                app_config.clone(),
112                app_event_tx,
113                default_model.clone(),
114                workspace.clone(),
115                tool_executor,
116                Some(session.config.clone()),
117                conv,
118            )
119            .await?
120        } else {
121            App::new(
122                app_config.clone(),
123                app_event_tx,
124                default_model,
125                workspace.clone(),
126                tool_executor,
127                Some(session.config.clone()),
128            )
129            .await?
130        };
131
132        // Set the initial model if specified in session metadata
133        if let Some(model_str) = session.config.metadata.get("initial_model") {
134            // Use the model registry from app_config to resolve the model
135            if let Ok(model_id) = app_config.model_registry.resolve(model_str) {
136                let _ = app.set_model(model_id).await;
137            }
138        }
139
140        // Spawn the app actor loop
141        let app_task_handle = tokio::spawn(crate::app::app_actor_loop(app, app_command_rx));
142
143        // Spawn the event translation/duplication task
144        let session_id = session.id.clone();
145        let store_clone = store.clone();
146
147        let event_task_handle = tokio::spawn(async move {
148            while let Some(app_event) = app_event_rx.recv().await {
149                // Always duplicate to external consumer
150                if let Err(e) = external_event_tx.try_send(app_event.clone()) {
151                    warn!(session_id = %session_id, "Failed to send event to external consumer: {}", e);
152                }
153
154                // Handle ActiveMessageIdChanged event specially
155                if let AppEvent::ActiveMessageIdChanged { message_id } = &app_event {
156                    if let Err(e) = store_clone
157                        .update_active_message_id(&session_id, message_id.as_deref())
158                        .await
159                    {
160                        error!(session_id = %session_id, error = %e, "Failed to update active message ID");
161                    }
162                }
163
164                // Translate and persist
165                if let Some(stream_event) = translate_app_event(app_event) {
166                    // Persist event
167                    if let Ok(_sequence_num) =
168                        store_clone.append_event(&session_id, &stream_event).await
169                    {
170                        // Update session state in store
171                        if let Err(e) =
172                            update_session_state_for_event(&store_clone, &session_id, &stream_event)
173                                .await
174                        {
175                            error!(session_id = %session_id, error = %e, "Failed to update session state");
176                        }
177                    }
178                }
179            }
180            info!(session_id = %session_id, "Event translation loop ended");
181        });
182
183        Ok(Self {
184            session,
185            command_tx: app_command_tx,
186            event_rx: Some(external_event_rx),
187            subscriber_count: 0,
188            last_activity: chrono::Utc::now(),
189            app_task_handle,
190            event_task_handle,
191        })
192    }
193
194    /// Take the event receiver (can only be called once)
195    pub fn take_event_rx(&mut self) -> Option<mpsc::Receiver<AppEvent>> {
196        self.event_rx.take()
197    }
198
199    /// Update last activity timestamp
200    pub fn touch(&mut self) {
201        self.last_activity = chrono::Utc::now();
202    }
203
204    /// Check if session is inactive (no subscribers and old)
205    pub fn is_inactive(&self, max_idle_time: chrono::Duration) -> bool {
206        self.subscriber_count == 0 && chrono::Utc::now() - self.last_activity > max_idle_time
207    }
208
209    /// Shutdown the session gracefully
210    pub async fn shutdown(self) {
211        // Send shutdown command to app
212        let _ = self.command_tx.send(AppCommand::Shutdown).await;
213
214        // Wait for tasks to complete
215        let _ = self.app_task_handle.await;
216        let _ = self.event_task_handle.await;
217    }
218}
219
220/// Manages multiple concurrent sessions
221pub struct SessionManager {
222    /// Active sessions with their App instances
223    active_sessions: Arc<RwLock<HashMap<String, ManagedSession>>>,
224    /// Session store for persistence
225    store: Arc<dyn SessionStore>,
226    /// Configuration
227    config: SessionManagerConfig,
228}
229
230impl SessionManager {
231    /// Create a new SessionManager
232    pub fn new(store: Arc<dyn SessionStore>, config: SessionManagerConfig) -> Self {
233        Self {
234            active_sessions: Arc::new(RwLock::new(HashMap::new())),
235            store,
236            config,
237        }
238    }
239
240    /// Create a new session
241    pub async fn create_session(
242        &self,
243        config: SessionConfig,
244        app_config: AppConfig,
245    ) -> Result<(String, mpsc::Sender<AppCommand>)> {
246        let session_config = config;
247
248        // Create session in store
249        let session = self.store.create_session(session_config).await?;
250        let session_id = session.id.clone();
251
252        info!(session_id = %session_id, "Creating new session");
253
254        // Check if we're at max capacity
255        {
256            let sessions = self.active_sessions.read().await;
257            if sessions.len() >= self.config.max_concurrent_sessions {
258                error!(
259                    session_id = %session_id,
260                    active_count = sessions.len(),
261                    max_capacity = self.config.max_concurrent_sessions,
262                    "Session creation rejected: at maximum capacity"
263                );
264                return Err(SessionManagerError::CapacityExceeded {
265                    current: sessions.len(),
266                    max: self.config.max_concurrent_sessions,
267                }
268                .into());
269            }
270        }
271
272        // Create managed session with event translation
273        let managed_session = ManagedSession::new(
274            session.clone(),
275            app_config,
276            self.store.clone(),
277            self.config.default_model.clone(),
278            None,
279        )
280        .await
281        .map_err(|e| SessionManagerError::CreationFailed {
282            message: format!("Failed to create managed session: {e}"),
283        })?;
284
285        // Get command sender before moving into sessions map
286        let command_tx = managed_session.command_tx.clone();
287
288        // Add to active sessions
289        {
290            let mut sessions = self.active_sessions.write().await;
291            sessions.insert(session_id.clone(), managed_session);
292        }
293
294        // Emit session created event
295        let metadata = crate::events::SessionMetadata::from(&SessionInfo::from(&session));
296        let event = StreamEvent::SessionCreated {
297            session_id: session_id.clone(),
298            metadata,
299        };
300        self.emit_event(session_id.clone(), event).await;
301
302        info!(session_id = %session_id, "Session created and activated");
303        Ok((session_id, command_tx))
304    }
305
306    /// Take the event receiver for a session (can only be called once per session)
307    pub async fn take_event_receiver(&self, session_id: &str) -> Result<mpsc::Receiver<AppEvent>> {
308        let mut sessions = self.active_sessions.write().await;
309        match sessions.get_mut(session_id) {
310            Some(managed_session) => {
311                if let Some(receiver) = managed_session.take_event_rx() {
312                    Ok(receiver)
313                } else {
314                    Err(SessionManagerError::SessionAlreadyHasListener {
315                        session_id: session_id.to_string(),
316                    }
317                    .into())
318                }
319            }
320            None => Err(SessionManagerError::SessionNotActive {
321                session_id: session_id.to_string(),
322            }
323            .into()),
324        }
325    }
326
327    /// Get session information
328    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionInfo>> {
329        // First check if it's active
330        {
331            let sessions = self.active_sessions.read().await;
332            if let Some(managed_session) = sessions.get(session_id) {
333                return Ok(Some(SessionInfo::from(&managed_session.session)));
334            }
335        }
336
337        // If not active, load from store
338        if let Some(session) = self.store.get_session(session_id).await? {
339            Ok(Some(SessionInfo::from(&session)))
340        } else {
341            Ok(None)
342        }
343    }
344
345    /// Get the workspace for a session
346    pub async fn get_session_workspace(
347        &self,
348        session_id: &str,
349    ) -> Result<Option<Arc<dyn crate::workspace::Workspace>>> {
350        // First check if session is active
351        {
352            let active_sessions = self.active_sessions.read().await;
353            if let Some(managed_session) = active_sessions.get(session_id) {
354                // Session is active, rebuild workspace from config
355                return Ok(Some(
356                    managed_session
357                        .session
358                        .build_workspace()
359                        .await
360                        .map_err(|e| SessionManagerError::CreationFailed {
361                            message: format!("Failed to build workspace: {e}"),
362                        })?,
363                ));
364            }
365        }
366
367        // Session not active, try to load from store
368        if let Some(session_info) = self.store.get_session(session_id).await? {
369            let session = session_info;
370            Ok(Some(session.build_workspace().await.map_err(|e| {
371                SessionManagerError::CreationFailed {
372                    message: format!("Failed to build workspace: {e}"),
373                }
374            })?))
375        } else {
376            Ok(None)
377        }
378    }
379
380    /// Resume a session (load from storage and activate)
381    pub async fn resume_session(
382        &self,
383        session_id: &str,
384        app_config: AppConfig,
385    ) -> Result<mpsc::Sender<AppCommand>> {
386        // Check if already active
387        {
388            let sessions = self.active_sessions.read().await;
389            if let Some(managed_session) = sessions.get(session_id) {
390                debug!(session_id = %session_id, "Session already active");
391                return Ok(managed_session.command_tx.clone());
392            }
393        }
394
395        // Load from store
396        let session = match self
397            .store
398            .get_session(session_id)
399            .await
400            .map_err(SessionManagerError::Storage)?
401        {
402            Some(session) => session,
403            None => {
404                debug!(session_id = %session_id, "Session not found in store");
405                return Err(SessionManagerError::SessionNotActive {
406                    session_id: session_id.to_string(),
407                }
408                .into());
409            }
410        };
411
412        info!(session_id = %session_id, "Resuming session from storage");
413
414        // Check capacity
415        {
416            let sessions = self.active_sessions.read().await;
417            if sessions.len() >= self.config.max_concurrent_sessions {
418                warn!(
419                    session_id = %session_id,
420                    active_count = sessions.len(),
421                    max_capacity = self.config.max_concurrent_sessions,
422                    "At maximum session capacity for resume"
423                );
424                // TODO: Implement eviction strategy
425            }
426        }
427
428        // Create a conversation from the session state
429        let conversation = Conversation {
430            messages: session.state.messages.clone(),
431            working_directory: session
432                .config
433                .workspace
434                .get_path()
435                .unwrap_or_default()
436                .into(),
437            active_message_id: session.state.active_message_id.clone(),
438        };
439
440        // Create managed session with event translation
441        let managed_session = ManagedSession::new(
442            session.clone(),
443            app_config,
444            self.store.clone(),
445            self.config.default_model.clone(),
446            Some(conversation),
447        )
448        .await
449        .map_err(|e| SessionManagerError::CreationFailed {
450            message: format!("Failed to create managed session: {e}"),
451        })?;
452
453        // Get command sender before restoration
454        let command_tx = managed_session.command_tx.clone();
455
456        // Restore conversation history and approved tools atomically
457        if !session.state.messages.is_empty() || !session.state.approved_tools.is_empty() {
458            info!(
459                session_id = %session_id,
460                message_count = session.state.messages.len(),
461                tool_count = session.state.approved_tools.len(),
462                "Restoring conversation state"
463            );
464
465            command_tx
466                .send(AppCommand::RestoreConversation {
467                    messages: session.state.messages.clone(),
468                    approved_tools: session.state.approved_tools.clone(),
469                    approved_bash_patterns: session.state.approved_bash_patterns.clone(),
470                    active_message_id: session.state.active_message_id.clone(),
471                })
472                .await
473                .map_err(|_| SessionManagerError::CreationFailed {
474                    message: "Failed to send restore command to App".to_string(),
475                })?;
476        }
477
478        // Add to active sessions
479        {
480            let mut sessions = self.active_sessions.write().await;
481            sessions.insert(session_id.to_string(), managed_session);
482        }
483
484        // Get the last event sequence for resume
485        let last_sequence = session.state.last_event_sequence;
486
487        // Emit session resumed event
488        let event = StreamEvent::SessionResumed {
489            session_id: session_id.to_string(),
490            event_offset: last_sequence,
491        };
492        self.emit_event(session_id.to_string(), event).await;
493
494        info!(session_id = %session_id, last_sequence = last_sequence, "Session resumed");
495        Ok(command_tx)
496    }
497
498    /// Suspend a session (save to storage and deactivate)
499    pub async fn suspend_session(&self, session_id: &str) -> Result<bool> {
500        let managed_session = {
501            let mut sessions = self.active_sessions.write().await;
502            sessions.remove(session_id)
503        };
504
505        let managed_session = match managed_session {
506            Some(session) => session,
507            None => {
508                debug!(session_id = %session_id, "Session not active, cannot suspend");
509                return Ok(false);
510            }
511        };
512
513        info!(session_id = %session_id, "Suspending session");
514
515        // Save current state to store
516        self.store.update_session(&managed_session.session).await?;
517
518        // Emit session saved event
519        let event = StreamEvent::SessionSaved {
520            session_id: session_id.to_string(),
521        };
522        self.emit_event(session_id.to_string(), event).await;
523
524        info!(session_id = %session_id, "Session suspended and saved");
525        Ok(true)
526    }
527
528    /// Delete a session permanently
529    pub async fn delete_session(&self, session_id: &str) -> Result<bool> {
530        // Remove from active sessions first
531        {
532            let mut sessions = self.active_sessions.write().await;
533            sessions.remove(session_id);
534        }
535
536        // Delete from store
537        self.store.delete_session(session_id).await?;
538
539        info!(session_id = %session_id, "Session deleted");
540        Ok(true)
541    }
542
543    /// List sessions with filtering
544    pub async fn list_sessions(&self, filter: SessionFilter) -> Result<Vec<SessionInfo>> {
545        Ok(self.store.list_sessions(filter).await?)
546    }
547
548    /// Get active session IDs
549    pub async fn get_active_sessions(&self) -> Vec<String> {
550        let sessions = self.active_sessions.read().await;
551        sessions.keys().cloned().collect()
552    }
553
554    /// Check if a session is active
555    pub async fn is_session_active(&self, session_id: &str) -> bool {
556        let sessions = self.active_sessions.read().await;
557        sessions.contains_key(session_id)
558    }
559
560    /// Send a command to an active session
561    pub async fn send_command(&self, session_id: &str, command: AppCommand) -> Result<()> {
562        let sessions = self.active_sessions.read().await;
563        if let Some(managed_session) = sessions.get(session_id) {
564            managed_session.command_tx.send(command).await.map_err(|_| {
565                Error::SessionManager(SessionManagerError::SessionNotActive {
566                    session_id: session_id.to_string(),
567                })
568            })
569        } else {
570            Err(Error::SessionManager(
571                SessionManagerError::SessionNotActive {
572                    session_id: session_id.to_string(),
573                },
574            ))
575        }
576    }
577
578    /// Update session state and persist if auto-persist is enabled
579    pub async fn update_session_state(
580        &self,
581        session_id: &str,
582        update_fn: impl FnOnce(&mut SessionState),
583    ) -> Result<()> {
584        {
585            let mut sessions = self.active_sessions.write().await;
586            if let Some(managed_session) = sessions.get_mut(session_id) {
587                managed_session.touch();
588                update_fn(&mut managed_session.session.state);
589                managed_session.session.update_timestamp();
590
591                // Auto-persist if enabled
592                if self.config.auto_persist {
593                    self.store.update_session(&managed_session.session).await?;
594                }
595            } else {
596                return Err(SessionManagerError::SessionNotActive {
597                    session_id: session_id.to_string(),
598                }
599                .into());
600            }
601        }
602
603        Ok(())
604    }
605
606    /// Emit an event for a session
607    pub async fn emit_event(&self, session_id: String, event: StreamEvent) {
608        // Get next sequence number and store event
609        let sequence_num = match self.store.append_event(&session_id, &event).await {
610            Ok(seq) => seq,
611            Err(e) => {
612                error!(session_id = %session_id, error = %e, "Failed to persist event");
613                return;
614            }
615        };
616
617        // Update session state with new sequence number
618        if let Err(e) = self
619            .update_session_state(&session_id, |state| {
620                state.last_event_sequence = sequence_num;
621            })
622            .await
623        {
624            error!(session_id = %session_id, error = %e, "Failed to update session sequence number");
625        }
626    }
627
628    /// Cleanup inactive sessions
629    pub async fn cleanup_inactive_sessions(&self, max_idle_time: chrono::Duration) -> usize {
630        let mut to_suspend = Vec::new();
631
632        {
633            let sessions = self.active_sessions.read().await;
634            for (session_id, managed_session) in sessions.iter() {
635                if managed_session.is_inactive(max_idle_time) {
636                    to_suspend.push(session_id.clone());
637                }
638            }
639        }
640
641        let mut suspended_count = 0;
642        for session_id in to_suspend {
643            if let Ok(true) = self.suspend_session(&session_id).await {
644                suspended_count += 1;
645            }
646        }
647
648        if suspended_count > 0 {
649            info!(
650                suspended_count = suspended_count,
651                "Cleaned up inactive sessions"
652            );
653        }
654
655        suspended_count
656    }
657
658    /// Get session store reference
659    pub fn store(&self) -> &Arc<dyn SessionStore> {
660        &self.store
661    }
662
663    /// Increment the subscriber count for a session
664    pub async fn increment_subscriber_count(&self, session_id: &str) -> Result<()> {
665        let mut sessions = self.active_sessions.write().await;
666        if let Some(managed_session) = sessions.get_mut(session_id) {
667            managed_session.subscriber_count += 1;
668            managed_session.touch();
669            debug!(
670                session_id = %session_id,
671                subscriber_count = managed_session.subscriber_count,
672                "Incremented subscriber count"
673            );
674            Ok(())
675        } else {
676            Err(SessionManagerError::SessionNotActive {
677                session_id: session_id.to_string(),
678            }
679            .into())
680        }
681    }
682
683    /// Decrement the subscriber count for a session
684    pub async fn decrement_subscriber_count(&self, session_id: &str) -> Result<()> {
685        let mut sessions = self.active_sessions.write().await;
686        if let Some(managed_session) = sessions.get_mut(session_id) {
687            managed_session.subscriber_count = managed_session.subscriber_count.saturating_sub(1);
688            managed_session.touch();
689            debug!(
690                session_id = %session_id,
691                subscriber_count = managed_session.subscriber_count,
692                "Decremented subscriber count"
693            );
694            Ok(())
695        } else {
696            // Session might have already been cleaned up
697            debug!(session_id = %session_id, "Session not active when decrementing subscriber count");
698            Ok(())
699        }
700    }
701
702    /// Touch a session to update its last activity timestamp
703    pub async fn touch_session(&self, session_id: &str) -> Result<()> {
704        let mut sessions = self.active_sessions.write().await;
705        if let Some(managed_session) = sessions.get_mut(session_id) {
706            managed_session.touch();
707            Ok(())
708        } else {
709            // Session might have been suspended, which is okay
710            Ok(())
711        }
712    }
713
714    /// Check if a session should be suspended due to no subscribers
715    pub async fn maybe_suspend_idle_session(&self, session_id: &str) -> Result<()> {
716        // Check if session has no subscribers
717        let should_suspend = {
718            let sessions = self.active_sessions.read().await;
719            if let Some(managed_session) = sessions.get(session_id) {
720                managed_session.subscriber_count == 0
721            } else {
722                false // Already suspended or deleted
723            }
724        };
725
726        if should_suspend {
727            info!(session_id = %session_id, "No active subscribers, suspending session");
728            self.suspend_session(session_id).await?;
729        }
730
731        Ok(())
732    }
733
734    /// Get session state for gRPC responses
735    pub async fn get_session_state(
736        &self,
737        session_id: &str,
738    ) -> Result<Option<crate::session::SessionState>> {
739        info!("get_session_state called for session: {}", session_id);
740
741        // Always load from store to get the most up-to-date state
742        // The in-memory state in ManagedSession may be stale
743        match self.store.get_session(session_id).await {
744            Ok(Some(session)) => {
745                info!(
746                    "Loaded session from store with {} messages",
747                    session.state.messages.len()
748                );
749                Ok(Some(session.state))
750            }
751            Ok(None) => {
752                info!("Session not found in store: {}", session_id);
753                Ok(None)
754            }
755            Err(e) => {
756                error!("Error loading session from store: {}", e);
757                Err(SessionManagerError::Storage(e).into())
758            }
759        }
760    }
761
762    /// Get MCP server connection statuses for a session
763    pub async fn get_mcp_statuses(
764        &self,
765        session_id: &str,
766    ) -> Result<Vec<crate::session::McpServerInfo>> {
767        // First check if session is active (has most up-to-date info)
768        {
769            let sessions = self.active_sessions.read().await;
770            if let Some(managed_session) = sessions.get(session_id) {
771                let servers: Vec<_> = managed_session
772                    .session
773                    .state
774                    .mcp_servers
775                    .values()
776                    .cloned()
777                    .collect();
778                return Ok(servers);
779            }
780        }
781
782        // If not active, load from store
783        match self.store.get_session(session_id).await? {
784            Some(session) => {
785                let servers: Vec<_> = session.state.mcp_servers.values().cloned().collect();
786                Ok(servers)
787            }
788            None => Err(SessionManagerError::SessionNotActive {
789                session_id: session_id.to_string(),
790            }
791            .into()),
792        }
793    }
794}
795
796/// Convert AppEvent to StreamEvent, returning None for events that shouldn't be streamed
797fn translate_app_event(app_event: AppEvent) -> Option<StreamEvent> {
798    match app_event {
799        AppEvent::MessageAdded { message, model } => Some(StreamEvent::MessageComplete {
800            message,
801            usage: None,
802            metadata: std::collections::HashMap::new(),
803            model,
804        }),
805
806        AppEvent::MessagePart { id, delta } => Some(StreamEvent::MessagePart {
807            content: delta,
808            message_id: id,
809        }),
810
811        AppEvent::ToolCallStarted {
812            name,
813            id,
814            parameters,
815            model,
816        } => {
817            let tool_call = ToolCall {
818                id: id.clone(),
819                name: name.clone(),
820                parameters,
821            };
822            Some(StreamEvent::ToolCallStarted {
823                tool_call,
824                metadata: std::collections::HashMap::new(),
825                model,
826            })
827        }
828
829        AppEvent::ToolCallCompleted {
830            name: _,
831            result,
832            id,
833            model,
834        } => Some(StreamEvent::ToolCallCompleted {
835            tool_call_id: id,
836            result,
837            metadata: std::collections::HashMap::new(),
838            model,
839        }),
840
841        AppEvent::ToolCallFailed {
842            name: _,
843            error,
844            id,
845            model,
846        } => Some(StreamEvent::ToolCallFailed {
847            tool_call_id: id,
848            error,
849            metadata: std::collections::HashMap::new(),
850            model,
851        }),
852
853        AppEvent::WorkspaceChanged => Some(StreamEvent::WorkspaceChanged),
854
855        AppEvent::WorkspaceFiles { files } => Some(StreamEvent::WorkspaceFiles {
856            files: files.clone(),
857        }),
858
859        AppEvent::Started { id, op } => Some(StreamEvent::OperationStarted {
860            operation_id: id,
861            operation: op,
862        }),
863        AppEvent::Finished { id, outcome } => Some(StreamEvent::OperationCompleted {
864            operation_id: id,
865            outcome,
866        }),
867        AppEvent::OperationCancelled { op_id, info } => {
868            // Use the provided operation ID, or generate a new one if not available
869            let operation_id = op_id.unwrap_or_else(uuid::Uuid::new_v4);
870            Some(StreamEvent::OperationCancelled {
871                operation_id,
872                reason: info.to_string(), // Use Display implementation for reason
873            })
874        }
875
876        // These events don't need to be streamed
877        _ => None,
878    }
879}
880/// Update session state based on a StreamEvent
881async fn update_session_state_for_event(
882    store: &Arc<dyn SessionStore>,
883    session_id: &str,
884    event: &StreamEvent,
885) -> Result<()> {
886    match event {
887        StreamEvent::MessageComplete { message, .. } => {
888            store.append_message(session_id, message).await?;
889
890            // Update tool call if this is a tool result
891            if let crate::app::conversation::MessageData::Tool {
892                tool_use_id,
893                result,
894                ..
895            } = &message.data
896            {
897                let stats = crate::session::ToolExecutionStats::success_typed(
898                    serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
899                    result.variant_name().to_string(),
900                    0, // Duration tracked elsewhere via Started/Finished events
901                );
902                let update = ToolCallUpdate::set_result(stats);
903                store.update_tool_call(tool_use_id, update).await?;
904            }
905        }
906        StreamEvent::ToolCallStarted { tool_call, .. } => {
907            store.create_tool_call(session_id, tool_call).await?;
908        }
909        StreamEvent::ToolCallCompleted {
910            tool_call_id,
911            result,
912            ..
913        } => {
914            let stats = crate::session::ToolExecutionStats::success_typed(
915                serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
916                result.variant_name().to_string(),
917                0,
918            );
919            let update = ToolCallUpdate::set_result(stats);
920            store.update_tool_call(tool_call_id, update).await?;
921
922            // Also add a Tool message with the result
923            // Get parent info from the latest message in the session
924            let messages = store.get_messages(session_id, None).await?;
925            let parent_id = messages.last().map(|m| m.id().to_string());
926
927            let tool_message = ConversationMessage {
928                data: crate::app::conversation::MessageData::Tool {
929                    tool_use_id: tool_call_id.clone(),
930                    result: result.clone(),
931                },
932                timestamp: std::time::SystemTime::now()
933                    .duration_since(std::time::UNIX_EPOCH)
934                    .expect("Time went backwards")
935                    .as_secs(),
936                id: format!("tool_result_{tool_call_id}"),
937                parent_message_id: parent_id,
938            };
939            store.append_message(session_id, &tool_message).await?;
940        }
941        StreamEvent::ToolCallFailed {
942            tool_call_id,
943            error,
944            ..
945        } => {
946            let update = ToolCallUpdate::set_error(error.clone());
947            store.update_tool_call(tool_call_id, update).await?;
948
949            // Also add a Tool message with the error
950            // Get parent info from the latest message in the session
951            let messages = store.get_messages(session_id, None).await?;
952            let parent_id = messages.last().map(|m| m.id().to_string());
953
954            let tool_error = steer_tools::error::ToolError::Execution {
955                tool_name: "unknown".to_string(), // We don't have the tool name here
956                message: error.clone(),
957            };
958            let tool_message = ConversationMessage {
959                data: MessageData::Tool {
960                    tool_use_id: tool_call_id.clone(),
961                    result: ToolResult::Error(tool_error),
962                },
963                timestamp: std::time::SystemTime::now()
964                    .duration_since(std::time::UNIX_EPOCH)
965                    .expect("Time went backwards")
966                    .as_secs(),
967                id: format!("tool_result_{tool_call_id}"),
968                parent_message_id: parent_id,
969            };
970            store.append_message(session_id, &tool_message).await?;
971        }
972        // Other events don't directly modify stored state
973        _ => {}
974    }
975    Ok(())
976}
977
978#[cfg(test)]
979mod tests {
980    use super::*;
981    use crate::app::MessageData;
982    use crate::app::conversation::{AssistantContent, Role, UserContent};
983    use crate::session::stores::sqlite::SqliteSessionStore;
984    use tempfile::TempDir;
985
986    async fn create_test_manager() -> (SessionManager, TempDir) {
987        let temp_dir = TempDir::new().unwrap();
988        let db_path = temp_dir.path().join("test.db");
989        let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
990
991        let config = SessionManagerConfig {
992            max_concurrent_sessions: 100,
993            default_model: crate::config::model::builtin::claude_sonnet_4_20250514(),
994            auto_persist: true,
995        };
996        let manager = SessionManager::new(store, config);
997
998        (manager, temp_dir)
999    }
1000
1001    fn create_test_app_config() -> AppConfig {
1002        crate::test_utils::test_app_config()
1003    }
1004
1005    #[tokio::test]
1006    async fn test_create_and_resume_session() {
1007        let (manager, temp) = create_test_manager().await;
1008        let app_config = create_test_app_config();
1009
1010        // Create session
1011        let session_config = SessionConfig {
1012            workspace: crate::session::state::WorkspaceConfig::Local {
1013                path: temp.path().to_path_buf(),
1014            },
1015            tool_config: crate::session::SessionToolConfig::default(),
1016            system_prompt: None,
1017            metadata: std::collections::HashMap::new(),
1018        };
1019        let (session_id, _command_tx) = manager
1020            .create_session(session_config, app_config.clone())
1021            .await
1022            .unwrap();
1023        assert!(!session_id.is_empty());
1024        assert!(manager.is_session_active(&session_id).await);
1025
1026        // Suspend session
1027        assert!(manager.suspend_session(&session_id).await.unwrap());
1028        assert!(!manager.is_session_active(&session_id).await);
1029
1030        // Resume Session
1031        let _command_tx = manager
1032            .resume_session(&session_id, app_config)
1033            .await
1034            .unwrap();
1035        assert!(manager.is_session_active(&session_id).await);
1036    }
1037
1038    #[tokio::test]
1039    async fn test_session_cleanup() {
1040        let (manager, temp) = create_test_manager().await;
1041        let app_config = create_test_app_config();
1042
1043        // Create session
1044        let session_config = SessionConfig {
1045            workspace: crate::session::state::WorkspaceConfig::Local {
1046                path: temp.path().to_path_buf(),
1047            },
1048            tool_config: crate::session::SessionToolConfig::default(),
1049            system_prompt: None,
1050            metadata: std::collections::HashMap::new(),
1051        };
1052        let (session_id, _command_tx) = manager
1053            .create_session(session_config, app_config)
1054            .await
1055            .unwrap();
1056
1057        // Make session appear inactive by manipulating last_activity
1058        {
1059            let mut sessions = manager.active_sessions.write().await;
1060            if let Some(session) = sessions.get_mut(&session_id) {
1061                session.last_activity = chrono::Utc::now() - chrono::Duration::hours(2);
1062                session.subscriber_count = 0;
1063            }
1064        }
1065
1066        // Cleanup should suspend the session
1067        let cleaned = manager
1068            .cleanup_inactive_sessions(chrono::Duration::hours(1))
1069            .await;
1070        assert_eq!(cleaned, 1);
1071        assert!(!manager.is_session_active(&session_id).await);
1072    }
1073
1074    #[tokio::test]
1075    async fn test_capacity_rejection() {
1076        let temp_dir = TempDir::new().unwrap();
1077        let temp = tempfile::TempDir::new().unwrap();
1078        let db_path = temp_dir.path().join("test.db");
1079        let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
1080
1081        let config = SessionManagerConfig {
1082            max_concurrent_sessions: 1, // Set to 1 for testing
1083            default_model: crate::config::model::builtin::claude_sonnet_4_20250514(),
1084            auto_persist: true,
1085        };
1086        let manager = SessionManager::new(store, config);
1087        let app_config = create_test_app_config();
1088
1089        // Create first session - should succeed
1090        let tool_config = crate::session::SessionToolConfig {
1091            approval_policy: crate::session::ToolApprovalPolicy::AlwaysAsk,
1092            ..Default::default()
1093        };
1094
1095        let session_config = SessionConfig {
1096            workspace: crate::session::state::WorkspaceConfig::Local {
1097                path: temp.path().to_path_buf(),
1098            },
1099            tool_config,
1100            system_prompt: None,
1101            metadata: std::collections::HashMap::new(),
1102        };
1103        let (session_id1, _command_tx) = manager
1104            .create_session(session_config.clone(), app_config.clone())
1105            .await
1106            .unwrap();
1107        assert!(!session_id1.is_empty());
1108
1109        // Create second session - should fail with capacity error
1110        let result = manager.create_session(session_config, app_config).await;
1111
1112        assert!(result.is_err());
1113        assert!(matches!(
1114            result,
1115            Err(crate::error::Error::SessionManager(
1116                SessionManagerError::CapacityExceeded { .. }
1117            ))
1118        ));
1119        match result.unwrap_err() {
1120            crate::error::Error::SessionManager(SessionManagerError::CapacityExceeded {
1121                current,
1122                max,
1123            }) => {
1124                assert_eq!(current, 1);
1125                assert_eq!(max, 1);
1126            }
1127            _ => unreachable!(),
1128        }
1129    }
1130
1131    #[tokio::test]
1132    async fn test_tool_result_persistence_on_resume() {
1133        let (manager, temp) = create_test_manager().await;
1134        let app_config = create_test_app_config();
1135
1136        // Create session
1137        let session_config = SessionConfig {
1138            workspace: crate::session::state::WorkspaceConfig::Local {
1139                path: temp.path().to_path_buf(),
1140            },
1141            tool_config: crate::session::SessionToolConfig::default(),
1142            system_prompt: None,
1143            metadata: std::collections::HashMap::new(),
1144        };
1145        let (session_id, _command_tx) = manager
1146            .create_session(session_config, app_config.clone())
1147            .await
1148            .unwrap();
1149
1150        // Simulate adding messages including tool calls and results
1151        // First, add a user message
1152        let user_message = ConversationMessage {
1153            data: crate::app::conversation::MessageData::User {
1154                content: vec![UserContent::Text {
1155                    text: "Read the file test.txt".to_string(),
1156                }],
1157            },
1158            timestamp: 123456789,
1159            id: "user_1".to_string(),
1160            parent_message_id: None,
1161        };
1162        manager
1163            .store
1164            .append_message(&session_id, &user_message)
1165            .await
1166            .unwrap();
1167
1168        // Add an assistant message with a tool call
1169        let assistant_message = ConversationMessage {
1170            data: crate::app::conversation::MessageData::Assistant {
1171                content: vec![
1172                    AssistantContent::Text {
1173                        text: "I'll read that file for you.".to_string(),
1174                    },
1175                    AssistantContent::ToolCall {
1176                        tool_call: ToolCall {
1177                            id: "tool_call_1".to_string(),
1178                            name: "read_file".to_string(),
1179                            parameters: serde_json::json!({"path": "test.txt"}),
1180                        },
1181                    },
1182                ],
1183            },
1184            timestamp: 123456790,
1185            id: "assistant_1".to_string(),
1186            parent_message_id: Some("user_1".to_string()),
1187        };
1188        manager
1189            .store
1190            .append_message(&session_id, &assistant_message)
1191            .await
1192            .unwrap();
1193
1194        // Simulate tool result directly by creating and storing the tool call manually
1195        // This mimics what would happen during actual tool execution
1196        let tool_call = ToolCall {
1197            id: "tool_call_1".to_string(),
1198            name: "read_file".to_string(),
1199            parameters: serde_json::json!({"path": "test.txt"}),
1200        };
1201        manager
1202            .store
1203            .create_tool_call(&session_id, &tool_call)
1204            .await
1205            .unwrap();
1206
1207        // Now update with the result
1208        let stats = crate::session::ToolExecutionStats::success_typed(
1209            serde_json::json!({
1210                "content": "File contents: Hello, world!",
1211                "file_path": "test.txt",
1212                "line_count": 1,
1213                "truncated": false
1214            }),
1215            "FileContent".to_string(),
1216            0,
1217        );
1218        let update = ToolCallUpdate::set_result(stats);
1219        manager
1220            .store
1221            .update_tool_call("tool_call_1", update)
1222            .await
1223            .unwrap();
1224
1225        // Also add a Tool message with the result
1226        let tool_message = ConversationMessage {
1227            data: MessageData::Tool {
1228                tool_use_id: "tool_call_1".to_string(),
1229                result: ToolResult::FileContent(steer_tools::result::FileContentResult {
1230                    content: "File contents: Hello, world!".to_string(),
1231                    file_path: "test.txt".to_string(),
1232                    line_count: 1,
1233                    truncated: false,
1234                }),
1235            },
1236            timestamp: 123456790,
1237            id: "tool_result_tool_call_1".to_string(),
1238            parent_message_id: Some("assistant_1".to_string()),
1239        };
1240        manager
1241            .store
1242            .append_message(&session_id, &tool_message)
1243            .await
1244            .unwrap();
1245
1246        // Add a follow-up assistant message
1247        let followup_message = ConversationMessage {
1248            data: crate::app::conversation::MessageData::Assistant {
1249                content: vec![AssistantContent::Text {
1250                    text: "The file contains: Hello, world!".to_string(),
1251                }],
1252            },
1253            timestamp: 123456791,
1254            id: "assistant_2".to_string(),
1255            parent_message_id: Some("assistant_1".to_string()),
1256        };
1257        manager
1258            .store
1259            .append_message(&session_id, &followup_message)
1260            .await
1261            .unwrap();
1262
1263        // Suspend the session
1264        manager.suspend_session(&session_id).await.unwrap();
1265
1266        // Load the session from storage and verify tool result message exists
1267        let loaded_session = manager
1268            .store
1269            .get_session(&session_id)
1270            .await
1271            .unwrap()
1272            .unwrap();
1273
1274        // Should have 4 messages: user, assistant with tool call, tool result, followup
1275        assert_eq!(loaded_session.state.messages.len(), 4);
1276
1277        // Verify the tool result message exists and has correct content
1278        let tool_result_msg = &loaded_session.state.messages[2];
1279        assert_eq!(tool_result_msg.role(), Role::Tool);
1280
1281        // Verify the content structure
1282        assert!(matches!(
1283            &tool_result_msg.data,
1284            crate::app::conversation::MessageData::Tool { .. }
1285        ));
1286        if let crate::app::conversation::MessageData::Tool {
1287            tool_use_id,
1288            result,
1289            ..
1290        } = &tool_result_msg.data
1291        {
1292            assert_eq!(tool_use_id, "tool_call_1");
1293            assert!(matches!(
1294                result,
1295                crate::app::conversation::ToolResult::FileContent(_)
1296            ));
1297            match result {
1298                crate::app::conversation::ToolResult::FileContent(content) => {
1299                    assert!(content.content.contains("Hello, world!"));
1300                }
1301                _ => unreachable!(),
1302            }
1303        } else {
1304            panic!("Expected Tool message");
1305        }
1306
1307        // Now test resuming the session - it should work without API errors
1308        let _command_tx = manager
1309            .resume_session(&session_id, app_config)
1310            .await
1311            .unwrap();
1312
1313        // The session should be properly restored with all messages including tool results
1314        // If the bug were still present, trying to send a new message would fail with the
1315        // "tool_use ids were found without tool_result blocks" error from the API
1316    }
1317
1318    #[tokio::test]
1319    async fn test_active_message_id_persistence() {
1320        let (manager, temp) = create_test_manager().await;
1321        let app_config = create_test_app_config();
1322
1323        // Create session
1324        let session_config = SessionConfig {
1325            workspace: crate::session::state::WorkspaceConfig::Local {
1326                path: temp.path().to_path_buf(),
1327            },
1328            tool_config: crate::session::SessionToolConfig::default(),
1329            system_prompt: None,
1330            metadata: std::collections::HashMap::new(),
1331        };
1332        let (session_id, _command_tx) = manager
1333            .create_session(session_config, app_config.clone())
1334            .await
1335            .unwrap();
1336
1337        // Add some messages directly to the store (simulating a conversation with branches)
1338        let msg1 = ConversationMessage {
1339            data: crate::app::conversation::MessageData::User {
1340                content: vec![UserContent::Text {
1341                    text: "Hello".to_string(),
1342                }],
1343            },
1344            timestamp: 1000,
1345            id: "msg1".to_string(),
1346            parent_message_id: None,
1347        };
1348
1349        let msg2 = ConversationMessage {
1350            data: crate::app::conversation::MessageData::Assistant {
1351                content: vec![AssistantContent::Text {
1352                    text: "Hi there!".to_string(),
1353                }],
1354            },
1355            timestamp: 2000,
1356            id: "msg2".to_string(),
1357            parent_message_id: Some("msg1".to_string()),
1358        };
1359
1360        // Add a branch - edited version of msg1
1361        let msg1_edited = ConversationMessage {
1362            data: crate::app::conversation::MessageData::User {
1363                content: vec![UserContent::Text {
1364                    text: "Goodbye".to_string(),
1365                }],
1366            },
1367            timestamp: 3000,
1368            id: "msg1_edited".to_string(),
1369            parent_message_id: None, // Same parent as original msg1
1370        };
1371
1372        // Store messages
1373        manager
1374            .store
1375            .append_message(&session_id, &msg1)
1376            .await
1377            .unwrap();
1378        manager
1379            .store
1380            .append_message(&session_id, &msg2)
1381            .await
1382            .unwrap();
1383        manager
1384            .store
1385            .append_message(&session_id, &msg1_edited)
1386            .await
1387            .unwrap();
1388
1389        // Update active message ID to the edited branch
1390        manager
1391            .store
1392            .update_active_message_id(&session_id, Some("msg1_edited"))
1393            .await
1394            .unwrap();
1395
1396        // Suspend the session
1397        manager.suspend_session(&session_id).await.unwrap();
1398
1399        // Load the session back and verify active_message_id was persisted
1400        let loaded_session = manager
1401            .store
1402            .get_session(&session_id)
1403            .await
1404            .unwrap()
1405            .unwrap();
1406
1407        // The active_message_id should be set to the edited message
1408        assert_eq!(
1409            loaded_session.state.active_message_id,
1410            Some("msg1_edited".to_string())
1411        );
1412
1413        // Should have 3 messages total (original 2 + edited version)
1414        assert_eq!(loaded_session.state.messages.len(), 3);
1415
1416        // Verify the edited message exists and has correct content
1417        let edited_msg = loaded_session
1418            .state
1419            .messages
1420            .iter()
1421            .find(|m| m.id() == "msg1_edited")
1422            .expect("Edited message should exist");
1423
1424        match &edited_msg.data {
1425            crate::app::conversation::MessageData::User { content, .. } => {
1426                if let Some(UserContent::Text { text }) = content.first() {
1427                    assert_eq!(text, "Goodbye");
1428                } else {
1429                    panic!("Expected text content");
1430                }
1431            }
1432            _ => panic!("Expected user message"),
1433        }
1434
1435        // Resume the session and verify the active_message_id is still correct
1436        let _ = manager
1437            .resume_session(&session_id, app_config)
1438            .await
1439            .unwrap();
1440
1441        // Get the state through the manager's API
1442        let state = manager
1443            .get_session_state(&session_id)
1444            .await
1445            .unwrap()
1446            .unwrap();
1447        assert_eq!(state.active_message_id, Some("msg1_edited".to_string()));
1448    }
1449}