matrixcode_core/agent/session/
manager.rs1use anyhow::Result;
15use tokio::sync::mpsc;
16
17use crate::cancel::CancellationToken;
18use crate::event::AgentEvent;
19
20pub struct SessionManager {
24 cancel_token: Option<CancellationToken>,
26
27 event_tx: mpsc::Sender<AgentEvent>,
29
30 ask_rx: Option<mpsc::Receiver<String>>,
32
33 pending_input_rx: Option<mpsc::Receiver<String>>,
35}
36
37impl SessionManager {
38 pub fn new(event_tx: mpsc::Sender<AgentEvent>) -> Self {
40 Self {
41 cancel_token: None,
42 event_tx,
43 ask_rx: None,
44 pending_input_rx: None,
45 }
46 }
47
48 pub fn with_channels(
50 event_tx: mpsc::Sender<AgentEvent>,
51 ask_rx: Option<mpsc::Receiver<String>>,
52 ) -> Self {
53 Self {
54 cancel_token: None,
55 event_tx,
56 ask_rx,
57 pending_input_rx: None,
58 }
59 }
60
61 pub fn with_pending_input(
63 event_tx: mpsc::Sender<AgentEvent>,
64 pending_input_rx: Option<mpsc::Receiver<String>>,
65 ) -> Self {
66 Self {
67 cancel_token: None,
68 event_tx,
69 ask_rx: None,
70 pending_input_rx,
71 }
72 }
73
74 pub fn with_all_channels(
76 event_tx: mpsc::Sender<AgentEvent>,
77 ask_rx: Option<mpsc::Receiver<String>>,
78 pending_input_rx: Option<mpsc::Receiver<String>>,
79 ) -> Self {
80 Self {
81 cancel_token: None,
82 event_tx,
83 ask_rx,
84 pending_input_rx,
85 }
86 }
87
88 pub fn emit(&self, event: AgentEvent) -> Result<()> {
90 self.event_tx
91 .try_send(event)
92 .map_err(|e| anyhow::anyhow!("Failed to emit event: {}", e))?;
93 Ok(())
94 }
95
96 pub fn is_cancelled(&self) -> bool {
98 self.cancel_token
99 .as_ref()
100 .map(|t| t.is_cancelled())
101 .unwrap_or(false)
102 }
103
104 pub fn set_cancel_token(&mut self, token: CancellationToken) {
106 self.cancel_token = Some(token);
107 }
108
109 pub fn cancel_token(&self) -> Option<&CancellationToken> {
111 self.cancel_token.as_ref()
112 }
113
114 pub fn set_ask_channel(&mut self, rx: mpsc::Receiver<String>) {
116 self.ask_rx = Some(rx);
117 }
118
119 pub fn has_ask_channel(&self) -> bool {
121 self.ask_rx.is_some()
122 }
123
124 pub fn ask_rx(&mut self) -> Option<&mut mpsc::Receiver<String>> {
126 self.ask_rx.as_mut()
127 }
128
129 pub fn set_pending_input_channel(&mut self, rx: mpsc::Receiver<String>) {
131 self.pending_input_rx = Some(rx);
132 }
133
134 pub fn pending_input_rx(&mut self) -> Option<&mut mpsc::Receiver<String>> {
136 self.pending_input_rx.as_mut()
137 }
138
139 pub fn drain_pending_inputs(&mut self) -> Vec<String> {
141 let mut inputs = Vec::new();
142 if let Some(rx) = &mut self.pending_input_rx {
143 while let Ok(msg) = rx.try_recv() {
144 inputs.push(msg);
145 }
146 }
147 inputs
148 }
149
150 pub fn event_sender(&self) -> mpsc::Sender<AgentEvent> {
152 self.event_tx.clone()
153 }
154
155 pub fn clear(&mut self) {
157 self.cancel_token = None;
158 }
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn test_session_new_creates_empty_session() {
168 let (tx, _rx) = mpsc::channel(100);
169 let session = SessionManager::new(tx);
170
171 assert!(session.cancel_token.is_none());
172 assert!(session.ask_rx.is_none());
173 assert!(!session.is_cancelled());
174 }
175
176 #[test]
177 fn test_session_emit_sends_event() {
178 let (tx, mut rx) = mpsc::channel(100);
179 let session = SessionManager::new(tx);
180
181 let event = AgentEvent::session_started();
182 session.emit(event).unwrap();
183
184 let received = rx.try_recv();
185 assert!(received.is_ok(), "should receive emitted event");
186 }
187
188 #[test]
189 fn test_session_set_cancel_token() {
190 let (tx, _rx) = mpsc::channel(100);
191 let mut session = SessionManager::new(tx);
192
193 let token = CancellationToken::new();
194 session.set_cancel_token(token.clone());
195
196 assert!(session.cancel_token().is_some());
197 assert!(!session.is_cancelled());
198
199 token.cancel();
201 assert!(session.is_cancelled());
202 }
203
204 #[test]
205 fn test_session_set_ask_channel() {
206 let (tx, _rx) = mpsc::channel(100);
207 let (ask_tx, ask_rx) = mpsc::channel(10);
208 let mut session = SessionManager::new(tx);
209
210 session.set_ask_channel(ask_rx);
211
212 assert!(session.ask_rx().is_some());
213
214 ask_tx.try_send("test response".to_string()).unwrap();
216
217 let response = session.ask_rx().unwrap().try_recv();
218 assert!(response.is_ok(), "should receive ask response");
219 }
220
221 #[test]
222 fn test_session_event_sender_can_clone() {
223 let (tx, _rx) = mpsc::channel(100);
224 let session = SessionManager::new(tx);
225
226 let sender = session.event_sender();
227 sender.try_send(AgentEvent::session_started()).unwrap();
229 }
230
231 #[test]
232 fn test_session_clear_reset_cancellation() {
233 let (tx, _rx) = mpsc::channel(100);
234 let mut session = SessionManager::new(tx);
235
236 let token = CancellationToken::new();
238 session.set_cancel_token(token);
239
240 session.clear();
242
243 assert!(session.cancel_token().is_none());
245 assert!(!session.is_cancelled());
246 }
247}