steer_core/session/
manager.rs

1use crate::api::{Model, ToolCall};
2use crate::app::{
3    App, AppCommand, AppConfig, AppEvent, Conversation, Message as ConversationMessage, MessageData,
4};
5use crate::error::{Error, Result};
6use crate::events::StreamEvent;
7use crate::session::{
8    Session, SessionConfig, SessionFilter, SessionInfo, SessionState, SessionStore,
9    SessionStoreError, ToolCallUpdate,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use steer_tools::ToolResult;
14use thiserror::Error;
15use tokio::sync::{RwLock, mpsc};
16use tokio::task::JoinHandle;
17use tracing::{debug, error, info, warn};
18
19/// Session manager specific errors
20#[derive(Debug, Error)]
21pub enum SessionManagerError {
22    #[error("Maximum session capacity reached ({current}/{max}). Cannot create new session.")]
23    CapacityExceeded { current: usize, max: usize },
24
25    #[error("Session not active: {session_id}")]
26    SessionNotActive { session_id: String },
27
28    #[error("Session {session_id} already has an active listener")]
29    SessionAlreadyHasListener { session_id: String },
30
31    #[error("Failed to create managed session: {message}")]
32    CreationFailed { message: String },
33
34    #[error(transparent)]
35    Storage(#[from] SessionStoreError),
36}
37
38/// Configuration for the SessionManager
39#[derive(Debug, Clone)]
40pub struct SessionManagerConfig {
41    /// Maximum number of concurrent active sessions
42    pub max_concurrent_sessions: usize,
43    /// Default model for new sessions
44    pub default_model: Model,
45    /// Whether to automatically persist sessions
46    pub auto_persist: bool,
47}
48
49/// A managed session contains both the session state and the App instance
50pub struct ManagedSession {
51    /// The session data
52    pub session: Session,
53    /// Command sender for the App
54    pub command_tx: mpsc::Sender<AppCommand>,
55    /// Event receiver from the App (for external consumers like TUI)
56    pub event_rx: Option<mpsc::Receiver<AppEvent>>,
57    /// Event subscriber count
58    pub subscriber_count: usize,
59    /// Last activity timestamp for cleanup
60    pub last_activity: chrono::DateTime<chrono::Utc>,
61    /// Handle to the app actor loop task
62    pub app_task_handle: JoinHandle<()>,
63    /// Handle to the event translation task
64    pub event_task_handle: JoinHandle<()>,
65}
66
67impl ManagedSession {
68    /// Create a new managed session
69    pub async fn new(
70        session: Session,
71        app_config: AppConfig,
72        store: Arc<dyn SessionStore>,
73        default_model: Model,
74        conversation: Option<Conversation>,
75    ) -> Result<Self> {
76        // Create channels for the App
77        let (app_event_tx, mut app_event_rx) = mpsc::channel(100);
78        let (app_command_tx, app_command_rx) = mpsc::channel::<AppCommand>(32);
79
80        // Always create external event channel
81        let (external_event_tx, external_event_rx) = mpsc::channel(100);
82
83        // Initialize the global command sender for tool approval requests
84        crate::app::OpContext::init_command_tx(app_command_tx.clone());
85
86        // Build workspace from session config first
87        let workspace = session.build_workspace().await?;
88
89        // Build backend registry from session tool config, passing workspace
90        let backend_registry = session
91            .config
92            .build_registry(
93                Arc::new(app_config.llm_config_provider.clone()),
94                workspace.clone(),
95            )
96            .await?;
97
98        let tool_executor = Arc::new(crate::app::ToolExecutor::with_all_components(
99            workspace.clone(),
100            Arc::new(backend_registry),
101            Arc::new(crate::app::validation::ValidatorRegistry::new()),
102            app_config.llm_config_provider.clone(),
103        ));
104
105        // Create the App instance with the configured tool executor and session config
106        let mut app = if let Some(conv) = conversation {
107            App::new_with_conversation(
108                app_config,
109                app_event_tx,
110                default_model,
111                workspace.clone(),
112                tool_executor,
113                Some(session.config.clone()),
114                conv,
115            )
116            .await?
117        } else {
118            App::new(
119                app_config,
120                app_event_tx,
121                default_model,
122                workspace.clone(),
123                tool_executor,
124                Some(session.config.clone()),
125            )
126            .await?
127        };
128
129        // Set the initial model if specified in session metadata
130        if let Some(model_str) = session.config.metadata.get("initial_model") {
131            if let Ok(model) = model_str.parse::<crate::api::Model>() {
132                let _ = app.set_model(model).await;
133            }
134        }
135
136        // Spawn the app actor loop
137        let app_task_handle = tokio::spawn(crate::app::app_actor_loop(app, app_command_rx));
138
139        // Spawn the event translation/duplication task
140        let session_id = session.id.clone();
141        let store_clone = store.clone();
142
143        let event_task_handle = tokio::spawn(async move {
144            while let Some(app_event) = app_event_rx.recv().await {
145                // Always duplicate to external consumer
146                if let Err(e) = external_event_tx.try_send(app_event.clone()) {
147                    warn!(session_id = %session_id, "Failed to send event to external consumer: {}", e);
148                }
149
150                // Handle ActiveMessageIdChanged event specially
151                if let AppEvent::ActiveMessageIdChanged { message_id } = &app_event {
152                    if let Err(e) = store_clone
153                        .update_active_message_id(&session_id, message_id.as_deref())
154                        .await
155                    {
156                        error!(session_id = %session_id, error = %e, "Failed to update active message ID");
157                    }
158                }
159
160                // Translate and persist
161                if let Some(stream_event) = translate_app_event(app_event) {
162                    // Persist event
163                    if let Ok(_sequence_num) =
164                        store_clone.append_event(&session_id, &stream_event).await
165                    {
166                        // Update session state in store
167                        if let Err(e) =
168                            update_session_state_for_event(&store_clone, &session_id, &stream_event)
169                                .await
170                        {
171                            error!(session_id = %session_id, error = %e, "Failed to update session state");
172                        }
173                    }
174                }
175            }
176            info!(session_id = %session_id, "Event translation loop ended");
177        });
178
179        Ok(Self {
180            session,
181            command_tx: app_command_tx,
182            event_rx: Some(external_event_rx),
183            subscriber_count: 0,
184            last_activity: chrono::Utc::now(),
185            app_task_handle,
186            event_task_handle,
187        })
188    }
189
190    /// Take the event receiver (can only be called once)
191    pub fn take_event_rx(&mut self) -> Option<mpsc::Receiver<AppEvent>> {
192        self.event_rx.take()
193    }
194
195    /// Update last activity timestamp
196    pub fn touch(&mut self) {
197        self.last_activity = chrono::Utc::now();
198    }
199
200    /// Check if session is inactive (no subscribers and old)
201    pub fn is_inactive(&self, max_idle_time: chrono::Duration) -> bool {
202        self.subscriber_count == 0 && chrono::Utc::now() - self.last_activity > max_idle_time
203    }
204
205    /// Shutdown the session gracefully
206    pub async fn shutdown(self) {
207        // Send shutdown command to app
208        let _ = self.command_tx.send(AppCommand::Shutdown).await;
209
210        // Wait for tasks to complete
211        let _ = self.app_task_handle.await;
212        let _ = self.event_task_handle.await;
213    }
214}
215
216/// Manages multiple concurrent sessions
217pub struct SessionManager {
218    /// Active sessions with their App instances
219    active_sessions: Arc<RwLock<HashMap<String, ManagedSession>>>,
220    /// Session store for persistence
221    store: Arc<dyn SessionStore>,
222    /// Configuration
223    config: SessionManagerConfig,
224}
225
226impl SessionManager {
227    /// Create a new SessionManager
228    pub fn new(store: Arc<dyn SessionStore>, config: SessionManagerConfig) -> Self {
229        Self {
230            active_sessions: Arc::new(RwLock::new(HashMap::new())),
231            store,
232            config,
233        }
234    }
235
236    /// Create a new session
237    pub async fn create_session(
238        &self,
239        config: SessionConfig,
240        app_config: AppConfig,
241    ) -> Result<(String, mpsc::Sender<AppCommand>)> {
242        let session_config = config;
243
244        // Create session in store
245        let session = self.store.create_session(session_config).await?;
246        let session_id = session.id.clone();
247
248        info!(session_id = %session_id, "Creating new session");
249
250        // Check if we're at max capacity
251        {
252            let sessions = self.active_sessions.read().await;
253            if sessions.len() >= self.config.max_concurrent_sessions {
254                error!(
255                    session_id = %session_id,
256                    active_count = sessions.len(),
257                    max_capacity = self.config.max_concurrent_sessions,
258                    "Session creation rejected: at maximum capacity"
259                );
260                return Err(SessionManagerError::CapacityExceeded {
261                    current: sessions.len(),
262                    max: self.config.max_concurrent_sessions,
263                }
264                .into());
265            }
266        }
267
268        // Create managed session with event translation
269        let managed_session = ManagedSession::new(
270            session.clone(),
271            app_config,
272            self.store.clone(),
273            self.config.default_model,
274            None,
275        )
276        .await
277        .map_err(|e| SessionManagerError::CreationFailed {
278            message: format!("Failed to create managed session: {e}"),
279        })?;
280
281        // Get command sender before moving into sessions map
282        let command_tx = managed_session.command_tx.clone();
283
284        // Add to active sessions
285        {
286            let mut sessions = self.active_sessions.write().await;
287            sessions.insert(session_id.clone(), managed_session);
288        }
289
290        // Emit session created event
291        let metadata = crate::events::SessionMetadata::from(&SessionInfo::from(&session));
292        let event = StreamEvent::SessionCreated {
293            session_id: session_id.clone(),
294            metadata,
295        };
296        self.emit_event(session_id.clone(), event).await;
297
298        info!(session_id = %session_id, "Session created and activated");
299        Ok((session_id, command_tx))
300    }
301
302    /// Take the event receiver for a session (can only be called once per session)
303    pub async fn take_event_receiver(&self, session_id: &str) -> Result<mpsc::Receiver<AppEvent>> {
304        let mut sessions = self.active_sessions.write().await;
305        match sessions.get_mut(session_id) {
306            Some(managed_session) => {
307                if let Some(receiver) = managed_session.take_event_rx() {
308                    Ok(receiver)
309                } else {
310                    Err(SessionManagerError::SessionAlreadyHasListener {
311                        session_id: session_id.to_string(),
312                    }
313                    .into())
314                }
315            }
316            None => Err(SessionManagerError::SessionNotActive {
317                session_id: session_id.to_string(),
318            }
319            .into()),
320        }
321    }
322
323    /// Get session information
324    pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionInfo>> {
325        // First check if it's active
326        {
327            let sessions = self.active_sessions.read().await;
328            if let Some(managed_session) = sessions.get(session_id) {
329                return Ok(Some(SessionInfo::from(&managed_session.session)));
330            }
331        }
332
333        // If not active, load from store
334        if let Some(session) = self.store.get_session(session_id).await? {
335            Ok(Some(SessionInfo::from(&session)))
336        } else {
337            Ok(None)
338        }
339    }
340
341    /// Get the workspace for a session
342    pub async fn get_session_workspace(
343        &self,
344        session_id: &str,
345    ) -> Result<Option<Arc<dyn crate::workspace::Workspace>>> {
346        // First check if session is active
347        {
348            let active_sessions = self.active_sessions.read().await;
349            if let Some(managed_session) = active_sessions.get(session_id) {
350                // Session is active, rebuild workspace from config
351                return Ok(Some(
352                    managed_session
353                        .session
354                        .build_workspace()
355                        .await
356                        .map_err(|e| SessionManagerError::CreationFailed {
357                            message: format!("Failed to build workspace: {e}"),
358                        })?,
359                ));
360            }
361        }
362
363        // Session not active, try to load from store
364        if let Some(session_info) = self.store.get_session(session_id).await? {
365            let session = session_info;
366            Ok(Some(session.build_workspace().await.map_err(|e| {
367                SessionManagerError::CreationFailed {
368                    message: format!("Failed to build workspace: {e}"),
369                }
370            })?))
371        } else {
372            Ok(None)
373        }
374    }
375
376    /// Resume a session (load from storage and activate)
377    pub async fn resume_session(
378        &self,
379        session_id: &str,
380        app_config: AppConfig,
381    ) -> Result<mpsc::Sender<AppCommand>> {
382        // Check if already active
383        {
384            let sessions = self.active_sessions.read().await;
385            if let Some(managed_session) = sessions.get(session_id) {
386                debug!(session_id = %session_id, "Session already active");
387                return Ok(managed_session.command_tx.clone());
388            }
389        }
390
391        // Load from store
392        let session = match self
393            .store
394            .get_session(session_id)
395            .await
396            .map_err(SessionManagerError::Storage)?
397        {
398            Some(session) => session,
399            None => {
400                debug!(session_id = %session_id, "Session not found in store");
401                return Err(SessionManagerError::SessionNotActive {
402                    session_id: session_id.to_string(),
403                }
404                .into());
405            }
406        };
407
408        info!(session_id = %session_id, "Resuming session from storage");
409
410        // Check capacity
411        {
412            let sessions = self.active_sessions.read().await;
413            if sessions.len() >= self.config.max_concurrent_sessions {
414                warn!(
415                    session_id = %session_id,
416                    active_count = sessions.len(),
417                    max_capacity = self.config.max_concurrent_sessions,
418                    "At maximum session capacity for resume"
419                );
420                // TODO: Implement eviction strategy
421            }
422        }
423
424        // Create a conversation from the session state
425        let conversation = Conversation {
426            messages: session.state.messages.clone(),
427            working_directory: session
428                .config
429                .workspace
430                .get_path()
431                .unwrap_or_default()
432                .into(),
433            active_message_id: session.state.active_message_id.clone(),
434        };
435
436        // Create managed session with event translation
437        let managed_session = ManagedSession::new(
438            session.clone(),
439            app_config,
440            self.store.clone(),
441            self.config.default_model,
442            Some(conversation),
443        )
444        .await
445        .map_err(|e| SessionManagerError::CreationFailed {
446            message: format!("Failed to create managed session: {e}"),
447        })?;
448
449        // Get command sender before restoration
450        let command_tx = managed_session.command_tx.clone();
451
452        // Restore conversation history and approved tools atomically
453        if !session.state.messages.is_empty() || !session.state.approved_tools.is_empty() {
454            info!(
455                session_id = %session_id,
456                message_count = session.state.messages.len(),
457                tool_count = session.state.approved_tools.len(),
458                "Restoring conversation state"
459            );
460
461            command_tx
462                .send(AppCommand::RestoreConversation {
463                    messages: session.state.messages.clone(),
464                    approved_tools: session.state.approved_tools.clone(),
465                    approved_bash_patterns: session.state.approved_bash_patterns.clone(),
466                    active_message_id: session.state.active_message_id.clone(),
467                })
468                .await
469                .map_err(|_| SessionManagerError::CreationFailed {
470                    message: "Failed to send restore command to App".to_string(),
471                })?;
472        }
473
474        // Add to active sessions
475        {
476            let mut sessions = self.active_sessions.write().await;
477            sessions.insert(session_id.to_string(), managed_session);
478        }
479
480        // Get the last event sequence for resume
481        let last_sequence = session.state.last_event_sequence;
482
483        // Emit session resumed event
484        let event = StreamEvent::SessionResumed {
485            session_id: session_id.to_string(),
486            event_offset: last_sequence,
487        };
488        self.emit_event(session_id.to_string(), event).await;
489
490        info!(session_id = %session_id, last_sequence = last_sequence, "Session resumed");
491        Ok(command_tx)
492    }
493
494    /// Suspend a session (save to storage and deactivate)
495    pub async fn suspend_session(&self, session_id: &str) -> Result<bool> {
496        let managed_session = {
497            let mut sessions = self.active_sessions.write().await;
498            sessions.remove(session_id)
499        };
500
501        let managed_session = match managed_session {
502            Some(session) => session,
503            None => {
504                debug!(session_id = %session_id, "Session not active, cannot suspend");
505                return Ok(false);
506            }
507        };
508
509        info!(session_id = %session_id, "Suspending session");
510
511        // Save current state to store
512        self.store.update_session(&managed_session.session).await?;
513
514        // Emit session saved event
515        let event = StreamEvent::SessionSaved {
516            session_id: session_id.to_string(),
517        };
518        self.emit_event(session_id.to_string(), event).await;
519
520        info!(session_id = %session_id, "Session suspended and saved");
521        Ok(true)
522    }
523
524    /// Delete a session permanently
525    pub async fn delete_session(&self, session_id: &str) -> Result<bool> {
526        // Remove from active sessions first
527        {
528            let mut sessions = self.active_sessions.write().await;
529            sessions.remove(session_id);
530        }
531
532        // Delete from store
533        self.store.delete_session(session_id).await?;
534
535        info!(session_id = %session_id, "Session deleted");
536        Ok(true)
537    }
538
539    /// List sessions with filtering
540    pub async fn list_sessions(&self, filter: SessionFilter) -> Result<Vec<SessionInfo>> {
541        Ok(self.store.list_sessions(filter).await?)
542    }
543
544    /// Get active session IDs
545    pub async fn get_active_sessions(&self) -> Vec<String> {
546        let sessions = self.active_sessions.read().await;
547        sessions.keys().cloned().collect()
548    }
549
550    /// Check if a session is active
551    pub async fn is_session_active(&self, session_id: &str) -> bool {
552        let sessions = self.active_sessions.read().await;
553        sessions.contains_key(session_id)
554    }
555
556    /// Send a command to an active session
557    pub async fn send_command(&self, session_id: &str, command: AppCommand) -> Result<()> {
558        let sessions = self.active_sessions.read().await;
559        if let Some(managed_session) = sessions.get(session_id) {
560            managed_session.command_tx.send(command).await.map_err(|_| {
561                Error::SessionManager(SessionManagerError::SessionNotActive {
562                    session_id: session_id.to_string(),
563                })
564            })
565        } else {
566            Err(Error::SessionManager(
567                SessionManagerError::SessionNotActive {
568                    session_id: session_id.to_string(),
569                },
570            ))
571        }
572    }
573
574    /// Update session state and persist if auto-persist is enabled
575    pub async fn update_session_state(
576        &self,
577        session_id: &str,
578        update_fn: impl FnOnce(&mut SessionState),
579    ) -> Result<()> {
580        {
581            let mut sessions = self.active_sessions.write().await;
582            if let Some(managed_session) = sessions.get_mut(session_id) {
583                managed_session.touch();
584                update_fn(&mut managed_session.session.state);
585                managed_session.session.update_timestamp();
586
587                // Auto-persist if enabled
588                if self.config.auto_persist {
589                    self.store.update_session(&managed_session.session).await?;
590                }
591            } else {
592                return Err(SessionManagerError::SessionNotActive {
593                    session_id: session_id.to_string(),
594                }
595                .into());
596            }
597        }
598
599        Ok(())
600    }
601
602    /// Emit an event for a session
603    pub async fn emit_event(&self, session_id: String, event: StreamEvent) {
604        // Get next sequence number and store event
605        let sequence_num = match self.store.append_event(&session_id, &event).await {
606            Ok(seq) => seq,
607            Err(e) => {
608                error!(session_id = %session_id, error = %e, "Failed to persist event");
609                return;
610            }
611        };
612
613        // Update session state with new sequence number
614        if let Err(e) = self
615            .update_session_state(&session_id, |state| {
616                state.last_event_sequence = sequence_num;
617            })
618            .await
619        {
620            error!(session_id = %session_id, error = %e, "Failed to update session sequence number");
621        }
622    }
623
624    /// Cleanup inactive sessions
625    pub async fn cleanup_inactive_sessions(&self, max_idle_time: chrono::Duration) -> usize {
626        let mut to_suspend = Vec::new();
627
628        {
629            let sessions = self.active_sessions.read().await;
630            for (session_id, managed_session) in sessions.iter() {
631                if managed_session.is_inactive(max_idle_time) {
632                    to_suspend.push(session_id.clone());
633                }
634            }
635        }
636
637        let mut suspended_count = 0;
638        for session_id in to_suspend {
639            if let Ok(true) = self.suspend_session(&session_id).await {
640                suspended_count += 1;
641            }
642        }
643
644        if suspended_count > 0 {
645            info!(
646                suspended_count = suspended_count,
647                "Cleaned up inactive sessions"
648            );
649        }
650
651        suspended_count
652    }
653
654    /// Get session store reference
655    pub fn store(&self) -> &Arc<dyn SessionStore> {
656        &self.store
657    }
658
659    /// Increment the subscriber count for a session
660    pub async fn increment_subscriber_count(&self, session_id: &str) -> Result<()> {
661        let mut sessions = self.active_sessions.write().await;
662        if let Some(managed_session) = sessions.get_mut(session_id) {
663            managed_session.subscriber_count += 1;
664            managed_session.touch();
665            debug!(
666                session_id = %session_id,
667                subscriber_count = managed_session.subscriber_count,
668                "Incremented subscriber count"
669            );
670            Ok(())
671        } else {
672            Err(SessionManagerError::SessionNotActive {
673                session_id: session_id.to_string(),
674            }
675            .into())
676        }
677    }
678
679    /// Decrement the subscriber count for a session
680    pub async fn decrement_subscriber_count(&self, session_id: &str) -> Result<()> {
681        let mut sessions = self.active_sessions.write().await;
682        if let Some(managed_session) = sessions.get_mut(session_id) {
683            managed_session.subscriber_count = managed_session.subscriber_count.saturating_sub(1);
684            managed_session.touch();
685            debug!(
686                session_id = %session_id,
687                subscriber_count = managed_session.subscriber_count,
688                "Decremented subscriber count"
689            );
690            Ok(())
691        } else {
692            // Session might have already been cleaned up
693            debug!(session_id = %session_id, "Session not active when decrementing subscriber count");
694            Ok(())
695        }
696    }
697
698    /// Touch a session to update its last activity timestamp
699    pub async fn touch_session(&self, session_id: &str) -> Result<()> {
700        let mut sessions = self.active_sessions.write().await;
701        if let Some(managed_session) = sessions.get_mut(session_id) {
702            managed_session.touch();
703            Ok(())
704        } else {
705            // Session might have been suspended, which is okay
706            Ok(())
707        }
708    }
709
710    /// Check if a session should be suspended due to no subscribers
711    pub async fn maybe_suspend_idle_session(&self, session_id: &str) -> Result<()> {
712        // Check if session has no subscribers
713        let should_suspend = {
714            let sessions = self.active_sessions.read().await;
715            if let Some(managed_session) = sessions.get(session_id) {
716                managed_session.subscriber_count == 0
717            } else {
718                false // Already suspended or deleted
719            }
720        };
721
722        if should_suspend {
723            info!(session_id = %session_id, "No active subscribers, suspending session");
724            self.suspend_session(session_id).await?;
725        }
726
727        Ok(())
728    }
729
730    /// Get session state for gRPC responses
731    pub async fn get_session_state(
732        &self,
733        session_id: &str,
734    ) -> Result<Option<crate::session::SessionState>> {
735        info!("get_session_state called for session: {}", session_id);
736
737        // Always load from store to get the most up-to-date state
738        // The in-memory state in ManagedSession may be stale
739        match self.store.get_session(session_id).await {
740            Ok(Some(session)) => {
741                info!(
742                    "Loaded session from store with {} messages",
743                    session.state.messages.len()
744                );
745                Ok(Some(session.state))
746            }
747            Ok(None) => {
748                info!("Session not found in store: {}", session_id);
749                Ok(None)
750            }
751            Err(e) => {
752                error!("Error loading session from store: {}", e);
753                Err(SessionManagerError::Storage(e).into())
754            }
755        }
756    }
757}
758
759/// Convert AppEvent to StreamEvent, returning None for events that shouldn't be streamed
760fn translate_app_event(app_event: AppEvent) -> Option<StreamEvent> {
761    match app_event {
762        AppEvent::MessageAdded { message, model } => Some(StreamEvent::MessageComplete {
763            message,
764            usage: None,
765            metadata: std::collections::HashMap::new(),
766            model,
767        }),
768
769        AppEvent::MessagePart { id, delta } => Some(StreamEvent::MessagePart {
770            content: delta,
771            message_id: id,
772        }),
773
774        AppEvent::ToolCallStarted {
775            name,
776            id,
777            parameters,
778            model,
779        } => {
780            let tool_call = ToolCall {
781                id: id.clone(),
782                name: name.clone(),
783                parameters,
784            };
785            Some(StreamEvent::ToolCallStarted {
786                tool_call,
787                metadata: std::collections::HashMap::new(),
788                model,
789            })
790        }
791
792        AppEvent::ToolCallCompleted {
793            name: _,
794            result,
795            id,
796            model,
797        } => Some(StreamEvent::ToolCallCompleted {
798            tool_call_id: id,
799            result,
800            metadata: std::collections::HashMap::new(),
801            model,
802        }),
803
804        AppEvent::ToolCallFailed {
805            name: _,
806            error,
807            id,
808            model,
809        } => Some(StreamEvent::ToolCallFailed {
810            tool_call_id: id,
811            error,
812            metadata: std::collections::HashMap::new(),
813            model,
814        }),
815
816        AppEvent::WorkspaceChanged => Some(StreamEvent::WorkspaceChanged),
817
818        AppEvent::WorkspaceFiles { files } => Some(StreamEvent::WorkspaceFiles {
819            files: files.clone(),
820        }),
821
822        AppEvent::Started { id, op } => Some(StreamEvent::OperationStarted {
823            operation_id: id,
824            operation: op,
825        }),
826        AppEvent::Finished { id, outcome } => Some(StreamEvent::OperationCompleted {
827            operation_id: id,
828            outcome,
829        }),
830        AppEvent::OperationCancelled { op_id, info } => {
831            // Use the provided operation ID, or generate a new one if not available
832            let operation_id = op_id.unwrap_or_else(uuid::Uuid::new_v4);
833            Some(StreamEvent::OperationCancelled {
834                operation_id,
835                reason: info.to_string(), // Use Display implementation for reason
836            })
837        }
838
839        // These events don't need to be streamed
840        _ => None,
841    }
842}
843/// Update session state based on a StreamEvent
844async fn update_session_state_for_event(
845    store: &Arc<dyn SessionStore>,
846    session_id: &str,
847    event: &StreamEvent,
848) -> Result<()> {
849    match event {
850        StreamEvent::MessageComplete { message, .. } => {
851            store.append_message(session_id, message).await?;
852
853            // Update tool call if this is a tool result
854            if let crate::app::conversation::MessageData::Tool {
855                tool_use_id,
856                result,
857                ..
858            } = &message.data
859            {
860                let stats = crate::session::ToolExecutionStats::success_typed(
861                    serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
862                    result.variant_name().to_string(),
863                    0, // Duration tracked elsewhere via Started/Finished events
864                );
865                let update = ToolCallUpdate::set_result(stats);
866                store.update_tool_call(tool_use_id, update).await?;
867            }
868        }
869        StreamEvent::ToolCallStarted { tool_call, .. } => {
870            store.create_tool_call(session_id, tool_call).await?;
871        }
872        StreamEvent::ToolCallCompleted {
873            tool_call_id,
874            result,
875            ..
876        } => {
877            let stats = crate::session::ToolExecutionStats::success_typed(
878                serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
879                result.variant_name().to_string(),
880                0,
881            );
882            let update = ToolCallUpdate::set_result(stats);
883            store.update_tool_call(tool_call_id, update).await?;
884
885            // Also add a Tool message with the result
886            // Get parent info from the latest message in the session
887            let messages = store.get_messages(session_id, None).await?;
888            let parent_id = messages.last().map(|m| m.id().to_string());
889
890            let tool_message = ConversationMessage {
891                data: crate::app::conversation::MessageData::Tool {
892                    tool_use_id: tool_call_id.clone(),
893                    result: result.clone(),
894                },
895                timestamp: std::time::SystemTime::now()
896                    .duration_since(std::time::UNIX_EPOCH)
897                    .expect("Time went backwards")
898                    .as_secs(),
899                id: format!("tool_result_{tool_call_id}"),
900                parent_message_id: parent_id,
901            };
902            store.append_message(session_id, &tool_message).await?;
903        }
904        StreamEvent::ToolCallFailed {
905            tool_call_id,
906            error,
907            ..
908        } => {
909            let update = ToolCallUpdate::set_error(error.clone());
910            store.update_tool_call(tool_call_id, update).await?;
911
912            // Also add a Tool message with the error
913            // Get parent info from the latest message in the session
914            let messages = store.get_messages(session_id, None).await?;
915            let parent_id = messages.last().map(|m| m.id().to_string());
916
917            let tool_error = steer_tools::error::ToolError::Execution {
918                tool_name: "unknown".to_string(), // We don't have the tool name here
919                message: error.clone(),
920            };
921            let tool_message = ConversationMessage {
922                data: MessageData::Tool {
923                    tool_use_id: tool_call_id.clone(),
924                    result: ToolResult::Error(tool_error),
925                },
926                timestamp: std::time::SystemTime::now()
927                    .duration_since(std::time::UNIX_EPOCH)
928                    .expect("Time went backwards")
929                    .as_secs(),
930                id: format!("tool_result_{tool_call_id}"),
931                parent_message_id: parent_id,
932            };
933            store.append_message(session_id, &tool_message).await?;
934        }
935        // Other events don't directly modify stored state
936        _ => {}
937    }
938    Ok(())
939}
940
941#[cfg(test)]
942mod tests {
943    use super::*;
944    use crate::api::ToolCall;
945    use crate::app::MessageData;
946    use crate::app::conversation::{AssistantContent, Role, UserContent};
947    use crate::session::stores::sqlite::SqliteSessionStore;
948    use tempfile::TempDir;
949
950    async fn create_test_manager() -> (SessionManager, TempDir) {
951        let temp_dir = TempDir::new().unwrap();
952        let db_path = temp_dir.path().join("test.db");
953        let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
954
955        let config = SessionManagerConfig {
956            max_concurrent_sessions: 100,
957            default_model: Model::default(),
958            auto_persist: true,
959        };
960        let manager = SessionManager::new(store, config);
961
962        (manager, temp_dir)
963    }
964
965    fn create_test_app_config() -> AppConfig {
966        crate::test_utils::test_app_config()
967    }
968
969    #[tokio::test]
970    async fn test_create_and_resume_session() {
971        let (manager, temp) = create_test_manager().await;
972        let app_config = create_test_app_config();
973
974        // Create session
975        let session_config = SessionConfig {
976            workspace: crate::session::state::WorkspaceConfig::Local {
977                path: temp.path().to_path_buf(),
978            },
979            tool_config: crate::session::SessionToolConfig::default(),
980            system_prompt: None,
981            metadata: std::collections::HashMap::new(),
982        };
983        let (session_id, _command_tx) = manager
984            .create_session(session_config, app_config.clone())
985            .await
986            .unwrap();
987        assert!(!session_id.is_empty());
988        assert!(manager.is_session_active(&session_id).await);
989
990        // Suspend session
991        assert!(manager.suspend_session(&session_id).await.unwrap());
992        assert!(!manager.is_session_active(&session_id).await);
993
994        // Resume Session
995        let _command_tx = manager
996            .resume_session(&session_id, app_config)
997            .await
998            .unwrap();
999        assert!(manager.is_session_active(&session_id).await);
1000    }
1001
1002    #[tokio::test]
1003    async fn test_session_cleanup() {
1004        let (manager, temp) = create_test_manager().await;
1005        let app_config = create_test_app_config();
1006
1007        // Create session
1008        let session_config = SessionConfig {
1009            workspace: crate::session::state::WorkspaceConfig::Local {
1010                path: temp.path().to_path_buf(),
1011            },
1012            tool_config: crate::session::SessionToolConfig::default(),
1013            system_prompt: None,
1014            metadata: std::collections::HashMap::new(),
1015        };
1016        let (session_id, _command_tx) = manager
1017            .create_session(session_config, app_config)
1018            .await
1019            .unwrap();
1020
1021        // Make session appear inactive by manipulating last_activity
1022        {
1023            let mut sessions = manager.active_sessions.write().await;
1024            if let Some(session) = sessions.get_mut(&session_id) {
1025                session.last_activity = chrono::Utc::now() - chrono::Duration::hours(2);
1026                session.subscriber_count = 0;
1027            }
1028        }
1029
1030        // Cleanup should suspend the session
1031        let cleaned = manager
1032            .cleanup_inactive_sessions(chrono::Duration::hours(1))
1033            .await;
1034        assert_eq!(cleaned, 1);
1035        assert!(!manager.is_session_active(&session_id).await);
1036    }
1037
1038    #[tokio::test]
1039    async fn test_capacity_rejection() {
1040        let temp_dir = TempDir::new().unwrap();
1041        let temp = tempfile::TempDir::new().unwrap();
1042        let db_path = temp_dir.path().join("test.db");
1043        let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
1044
1045        let config = SessionManagerConfig {
1046            max_concurrent_sessions: 1, // Set to 1 for testing
1047            default_model: Model::default(),
1048            auto_persist: true,
1049        };
1050        let manager = SessionManager::new(store, config);
1051        let app_config = create_test_app_config();
1052
1053        // Create first session - should succeed
1054        let tool_config = crate::session::SessionToolConfig {
1055            approval_policy: crate::session::ToolApprovalPolicy::AlwaysAsk,
1056            ..Default::default()
1057        };
1058
1059        let session_config = SessionConfig {
1060            workspace: crate::session::state::WorkspaceConfig::Local {
1061                path: temp.path().to_path_buf(),
1062            },
1063            tool_config,
1064            system_prompt: None,
1065            metadata: std::collections::HashMap::new(),
1066        };
1067        let (session_id1, _command_tx) = manager
1068            .create_session(session_config.clone(), app_config.clone())
1069            .await
1070            .unwrap();
1071        assert!(!session_id1.is_empty());
1072
1073        // Create second session - should fail with capacity error
1074        let result = manager.create_session(session_config, app_config).await;
1075
1076        assert!(result.is_err());
1077        assert!(matches!(
1078            result,
1079            Err(crate::error::Error::SessionManager(
1080                SessionManagerError::CapacityExceeded { .. }
1081            ))
1082        ));
1083        match result.unwrap_err() {
1084            crate::error::Error::SessionManager(SessionManagerError::CapacityExceeded {
1085                current,
1086                max,
1087            }) => {
1088                assert_eq!(current, 1);
1089                assert_eq!(max, 1);
1090            }
1091            _ => unreachable!(),
1092        }
1093    }
1094
1095    #[tokio::test]
1096    async fn test_tool_result_persistence_on_resume() {
1097        let (manager, temp) = create_test_manager().await;
1098        let app_config = create_test_app_config();
1099
1100        // Create session
1101        let session_config = SessionConfig {
1102            workspace: crate::session::state::WorkspaceConfig::Local {
1103                path: temp.path().to_path_buf(),
1104            },
1105            tool_config: crate::session::SessionToolConfig::default(),
1106            system_prompt: None,
1107            metadata: std::collections::HashMap::new(),
1108        };
1109        let (session_id, _command_tx) = manager
1110            .create_session(session_config, app_config.clone())
1111            .await
1112            .unwrap();
1113
1114        // Simulate adding messages including tool calls and results
1115        // First, add a user message
1116        let user_message = ConversationMessage {
1117            data: crate::app::conversation::MessageData::User {
1118                content: vec![UserContent::Text {
1119                    text: "Read the file test.txt".to_string(),
1120                }],
1121            },
1122            timestamp: 123456789,
1123            id: "user_1".to_string(),
1124            parent_message_id: None,
1125        };
1126        manager
1127            .store
1128            .append_message(&session_id, &user_message)
1129            .await
1130            .unwrap();
1131
1132        // Add an assistant message with a tool call
1133        let assistant_message = ConversationMessage {
1134            data: crate::app::conversation::MessageData::Assistant {
1135                content: vec![
1136                    AssistantContent::Text {
1137                        text: "I'll read that file for you.".to_string(),
1138                    },
1139                    AssistantContent::ToolCall {
1140                        tool_call: ToolCall {
1141                            id: "tool_call_1".to_string(),
1142                            name: "read_file".to_string(),
1143                            parameters: serde_json::json!({"path": "test.txt"}),
1144                        },
1145                    },
1146                ],
1147            },
1148            timestamp: 123456790,
1149            id: "assistant_1".to_string(),
1150            parent_message_id: Some("user_1".to_string()),
1151        };
1152        manager
1153            .store
1154            .append_message(&session_id, &assistant_message)
1155            .await
1156            .unwrap();
1157
1158        // Simulate tool result directly by creating and storing the tool call manually
1159        // This mimics what would happen during actual tool execution
1160        let tool_call = ToolCall {
1161            id: "tool_call_1".to_string(),
1162            name: "read_file".to_string(),
1163            parameters: serde_json::json!({"path": "test.txt"}),
1164        };
1165        manager
1166            .store
1167            .create_tool_call(&session_id, &tool_call)
1168            .await
1169            .unwrap();
1170
1171        // Now update with the result
1172        let stats = crate::session::ToolExecutionStats::success_typed(
1173            serde_json::json!({
1174                "content": "File contents: Hello, world!",
1175                "file_path": "test.txt",
1176                "line_count": 1,
1177                "truncated": false
1178            }),
1179            "FileContent".to_string(),
1180            0,
1181        );
1182        let update = ToolCallUpdate::set_result(stats);
1183        manager
1184            .store
1185            .update_tool_call("tool_call_1", update)
1186            .await
1187            .unwrap();
1188
1189        // Also add a Tool message with the result
1190        let tool_message = ConversationMessage {
1191            data: MessageData::Tool {
1192                tool_use_id: "tool_call_1".to_string(),
1193                result: ToolResult::FileContent(steer_tools::result::FileContentResult {
1194                    content: "File contents: Hello, world!".to_string(),
1195                    file_path: "test.txt".to_string(),
1196                    line_count: 1,
1197                    truncated: false,
1198                }),
1199            },
1200            timestamp: 123456790,
1201            id: "tool_result_tool_call_1".to_string(),
1202            parent_message_id: Some("assistant_1".to_string()),
1203        };
1204        manager
1205            .store
1206            .append_message(&session_id, &tool_message)
1207            .await
1208            .unwrap();
1209
1210        // Add a follow-up assistant message
1211        let followup_message = ConversationMessage {
1212            data: crate::app::conversation::MessageData::Assistant {
1213                content: vec![AssistantContent::Text {
1214                    text: "The file contains: Hello, world!".to_string(),
1215                }],
1216            },
1217            timestamp: 123456791,
1218            id: "assistant_2".to_string(),
1219            parent_message_id: Some("assistant_1".to_string()),
1220        };
1221        manager
1222            .store
1223            .append_message(&session_id, &followup_message)
1224            .await
1225            .unwrap();
1226
1227        // Suspend the session
1228        manager.suspend_session(&session_id).await.unwrap();
1229
1230        // Load the session from storage and verify tool result message exists
1231        let loaded_session = manager
1232            .store
1233            .get_session(&session_id)
1234            .await
1235            .unwrap()
1236            .unwrap();
1237
1238        // Should have 4 messages: user, assistant with tool call, tool result, followup
1239        assert_eq!(loaded_session.state.messages.len(), 4);
1240
1241        // Verify the tool result message exists and has correct content
1242        let tool_result_msg = &loaded_session.state.messages[2];
1243        assert_eq!(tool_result_msg.role(), Role::Tool);
1244
1245        // Verify the content structure
1246        assert!(matches!(
1247            &tool_result_msg.data,
1248            crate::app::conversation::MessageData::Tool { .. }
1249        ));
1250        if let crate::app::conversation::MessageData::Tool {
1251            tool_use_id,
1252            result,
1253            ..
1254        } = &tool_result_msg.data
1255        {
1256            assert_eq!(tool_use_id, "tool_call_1");
1257            assert!(matches!(
1258                result,
1259                crate::app::conversation::ToolResult::FileContent(_)
1260            ));
1261            match result {
1262                crate::app::conversation::ToolResult::FileContent(content) => {
1263                    assert!(content.content.contains("Hello, world!"));
1264                }
1265                _ => unreachable!(),
1266            }
1267        } else {
1268            panic!("Expected Tool message");
1269        }
1270
1271        // Now test resuming the session - it should work without API errors
1272        let _command_tx = manager
1273            .resume_session(&session_id, app_config)
1274            .await
1275            .unwrap();
1276
1277        // The session should be properly restored with all messages including tool results
1278        // If the bug were still present, trying to send a new message would fail with the
1279        // "tool_use ids were found without tool_result blocks" error from the API
1280    }
1281
1282    #[tokio::test]
1283    async fn test_active_message_id_persistence() {
1284        let (manager, temp) = create_test_manager().await;
1285        let app_config = create_test_app_config();
1286
1287        // Create session
1288        let session_config = SessionConfig {
1289            workspace: crate::session::state::WorkspaceConfig::Local {
1290                path: temp.path().to_path_buf(),
1291            },
1292            tool_config: crate::session::SessionToolConfig::default(),
1293            system_prompt: None,
1294            metadata: std::collections::HashMap::new(),
1295        };
1296        let (session_id, _command_tx) = manager
1297            .create_session(session_config, app_config.clone())
1298            .await
1299            .unwrap();
1300
1301        // Add some messages directly to the store (simulating a conversation with branches)
1302        let msg1 = ConversationMessage {
1303            data: crate::app::conversation::MessageData::User {
1304                content: vec![UserContent::Text {
1305                    text: "Hello".to_string(),
1306                }],
1307            },
1308            timestamp: 1000,
1309            id: "msg1".to_string(),
1310            parent_message_id: None,
1311        };
1312
1313        let msg2 = ConversationMessage {
1314            data: crate::app::conversation::MessageData::Assistant {
1315                content: vec![AssistantContent::Text {
1316                    text: "Hi there!".to_string(),
1317                }],
1318            },
1319            timestamp: 2000,
1320            id: "msg2".to_string(),
1321            parent_message_id: Some("msg1".to_string()),
1322        };
1323
1324        // Add a branch - edited version of msg1
1325        let msg1_edited = ConversationMessage {
1326            data: crate::app::conversation::MessageData::User {
1327                content: vec![UserContent::Text {
1328                    text: "Goodbye".to_string(),
1329                }],
1330            },
1331            timestamp: 3000,
1332            id: "msg1_edited".to_string(),
1333            parent_message_id: None, // Same parent as original msg1
1334        };
1335
1336        // Store messages
1337        manager
1338            .store
1339            .append_message(&session_id, &msg1)
1340            .await
1341            .unwrap();
1342        manager
1343            .store
1344            .append_message(&session_id, &msg2)
1345            .await
1346            .unwrap();
1347        manager
1348            .store
1349            .append_message(&session_id, &msg1_edited)
1350            .await
1351            .unwrap();
1352
1353        // Update active message ID to the edited branch
1354        manager
1355            .store
1356            .update_active_message_id(&session_id, Some("msg1_edited"))
1357            .await
1358            .unwrap();
1359
1360        // Suspend the session
1361        manager.suspend_session(&session_id).await.unwrap();
1362
1363        // Load the session back and verify active_message_id was persisted
1364        let loaded_session = manager
1365            .store
1366            .get_session(&session_id)
1367            .await
1368            .unwrap()
1369            .unwrap();
1370
1371        // The active_message_id should be set to the edited message
1372        assert_eq!(
1373            loaded_session.state.active_message_id,
1374            Some("msg1_edited".to_string())
1375        );
1376
1377        // Should have 3 messages total (original 2 + edited version)
1378        assert_eq!(loaded_session.state.messages.len(), 3);
1379
1380        // Verify the edited message exists and has correct content
1381        let edited_msg = loaded_session
1382            .state
1383            .messages
1384            .iter()
1385            .find(|m| m.id() == "msg1_edited")
1386            .expect("Edited message should exist");
1387
1388        match &edited_msg.data {
1389            crate::app::conversation::MessageData::User { content, .. } => {
1390                if let Some(UserContent::Text { text }) = content.first() {
1391                    assert_eq!(text, "Goodbye");
1392                } else {
1393                    panic!("Expected text content");
1394                }
1395            }
1396            _ => panic!("Expected user message"),
1397        }
1398
1399        // Resume the session and verify the active_message_id is still correct
1400        let _ = manager
1401            .resume_session(&session_id, app_config)
1402            .await
1403            .unwrap();
1404
1405        // Get the state through the manager's API
1406        let state = manager
1407            .get_session_state(&session_id)
1408            .await
1409            .unwrap()
1410            .unwrap();
1411        assert_eq!(state.active_message_id, Some("msg1_edited".to_string()));
1412    }
1413}