1use crate::app::{
2 App, AppCommand, AppConfig, AppEvent, Conversation, Message as ConversationMessage, MessageData,
3};
4use crate::config::model::ModelId;
5use crate::error::{Error, Result};
6use crate::events::StreamEvent;
7use crate::session::{
8 Session, SessionConfig, SessionFilter, SessionInfo, SessionState, SessionStore,
9 SessionStoreError, ToolCallUpdate,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use steer_tools::{ToolCall, ToolResult};
14use thiserror::Error;
15use tokio::sync::{RwLock, mpsc};
16use tokio::task::JoinHandle;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Error)]
21pub enum SessionManagerError {
22 #[error("Maximum session capacity reached ({current}/{max}). Cannot create new session.")]
23 CapacityExceeded { current: usize, max: usize },
24
25 #[error("Session not active: {session_id}")]
26 SessionNotActive { session_id: String },
27
28 #[error("Session {session_id} already has an active listener")]
29 SessionAlreadyHasListener { session_id: String },
30
31 #[error("Failed to create managed session: {message}")]
32 CreationFailed { message: String },
33
34 #[error(transparent)]
35 Storage(#[from] SessionStoreError),
36}
37
38#[derive(Debug, Clone)]
40pub struct SessionManagerConfig {
41 pub max_concurrent_sessions: usize,
43 pub default_model: ModelId,
45 pub auto_persist: bool,
47}
48
49pub struct ManagedSession {
51 pub session: Session,
53 pub command_tx: mpsc::Sender<AppCommand>,
55 pub event_rx: Option<mpsc::Receiver<AppEvent>>,
57 pub subscriber_count: usize,
59 pub last_activity: chrono::DateTime<chrono::Utc>,
61 pub app_task_handle: JoinHandle<()>,
63 pub event_task_handle: JoinHandle<()>,
65}
66
67impl ManagedSession {
68 pub async fn new(
70 mut session: Session,
71 app_config: AppConfig,
72 store: Arc<dyn SessionStore>,
73 default_model: ModelId,
74 conversation: Option<Conversation>,
75 ) -> Result<Self> {
76 let (app_event_tx, mut app_event_rx) = mpsc::channel(100);
78 let (app_command_tx, app_command_rx) = mpsc::channel::<AppCommand>(32);
79
80 let (external_event_tx, external_event_rx) = mpsc::channel(100);
82
83 crate::app::OpContext::init_command_tx(app_command_tx.clone());
85
86 let workspace = session.build_workspace().await?;
88
89 let (backend_registry, mcp_servers) = session
91 .config
92 .build_registry(
93 Arc::new(app_config.llm_config_provider.clone()),
94 workspace.clone(),
95 )
96 .await?;
97
98 session.state.mcp_servers = mcp_servers;
100
101 let tool_executor = Arc::new(crate::tools::ToolExecutor::with_all_components(
102 workspace.clone(),
103 Arc::new(backend_registry),
104 Arc::new(crate::app::validation::ValidatorRegistry::new()),
105 app_config.llm_config_provider.clone(),
106 ));
107
108 let mut app = if let Some(conv) = conversation {
110 App::new_with_conversation(
111 app_config.clone(),
112 app_event_tx,
113 default_model.clone(),
114 workspace.clone(),
115 tool_executor,
116 Some(session.config.clone()),
117 conv,
118 )
119 .await?
120 } else {
121 App::new(
122 app_config.clone(),
123 app_event_tx,
124 default_model,
125 workspace.clone(),
126 tool_executor,
127 Some(session.config.clone()),
128 )
129 .await?
130 };
131
132 if let Some(model_str) = session.config.metadata.get("initial_model") {
134 if let Ok(model_id) = app_config.model_registry.resolve(model_str) {
136 let _ = app.set_model(model_id).await;
137 }
138 }
139
140 let app_task_handle = tokio::spawn(crate::app::app_actor_loop(app, app_command_rx));
142
143 let session_id = session.id.clone();
145 let store_clone = store.clone();
146
147 let event_task_handle = tokio::spawn(async move {
148 while let Some(app_event) = app_event_rx.recv().await {
149 if let Err(e) = external_event_tx.try_send(app_event.clone()) {
151 warn!(session_id = %session_id, "Failed to send event to external consumer: {}", e);
152 }
153
154 if let AppEvent::ActiveMessageIdChanged { message_id } = &app_event {
156 if let Err(e) = store_clone
157 .update_active_message_id(&session_id, message_id.as_deref())
158 .await
159 {
160 error!(session_id = %session_id, error = %e, "Failed to update active message ID");
161 }
162 }
163
164 if let Some(stream_event) = translate_app_event(app_event) {
166 if let Ok(_sequence_num) =
168 store_clone.append_event(&session_id, &stream_event).await
169 {
170 if let Err(e) =
172 update_session_state_for_event(&store_clone, &session_id, &stream_event)
173 .await
174 {
175 error!(session_id = %session_id, error = %e, "Failed to update session state");
176 }
177 }
178 }
179 }
180 info!(session_id = %session_id, "Event translation loop ended");
181 });
182
183 Ok(Self {
184 session,
185 command_tx: app_command_tx,
186 event_rx: Some(external_event_rx),
187 subscriber_count: 0,
188 last_activity: chrono::Utc::now(),
189 app_task_handle,
190 event_task_handle,
191 })
192 }
193
194 pub fn take_event_rx(&mut self) -> Option<mpsc::Receiver<AppEvent>> {
196 self.event_rx.take()
197 }
198
199 pub fn touch(&mut self) {
201 self.last_activity = chrono::Utc::now();
202 }
203
204 pub fn is_inactive(&self, max_idle_time: chrono::Duration) -> bool {
206 self.subscriber_count == 0 && chrono::Utc::now() - self.last_activity > max_idle_time
207 }
208
209 pub async fn shutdown(self) {
211 let _ = self.command_tx.send(AppCommand::Shutdown).await;
213
214 let _ = self.app_task_handle.await;
216 let _ = self.event_task_handle.await;
217 }
218}
219
220pub struct SessionManager {
222 active_sessions: Arc<RwLock<HashMap<String, ManagedSession>>>,
224 store: Arc<dyn SessionStore>,
226 config: SessionManagerConfig,
228}
229
230impl SessionManager {
231 pub fn new(store: Arc<dyn SessionStore>, config: SessionManagerConfig) -> Self {
233 Self {
234 active_sessions: Arc::new(RwLock::new(HashMap::new())),
235 store,
236 config,
237 }
238 }
239
240 pub async fn create_session(
242 &self,
243 config: SessionConfig,
244 app_config: AppConfig,
245 ) -> Result<(String, mpsc::Sender<AppCommand>)> {
246 let session_config = config;
247
248 let session = self.store.create_session(session_config).await?;
250 let session_id = session.id.clone();
251
252 info!(session_id = %session_id, "Creating new session");
253
254 {
256 let sessions = self.active_sessions.read().await;
257 if sessions.len() >= self.config.max_concurrent_sessions {
258 error!(
259 session_id = %session_id,
260 active_count = sessions.len(),
261 max_capacity = self.config.max_concurrent_sessions,
262 "Session creation rejected: at maximum capacity"
263 );
264 return Err(SessionManagerError::CapacityExceeded {
265 current: sessions.len(),
266 max: self.config.max_concurrent_sessions,
267 }
268 .into());
269 }
270 }
271
272 let managed_session = ManagedSession::new(
274 session.clone(),
275 app_config,
276 self.store.clone(),
277 self.config.default_model.clone(),
278 None,
279 )
280 .await
281 .map_err(|e| SessionManagerError::CreationFailed {
282 message: format!("Failed to create managed session: {e}"),
283 })?;
284
285 let command_tx = managed_session.command_tx.clone();
287
288 {
290 let mut sessions = self.active_sessions.write().await;
291 sessions.insert(session_id.clone(), managed_session);
292 }
293
294 let metadata = crate::events::SessionMetadata::from(&SessionInfo::from(&session));
296 let event = StreamEvent::SessionCreated {
297 session_id: session_id.clone(),
298 metadata,
299 };
300 self.emit_event(session_id.clone(), event).await;
301
302 info!(session_id = %session_id, "Session created and activated");
303 Ok((session_id, command_tx))
304 }
305
306 pub async fn take_event_receiver(&self, session_id: &str) -> Result<mpsc::Receiver<AppEvent>> {
308 let mut sessions = self.active_sessions.write().await;
309 match sessions.get_mut(session_id) {
310 Some(managed_session) => {
311 if let Some(receiver) = managed_session.take_event_rx() {
312 Ok(receiver)
313 } else {
314 Err(SessionManagerError::SessionAlreadyHasListener {
315 session_id: session_id.to_string(),
316 }
317 .into())
318 }
319 }
320 None => Err(SessionManagerError::SessionNotActive {
321 session_id: session_id.to_string(),
322 }
323 .into()),
324 }
325 }
326
327 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionInfo>> {
329 {
331 let sessions = self.active_sessions.read().await;
332 if let Some(managed_session) = sessions.get(session_id) {
333 return Ok(Some(SessionInfo::from(&managed_session.session)));
334 }
335 }
336
337 if let Some(session) = self.store.get_session(session_id).await? {
339 Ok(Some(SessionInfo::from(&session)))
340 } else {
341 Ok(None)
342 }
343 }
344
345 pub async fn get_session_workspace(
347 &self,
348 session_id: &str,
349 ) -> Result<Option<Arc<dyn crate::workspace::Workspace>>> {
350 {
352 let active_sessions = self.active_sessions.read().await;
353 if let Some(managed_session) = active_sessions.get(session_id) {
354 return Ok(Some(
356 managed_session
357 .session
358 .build_workspace()
359 .await
360 .map_err(|e| SessionManagerError::CreationFailed {
361 message: format!("Failed to build workspace: {e}"),
362 })?,
363 ));
364 }
365 }
366
367 if let Some(session_info) = self.store.get_session(session_id).await? {
369 let session = session_info;
370 Ok(Some(session.build_workspace().await.map_err(|e| {
371 SessionManagerError::CreationFailed {
372 message: format!("Failed to build workspace: {e}"),
373 }
374 })?))
375 } else {
376 Ok(None)
377 }
378 }
379
380 pub async fn resume_session(
382 &self,
383 session_id: &str,
384 app_config: AppConfig,
385 ) -> Result<mpsc::Sender<AppCommand>> {
386 {
388 let sessions = self.active_sessions.read().await;
389 if let Some(managed_session) = sessions.get(session_id) {
390 debug!(session_id = %session_id, "Session already active");
391 return Ok(managed_session.command_tx.clone());
392 }
393 }
394
395 let session = match self
397 .store
398 .get_session(session_id)
399 .await
400 .map_err(SessionManagerError::Storage)?
401 {
402 Some(session) => session,
403 None => {
404 debug!(session_id = %session_id, "Session not found in store");
405 return Err(SessionManagerError::SessionNotActive {
406 session_id: session_id.to_string(),
407 }
408 .into());
409 }
410 };
411
412 info!(session_id = %session_id, "Resuming session from storage");
413
414 {
416 let sessions = self.active_sessions.read().await;
417 if sessions.len() >= self.config.max_concurrent_sessions {
418 warn!(
419 session_id = %session_id,
420 active_count = sessions.len(),
421 max_capacity = self.config.max_concurrent_sessions,
422 "At maximum session capacity for resume"
423 );
424 }
426 }
427
428 let conversation = Conversation {
430 messages: session.state.messages.clone(),
431 working_directory: session
432 .config
433 .workspace
434 .get_path()
435 .unwrap_or_default()
436 .into(),
437 active_message_id: session.state.active_message_id.clone(),
438 };
439
440 let managed_session = ManagedSession::new(
442 session.clone(),
443 app_config,
444 self.store.clone(),
445 self.config.default_model.clone(),
446 Some(conversation),
447 )
448 .await
449 .map_err(|e| SessionManagerError::CreationFailed {
450 message: format!("Failed to create managed session: {e}"),
451 })?;
452
453 let command_tx = managed_session.command_tx.clone();
455
456 if !session.state.messages.is_empty() || !session.state.approved_tools.is_empty() {
458 info!(
459 session_id = %session_id,
460 message_count = session.state.messages.len(),
461 tool_count = session.state.approved_tools.len(),
462 "Restoring conversation state"
463 );
464
465 command_tx
466 .send(AppCommand::RestoreConversation {
467 messages: session.state.messages.clone(),
468 approved_tools: session.state.approved_tools.clone(),
469 approved_bash_patterns: session.state.approved_bash_patterns.clone(),
470 active_message_id: session.state.active_message_id.clone(),
471 })
472 .await
473 .map_err(|_| SessionManagerError::CreationFailed {
474 message: "Failed to send restore command to App".to_string(),
475 })?;
476 }
477
478 {
480 let mut sessions = self.active_sessions.write().await;
481 sessions.insert(session_id.to_string(), managed_session);
482 }
483
484 let last_sequence = session.state.last_event_sequence;
486
487 let event = StreamEvent::SessionResumed {
489 session_id: session_id.to_string(),
490 event_offset: last_sequence,
491 };
492 self.emit_event(session_id.to_string(), event).await;
493
494 info!(session_id = %session_id, last_sequence = last_sequence, "Session resumed");
495 Ok(command_tx)
496 }
497
498 pub async fn suspend_session(&self, session_id: &str) -> Result<bool> {
500 let managed_session = {
501 let mut sessions = self.active_sessions.write().await;
502 sessions.remove(session_id)
503 };
504
505 let managed_session = match managed_session {
506 Some(session) => session,
507 None => {
508 debug!(session_id = %session_id, "Session not active, cannot suspend");
509 return Ok(false);
510 }
511 };
512
513 info!(session_id = %session_id, "Suspending session");
514
515 self.store.update_session(&managed_session.session).await?;
517
518 let event = StreamEvent::SessionSaved {
520 session_id: session_id.to_string(),
521 };
522 self.emit_event(session_id.to_string(), event).await;
523
524 info!(session_id = %session_id, "Session suspended and saved");
525 Ok(true)
526 }
527
528 pub async fn delete_session(&self, session_id: &str) -> Result<bool> {
530 {
532 let mut sessions = self.active_sessions.write().await;
533 sessions.remove(session_id);
534 }
535
536 self.store.delete_session(session_id).await?;
538
539 info!(session_id = %session_id, "Session deleted");
540 Ok(true)
541 }
542
543 pub async fn list_sessions(&self, filter: SessionFilter) -> Result<Vec<SessionInfo>> {
545 Ok(self.store.list_sessions(filter).await?)
546 }
547
548 pub async fn get_active_sessions(&self) -> Vec<String> {
550 let sessions = self.active_sessions.read().await;
551 sessions.keys().cloned().collect()
552 }
553
554 pub async fn is_session_active(&self, session_id: &str) -> bool {
556 let sessions = self.active_sessions.read().await;
557 sessions.contains_key(session_id)
558 }
559
560 pub async fn send_command(&self, session_id: &str, command: AppCommand) -> Result<()> {
562 let sessions = self.active_sessions.read().await;
563 if let Some(managed_session) = sessions.get(session_id) {
564 managed_session.command_tx.send(command).await.map_err(|_| {
565 Error::SessionManager(SessionManagerError::SessionNotActive {
566 session_id: session_id.to_string(),
567 })
568 })
569 } else {
570 Err(Error::SessionManager(
571 SessionManagerError::SessionNotActive {
572 session_id: session_id.to_string(),
573 },
574 ))
575 }
576 }
577
578 pub async fn update_session_state(
580 &self,
581 session_id: &str,
582 update_fn: impl FnOnce(&mut SessionState),
583 ) -> Result<()> {
584 {
585 let mut sessions = self.active_sessions.write().await;
586 if let Some(managed_session) = sessions.get_mut(session_id) {
587 managed_session.touch();
588 update_fn(&mut managed_session.session.state);
589 managed_session.session.update_timestamp();
590
591 if self.config.auto_persist {
593 self.store.update_session(&managed_session.session).await?;
594 }
595 } else {
596 return Err(SessionManagerError::SessionNotActive {
597 session_id: session_id.to_string(),
598 }
599 .into());
600 }
601 }
602
603 Ok(())
604 }
605
606 pub async fn emit_event(&self, session_id: String, event: StreamEvent) {
608 let sequence_num = match self.store.append_event(&session_id, &event).await {
610 Ok(seq) => seq,
611 Err(e) => {
612 error!(session_id = %session_id, error = %e, "Failed to persist event");
613 return;
614 }
615 };
616
617 if let Err(e) = self
619 .update_session_state(&session_id, |state| {
620 state.last_event_sequence = sequence_num;
621 })
622 .await
623 {
624 error!(session_id = %session_id, error = %e, "Failed to update session sequence number");
625 }
626 }
627
628 pub async fn cleanup_inactive_sessions(&self, max_idle_time: chrono::Duration) -> usize {
630 let mut to_suspend = Vec::new();
631
632 {
633 let sessions = self.active_sessions.read().await;
634 for (session_id, managed_session) in sessions.iter() {
635 if managed_session.is_inactive(max_idle_time) {
636 to_suspend.push(session_id.clone());
637 }
638 }
639 }
640
641 let mut suspended_count = 0;
642 for session_id in to_suspend {
643 if let Ok(true) = self.suspend_session(&session_id).await {
644 suspended_count += 1;
645 }
646 }
647
648 if suspended_count > 0 {
649 info!(
650 suspended_count = suspended_count,
651 "Cleaned up inactive sessions"
652 );
653 }
654
655 suspended_count
656 }
657
658 pub fn store(&self) -> &Arc<dyn SessionStore> {
660 &self.store
661 }
662
663 pub async fn increment_subscriber_count(&self, session_id: &str) -> Result<()> {
665 let mut sessions = self.active_sessions.write().await;
666 if let Some(managed_session) = sessions.get_mut(session_id) {
667 managed_session.subscriber_count += 1;
668 managed_session.touch();
669 debug!(
670 session_id = %session_id,
671 subscriber_count = managed_session.subscriber_count,
672 "Incremented subscriber count"
673 );
674 Ok(())
675 } else {
676 Err(SessionManagerError::SessionNotActive {
677 session_id: session_id.to_string(),
678 }
679 .into())
680 }
681 }
682
683 pub async fn decrement_subscriber_count(&self, session_id: &str) -> Result<()> {
685 let mut sessions = self.active_sessions.write().await;
686 if let Some(managed_session) = sessions.get_mut(session_id) {
687 managed_session.subscriber_count = managed_session.subscriber_count.saturating_sub(1);
688 managed_session.touch();
689 debug!(
690 session_id = %session_id,
691 subscriber_count = managed_session.subscriber_count,
692 "Decremented subscriber count"
693 );
694 Ok(())
695 } else {
696 debug!(session_id = %session_id, "Session not active when decrementing subscriber count");
698 Ok(())
699 }
700 }
701
702 pub async fn touch_session(&self, session_id: &str) -> Result<()> {
704 let mut sessions = self.active_sessions.write().await;
705 if let Some(managed_session) = sessions.get_mut(session_id) {
706 managed_session.touch();
707 Ok(())
708 } else {
709 Ok(())
711 }
712 }
713
714 pub async fn maybe_suspend_idle_session(&self, session_id: &str) -> Result<()> {
716 let should_suspend = {
718 let sessions = self.active_sessions.read().await;
719 if let Some(managed_session) = sessions.get(session_id) {
720 managed_session.subscriber_count == 0
721 } else {
722 false }
724 };
725
726 if should_suspend {
727 info!(session_id = %session_id, "No active subscribers, suspending session");
728 self.suspend_session(session_id).await?;
729 }
730
731 Ok(())
732 }
733
734 pub async fn get_session_state(
736 &self,
737 session_id: &str,
738 ) -> Result<Option<crate::session::SessionState>> {
739 info!("get_session_state called for session: {}", session_id);
740
741 match self.store.get_session(session_id).await {
744 Ok(Some(session)) => {
745 info!(
746 "Loaded session from store with {} messages",
747 session.state.messages.len()
748 );
749 Ok(Some(session.state))
750 }
751 Ok(None) => {
752 info!("Session not found in store: {}", session_id);
753 Ok(None)
754 }
755 Err(e) => {
756 error!("Error loading session from store: {}", e);
757 Err(SessionManagerError::Storage(e).into())
758 }
759 }
760 }
761
762 pub async fn get_mcp_statuses(
764 &self,
765 session_id: &str,
766 ) -> Result<Vec<crate::session::McpServerInfo>> {
767 {
769 let sessions = self.active_sessions.read().await;
770 if let Some(managed_session) = sessions.get(session_id) {
771 let servers: Vec<_> = managed_session
772 .session
773 .state
774 .mcp_servers
775 .values()
776 .cloned()
777 .collect();
778 return Ok(servers);
779 }
780 }
781
782 match self.store.get_session(session_id).await? {
784 Some(session) => {
785 let servers: Vec<_> = session.state.mcp_servers.values().cloned().collect();
786 Ok(servers)
787 }
788 None => Err(SessionManagerError::SessionNotActive {
789 session_id: session_id.to_string(),
790 }
791 .into()),
792 }
793 }
794}
795
796fn translate_app_event(app_event: AppEvent) -> Option<StreamEvent> {
798 match app_event {
799 AppEvent::MessageAdded { message, model } => Some(StreamEvent::MessageComplete {
800 message,
801 usage: None,
802 metadata: std::collections::HashMap::new(),
803 model,
804 }),
805
806 AppEvent::MessagePart { id, delta } => Some(StreamEvent::MessagePart {
807 content: delta,
808 message_id: id,
809 }),
810
811 AppEvent::ToolCallStarted {
812 name,
813 id,
814 parameters,
815 model,
816 } => {
817 let tool_call = ToolCall {
818 id: id.clone(),
819 name: name.clone(),
820 parameters,
821 };
822 Some(StreamEvent::ToolCallStarted {
823 tool_call,
824 metadata: std::collections::HashMap::new(),
825 model,
826 })
827 }
828
829 AppEvent::ToolCallCompleted {
830 name: _,
831 result,
832 id,
833 model,
834 } => Some(StreamEvent::ToolCallCompleted {
835 tool_call_id: id,
836 result,
837 metadata: std::collections::HashMap::new(),
838 model,
839 }),
840
841 AppEvent::ToolCallFailed {
842 name: _,
843 error,
844 id,
845 model,
846 } => Some(StreamEvent::ToolCallFailed {
847 tool_call_id: id,
848 error,
849 metadata: std::collections::HashMap::new(),
850 model,
851 }),
852
853 AppEvent::WorkspaceChanged => Some(StreamEvent::WorkspaceChanged),
854
855 AppEvent::WorkspaceFiles { files } => Some(StreamEvent::WorkspaceFiles {
856 files: files.clone(),
857 }),
858
859 AppEvent::Started { id, op } => Some(StreamEvent::OperationStarted {
860 operation_id: id,
861 operation: op,
862 }),
863 AppEvent::Finished { id, outcome } => Some(StreamEvent::OperationCompleted {
864 operation_id: id,
865 outcome,
866 }),
867 AppEvent::OperationCancelled { op_id, info } => {
868 let operation_id = op_id.unwrap_or_else(uuid::Uuid::new_v4);
870 Some(StreamEvent::OperationCancelled {
871 operation_id,
872 reason: info.to_string(), })
874 }
875
876 _ => None,
878 }
879}
880async fn update_session_state_for_event(
882 store: &Arc<dyn SessionStore>,
883 session_id: &str,
884 event: &StreamEvent,
885) -> Result<()> {
886 match event {
887 StreamEvent::MessageComplete { message, .. } => {
888 store.append_message(session_id, message).await?;
889
890 if let crate::app::conversation::MessageData::Tool {
892 tool_use_id,
893 result,
894 ..
895 } = &message.data
896 {
897 let stats = crate::session::ToolExecutionStats::success_typed(
898 serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
899 result.variant_name().to_string(),
900 0, );
902 let update = ToolCallUpdate::set_result(stats);
903 store.update_tool_call(tool_use_id, update).await?;
904 }
905 }
906 StreamEvent::ToolCallStarted { tool_call, .. } => {
907 store.create_tool_call(session_id, tool_call).await?;
908 }
909 StreamEvent::ToolCallCompleted {
910 tool_call_id,
911 result,
912 ..
913 } => {
914 let stats = crate::session::ToolExecutionStats::success_typed(
915 serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
916 result.variant_name().to_string(),
917 0,
918 );
919 let update = ToolCallUpdate::set_result(stats);
920 store.update_tool_call(tool_call_id, update).await?;
921
922 let messages = store.get_messages(session_id, None).await?;
925 let parent_id = messages.last().map(|m| m.id().to_string());
926
927 let tool_message = ConversationMessage {
928 data: crate::app::conversation::MessageData::Tool {
929 tool_use_id: tool_call_id.clone(),
930 result: result.clone(),
931 },
932 timestamp: std::time::SystemTime::now()
933 .duration_since(std::time::UNIX_EPOCH)
934 .expect("Time went backwards")
935 .as_secs(),
936 id: format!("tool_result_{tool_call_id}"),
937 parent_message_id: parent_id,
938 };
939 store.append_message(session_id, &tool_message).await?;
940 }
941 StreamEvent::ToolCallFailed {
942 tool_call_id,
943 error,
944 ..
945 } => {
946 let update = ToolCallUpdate::set_error(error.clone());
947 store.update_tool_call(tool_call_id, update).await?;
948
949 let messages = store.get_messages(session_id, None).await?;
952 let parent_id = messages.last().map(|m| m.id().to_string());
953
954 let tool_error = steer_tools::error::ToolError::Execution {
955 tool_name: "unknown".to_string(), message: error.clone(),
957 };
958 let tool_message = ConversationMessage {
959 data: MessageData::Tool {
960 tool_use_id: tool_call_id.clone(),
961 result: ToolResult::Error(tool_error),
962 },
963 timestamp: std::time::SystemTime::now()
964 .duration_since(std::time::UNIX_EPOCH)
965 .expect("Time went backwards")
966 .as_secs(),
967 id: format!("tool_result_{tool_call_id}"),
968 parent_message_id: parent_id,
969 };
970 store.append_message(session_id, &tool_message).await?;
971 }
972 _ => {}
974 }
975 Ok(())
976}
977
978#[cfg(test)]
979mod tests {
980 use super::*;
981 use crate::app::MessageData;
982 use crate::app::conversation::{AssistantContent, Role, UserContent};
983 use crate::session::stores::sqlite::SqliteSessionStore;
984 use tempfile::TempDir;
985
986 async fn create_test_manager() -> (SessionManager, TempDir) {
987 let temp_dir = TempDir::new().unwrap();
988 let db_path = temp_dir.path().join("test.db");
989 let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
990
991 let config = SessionManagerConfig {
992 max_concurrent_sessions: 100,
993 default_model: crate::config::model::builtin::claude_sonnet_4_20250514(),
994 auto_persist: true,
995 };
996 let manager = SessionManager::new(store, config);
997
998 (manager, temp_dir)
999 }
1000
1001 fn create_test_app_config() -> AppConfig {
1002 crate::test_utils::test_app_config()
1003 }
1004
1005 #[tokio::test]
1006 async fn test_create_and_resume_session() {
1007 let (manager, temp) = create_test_manager().await;
1008 let app_config = create_test_app_config();
1009
1010 let session_config = SessionConfig {
1012 workspace: crate::session::state::WorkspaceConfig::Local {
1013 path: temp.path().to_path_buf(),
1014 },
1015 tool_config: crate::session::SessionToolConfig::default(),
1016 system_prompt: None,
1017 metadata: std::collections::HashMap::new(),
1018 };
1019 let (session_id, _command_tx) = manager
1020 .create_session(session_config, app_config.clone())
1021 .await
1022 .unwrap();
1023 assert!(!session_id.is_empty());
1024 assert!(manager.is_session_active(&session_id).await);
1025
1026 assert!(manager.suspend_session(&session_id).await.unwrap());
1028 assert!(!manager.is_session_active(&session_id).await);
1029
1030 let _command_tx = manager
1032 .resume_session(&session_id, app_config)
1033 .await
1034 .unwrap();
1035 assert!(manager.is_session_active(&session_id).await);
1036 }
1037
1038 #[tokio::test]
1039 async fn test_session_cleanup() {
1040 let (manager, temp) = create_test_manager().await;
1041 let app_config = create_test_app_config();
1042
1043 let session_config = SessionConfig {
1045 workspace: crate::session::state::WorkspaceConfig::Local {
1046 path: temp.path().to_path_buf(),
1047 },
1048 tool_config: crate::session::SessionToolConfig::default(),
1049 system_prompt: None,
1050 metadata: std::collections::HashMap::new(),
1051 };
1052 let (session_id, _command_tx) = manager
1053 .create_session(session_config, app_config)
1054 .await
1055 .unwrap();
1056
1057 {
1059 let mut sessions = manager.active_sessions.write().await;
1060 if let Some(session) = sessions.get_mut(&session_id) {
1061 session.last_activity = chrono::Utc::now() - chrono::Duration::hours(2);
1062 session.subscriber_count = 0;
1063 }
1064 }
1065
1066 let cleaned = manager
1068 .cleanup_inactive_sessions(chrono::Duration::hours(1))
1069 .await;
1070 assert_eq!(cleaned, 1);
1071 assert!(!manager.is_session_active(&session_id).await);
1072 }
1073
1074 #[tokio::test]
1075 async fn test_capacity_rejection() {
1076 let temp_dir = TempDir::new().unwrap();
1077 let temp = tempfile::TempDir::new().unwrap();
1078 let db_path = temp_dir.path().join("test.db");
1079 let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
1080
1081 let config = SessionManagerConfig {
1082 max_concurrent_sessions: 1, default_model: crate::config::model::builtin::claude_sonnet_4_20250514(),
1084 auto_persist: true,
1085 };
1086 let manager = SessionManager::new(store, config);
1087 let app_config = create_test_app_config();
1088
1089 let tool_config = crate::session::SessionToolConfig {
1091 approval_policy: crate::session::ToolApprovalPolicy::AlwaysAsk,
1092 ..Default::default()
1093 };
1094
1095 let session_config = SessionConfig {
1096 workspace: crate::session::state::WorkspaceConfig::Local {
1097 path: temp.path().to_path_buf(),
1098 },
1099 tool_config,
1100 system_prompt: None,
1101 metadata: std::collections::HashMap::new(),
1102 };
1103 let (session_id1, _command_tx) = manager
1104 .create_session(session_config.clone(), app_config.clone())
1105 .await
1106 .unwrap();
1107 assert!(!session_id1.is_empty());
1108
1109 let result = manager.create_session(session_config, app_config).await;
1111
1112 assert!(result.is_err());
1113 assert!(matches!(
1114 result,
1115 Err(crate::error::Error::SessionManager(
1116 SessionManagerError::CapacityExceeded { .. }
1117 ))
1118 ));
1119 match result.unwrap_err() {
1120 crate::error::Error::SessionManager(SessionManagerError::CapacityExceeded {
1121 current,
1122 max,
1123 }) => {
1124 assert_eq!(current, 1);
1125 assert_eq!(max, 1);
1126 }
1127 _ => unreachable!(),
1128 }
1129 }
1130
1131 #[tokio::test]
1132 async fn test_tool_result_persistence_on_resume() {
1133 let (manager, temp) = create_test_manager().await;
1134 let app_config = create_test_app_config();
1135
1136 let session_config = SessionConfig {
1138 workspace: crate::session::state::WorkspaceConfig::Local {
1139 path: temp.path().to_path_buf(),
1140 },
1141 tool_config: crate::session::SessionToolConfig::default(),
1142 system_prompt: None,
1143 metadata: std::collections::HashMap::new(),
1144 };
1145 let (session_id, _command_tx) = manager
1146 .create_session(session_config, app_config.clone())
1147 .await
1148 .unwrap();
1149
1150 let user_message = ConversationMessage {
1153 data: crate::app::conversation::MessageData::User {
1154 content: vec![UserContent::Text {
1155 text: "Read the file test.txt".to_string(),
1156 }],
1157 },
1158 timestamp: 123456789,
1159 id: "user_1".to_string(),
1160 parent_message_id: None,
1161 };
1162 manager
1163 .store
1164 .append_message(&session_id, &user_message)
1165 .await
1166 .unwrap();
1167
1168 let assistant_message = ConversationMessage {
1170 data: crate::app::conversation::MessageData::Assistant {
1171 content: vec![
1172 AssistantContent::Text {
1173 text: "I'll read that file for you.".to_string(),
1174 },
1175 AssistantContent::ToolCall {
1176 tool_call: ToolCall {
1177 id: "tool_call_1".to_string(),
1178 name: "read_file".to_string(),
1179 parameters: serde_json::json!({"path": "test.txt"}),
1180 },
1181 },
1182 ],
1183 },
1184 timestamp: 123456790,
1185 id: "assistant_1".to_string(),
1186 parent_message_id: Some("user_1".to_string()),
1187 };
1188 manager
1189 .store
1190 .append_message(&session_id, &assistant_message)
1191 .await
1192 .unwrap();
1193
1194 let tool_call = ToolCall {
1197 id: "tool_call_1".to_string(),
1198 name: "read_file".to_string(),
1199 parameters: serde_json::json!({"path": "test.txt"}),
1200 };
1201 manager
1202 .store
1203 .create_tool_call(&session_id, &tool_call)
1204 .await
1205 .unwrap();
1206
1207 let stats = crate::session::ToolExecutionStats::success_typed(
1209 serde_json::json!({
1210 "content": "File contents: Hello, world!",
1211 "file_path": "test.txt",
1212 "line_count": 1,
1213 "truncated": false
1214 }),
1215 "FileContent".to_string(),
1216 0,
1217 );
1218 let update = ToolCallUpdate::set_result(stats);
1219 manager
1220 .store
1221 .update_tool_call("tool_call_1", update)
1222 .await
1223 .unwrap();
1224
1225 let tool_message = ConversationMessage {
1227 data: MessageData::Tool {
1228 tool_use_id: "tool_call_1".to_string(),
1229 result: ToolResult::FileContent(steer_tools::result::FileContentResult {
1230 content: "File contents: Hello, world!".to_string(),
1231 file_path: "test.txt".to_string(),
1232 line_count: 1,
1233 truncated: false,
1234 }),
1235 },
1236 timestamp: 123456790,
1237 id: "tool_result_tool_call_1".to_string(),
1238 parent_message_id: Some("assistant_1".to_string()),
1239 };
1240 manager
1241 .store
1242 .append_message(&session_id, &tool_message)
1243 .await
1244 .unwrap();
1245
1246 let followup_message = ConversationMessage {
1248 data: crate::app::conversation::MessageData::Assistant {
1249 content: vec![AssistantContent::Text {
1250 text: "The file contains: Hello, world!".to_string(),
1251 }],
1252 },
1253 timestamp: 123456791,
1254 id: "assistant_2".to_string(),
1255 parent_message_id: Some("assistant_1".to_string()),
1256 };
1257 manager
1258 .store
1259 .append_message(&session_id, &followup_message)
1260 .await
1261 .unwrap();
1262
1263 manager.suspend_session(&session_id).await.unwrap();
1265
1266 let loaded_session = manager
1268 .store
1269 .get_session(&session_id)
1270 .await
1271 .unwrap()
1272 .unwrap();
1273
1274 assert_eq!(loaded_session.state.messages.len(), 4);
1276
1277 let tool_result_msg = &loaded_session.state.messages[2];
1279 assert_eq!(tool_result_msg.role(), Role::Tool);
1280
1281 assert!(matches!(
1283 &tool_result_msg.data,
1284 crate::app::conversation::MessageData::Tool { .. }
1285 ));
1286 if let crate::app::conversation::MessageData::Tool {
1287 tool_use_id,
1288 result,
1289 ..
1290 } = &tool_result_msg.data
1291 {
1292 assert_eq!(tool_use_id, "tool_call_1");
1293 assert!(matches!(
1294 result,
1295 crate::app::conversation::ToolResult::FileContent(_)
1296 ));
1297 match result {
1298 crate::app::conversation::ToolResult::FileContent(content) => {
1299 assert!(content.content.contains("Hello, world!"));
1300 }
1301 _ => unreachable!(),
1302 }
1303 } else {
1304 panic!("Expected Tool message");
1305 }
1306
1307 let _command_tx = manager
1309 .resume_session(&session_id, app_config)
1310 .await
1311 .unwrap();
1312
1313 }
1317
1318 #[tokio::test]
1319 async fn test_active_message_id_persistence() {
1320 let (manager, temp) = create_test_manager().await;
1321 let app_config = create_test_app_config();
1322
1323 let session_config = SessionConfig {
1325 workspace: crate::session::state::WorkspaceConfig::Local {
1326 path: temp.path().to_path_buf(),
1327 },
1328 tool_config: crate::session::SessionToolConfig::default(),
1329 system_prompt: None,
1330 metadata: std::collections::HashMap::new(),
1331 };
1332 let (session_id, _command_tx) = manager
1333 .create_session(session_config, app_config.clone())
1334 .await
1335 .unwrap();
1336
1337 let msg1 = ConversationMessage {
1339 data: crate::app::conversation::MessageData::User {
1340 content: vec![UserContent::Text {
1341 text: "Hello".to_string(),
1342 }],
1343 },
1344 timestamp: 1000,
1345 id: "msg1".to_string(),
1346 parent_message_id: None,
1347 };
1348
1349 let msg2 = ConversationMessage {
1350 data: crate::app::conversation::MessageData::Assistant {
1351 content: vec![AssistantContent::Text {
1352 text: "Hi there!".to_string(),
1353 }],
1354 },
1355 timestamp: 2000,
1356 id: "msg2".to_string(),
1357 parent_message_id: Some("msg1".to_string()),
1358 };
1359
1360 let msg1_edited = ConversationMessage {
1362 data: crate::app::conversation::MessageData::User {
1363 content: vec![UserContent::Text {
1364 text: "Goodbye".to_string(),
1365 }],
1366 },
1367 timestamp: 3000,
1368 id: "msg1_edited".to_string(),
1369 parent_message_id: None, };
1371
1372 manager
1374 .store
1375 .append_message(&session_id, &msg1)
1376 .await
1377 .unwrap();
1378 manager
1379 .store
1380 .append_message(&session_id, &msg2)
1381 .await
1382 .unwrap();
1383 manager
1384 .store
1385 .append_message(&session_id, &msg1_edited)
1386 .await
1387 .unwrap();
1388
1389 manager
1391 .store
1392 .update_active_message_id(&session_id, Some("msg1_edited"))
1393 .await
1394 .unwrap();
1395
1396 manager.suspend_session(&session_id).await.unwrap();
1398
1399 let loaded_session = manager
1401 .store
1402 .get_session(&session_id)
1403 .await
1404 .unwrap()
1405 .unwrap();
1406
1407 assert_eq!(
1409 loaded_session.state.active_message_id,
1410 Some("msg1_edited".to_string())
1411 );
1412
1413 assert_eq!(loaded_session.state.messages.len(), 3);
1415
1416 let edited_msg = loaded_session
1418 .state
1419 .messages
1420 .iter()
1421 .find(|m| m.id() == "msg1_edited")
1422 .expect("Edited message should exist");
1423
1424 match &edited_msg.data {
1425 crate::app::conversation::MessageData::User { content, .. } => {
1426 if let Some(UserContent::Text { text }) = content.first() {
1427 assert_eq!(text, "Goodbye");
1428 } else {
1429 panic!("Expected text content");
1430 }
1431 }
1432 _ => panic!("Expected user message"),
1433 }
1434
1435 let _ = manager
1437 .resume_session(&session_id, app_config)
1438 .await
1439 .unwrap();
1440
1441 let state = manager
1443 .get_session_state(&session_id)
1444 .await
1445 .unwrap()
1446 .unwrap();
1447 assert_eq!(state.active_message_id, Some("msg1_edited".to_string()));
1448 }
1449}