Skip to main content

matrixcode_core/agent/session/
manager.rs

1//! Agent session management.
2//!
3//! This module manages the session lifecycle, including:
4//! - Event emission
5//! - Cancellation tokens
6//! - Ask response channel
7//! - Pending input channel (for real-time message appending)
8//!
9//! By extracting session management into a dedicated struct, we enable:
10//! - Clear separation between session and state
11//! - Easier testing of event handling
12//! - Better control over session lifecycle
13
14use anyhow::Result;
15use tokio::sync::mpsc;
16
17use crate::cancel::CancellationToken;
18use crate::event::AgentEvent;
19
20/// Agent session manager.
21///
22/// Handles session lifecycle and communication channels.
23pub struct SessionManager {
24    /// Cancellation token for stopping the session.
25    cancel_token: Option<CancellationToken>,
26
27    /// Event sender for emitting agent events.
28    event_tx: mpsc::Sender<AgentEvent>,
29
30    /// Ask response receiver (for TUI mode).
31    ask_rx: Option<mpsc::Receiver<String>>,
32
33    /// Pending input receiver (for real-time message appending during streaming).
34    pending_input_rx: Option<mpsc::Receiver<String>>,
35}
36
37impl SessionManager {
38    /// Create a new session manager with event sender.
39    pub fn new(event_tx: mpsc::Sender<AgentEvent>) -> Self {
40        Self {
41            cancel_token: None,
42            event_tx,
43            ask_rx: None,
44            pending_input_rx: None,
45        }
46    }
47
48    /// Create a session manager with all channels.
49    pub fn with_channels(
50        event_tx: mpsc::Sender<AgentEvent>,
51        ask_rx: Option<mpsc::Receiver<String>>,
52    ) -> Self {
53        Self {
54            cancel_token: None,
55            event_tx,
56            ask_rx,
57            pending_input_rx: None,
58        }
59    }
60
61    /// Create a session manager with pending input channel.
62    pub fn with_pending_input(
63        event_tx: mpsc::Sender<AgentEvent>,
64        pending_input_rx: Option<mpsc::Receiver<String>>,
65    ) -> Self {
66        Self {
67            cancel_token: None,
68            event_tx,
69            ask_rx: None,
70            pending_input_rx,
71        }
72    }
73
74    /// Create a session manager with all channels including pending input.
75    pub fn with_all_channels(
76        event_tx: mpsc::Sender<AgentEvent>,
77        ask_rx: Option<mpsc::Receiver<String>>,
78        pending_input_rx: Option<mpsc::Receiver<String>>,
79    ) -> Self {
80        Self {
81            cancel_token: None,
82            event_tx,
83            ask_rx,
84            pending_input_rx,
85        }
86    }
87
88    /// Emit an agent event.
89    pub fn emit(&self, event: AgentEvent) -> Result<()> {
90        self.event_tx
91            .try_send(event)
92            .map_err(|e| anyhow::anyhow!("Failed to emit event: {}", e))?;
93        Ok(())
94    }
95
96    /// Check if session is cancelled.
97    pub fn is_cancelled(&self) -> bool {
98        self.cancel_token
99            .as_ref()
100            .map(|t| t.is_cancelled())
101            .unwrap_or(false)
102    }
103
104    /// Set cancellation token.
105    pub fn set_cancel_token(&mut self, token: CancellationToken) {
106        self.cancel_token = Some(token);
107    }
108
109    /// Get cancellation token.
110    pub fn cancel_token(&self) -> Option<&CancellationToken> {
111        self.cancel_token.as_ref()
112    }
113
114    /// Set ask response channel.
115    pub fn set_ask_channel(&mut self, rx: mpsc::Receiver<String>) {
116        self.ask_rx = Some(rx);
117    }
118
119    /// Check if ask response channel is available (no mutable borrow needed).
120    pub fn has_ask_channel(&self) -> bool {
121        self.ask_rx.is_some()
122    }
123
124    /// Get ask response channel.
125    pub fn ask_rx(&mut self) -> Option<&mut mpsc::Receiver<String>> {
126        self.ask_rx.as_mut()
127    }
128
129    /// Set pending input channel.
130    pub fn set_pending_input_channel(&mut self, rx: mpsc::Receiver<String>) {
131        self.pending_input_rx = Some(rx);
132    }
133
134    /// Get pending input channel.
135    pub fn pending_input_rx(&mut self) -> Option<&mut mpsc::Receiver<String>> {
136        self.pending_input_rx.as_mut()
137    }
138
139    /// Drain pending inputs from the channel.
140    pub fn drain_pending_inputs(&mut self) -> Vec<String> {
141        let mut inputs = Vec::new();
142        if let Some(rx) = &mut self.pending_input_rx {
143            while let Ok(msg) = rx.try_recv() {
144                inputs.push(msg);
145            }
146        }
147        inputs
148    }
149
150    /// Get event sender (for cloning).
151    pub fn event_sender(&self) -> mpsc::Sender<AgentEvent> {
152        self.event_tx.clone()
153    }
154
155    /// Clear session state (reset cancellation).
156    pub fn clear(&mut self) {
157        self.cancel_token = None;
158        // Note: ask_rx, pending_input_rx and event_tx are not cleared as they are channels
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_session_new_creates_empty_session() {
168        let (tx, _rx) = mpsc::channel(100);
169        let session = SessionManager::new(tx);
170
171        assert!(session.cancel_token.is_none());
172        assert!(session.ask_rx.is_none());
173        assert!(!session.is_cancelled());
174    }
175
176    #[test]
177    fn test_session_emit_sends_event() {
178        let (tx, mut rx) = mpsc::channel(100);
179        let session = SessionManager::new(tx);
180
181        let event = AgentEvent::session_started();
182        session.emit(event).unwrap();
183
184        let received = rx.try_recv();
185        assert!(received.is_ok(), "should receive emitted event");
186    }
187
188    #[test]
189    fn test_session_set_cancel_token() {
190        let (tx, _rx) = mpsc::channel(100);
191        let mut session = SessionManager::new(tx);
192
193        let token = CancellationToken::new();
194        session.set_cancel_token(token.clone());
195
196        assert!(session.cancel_token().is_some());
197        assert!(!session.is_cancelled());
198
199        // Cancel the token
200        token.cancel();
201        assert!(session.is_cancelled());
202    }
203
204    #[test]
205    fn test_session_set_ask_channel() {
206        let (tx, _rx) = mpsc::channel(100);
207        let (ask_tx, ask_rx) = mpsc::channel(10);
208        let mut session = SessionManager::new(tx);
209
210        session.set_ask_channel(ask_rx);
211
212        assert!(session.ask_rx().is_some());
213
214        // Send a message through ask channel
215        ask_tx.try_send("test response".to_string()).unwrap();
216
217        let response = session.ask_rx().unwrap().try_recv();
218        assert!(response.is_ok(), "should receive ask response");
219    }
220
221    #[test]
222    fn test_session_event_sender_can_clone() {
223        let (tx, _rx) = mpsc::channel(100);
224        let session = SessionManager::new(tx);
225
226        let sender = session.event_sender();
227        // Should be able to send through cloned sender
228        sender.try_send(AgentEvent::session_started()).unwrap();
229    }
230
231    #[test]
232    fn test_session_clear_reset_cancellation() {
233        let (tx, _rx) = mpsc::channel(100);
234        let mut session = SessionManager::new(tx);
235
236        // Set cancellation token
237        let token = CancellationToken::new();
238        session.set_cancel_token(token);
239
240        // Clear
241        session.clear();
242
243        // Token should be None
244        assert!(session.cancel_token().is_none());
245        assert!(!session.is_cancelled());
246    }
247}