nexus_async_net/rest/tokio/
pool.rs1use std::future::poll_fn;
9use std::pin::Pin;
10
11#[cfg(test)]
12use nexus_net::http::HTTP_HANDSHAKE_BUFFER;
13use nexus_net::http::ResponseReader;
14use nexus_net::rest::{RequestWriter, RestError};
15#[cfg(feature = "tls")]
16use nexus_net::tls::TlsConfig;
17use nexus_pool::local::{Pool, Pooled};
18use tokio::io::AsyncWrite;
19
20use super::connection::{HttpConnection, HttpConnectionBuilder};
21use crate::maybe_tls::MaybeTls;
22
23async fn graceful_shutdown(conn: &mut HttpConnection<MaybeTls>) {
28 let stream = conn.stream_mut();
29 let _ = poll_fn(|cx| Pin::new(&mut *stream).poll_shutdown(cx)).await;
30}
31
32pub struct ClientSlot {
56 pub writer: RequestWriter,
58 pub reader: ResponseReader,
60 pub conn: Option<HttpConnection<MaybeTls>>,
62}
63
64impl ClientSlot {
65 pub fn needs_reconnect(&self) -> bool {
67 self.conn.as_ref().is_none_or(HttpConnection::is_poisoned)
68 }
69
70 pub fn conn_and_reader(
76 &mut self,
77 ) -> Result<(&mut HttpConnection<MaybeTls>, &mut ResponseReader), RestError> {
78 let conn = self.conn.as_mut().ok_or(RestError::ConnectionPoisoned)?;
79 Ok((conn, &mut self.reader))
80 }
81}
82
83pub struct ClientPool {
117 pool: Pool<ClientSlot>,
118 reconnect_config: ReconnectConfig,
119}
120
121#[derive(Clone)]
122struct ReconnectConfig {
123 url: String,
124 #[cfg(feature = "tls")]
125 tls_config: Option<TlsConfig>,
126 nodelay: bool,
127 #[cfg(feature = "socket-opts")]
128 tcp_keepalive: Option<std::time::Duration>,
129 #[cfg(feature = "socket-opts")]
130 recv_buf_size: Option<usize>,
131 #[cfg(feature = "socket-opts")]
132 send_buf_size: Option<usize>,
133}
134
135#[allow(clippy::future_not_send)] impl ClientPool {
137 #[must_use]
139 pub fn builder() -> ClientPoolBuilder {
140 ClientPoolBuilder::new()
141 }
142
143 pub fn try_acquire(&self) -> Option<Pooled<ClientSlot>> {
154 loop {
155 let slot = self.pool.try_acquire()?;
156 if !slot.needs_reconnect() {
157 return Some(slot);
158 }
159 self.spawn_reconnect(slot);
163 }
165 }
166
167 pub async fn acquire(&self) -> Result<Pooled<ClientSlot>, RestError> {
175 const MAX_BACKOFF_MS: u64 = 1_000;
176 const MAX_ATTEMPTS: u32 = 20;
177 let mut backoff_ms = 1u64;
178
179 for _ in 0..MAX_ATTEMPTS {
180 if let Some(slot) = self.try_acquire() {
181 return Ok(slot);
182 }
183 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
186 backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
187 }
188
189 Err(RestError::ConnectionClosed(
190 "pool acquire timed out: no healthy slots available",
191 ))
192 }
193
194 pub fn available(&self) -> usize {
196 self.pool.available()
197 }
198
199 fn spawn_reconnect(&self, mut slot: Pooled<ClientSlot>) {
205 let config = self.reconnect_config.clone();
206 tokio::task::spawn_local(async move {
207 const MAX_BACKOFF_MS: u64 = 5_000;
208 let mut backoff_ms = 100u64;
209
210 loop {
211 if let Ok(conn) = Self::connect_one_with(&config).await {
212 slot.conn = Some(conn);
213 slot.reader.reset();
214 return;
215 }
216 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
217 backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
218 }
219 });
220 }
221
222 async fn connect_one_with(
223 config: &ReconnectConfig,
224 ) -> Result<HttpConnection<MaybeTls>, RestError> {
225 let mut builder = HttpConnectionBuilder::new();
226 #[cfg(feature = "tls")]
227 if let Some(ref tls) = config.tls_config {
228 builder = builder.tls(tls);
229 }
230 if config.nodelay {
231 builder = builder.disable_nagle();
232 }
233 #[cfg(feature = "socket-opts")]
234 {
235 if let Some(idle) = config.tcp_keepalive {
236 builder = builder.tcp_keepalive(idle);
237 }
238 if let Some(size) = config.recv_buf_size {
239 builder = builder.recv_buffer_size(size);
240 }
241 if let Some(size) = config.send_buf_size {
242 builder = builder.send_buffer_size(size);
243 }
244 }
245 builder.connect(&config.url).await
246 }
247}
248
249pub struct ClientPoolBuilder {
255 url: String,
256 base_path: String,
257 default_headers: Vec<(String, String)>,
258 connections: usize,
259 #[cfg(feature = "tls")]
260 tls_config: Option<TlsConfig>,
261 nodelay: bool,
262 #[cfg(feature = "socket-opts")]
263 tcp_keepalive: Option<std::time::Duration>,
264 #[cfg(feature = "socket-opts")]
265 recv_buf_size: Option<usize>,
266 #[cfg(feature = "socket-opts")]
267 send_buf_size: Option<usize>,
268 write_buffer_capacity: usize,
269 response_buffer_capacity: usize,
270 max_body_size: usize,
271}
272
273impl ClientPoolBuilder {
274 #[must_use]
278 pub fn new() -> Self {
279 Self {
280 url: String::new(),
281 base_path: String::new(),
282 default_headers: Vec::new(),
283 connections: 1,
284 #[cfg(feature = "tls")]
285 tls_config: None,
286 nodelay: false,
287 #[cfg(feature = "socket-opts")]
288 tcp_keepalive: None,
289 #[cfg(feature = "socket-opts")]
290 recv_buf_size: None,
291 #[cfg(feature = "socket-opts")]
292 send_buf_size: None,
293 write_buffer_capacity: 32 * 1024,
294 response_buffer_capacity: 32 * 1024,
295 max_body_size: 0,
296 }
297 }
298
299 #[must_use]
301 pub fn url(mut self, url: &str) -> Self {
302 self.url = url.to_string();
303 self
304 }
305
306 #[must_use]
308 pub fn base_path(mut self, path: &str) -> Self {
309 self.base_path = path.to_string();
310 self
311 }
312
313 pub fn default_header(mut self, name: &str, value: &str) -> Result<Self, RestError> {
315 if name.bytes().any(|b| b == b'\r' || b == b'\n')
316 || value.bytes().any(|b| b == b'\r' || b == b'\n')
317 {
318 return Err(RestError::CrlfInjection);
319 }
320 self.default_headers
321 .push((name.to_string(), value.to_string()));
322 Ok(self)
323 }
324
325 #[must_use]
327 pub fn connections(mut self, n: usize) -> Self {
328 self.connections = n;
329 self
330 }
331
332 #[must_use]
334 #[cfg(feature = "tls")]
335 pub fn tls(mut self, config: &TlsConfig) -> Self {
336 self.tls_config = Some(config.clone());
337 self
338 }
339
340 #[must_use]
342 pub fn disable_nagle(mut self) -> Self {
343 self.nodelay = true;
344 self
345 }
346
347 #[cfg(feature = "socket-opts")]
349 #[must_use]
350 pub fn tcp_keepalive(mut self, idle: std::time::Duration) -> Self {
351 self.tcp_keepalive = Some(idle);
352 self
353 }
354
355 #[cfg(feature = "socket-opts")]
357 #[must_use]
358 pub fn recv_buffer_size(mut self, n: usize) -> Self {
359 self.recv_buf_size = Some(n);
360 self
361 }
362
363 #[cfg(feature = "socket-opts")]
365 #[must_use]
366 pub fn send_buffer_size(mut self, n: usize) -> Self {
367 self.send_buf_size = Some(n);
368 self
369 }
370
371 #[must_use]
373 pub fn write_buffer_capacity(mut self, n: usize) -> Self {
374 self.write_buffer_capacity = n;
375 self
376 }
377
378 #[must_use]
380 pub fn response_buffer_capacity(mut self, n: usize) -> Self {
381 self.response_buffer_capacity = n;
382 self
383 }
384
385 #[must_use]
387 pub fn max_body_size(mut self, n: usize) -> Self {
388 self.max_body_size = n;
389 self
390 }
391
392 pub async fn build(self) -> Result<ClientPool, RestError> {
394 if self.url.is_empty() {
395 return Err(RestError::InvalidUrl("url is required".to_string()));
396 }
397 if self.connections == 0 {
398 return Err(RestError::InvalidUrl("connections must be > 0".to_string()));
399 }
400
401 let parsed = nexus_net::rest::parse_base_url(&self.url)?;
402 let host_header = parsed.host_header();
403
404 let reconnect_config = ReconnectConfig {
405 url: self.url.clone(),
406 #[cfg(feature = "tls")]
407 tls_config: self.tls_config.clone(),
408 nodelay: self.nodelay,
409 #[cfg(feature = "socket-opts")]
410 tcp_keepalive: self.tcp_keepalive,
411 #[cfg(feature = "socket-opts")]
412 recv_buf_size: self.recv_buf_size,
413 #[cfg(feature = "socket-opts")]
414 send_buf_size: self.send_buf_size,
415 };
416
417 let mut initial_slots: Vec<ClientSlot> = Vec::with_capacity(self.connections);
422 for _ in 0..self.connections {
423 let slot_result: Result<ClientSlot, RestError> = async {
424 let mut builder = HttpConnectionBuilder::new();
425 #[cfg(feature = "tls")]
426 if let Some(ref tls) = self.tls_config {
427 builder = builder.tls(tls);
428 }
429 if self.nodelay {
430 builder = builder.disable_nagle();
431 }
432 #[cfg(feature = "socket-opts")]
433 {
434 if let Some(idle) = self.tcp_keepalive {
435 builder = builder.tcp_keepalive(idle);
436 }
437 if let Some(size) = self.recv_buf_size {
438 builder = builder.recv_buffer_size(size);
439 }
440 if let Some(size) = self.send_buf_size {
441 builder = builder.send_buffer_size(size);
442 }
443 }
444 let conn = builder.connect(&self.url).await?;
445
446 let mut writer = RequestWriter::new(&host_header)?;
447 if !self.base_path.is_empty() {
448 writer.set_base_path(&self.base_path)?;
449 }
450 writer.set_write_buffer_capacity(self.write_buffer_capacity);
451 for (name, value) in &self.default_headers {
452 writer.default_header(name, value)?;
453 }
454
455 let reader = ResponseReader::new(self.response_buffer_capacity)
456 .max_body_size(self.max_body_size);
457
458 Ok(ClientSlot {
459 writer,
460 reader,
461 conn: Some(conn),
462 })
463 }
464 .await;
465
466 match slot_result {
467 Ok(slot) => initial_slots.push(slot),
468 Err(e) => {
469 for slot in &mut initial_slots {
470 if let Some(ref mut c) = slot.conn {
471 graceful_shutdown(c).await;
472 }
473 }
474 return Err(e);
475 }
476 }
477 }
478
479 let host = host_header.clone();
481 let base = self.base_path.clone();
482 let headers = self.default_headers.clone();
483 let wbuf_cap = self.write_buffer_capacity;
484 let rbuf_cap = self.response_buffer_capacity;
485 let max_body = self.max_body_size;
486
487 let pool = Pool::new(
488 move || {
489 let mut writer = RequestWriter::new(&host).expect("host already validated");
490 if !base.is_empty() {
491 writer
492 .set_base_path(&base)
493 .expect("base_path already validated");
494 }
495 writer.set_write_buffer_capacity(wbuf_cap);
496 for (name, value) in &headers {
497 writer
498 .default_header(name, value)
499 .expect("headers already validated");
500 }
501 ClientSlot {
502 writer,
503 reader: ResponseReader::new(rbuf_cap).max_body_size(max_body),
504 conn: None,
505 }
506 },
507 |slot: &mut ClientSlot| {
508 if slot.needs_reconnect() {
509 slot.conn = None;
510 slot.reader.reset();
513 }
514 },
515 );
516
517 for slot in initial_slots {
519 pool.put(slot);
520 }
521
522 Ok(ClientPool {
523 pool,
524 reconnect_config,
525 })
526 }
527}
528
529impl Default for ClientPoolBuilder {
530 fn default() -> Self {
531 Self::new()
532 }
533}
534
535#[cfg(test)]
540mod tests {
541 use super::*;
542
543 fn make_disconnected_slot() -> ClientSlot {
544 ClientSlot {
545 writer: RequestWriter::new("host").unwrap(),
546 reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
547 conn: None,
548 }
549 }
550
551 #[test]
552 fn slot_needs_reconnect_when_no_conn() {
553 let slot = make_disconnected_slot();
554 assert!(slot.needs_reconnect());
555 }
556
557 #[test]
558 fn pool_acquire_release_cycle() {
559 let pool = Pool::new(make_disconnected_slot, |_| {});
560 pool.put(make_disconnected_slot());
561
562 assert_eq!(pool.available(), 1);
563
564 let slot = pool.acquire();
565 assert_eq!(pool.available(), 0);
566
567 drop(slot);
568 assert_eq!(pool.available(), 1);
569 }
570
571 #[test]
572 fn pool_acquire_returns_available() {
573 let pool: Pool<ClientSlot> = Pool::new(make_disconnected_slot, |_| {});
574 pool.put(make_disconnected_slot());
575 pool.put(make_disconnected_slot());
576
577 assert_eq!(pool.available(), 2);
578 let _s1 = pool.acquire();
579 assert_eq!(pool.available(), 1);
580 let _s2 = pool.acquire();
581 assert_eq!(pool.available(), 0);
582 }
583
584 #[test]
585 fn pool_reset_clears_dead_conn() {
586 let pool = Pool::new(make_disconnected_slot, |slot| {
587 if slot.needs_reconnect() {
588 slot.conn = None;
589 }
590 });
591 pool.put(make_disconnected_slot());
592
593 let slot = pool.acquire();
594 assert!(slot.conn.is_none());
595 assert!(slot.needs_reconnect());
596 drop(slot);
597
598 let slot = pool.acquire();
600 assert!(slot.conn.is_none());
601 }
602
603 #[test]
604 fn pool_multiple_slots() {
605 let pool = Pool::new(make_disconnected_slot, |_| {});
606 for _ in 0..4 {
607 pool.put(make_disconnected_slot());
608 }
609 assert_eq!(pool.available(), 4);
610
611 let s1 = pool.acquire();
612 let s2 = pool.acquire();
613 assert_eq!(pool.available(), 2);
614
615 drop(s1);
616 assert_eq!(pool.available(), 3);
617 drop(s2);
618 assert_eq!(pool.available(), 4);
619 }
620
621 #[test]
622 fn try_acquire_returns_none_when_all_in_use() {
623 let pool = Pool::new(make_disconnected_slot, |_| {});
624 pool.put(make_disconnected_slot());
625 pool.put(make_disconnected_slot());
626
627 let s1 = pool.try_acquire().unwrap();
628 let s2 = pool.try_acquire().unwrap();
629 assert!(pool.try_acquire().is_none());
631 assert_eq!(pool.available(), 0);
632
633 drop(s1);
635 drop(s2);
636 assert_eq!(pool.available(), 2);
637 assert!(pool.try_acquire().is_some());
638 }
639
640 #[test]
641 fn try_acquire_returns_some_after_slot_released() {
642 let pool = Pool::new(make_disconnected_slot, |_| {});
645 pool.put(make_disconnected_slot());
646
647 let held = pool.try_acquire().unwrap();
648 assert!(pool.try_acquire().is_none());
649
650 drop(held);
651 assert!(pool.try_acquire().is_some());
652 }
653
654 #[tokio::test(flavor = "current_thread")]
657 async fn pool_four_connections_all_succeed() {
658 use tokio::io::{AsyncReadExt, AsyncWriteExt};
659 use tokio::net::TcpListener;
660
661 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
662 let addr = listener.local_addr().unwrap();
663
664 tokio::spawn(async move {
666 for _ in 0..4 {
667 let (mut tcp, _) = listener.accept().await.unwrap();
668 let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
669 let _ = tcp.read(&mut buf).await.unwrap();
670 let resp = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok";
671 tcp.write_all(resp).await.unwrap();
672 }
673 });
674
675 let pool = Pool::new(make_disconnected_slot, |slot| {
676 if slot.needs_reconnect() {
677 slot.conn = None;
678 slot.reader.reset();
679 }
680 });
681
682 let mut success_count = 0u8;
685 for _ in 0..4u8 {
686 let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
687 tcp.set_nodelay(true).unwrap();
688 let stream = MaybeTls::Plain(tcp);
689 let conn = HttpConnection::new(stream);
690 pool.put(ClientSlot {
691 writer: RequestWriter::new(&addr.to_string()).unwrap(),
692 reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
693 conn: Some(conn),
694 });
695
696 let mut slot = pool.try_acquire().unwrap();
697 let s: &mut ClientSlot = &mut slot;
698 let req = s.writer.get("/test").finish().unwrap();
699 let conn = s.conn.as_mut().unwrap();
700 let resp = conn.send(req, &mut s.reader).await.unwrap();
701 assert_eq!(resp.status(), 200);
702 assert_eq!(resp.body_str().unwrap(), "ok");
703 success_count += 1;
704 }
705
706 assert_eq!(success_count, 4);
707 }
708
709 #[tokio::test(flavor = "current_thread")]
712 async fn pool_loopback_send() {
713 use tokio::io::{AsyncReadExt, AsyncWriteExt};
714 use tokio::net::TcpListener;
715
716 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
717 let addr = listener.local_addr().unwrap();
718
719 tokio::spawn(async move {
721 let (mut tcp, _) = listener.accept().await.unwrap();
722 let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
723 let _ = tcp.read(&mut buf).await.unwrap();
724 let resp = b"HTTP/1.1 200 OK\r\nContent-Length: 15\r\n\r\n{\"orderId\":123}";
725 tcp.write_all(resp).await.unwrap();
726 });
727
728 let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
730 let stream = MaybeTls::Plain(tcp);
731 let conn = HttpConnection::new(stream);
732
733 let mut slot = ClientSlot {
734 writer: RequestWriter::new(&addr.to_string()).unwrap(),
735 reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
736 conn: Some(conn),
737 };
738
739 let req = slot.writer.get("/test").finish().unwrap();
740 let conn = slot.conn.as_mut().unwrap();
741 let resp = conn.send(req, &mut slot.reader).await.unwrap();
742 assert_eq!(resp.status(), 200);
743 assert_eq!(resp.body_str().unwrap(), r#"{"orderId":123}"#);
744 }
745
746 #[test]
747 fn try_acquire_returns_none_when_exhausted() {
748 let pool = Pool::new(make_disconnected_slot, |_| {});
749 pool.put(make_disconnected_slot());
750
751 let _s1 = pool.try_acquire().unwrap();
752 assert!(pool.try_acquire().is_none());
753 }
754
755 #[tokio::test(flavor = "current_thread")]
756 async fn pool_keep_alive_multiple_requests() {
757 use tokio::io::{AsyncReadExt, AsyncWriteExt};
758 use tokio::net::TcpListener;
759
760 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
761 let addr = listener.local_addr().unwrap();
762
763 tokio::spawn(async move {
765 let (mut tcp, _) = listener.accept().await.unwrap();
766 let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
767
768 let _ = tcp.read(&mut buf).await.unwrap();
770 tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\n{\"r\":1}")
771 .await
772 .unwrap();
773
774 let _ = tcp.read(&mut buf).await.unwrap();
776 tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\n{\"r\":2}")
777 .await
778 .unwrap();
779 });
780
781 let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
782 tcp.set_nodelay(true).unwrap();
783 let stream = MaybeTls::Plain(tcp);
784 let conn = HttpConnection::new(stream);
785
786 let pool = Pool::new(make_disconnected_slot, |slot| {
787 if slot.needs_reconnect() {
788 slot.conn = None;
789 slot.reader.reset();
790 }
791 });
792 pool.put(ClientSlot {
793 writer: RequestWriter::new(&addr.to_string()).unwrap(),
794 reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
795 conn: Some(conn),
796 });
797
798 {
800 let mut slot = pool.acquire();
801 let s: &mut ClientSlot = &mut slot;
802 let req = s.writer.get("/first").finish().unwrap();
803 let conn = s.conn.as_mut().unwrap();
804 let resp = conn.send(req, &mut s.reader).await.unwrap();
805 assert_eq!(resp.body_str().unwrap(), r#"{"r":1}"#);
806 } {
810 let mut slot = pool.acquire();
811 let s: &mut ClientSlot = &mut slot;
812 let req = s.writer.get("/second").finish().unwrap();
813 let conn = s.conn.as_mut().unwrap();
814 let resp = conn.send(req, &mut s.reader).await.unwrap();
815 assert_eq!(resp.body_str().unwrap(), r#"{"r":2}"#);
816 }
817 }
818
819 #[tokio::test(flavor = "current_thread")]
820 async fn builder_validates_empty_url() {
821 let result = ClientPool::builder().connections(1).build().await;
822 assert!(result.is_err());
823 }
824
825 #[tokio::test(flavor = "current_thread")]
826 async fn builder_validates_zero_connections() {
827 let result = ClientPool::builder()
828 .url("http://localhost")
829 .connections(0)
830 .build()
831 .await;
832 assert!(result.is_err());
833 }
834
835 #[tokio::test(flavor = "current_thread")]
839 async fn stale_connection_timeout_not_hang() {
840 use tokio::io::{AsyncReadExt, AsyncWriteExt};
841 use tokio::net::TcpListener;
842
843 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
844 let addr = listener.local_addr().unwrap();
845
846 tokio::spawn(async move {
848 let (mut tcp, _) = listener.accept().await.unwrap();
849 let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
850
851 let _ = tcp.read(&mut buf).await.unwrap();
853 tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok")
854 .await
855 .unwrap();
856
857 drop(tcp);
859 });
860
861 let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
862 tcp.set_nodelay(true).unwrap();
863 let stream = MaybeTls::Plain(tcp);
864 let conn = HttpConnection::new(stream);
865
866 let pool = Pool::new(make_disconnected_slot, |slot| {
867 if slot.needs_reconnect() {
868 slot.conn = None;
869 slot.reader.reset();
870 }
871 });
872 pool.put(ClientSlot {
873 writer: RequestWriter::new(&addr.to_string()).unwrap(),
874 reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
875 conn: Some(conn),
876 });
877
878 {
880 let mut slot = pool.acquire();
881 let s: &mut ClientSlot = &mut slot;
882 let req = s.writer.get("/first").finish().unwrap();
883 let conn = s.conn.as_mut().unwrap();
884 let resp = conn.send(req, &mut s.reader).await.unwrap();
885 assert_eq!(resp.status(), 200);
886 }
887
888 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
890
891 {
894 let mut slot = pool.acquire();
895 let s: &mut ClientSlot = &mut slot;
896 let req = s.writer.get("/second").finish().unwrap();
897 let conn = s.conn.as_mut().unwrap();
898
899 let timeout = std::time::Duration::from_millis(500);
900 let result = tokio::time::timeout(timeout, conn.send(req, &mut s.reader)).await;
901
902 match result {
903 Ok(Ok(_)) => panic!("stale connection should not succeed"),
904 Ok(Err(_)) => {} Err(_elapsed) => {
906 }
909 }
910
911 assert!(
913 s.needs_reconnect(),
914 "slot should be poisoned after stale connection"
915 );
916 }
917 }
918
919 #[tokio::test(flavor = "current_thread")]
930 #[allow(clippy::large_futures)]
931 async fn dead_connection_heals_via_reconnect_task() {
932 use tokio::io::{AsyncReadExt, AsyncWriteExt};
933 use tokio::net::TcpListener;
934
935 let local = tokio::task::LocalSet::new();
936 local
937 .run_until(async {
938 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
940 let addr = listener.local_addr().unwrap();
941
942 let server_handle = tokio::task::spawn_local({
943 let listener_fd = listener.into_std().unwrap();
944 async move {
945 let listener = TcpListener::from_std(listener_fd).unwrap();
946 let (mut tcp, _) = listener.accept().await.unwrap();
948 let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
949 let _ = tcp.read(&mut buf).await.unwrap();
950 tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nfirst")
951 .await
952 .unwrap();
953 drop(tcp); let (mut tcp, _) = listener.accept().await.unwrap();
957 let _ = tcp.read(&mut buf).await.unwrap();
958 tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhealed")
959 .await
960 .unwrap();
961 }
962 });
963
964 let pool = ClientPool::builder()
965 .url(&format!("http://{addr}"))
966 .connections(1)
967 .build()
968 .await
969 .unwrap();
970
971 {
973 let mut slot = pool.try_acquire().unwrap();
974 let s: &mut ClientSlot = &mut slot;
975 let req = s.writer.get("/first").finish().unwrap();
976 let conn = s.conn.as_mut().unwrap();
977 let resp = conn.send(req, &mut s.reader).await.unwrap();
978 assert_eq!(resp.body_str().unwrap(), "first");
979 }
980
981 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
983 {
984 let mut slot = pool.try_acquire().unwrap();
985 let s: &mut ClientSlot = &mut slot;
986 let req = s.writer.get("/dead").finish().unwrap();
987 let conn = s.conn.as_mut().unwrap();
988 let timeout = std::time::Duration::from_millis(500);
989 let _ = tokio::time::timeout(timeout, conn.send(req, &mut s.reader)).await;
990 assert!(s.needs_reconnect(), "should be poisoned");
991 }
992 let mut slot = pool.acquire().await.unwrap();
998 let s: &mut ClientSlot = &mut slot;
999 let req = s.writer.get("/healed").finish().unwrap();
1000 let conn = s.conn.as_mut().unwrap();
1001 let resp = conn.send(req, &mut s.reader).await.unwrap();
1002 assert_eq!(resp.body_str().unwrap(), "healed");
1003
1004 server_handle.await.unwrap();
1005 })
1006 .await;
1007 }
1008}