a2a_protocol_server/handler/
shutdown.rs1use std::time::Duration;
9
10#[cfg(test)]
11use std::time::Instant;
12
13use super::RequestHandler;
14
15impl RequestHandler {
16 pub async fn shutdown(&self) {
26 {
28 let tokens = self.cancellation_tokens.read().await;
29 for entry in tokens.values() {
30 entry.token.cancel();
31 }
32 }
33
34 self.event_queue_manager.destroy_all().await;
36
37 {
39 let mut tokens = self.cancellation_tokens.write().await;
40 tokens.clear();
41 }
42
43 let _ = tokio::time::timeout(Duration::from_secs(10), self.executor.on_shutdown()).await;
45 }
46
47 pub async fn shutdown_with_timeout(&self, timeout: Duration) {
53 {
55 let tokens = self.cancellation_tokens.read().await;
56 for entry in tokens.values() {
57 entry.token.cancel();
58 }
59 }
60
61 let drain_deadline = tokio::time::Instant::now() + timeout;
63 loop {
64 let active = self.event_queue_manager.active_count().await;
65 if active == 0 {
66 break;
67 }
68 if tokio::time::Instant::now() >= drain_deadline {
69 trace_warn!(
70 active_queues = active,
71 "shutdown timeout reached, force-destroying remaining queues"
72 );
73 break;
74 }
75 let remaining = drain_deadline - tokio::time::Instant::now();
77 tokio::time::sleep(remaining.min(tokio::time::Duration::from_millis(10))).await;
78 }
79
80 self.event_queue_manager.destroy_all().await;
82
83 {
85 let mut tokens = self.cancellation_tokens.write().await;
86 tokens.clear();
87 }
88
89 let _ = tokio::time::timeout(timeout, self.executor.on_shutdown()).await;
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98
99 use a2a_protocol_types::error::A2aResult;
100 use std::future::Future;
101 use std::pin::Pin;
102
103 use crate::builder::RequestHandlerBuilder;
104 use crate::executor::AgentExecutor;
105 use crate::request_context::RequestContext;
106 use crate::streaming::EventQueueWriter;
107
108 struct NoopExecutor;
110
111 impl AgentExecutor for NoopExecutor {
112 fn execute<'a>(
113 &'a self,
114 _ctx: &'a RequestContext,
115 _queue: &'a dyn EventQueueWriter,
116 ) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
117 Box::pin(async { Ok(()) })
118 }
119 }
120
121 fn make_handler() -> RequestHandler {
123 RequestHandlerBuilder::new(NoopExecutor)
124 .build()
125 .expect("builder should succeed with defaults")
126 }
127
128 #[tokio::test]
131 async fn shutdown_completes_without_panic() {
132 let handler = make_handler();
133 handler.shutdown().await;
135 }
136
137 #[tokio::test]
138 async fn shutdown_is_idempotent() {
139 let handler = make_handler();
140 handler.shutdown().await;
141 handler.shutdown().await;
143 }
144
145 #[tokio::test]
146 async fn shutdown_clears_cancellation_tokens() {
147 let handler = make_handler();
148
149 {
151 let mut tokens = handler.cancellation_tokens.write().await;
152 tokens.insert(
153 a2a_protocol_types::task::TaskId::new("t-1"),
154 super::super::CancellationEntry {
155 token: tokio_util::sync::CancellationToken::new(),
156 created_at: Instant::now(),
157 },
158 );
159 }
160 assert_eq!(
161 handler.cancellation_tokens.read().await.len(),
162 1,
163 "should have 1 token before shutdown"
164 );
165
166 handler.shutdown().await;
167
168 assert!(
169 handler.cancellation_tokens.read().await.is_empty(),
170 "cancellation tokens should be cleared after shutdown"
171 );
172 }
173
174 #[tokio::test]
177 async fn shutdown_with_timeout_completes_within_timeout() {
178 let handler = make_handler();
179 let start = Instant::now();
180 handler.shutdown_with_timeout(Duration::from_secs(5)).await;
181 assert!(
182 start.elapsed() < Duration::from_secs(5),
183 "shutdown with no active queues should complete well before the timeout"
184 );
185 }
186
187 #[tokio::test]
188 async fn shutdown_with_timeout_clears_cancellation_tokens() {
189 let handler = make_handler();
190
191 {
192 let mut tokens = handler.cancellation_tokens.write().await;
193 tokens.insert(
194 a2a_protocol_types::task::TaskId::new("t-2"),
195 super::super::CancellationEntry {
196 token: tokio_util::sync::CancellationToken::new(),
197 created_at: Instant::now(),
198 },
199 );
200 }
201
202 handler
203 .shutdown_with_timeout(Duration::from_millis(200))
204 .await;
205
206 assert!(
207 handler.cancellation_tokens.read().await.is_empty(),
208 "cancellation tokens should be cleared after shutdown_with_timeout"
209 );
210 }
211
212 #[tokio::test]
213 async fn shutdown_with_timeout_cancels_tokens() {
214 let handler = make_handler();
215 let token = tokio_util::sync::CancellationToken::new();
216 let token_clone = token.clone();
217
218 {
219 let mut tokens = handler.cancellation_tokens.write().await;
220 tokens.insert(
221 a2a_protocol_types::task::TaskId::new("t-3"),
222 super::super::CancellationEntry {
223 token: token_clone,
224 created_at: Instant::now(),
225 },
226 );
227 }
228
229 handler
230 .shutdown_with_timeout(Duration::from_millis(200))
231 .await;
232
233 assert!(
234 token.is_cancelled(),
235 "cancellation token should be cancelled after shutdown"
236 );
237 }
238
239 #[tokio::test]
240 async fn shutdown_with_zero_timeout_still_completes() {
241 let handler = make_handler();
242 handler
244 .shutdown_with_timeout(Duration::from_millis(0))
245 .await;
246 }
247
248 #[tokio::test]
249 async fn shutdown_with_timeout_drains_active_queues() {
250 use a2a_protocol_types::task::TaskId;
253
254 let handler = make_handler();
255 let task_id = TaskId::new("t-drain");
256
257 let (_writer, _reader) = handler.event_queue_manager.get_or_create(&task_id).await;
259 assert_eq!(
260 handler.event_queue_manager.active_count().await,
261 1,
262 "should have 1 active queue before shutdown"
263 );
264
265 let eqm = handler.event_queue_manager.clone();
268 let tid = task_id.clone();
269 tokio::spawn(async move {
270 tokio::time::sleep(Duration::from_millis(50)).await;
271 eqm.destroy(&tid).await;
272 });
273
274 let start = Instant::now();
275 handler.shutdown_with_timeout(Duration::from_secs(5)).await;
276 assert!(
279 start.elapsed() < Duration::from_secs(2),
280 "shutdown should complete quickly once queues drain"
281 );
282 }
283
284 #[tokio::test]
285 async fn shutdown_with_timeout_force_destroys_on_timeout() {
286 use a2a_protocol_types::task::TaskId;
289
290 let handler = make_handler();
291 let task_id = TaskId::new("t-force");
292
293 let (_writer, _reader) = handler.event_queue_manager.get_or_create(&task_id).await;
295 assert_eq!(
296 handler.event_queue_manager.active_count().await,
297 1,
298 "should have 1 active queue before shutdown"
299 );
300
301 let start = Instant::now();
303 handler
304 .shutdown_with_timeout(Duration::from_millis(100))
305 .await;
306
307 assert!(
309 start.elapsed() >= Duration::from_millis(100),
310 "shutdown should wait at least the timeout duration"
311 );
312 assert_eq!(
314 handler.event_queue_manager.active_count().await,
315 0,
316 "all queues should be destroyed after shutdown timeout"
317 );
318 }
319}