1use crate::api::{Model, ToolCall};
2use crate::app::{
3 App, AppCommand, AppConfig, AppEvent, Conversation, Message as ConversationMessage, MessageData,
4};
5use crate::error::{Error, Result};
6use crate::events::StreamEvent;
7use crate::session::{
8 Session, SessionConfig, SessionFilter, SessionInfo, SessionState, SessionStore,
9 SessionStoreError, ToolCallUpdate,
10};
11use std::collections::HashMap;
12use std::sync::Arc;
13use steer_tools::ToolResult;
14use thiserror::Error;
15use tokio::sync::{RwLock, mpsc};
16use tokio::task::JoinHandle;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Error)]
21pub enum SessionManagerError {
22 #[error("Maximum session capacity reached ({current}/{max}). Cannot create new session.")]
23 CapacityExceeded { current: usize, max: usize },
24
25 #[error("Session not active: {session_id}")]
26 SessionNotActive { session_id: String },
27
28 #[error("Session {session_id} already has an active listener")]
29 SessionAlreadyHasListener { session_id: String },
30
31 #[error("Failed to create managed session: {message}")]
32 CreationFailed { message: String },
33
34 #[error(transparent)]
35 Storage(#[from] SessionStoreError),
36}
37
38#[derive(Debug, Clone)]
40pub struct SessionManagerConfig {
41 pub max_concurrent_sessions: usize,
43 pub default_model: Model,
45 pub auto_persist: bool,
47}
48
49pub struct ManagedSession {
51 pub session: Session,
53 pub command_tx: mpsc::Sender<AppCommand>,
55 pub event_rx: Option<mpsc::Receiver<AppEvent>>,
57 pub subscriber_count: usize,
59 pub last_activity: chrono::DateTime<chrono::Utc>,
61 pub app_task_handle: JoinHandle<()>,
63 pub event_task_handle: JoinHandle<()>,
65}
66
67impl ManagedSession {
68 pub async fn new(
70 session: Session,
71 app_config: AppConfig,
72 store: Arc<dyn SessionStore>,
73 default_model: Model,
74 conversation: Option<Conversation>,
75 ) -> Result<Self> {
76 let (app_event_tx, mut app_event_rx) = mpsc::channel(100);
78 let (app_command_tx, app_command_rx) = mpsc::channel::<AppCommand>(32);
79
80 let (external_event_tx, external_event_rx) = mpsc::channel(100);
82
83 crate::app::OpContext::init_command_tx(app_command_tx.clone());
85
86 let workspace = session.build_workspace().await?;
88
89 let backend_registry = session
91 .config
92 .build_registry(
93 Arc::new(app_config.llm_config_provider.clone()),
94 workspace.clone(),
95 )
96 .await?;
97
98 let tool_executor = Arc::new(crate::app::ToolExecutor::with_all_components(
99 workspace.clone(),
100 Arc::new(backend_registry),
101 Arc::new(crate::app::validation::ValidatorRegistry::new()),
102 app_config.llm_config_provider.clone(),
103 ));
104
105 let mut app = if let Some(conv) = conversation {
107 App::new_with_conversation(
108 app_config,
109 app_event_tx,
110 default_model,
111 workspace.clone(),
112 tool_executor,
113 Some(session.config.clone()),
114 conv,
115 )
116 .await?
117 } else {
118 App::new(
119 app_config,
120 app_event_tx,
121 default_model,
122 workspace.clone(),
123 tool_executor,
124 Some(session.config.clone()),
125 )
126 .await?
127 };
128
129 if let Some(model_str) = session.config.metadata.get("initial_model") {
131 if let Ok(model) = model_str.parse::<crate::api::Model>() {
132 let _ = app.set_model(model).await;
133 }
134 }
135
136 let app_task_handle = tokio::spawn(crate::app::app_actor_loop(app, app_command_rx));
138
139 let session_id = session.id.clone();
141 let store_clone = store.clone();
142
143 let event_task_handle = tokio::spawn(async move {
144 while let Some(app_event) = app_event_rx.recv().await {
145 if let Err(e) = external_event_tx.try_send(app_event.clone()) {
147 warn!(session_id = %session_id, "Failed to send event to external consumer: {}", e);
148 }
149
150 if let AppEvent::ActiveMessageIdChanged { message_id } = &app_event {
152 if let Err(e) = store_clone
153 .update_active_message_id(&session_id, message_id.as_deref())
154 .await
155 {
156 error!(session_id = %session_id, error = %e, "Failed to update active message ID");
157 }
158 }
159
160 if let Some(stream_event) = translate_app_event(app_event) {
162 if let Ok(_sequence_num) =
164 store_clone.append_event(&session_id, &stream_event).await
165 {
166 if let Err(e) =
168 update_session_state_for_event(&store_clone, &session_id, &stream_event)
169 .await
170 {
171 error!(session_id = %session_id, error = %e, "Failed to update session state");
172 }
173 }
174 }
175 }
176 info!(session_id = %session_id, "Event translation loop ended");
177 });
178
179 Ok(Self {
180 session,
181 command_tx: app_command_tx,
182 event_rx: Some(external_event_rx),
183 subscriber_count: 0,
184 last_activity: chrono::Utc::now(),
185 app_task_handle,
186 event_task_handle,
187 })
188 }
189
190 pub fn take_event_rx(&mut self) -> Option<mpsc::Receiver<AppEvent>> {
192 self.event_rx.take()
193 }
194
195 pub fn touch(&mut self) {
197 self.last_activity = chrono::Utc::now();
198 }
199
200 pub fn is_inactive(&self, max_idle_time: chrono::Duration) -> bool {
202 self.subscriber_count == 0 && chrono::Utc::now() - self.last_activity > max_idle_time
203 }
204
205 pub async fn shutdown(self) {
207 let _ = self.command_tx.send(AppCommand::Shutdown).await;
209
210 let _ = self.app_task_handle.await;
212 let _ = self.event_task_handle.await;
213 }
214}
215
216pub struct SessionManager {
218 active_sessions: Arc<RwLock<HashMap<String, ManagedSession>>>,
220 store: Arc<dyn SessionStore>,
222 config: SessionManagerConfig,
224}
225
226impl SessionManager {
227 pub fn new(store: Arc<dyn SessionStore>, config: SessionManagerConfig) -> Self {
229 Self {
230 active_sessions: Arc::new(RwLock::new(HashMap::new())),
231 store,
232 config,
233 }
234 }
235
236 pub async fn create_session(
238 &self,
239 config: SessionConfig,
240 app_config: AppConfig,
241 ) -> Result<(String, mpsc::Sender<AppCommand>)> {
242 let session_config = config;
243
244 let session = self.store.create_session(session_config).await?;
246 let session_id = session.id.clone();
247
248 info!(session_id = %session_id, "Creating new session");
249
250 {
252 let sessions = self.active_sessions.read().await;
253 if sessions.len() >= self.config.max_concurrent_sessions {
254 error!(
255 session_id = %session_id,
256 active_count = sessions.len(),
257 max_capacity = self.config.max_concurrent_sessions,
258 "Session creation rejected: at maximum capacity"
259 );
260 return Err(SessionManagerError::CapacityExceeded {
261 current: sessions.len(),
262 max: self.config.max_concurrent_sessions,
263 }
264 .into());
265 }
266 }
267
268 let managed_session = ManagedSession::new(
270 session.clone(),
271 app_config,
272 self.store.clone(),
273 self.config.default_model,
274 None,
275 )
276 .await
277 .map_err(|e| SessionManagerError::CreationFailed {
278 message: format!("Failed to create managed session: {e}"),
279 })?;
280
281 let command_tx = managed_session.command_tx.clone();
283
284 {
286 let mut sessions = self.active_sessions.write().await;
287 sessions.insert(session_id.clone(), managed_session);
288 }
289
290 let metadata = crate::events::SessionMetadata::from(&SessionInfo::from(&session));
292 let event = StreamEvent::SessionCreated {
293 session_id: session_id.clone(),
294 metadata,
295 };
296 self.emit_event(session_id.clone(), event).await;
297
298 info!(session_id = %session_id, "Session created and activated");
299 Ok((session_id, command_tx))
300 }
301
302 pub async fn take_event_receiver(&self, session_id: &str) -> Result<mpsc::Receiver<AppEvent>> {
304 let mut sessions = self.active_sessions.write().await;
305 match sessions.get_mut(session_id) {
306 Some(managed_session) => {
307 if let Some(receiver) = managed_session.take_event_rx() {
308 Ok(receiver)
309 } else {
310 Err(SessionManagerError::SessionAlreadyHasListener {
311 session_id: session_id.to_string(),
312 }
313 .into())
314 }
315 }
316 None => Err(SessionManagerError::SessionNotActive {
317 session_id: session_id.to_string(),
318 }
319 .into()),
320 }
321 }
322
323 pub async fn get_session(&self, session_id: &str) -> Result<Option<SessionInfo>> {
325 {
327 let sessions = self.active_sessions.read().await;
328 if let Some(managed_session) = sessions.get(session_id) {
329 return Ok(Some(SessionInfo::from(&managed_session.session)));
330 }
331 }
332
333 if let Some(session) = self.store.get_session(session_id).await? {
335 Ok(Some(SessionInfo::from(&session)))
336 } else {
337 Ok(None)
338 }
339 }
340
341 pub async fn get_session_workspace(
343 &self,
344 session_id: &str,
345 ) -> Result<Option<Arc<dyn crate::workspace::Workspace>>> {
346 {
348 let active_sessions = self.active_sessions.read().await;
349 if let Some(managed_session) = active_sessions.get(session_id) {
350 return Ok(Some(
352 managed_session
353 .session
354 .build_workspace()
355 .await
356 .map_err(|e| SessionManagerError::CreationFailed {
357 message: format!("Failed to build workspace: {e}"),
358 })?,
359 ));
360 }
361 }
362
363 if let Some(session_info) = self.store.get_session(session_id).await? {
365 let session = session_info;
366 Ok(Some(session.build_workspace().await.map_err(|e| {
367 SessionManagerError::CreationFailed {
368 message: format!("Failed to build workspace: {e}"),
369 }
370 })?))
371 } else {
372 Ok(None)
373 }
374 }
375
376 pub async fn resume_session(
378 &self,
379 session_id: &str,
380 app_config: AppConfig,
381 ) -> Result<mpsc::Sender<AppCommand>> {
382 {
384 let sessions = self.active_sessions.read().await;
385 if let Some(managed_session) = sessions.get(session_id) {
386 debug!(session_id = %session_id, "Session already active");
387 return Ok(managed_session.command_tx.clone());
388 }
389 }
390
391 let session = match self
393 .store
394 .get_session(session_id)
395 .await
396 .map_err(SessionManagerError::Storage)?
397 {
398 Some(session) => session,
399 None => {
400 debug!(session_id = %session_id, "Session not found in store");
401 return Err(SessionManagerError::SessionNotActive {
402 session_id: session_id.to_string(),
403 }
404 .into());
405 }
406 };
407
408 info!(session_id = %session_id, "Resuming session from storage");
409
410 {
412 let sessions = self.active_sessions.read().await;
413 if sessions.len() >= self.config.max_concurrent_sessions {
414 warn!(
415 session_id = %session_id,
416 active_count = sessions.len(),
417 max_capacity = self.config.max_concurrent_sessions,
418 "At maximum session capacity for resume"
419 );
420 }
422 }
423
424 let conversation = Conversation {
426 messages: session.state.messages.clone(),
427 working_directory: session
428 .config
429 .workspace
430 .get_path()
431 .unwrap_or_default()
432 .into(),
433 active_message_id: session.state.active_message_id.clone(),
434 };
435
436 let managed_session = ManagedSession::new(
438 session.clone(),
439 app_config,
440 self.store.clone(),
441 self.config.default_model,
442 Some(conversation),
443 )
444 .await
445 .map_err(|e| SessionManagerError::CreationFailed {
446 message: format!("Failed to create managed session: {e}"),
447 })?;
448
449 let command_tx = managed_session.command_tx.clone();
451
452 if !session.state.messages.is_empty() || !session.state.approved_tools.is_empty() {
454 info!(
455 session_id = %session_id,
456 message_count = session.state.messages.len(),
457 tool_count = session.state.approved_tools.len(),
458 "Restoring conversation state"
459 );
460
461 command_tx
462 .send(AppCommand::RestoreConversation {
463 messages: session.state.messages.clone(),
464 approved_tools: session.state.approved_tools.clone(),
465 approved_bash_patterns: session.state.approved_bash_patterns.clone(),
466 active_message_id: session.state.active_message_id.clone(),
467 })
468 .await
469 .map_err(|_| SessionManagerError::CreationFailed {
470 message: "Failed to send restore command to App".to_string(),
471 })?;
472 }
473
474 {
476 let mut sessions = self.active_sessions.write().await;
477 sessions.insert(session_id.to_string(), managed_session);
478 }
479
480 let last_sequence = session.state.last_event_sequence;
482
483 let event = StreamEvent::SessionResumed {
485 session_id: session_id.to_string(),
486 event_offset: last_sequence,
487 };
488 self.emit_event(session_id.to_string(), event).await;
489
490 info!(session_id = %session_id, last_sequence = last_sequence, "Session resumed");
491 Ok(command_tx)
492 }
493
494 pub async fn suspend_session(&self, session_id: &str) -> Result<bool> {
496 let managed_session = {
497 let mut sessions = self.active_sessions.write().await;
498 sessions.remove(session_id)
499 };
500
501 let managed_session = match managed_session {
502 Some(session) => session,
503 None => {
504 debug!(session_id = %session_id, "Session not active, cannot suspend");
505 return Ok(false);
506 }
507 };
508
509 info!(session_id = %session_id, "Suspending session");
510
511 self.store.update_session(&managed_session.session).await?;
513
514 let event = StreamEvent::SessionSaved {
516 session_id: session_id.to_string(),
517 };
518 self.emit_event(session_id.to_string(), event).await;
519
520 info!(session_id = %session_id, "Session suspended and saved");
521 Ok(true)
522 }
523
524 pub async fn delete_session(&self, session_id: &str) -> Result<bool> {
526 {
528 let mut sessions = self.active_sessions.write().await;
529 sessions.remove(session_id);
530 }
531
532 self.store.delete_session(session_id).await?;
534
535 info!(session_id = %session_id, "Session deleted");
536 Ok(true)
537 }
538
539 pub async fn list_sessions(&self, filter: SessionFilter) -> Result<Vec<SessionInfo>> {
541 Ok(self.store.list_sessions(filter).await?)
542 }
543
544 pub async fn get_active_sessions(&self) -> Vec<String> {
546 let sessions = self.active_sessions.read().await;
547 sessions.keys().cloned().collect()
548 }
549
550 pub async fn is_session_active(&self, session_id: &str) -> bool {
552 let sessions = self.active_sessions.read().await;
553 sessions.contains_key(session_id)
554 }
555
556 pub async fn send_command(&self, session_id: &str, command: AppCommand) -> Result<()> {
558 let sessions = self.active_sessions.read().await;
559 if let Some(managed_session) = sessions.get(session_id) {
560 managed_session.command_tx.send(command).await.map_err(|_| {
561 Error::SessionManager(SessionManagerError::SessionNotActive {
562 session_id: session_id.to_string(),
563 })
564 })
565 } else {
566 Err(Error::SessionManager(
567 SessionManagerError::SessionNotActive {
568 session_id: session_id.to_string(),
569 },
570 ))
571 }
572 }
573
574 pub async fn update_session_state(
576 &self,
577 session_id: &str,
578 update_fn: impl FnOnce(&mut SessionState),
579 ) -> Result<()> {
580 {
581 let mut sessions = self.active_sessions.write().await;
582 if let Some(managed_session) = sessions.get_mut(session_id) {
583 managed_session.touch();
584 update_fn(&mut managed_session.session.state);
585 managed_session.session.update_timestamp();
586
587 if self.config.auto_persist {
589 self.store.update_session(&managed_session.session).await?;
590 }
591 } else {
592 return Err(SessionManagerError::SessionNotActive {
593 session_id: session_id.to_string(),
594 }
595 .into());
596 }
597 }
598
599 Ok(())
600 }
601
602 pub async fn emit_event(&self, session_id: String, event: StreamEvent) {
604 let sequence_num = match self.store.append_event(&session_id, &event).await {
606 Ok(seq) => seq,
607 Err(e) => {
608 error!(session_id = %session_id, error = %e, "Failed to persist event");
609 return;
610 }
611 };
612
613 if let Err(e) = self
615 .update_session_state(&session_id, |state| {
616 state.last_event_sequence = sequence_num;
617 })
618 .await
619 {
620 error!(session_id = %session_id, error = %e, "Failed to update session sequence number");
621 }
622 }
623
624 pub async fn cleanup_inactive_sessions(&self, max_idle_time: chrono::Duration) -> usize {
626 let mut to_suspend = Vec::new();
627
628 {
629 let sessions = self.active_sessions.read().await;
630 for (session_id, managed_session) in sessions.iter() {
631 if managed_session.is_inactive(max_idle_time) {
632 to_suspend.push(session_id.clone());
633 }
634 }
635 }
636
637 let mut suspended_count = 0;
638 for session_id in to_suspend {
639 if let Ok(true) = self.suspend_session(&session_id).await {
640 suspended_count += 1;
641 }
642 }
643
644 if suspended_count > 0 {
645 info!(
646 suspended_count = suspended_count,
647 "Cleaned up inactive sessions"
648 );
649 }
650
651 suspended_count
652 }
653
654 pub fn store(&self) -> &Arc<dyn SessionStore> {
656 &self.store
657 }
658
659 pub async fn increment_subscriber_count(&self, session_id: &str) -> Result<()> {
661 let mut sessions = self.active_sessions.write().await;
662 if let Some(managed_session) = sessions.get_mut(session_id) {
663 managed_session.subscriber_count += 1;
664 managed_session.touch();
665 debug!(
666 session_id = %session_id,
667 subscriber_count = managed_session.subscriber_count,
668 "Incremented subscriber count"
669 );
670 Ok(())
671 } else {
672 Err(SessionManagerError::SessionNotActive {
673 session_id: session_id.to_string(),
674 }
675 .into())
676 }
677 }
678
679 pub async fn decrement_subscriber_count(&self, session_id: &str) -> Result<()> {
681 let mut sessions = self.active_sessions.write().await;
682 if let Some(managed_session) = sessions.get_mut(session_id) {
683 managed_session.subscriber_count = managed_session.subscriber_count.saturating_sub(1);
684 managed_session.touch();
685 debug!(
686 session_id = %session_id,
687 subscriber_count = managed_session.subscriber_count,
688 "Decremented subscriber count"
689 );
690 Ok(())
691 } else {
692 debug!(session_id = %session_id, "Session not active when decrementing subscriber count");
694 Ok(())
695 }
696 }
697
698 pub async fn touch_session(&self, session_id: &str) -> Result<()> {
700 let mut sessions = self.active_sessions.write().await;
701 if let Some(managed_session) = sessions.get_mut(session_id) {
702 managed_session.touch();
703 Ok(())
704 } else {
705 Ok(())
707 }
708 }
709
710 pub async fn maybe_suspend_idle_session(&self, session_id: &str) -> Result<()> {
712 let should_suspend = {
714 let sessions = self.active_sessions.read().await;
715 if let Some(managed_session) = sessions.get(session_id) {
716 managed_session.subscriber_count == 0
717 } else {
718 false }
720 };
721
722 if should_suspend {
723 info!(session_id = %session_id, "No active subscribers, suspending session");
724 self.suspend_session(session_id).await?;
725 }
726
727 Ok(())
728 }
729
730 pub async fn get_session_state(
732 &self,
733 session_id: &str,
734 ) -> Result<Option<crate::session::SessionState>> {
735 info!("get_session_state called for session: {}", session_id);
736
737 match self.store.get_session(session_id).await {
740 Ok(Some(session)) => {
741 info!(
742 "Loaded session from store with {} messages",
743 session.state.messages.len()
744 );
745 Ok(Some(session.state))
746 }
747 Ok(None) => {
748 info!("Session not found in store: {}", session_id);
749 Ok(None)
750 }
751 Err(e) => {
752 error!("Error loading session from store: {}", e);
753 Err(SessionManagerError::Storage(e).into())
754 }
755 }
756 }
757}
758
759fn translate_app_event(app_event: AppEvent) -> Option<StreamEvent> {
761 match app_event {
762 AppEvent::MessageAdded { message, model } => Some(StreamEvent::MessageComplete {
763 message,
764 usage: None,
765 metadata: std::collections::HashMap::new(),
766 model,
767 }),
768
769 AppEvent::MessagePart { id, delta } => Some(StreamEvent::MessagePart {
770 content: delta,
771 message_id: id,
772 }),
773
774 AppEvent::ToolCallStarted {
775 name,
776 id,
777 parameters,
778 model,
779 } => {
780 let tool_call = ToolCall {
781 id: id.clone(),
782 name: name.clone(),
783 parameters,
784 };
785 Some(StreamEvent::ToolCallStarted {
786 tool_call,
787 metadata: std::collections::HashMap::new(),
788 model,
789 })
790 }
791
792 AppEvent::ToolCallCompleted {
793 name: _,
794 result,
795 id,
796 model,
797 } => Some(StreamEvent::ToolCallCompleted {
798 tool_call_id: id,
799 result,
800 metadata: std::collections::HashMap::new(),
801 model,
802 }),
803
804 AppEvent::ToolCallFailed {
805 name: _,
806 error,
807 id,
808 model,
809 } => Some(StreamEvent::ToolCallFailed {
810 tool_call_id: id,
811 error,
812 metadata: std::collections::HashMap::new(),
813 model,
814 }),
815
816 AppEvent::WorkspaceChanged => Some(StreamEvent::WorkspaceChanged),
817
818 AppEvent::WorkspaceFiles { files } => Some(StreamEvent::WorkspaceFiles {
819 files: files.clone(),
820 }),
821
822 AppEvent::Started { id, op } => Some(StreamEvent::OperationStarted {
823 operation_id: id,
824 operation: op,
825 }),
826 AppEvent::Finished { id, outcome } => Some(StreamEvent::OperationCompleted {
827 operation_id: id,
828 outcome,
829 }),
830 AppEvent::OperationCancelled { op_id, info } => {
831 let operation_id = op_id.unwrap_or_else(uuid::Uuid::new_v4);
833 Some(StreamEvent::OperationCancelled {
834 operation_id,
835 reason: info.to_string(), })
837 }
838
839 _ => None,
841 }
842}
843async fn update_session_state_for_event(
845 store: &Arc<dyn SessionStore>,
846 session_id: &str,
847 event: &StreamEvent,
848) -> Result<()> {
849 match event {
850 StreamEvent::MessageComplete { message, .. } => {
851 store.append_message(session_id, message).await?;
852
853 if let crate::app::conversation::MessageData::Tool {
855 tool_use_id,
856 result,
857 ..
858 } = &message.data
859 {
860 let stats = crate::session::ToolExecutionStats::success_typed(
861 serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
862 result.variant_name().to_string(),
863 0, );
865 let update = ToolCallUpdate::set_result(stats);
866 store.update_tool_call(tool_use_id, update).await?;
867 }
868 }
869 StreamEvent::ToolCallStarted { tool_call, .. } => {
870 store.create_tool_call(session_id, tool_call).await?;
871 }
872 StreamEvent::ToolCallCompleted {
873 tool_call_id,
874 result,
875 ..
876 } => {
877 let stats = crate::session::ToolExecutionStats::success_typed(
878 serde_json::to_value(result).unwrap_or(serde_json::Value::Null),
879 result.variant_name().to_string(),
880 0,
881 );
882 let update = ToolCallUpdate::set_result(stats);
883 store.update_tool_call(tool_call_id, update).await?;
884
885 let messages = store.get_messages(session_id, None).await?;
888 let parent_id = messages.last().map(|m| m.id().to_string());
889
890 let tool_message = ConversationMessage {
891 data: crate::app::conversation::MessageData::Tool {
892 tool_use_id: tool_call_id.clone(),
893 result: result.clone(),
894 },
895 timestamp: std::time::SystemTime::now()
896 .duration_since(std::time::UNIX_EPOCH)
897 .expect("Time went backwards")
898 .as_secs(),
899 id: format!("tool_result_{tool_call_id}"),
900 parent_message_id: parent_id,
901 };
902 store.append_message(session_id, &tool_message).await?;
903 }
904 StreamEvent::ToolCallFailed {
905 tool_call_id,
906 error,
907 ..
908 } => {
909 let update = ToolCallUpdate::set_error(error.clone());
910 store.update_tool_call(tool_call_id, update).await?;
911
912 let messages = store.get_messages(session_id, None).await?;
915 let parent_id = messages.last().map(|m| m.id().to_string());
916
917 let tool_error = steer_tools::error::ToolError::Execution {
918 tool_name: "unknown".to_string(), message: error.clone(),
920 };
921 let tool_message = ConversationMessage {
922 data: MessageData::Tool {
923 tool_use_id: tool_call_id.clone(),
924 result: ToolResult::Error(tool_error),
925 },
926 timestamp: std::time::SystemTime::now()
927 .duration_since(std::time::UNIX_EPOCH)
928 .expect("Time went backwards")
929 .as_secs(),
930 id: format!("tool_result_{tool_call_id}"),
931 parent_message_id: parent_id,
932 };
933 store.append_message(session_id, &tool_message).await?;
934 }
935 _ => {}
937 }
938 Ok(())
939}
940
941#[cfg(test)]
942mod tests {
943 use super::*;
944 use crate::api::ToolCall;
945 use crate::app::MessageData;
946 use crate::app::conversation::{AssistantContent, Role, UserContent};
947 use crate::session::stores::sqlite::SqliteSessionStore;
948 use tempfile::TempDir;
949
950 async fn create_test_manager() -> (SessionManager, TempDir) {
951 let temp_dir = TempDir::new().unwrap();
952 let db_path = temp_dir.path().join("test.db");
953 let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
954
955 let config = SessionManagerConfig {
956 max_concurrent_sessions: 100,
957 default_model: Model::default(),
958 auto_persist: true,
959 };
960 let manager = SessionManager::new(store, config);
961
962 (manager, temp_dir)
963 }
964
965 fn create_test_app_config() -> AppConfig {
966 crate::test_utils::test_app_config()
967 }
968
969 #[tokio::test]
970 async fn test_create_and_resume_session() {
971 let (manager, temp) = create_test_manager().await;
972 let app_config = create_test_app_config();
973
974 let session_config = SessionConfig {
976 workspace: crate::session::state::WorkspaceConfig::Local {
977 path: temp.path().to_path_buf(),
978 },
979 tool_config: crate::session::SessionToolConfig::default(),
980 system_prompt: None,
981 metadata: std::collections::HashMap::new(),
982 };
983 let (session_id, _command_tx) = manager
984 .create_session(session_config, app_config.clone())
985 .await
986 .unwrap();
987 assert!(!session_id.is_empty());
988 assert!(manager.is_session_active(&session_id).await);
989
990 assert!(manager.suspend_session(&session_id).await.unwrap());
992 assert!(!manager.is_session_active(&session_id).await);
993
994 let _command_tx = manager
996 .resume_session(&session_id, app_config)
997 .await
998 .unwrap();
999 assert!(manager.is_session_active(&session_id).await);
1000 }
1001
1002 #[tokio::test]
1003 async fn test_session_cleanup() {
1004 let (manager, temp) = create_test_manager().await;
1005 let app_config = create_test_app_config();
1006
1007 let session_config = SessionConfig {
1009 workspace: crate::session::state::WorkspaceConfig::Local {
1010 path: temp.path().to_path_buf(),
1011 },
1012 tool_config: crate::session::SessionToolConfig::default(),
1013 system_prompt: None,
1014 metadata: std::collections::HashMap::new(),
1015 };
1016 let (session_id, _command_tx) = manager
1017 .create_session(session_config, app_config)
1018 .await
1019 .unwrap();
1020
1021 {
1023 let mut sessions = manager.active_sessions.write().await;
1024 if let Some(session) = sessions.get_mut(&session_id) {
1025 session.last_activity = chrono::Utc::now() - chrono::Duration::hours(2);
1026 session.subscriber_count = 0;
1027 }
1028 }
1029
1030 let cleaned = manager
1032 .cleanup_inactive_sessions(chrono::Duration::hours(1))
1033 .await;
1034 assert_eq!(cleaned, 1);
1035 assert!(!manager.is_session_active(&session_id).await);
1036 }
1037
1038 #[tokio::test]
1039 async fn test_capacity_rejection() {
1040 let temp_dir = TempDir::new().unwrap();
1041 let temp = tempfile::TempDir::new().unwrap();
1042 let db_path = temp_dir.path().join("test.db");
1043 let store = Arc::new(SqliteSessionStore::new(&db_path).await.unwrap());
1044
1045 let config = SessionManagerConfig {
1046 max_concurrent_sessions: 1, default_model: Model::default(),
1048 auto_persist: true,
1049 };
1050 let manager = SessionManager::new(store, config);
1051 let app_config = create_test_app_config();
1052
1053 let tool_config = crate::session::SessionToolConfig {
1055 approval_policy: crate::session::ToolApprovalPolicy::AlwaysAsk,
1056 ..Default::default()
1057 };
1058
1059 let session_config = SessionConfig {
1060 workspace: crate::session::state::WorkspaceConfig::Local {
1061 path: temp.path().to_path_buf(),
1062 },
1063 tool_config,
1064 system_prompt: None,
1065 metadata: std::collections::HashMap::new(),
1066 };
1067 let (session_id1, _command_tx) = manager
1068 .create_session(session_config.clone(), app_config.clone())
1069 .await
1070 .unwrap();
1071 assert!(!session_id1.is_empty());
1072
1073 let result = manager.create_session(session_config, app_config).await;
1075
1076 assert!(result.is_err());
1077 assert!(matches!(
1078 result,
1079 Err(crate::error::Error::SessionManager(
1080 SessionManagerError::CapacityExceeded { .. }
1081 ))
1082 ));
1083 match result.unwrap_err() {
1084 crate::error::Error::SessionManager(SessionManagerError::CapacityExceeded {
1085 current,
1086 max,
1087 }) => {
1088 assert_eq!(current, 1);
1089 assert_eq!(max, 1);
1090 }
1091 _ => unreachable!(),
1092 }
1093 }
1094
1095 #[tokio::test]
1096 async fn test_tool_result_persistence_on_resume() {
1097 let (manager, temp) = create_test_manager().await;
1098 let app_config = create_test_app_config();
1099
1100 let session_config = SessionConfig {
1102 workspace: crate::session::state::WorkspaceConfig::Local {
1103 path: temp.path().to_path_buf(),
1104 },
1105 tool_config: crate::session::SessionToolConfig::default(),
1106 system_prompt: None,
1107 metadata: std::collections::HashMap::new(),
1108 };
1109 let (session_id, _command_tx) = manager
1110 .create_session(session_config, app_config.clone())
1111 .await
1112 .unwrap();
1113
1114 let user_message = ConversationMessage {
1117 data: crate::app::conversation::MessageData::User {
1118 content: vec![UserContent::Text {
1119 text: "Read the file test.txt".to_string(),
1120 }],
1121 },
1122 timestamp: 123456789,
1123 id: "user_1".to_string(),
1124 parent_message_id: None,
1125 };
1126 manager
1127 .store
1128 .append_message(&session_id, &user_message)
1129 .await
1130 .unwrap();
1131
1132 let assistant_message = ConversationMessage {
1134 data: crate::app::conversation::MessageData::Assistant {
1135 content: vec![
1136 AssistantContent::Text {
1137 text: "I'll read that file for you.".to_string(),
1138 },
1139 AssistantContent::ToolCall {
1140 tool_call: ToolCall {
1141 id: "tool_call_1".to_string(),
1142 name: "read_file".to_string(),
1143 parameters: serde_json::json!({"path": "test.txt"}),
1144 },
1145 },
1146 ],
1147 },
1148 timestamp: 123456790,
1149 id: "assistant_1".to_string(),
1150 parent_message_id: Some("user_1".to_string()),
1151 };
1152 manager
1153 .store
1154 .append_message(&session_id, &assistant_message)
1155 .await
1156 .unwrap();
1157
1158 let tool_call = ToolCall {
1161 id: "tool_call_1".to_string(),
1162 name: "read_file".to_string(),
1163 parameters: serde_json::json!({"path": "test.txt"}),
1164 };
1165 manager
1166 .store
1167 .create_tool_call(&session_id, &tool_call)
1168 .await
1169 .unwrap();
1170
1171 let stats = crate::session::ToolExecutionStats::success_typed(
1173 serde_json::json!({
1174 "content": "File contents: Hello, world!",
1175 "file_path": "test.txt",
1176 "line_count": 1,
1177 "truncated": false
1178 }),
1179 "FileContent".to_string(),
1180 0,
1181 );
1182 let update = ToolCallUpdate::set_result(stats);
1183 manager
1184 .store
1185 .update_tool_call("tool_call_1", update)
1186 .await
1187 .unwrap();
1188
1189 let tool_message = ConversationMessage {
1191 data: MessageData::Tool {
1192 tool_use_id: "tool_call_1".to_string(),
1193 result: ToolResult::FileContent(steer_tools::result::FileContentResult {
1194 content: "File contents: Hello, world!".to_string(),
1195 file_path: "test.txt".to_string(),
1196 line_count: 1,
1197 truncated: false,
1198 }),
1199 },
1200 timestamp: 123456790,
1201 id: "tool_result_tool_call_1".to_string(),
1202 parent_message_id: Some("assistant_1".to_string()),
1203 };
1204 manager
1205 .store
1206 .append_message(&session_id, &tool_message)
1207 .await
1208 .unwrap();
1209
1210 let followup_message = ConversationMessage {
1212 data: crate::app::conversation::MessageData::Assistant {
1213 content: vec![AssistantContent::Text {
1214 text: "The file contains: Hello, world!".to_string(),
1215 }],
1216 },
1217 timestamp: 123456791,
1218 id: "assistant_2".to_string(),
1219 parent_message_id: Some("assistant_1".to_string()),
1220 };
1221 manager
1222 .store
1223 .append_message(&session_id, &followup_message)
1224 .await
1225 .unwrap();
1226
1227 manager.suspend_session(&session_id).await.unwrap();
1229
1230 let loaded_session = manager
1232 .store
1233 .get_session(&session_id)
1234 .await
1235 .unwrap()
1236 .unwrap();
1237
1238 assert_eq!(loaded_session.state.messages.len(), 4);
1240
1241 let tool_result_msg = &loaded_session.state.messages[2];
1243 assert_eq!(tool_result_msg.role(), Role::Tool);
1244
1245 assert!(matches!(
1247 &tool_result_msg.data,
1248 crate::app::conversation::MessageData::Tool { .. }
1249 ));
1250 if let crate::app::conversation::MessageData::Tool {
1251 tool_use_id,
1252 result,
1253 ..
1254 } = &tool_result_msg.data
1255 {
1256 assert_eq!(tool_use_id, "tool_call_1");
1257 assert!(matches!(
1258 result,
1259 crate::app::conversation::ToolResult::FileContent(_)
1260 ));
1261 match result {
1262 crate::app::conversation::ToolResult::FileContent(content) => {
1263 assert!(content.content.contains("Hello, world!"));
1264 }
1265 _ => unreachable!(),
1266 }
1267 } else {
1268 panic!("Expected Tool message");
1269 }
1270
1271 let _command_tx = manager
1273 .resume_session(&session_id, app_config)
1274 .await
1275 .unwrap();
1276
1277 }
1281
1282 #[tokio::test]
1283 async fn test_active_message_id_persistence() {
1284 let (manager, temp) = create_test_manager().await;
1285 let app_config = create_test_app_config();
1286
1287 let session_config = SessionConfig {
1289 workspace: crate::session::state::WorkspaceConfig::Local {
1290 path: temp.path().to_path_buf(),
1291 },
1292 tool_config: crate::session::SessionToolConfig::default(),
1293 system_prompt: None,
1294 metadata: std::collections::HashMap::new(),
1295 };
1296 let (session_id, _command_tx) = manager
1297 .create_session(session_config, app_config.clone())
1298 .await
1299 .unwrap();
1300
1301 let msg1 = ConversationMessage {
1303 data: crate::app::conversation::MessageData::User {
1304 content: vec![UserContent::Text {
1305 text: "Hello".to_string(),
1306 }],
1307 },
1308 timestamp: 1000,
1309 id: "msg1".to_string(),
1310 parent_message_id: None,
1311 };
1312
1313 let msg2 = ConversationMessage {
1314 data: crate::app::conversation::MessageData::Assistant {
1315 content: vec![AssistantContent::Text {
1316 text: "Hi there!".to_string(),
1317 }],
1318 },
1319 timestamp: 2000,
1320 id: "msg2".to_string(),
1321 parent_message_id: Some("msg1".to_string()),
1322 };
1323
1324 let msg1_edited = ConversationMessage {
1326 data: crate::app::conversation::MessageData::User {
1327 content: vec![UserContent::Text {
1328 text: "Goodbye".to_string(),
1329 }],
1330 },
1331 timestamp: 3000,
1332 id: "msg1_edited".to_string(),
1333 parent_message_id: None, };
1335
1336 manager
1338 .store
1339 .append_message(&session_id, &msg1)
1340 .await
1341 .unwrap();
1342 manager
1343 .store
1344 .append_message(&session_id, &msg2)
1345 .await
1346 .unwrap();
1347 manager
1348 .store
1349 .append_message(&session_id, &msg1_edited)
1350 .await
1351 .unwrap();
1352
1353 manager
1355 .store
1356 .update_active_message_id(&session_id, Some("msg1_edited"))
1357 .await
1358 .unwrap();
1359
1360 manager.suspend_session(&session_id).await.unwrap();
1362
1363 let loaded_session = manager
1365 .store
1366 .get_session(&session_id)
1367 .await
1368 .unwrap()
1369 .unwrap();
1370
1371 assert_eq!(
1373 loaded_session.state.active_message_id,
1374 Some("msg1_edited".to_string())
1375 );
1376
1377 assert_eq!(loaded_session.state.messages.len(), 3);
1379
1380 let edited_msg = loaded_session
1382 .state
1383 .messages
1384 .iter()
1385 .find(|m| m.id() == "msg1_edited")
1386 .expect("Edited message should exist");
1387
1388 match &edited_msg.data {
1389 crate::app::conversation::MessageData::User { content, .. } => {
1390 if let Some(UserContent::Text { text }) = content.first() {
1391 assert_eq!(text, "Goodbye");
1392 } else {
1393 panic!("Expected text content");
1394 }
1395 }
1396 _ => panic!("Expected user message"),
1397 }
1398
1399 let _ = manager
1401 .resume_session(&session_id, app_config)
1402 .await
1403 .unwrap();
1404
1405 let state = manager
1407 .get_session_state(&session_id)
1408 .await
1409 .unwrap()
1410 .unwrap();
1411 assert_eq!(state.active_message_id, Some("msg1_edited".to_string()));
1412 }
1413}