Skip to main content

nexus_async_net/rest/tokio/
pool.rs

1//! Single-threaded async HTTP client pool.
2//!
3//! Uses [`nexus_pool::local::Pool`] for LIFO acquire/release with RAII guards.
4//! Inline reconnect on acquire when a connection dies.
5//!
6//! For `current_thread` runtime + `LocalSet`.
7
8use std::future::poll_fn;
9use std::pin::Pin;
10
11#[cfg(test)]
12use nexus_net::http::HTTP_HANDSHAKE_BUFFER;
13use nexus_net::http::ResponseReader;
14use nexus_net::rest::{RequestWriter, RestError};
15#[cfg(feature = "tls")]
16use nexus_net::tls::TlsConfig;
17use nexus_pool::local::{Pool, Pooled};
18use tokio::io::AsyncWrite;
19
20use super::connection::{HttpConnection, HttpConnectionBuilder};
21use crate::maybe_tls::MaybeTls;
22
23/// Drive `poll_shutdown` on a healthy connection during pool-build
24/// cleanup when a later slot fails — sends TLS `close_notify` + TCP
25/// FIN gracefully so the peer doesn't see a truncation alert. Errors
26/// are ignored: best-effort cleanup on an already-failing path.
27async fn graceful_shutdown(conn: &mut HttpConnection<MaybeTls>) {
28    let stream = conn.stream_mut();
29    let _ = poll_fn(|cx| Pin::new(&mut *stream).poll_shutdown(cx)).await;
30}
31
32// =============================================================================
33// ClientSlot — the item stored in the pool
34// =============================================================================
35
36/// A complete request/response pipeline: writer + reader + transport.
37///
38/// Each slot in the pool owns its own set of protocol primitives.
39/// Acquired via [`ClientPool::acquire`], auto-returned on drop.
40///
41/// Fields are public for split borrows through `Pooled<T>`'s `DerefMut`.
42/// Deref explicitly to split, then build + send:
43///
44/// ```ignore
45/// let s: &mut ClientSlot = &mut slot;  // explicit deref, enables split borrows
46/// let req = s.writer.post("/order").body(json).finish()?;
47/// let (conn, reader) = (s.conn.as_mut().unwrap(), &mut s.reader);
48///
49/// // With timeout (recommended for production):
50/// let resp = tokio::time::timeout(timeout, conn.send(req, reader)).await??;
51///
52/// // Without timeout (prototyping only):
53/// let resp = conn.send(req, reader).await?;
54/// ```
55pub struct ClientSlot {
56    /// Request encoder (sans-IO). Build requests here.
57    pub writer: RequestWriter,
58    /// Response parser. Fed by the connection during send.
59    pub reader: ResponseReader,
60    /// Transport. `None` if connection died and needs reconnect.
61    pub conn: Option<HttpConnection<MaybeTls>>,
62}
63
64impl ClientSlot {
65    /// Whether the connection is dead and needs reconnect.
66    pub fn needs_reconnect(&self) -> bool {
67        self.conn.as_ref().is_none_or(HttpConnection::is_poisoned)
68    }
69
70    /// Split borrow: get mutable references to conn + reader
71    /// while writer is borrowed by a `Request<'_>`.
72    ///
73    /// This exists because `Pooled<ClientSlot>` goes through `DerefMut`
74    /// which prevents the compiler from seeing disjoint field borrows.
75    pub fn conn_and_reader(
76        &mut self,
77    ) -> Result<(&mut HttpConnection<MaybeTls>, &mut ResponseReader), RestError> {
78        let conn = self.conn.as_mut().ok_or(RestError::ConnectionPoisoned)?;
79        Ok((conn, &mut self.reader))
80    }
81}
82
83// =============================================================================
84// ClientPool
85// =============================================================================
86
87/// Single-threaded async HTTP client pool.
88///
89/// Pre-allocated slots with LIFO acquire for cache locality. Each slot
90/// owns a [`RequestWriter`], [`ResponseReader`], and
91/// [`HttpConnection`].
92///
93/// # Usage
94///
95/// ```ignore
96/// let pool = ClientPool::builder()
97///     .url("https://api.binance.com")
98///     .base_path("/api/v3")
99///     .default_header("X-API-KEY", &key)?
100///     .connections(4)
101///     .tls(&tls)
102///     .build()
103///     .await?;
104///
105/// // Fast path (trading) — no reconnect, no wait
106/// let mut slot = pool.try_acquire().unwrap();
107/// // Patient path (background) — waits, reconnects with backoff
108/// let mut slot = pool.acquire().await?;
109///
110/// let s: &mut ClientSlot = &mut slot;
111/// let req = s.writer.post("/order").body(json).finish()?;
112/// let conn = s.conn.as_mut().unwrap();
113/// let resp = conn.send(req, &mut s.reader).await?;
114/// // drop(slot) returns to pool
115/// ```
116pub struct ClientPool {
117    pool: Pool<ClientSlot>,
118    reconnect_config: ReconnectConfig,
119}
120
121#[derive(Clone)]
122struct ReconnectConfig {
123    url: String,
124    #[cfg(feature = "tls")]
125    tls_config: Option<TlsConfig>,
126    nodelay: bool,
127    #[cfg(feature = "socket-opts")]
128    tcp_keepalive: Option<std::time::Duration>,
129    #[cfg(feature = "socket-opts")]
130    recv_buf_size: Option<usize>,
131    #[cfg(feature = "socket-opts")]
132    send_buf_size: Option<usize>,
133}
134
135#[allow(clippy::future_not_send)] // Intentionally !Send — single-threaded pool for LocalSet.
136impl ClientPool {
137    /// Create a builder.
138    #[must_use]
139    pub fn builder() -> ClientPoolBuilder {
140        ClientPoolBuilder::new()
141    }
142
143    /// Try to acquire a healthy client slot (LIFO).
144    ///
145    /// Checks available slots for a healthy connection. Dead slots are
146    /// ejected from the pool and a reconnect task is spawned for each.
147    /// When reconnection succeeds, the slot returns to the pool
148    /// automatically.
149    ///
150    /// Returns `None` if all slots are in use or currently reconnecting.
151    ///
152    /// This is the trading hot path — O(1) when the top slot is healthy.
153    pub fn try_acquire(&self) -> Option<Pooled<ClientSlot>> {
154        loop {
155            let slot = self.pool.try_acquire()?;
156            if !slot.needs_reconnect() {
157                return Some(slot);
158            }
159            // Spawn a task to heal this slot. The task owns the Pooled
160            // guard — when it reconnects, dropping the guard returns
161            // the healthy slot to the pool.
162            self.spawn_reconnect(slot);
163            // Try next slot.
164        }
165    }
166
167    /// Acquire a client slot, waiting until one is available.
168    ///
169    /// If no healthy slots are available, waits for reconnect tasks
170    /// to finish healing dead connections. Returns error if no slot
171    /// becomes available within the retry limit.
172    ///
173    /// This is the off-hot-path API for background tasks and REST calls.
174    pub async fn acquire(&self) -> Result<Pooled<ClientSlot>, RestError> {
175        const MAX_BACKOFF_MS: u64 = 1_000;
176        const MAX_ATTEMPTS: u32 = 20;
177        let mut backoff_ms = 1u64;
178
179        for _ in 0..MAX_ATTEMPTS {
180            if let Some(slot) = self.try_acquire() {
181                return Ok(slot);
182            }
183            // All slots are either in use or being reconnected.
184            // Wait for a reconnect task to finish and return a slot.
185            tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
186            backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
187        }
188
189        Err(RestError::ConnectionClosed(
190            "pool acquire timed out: no healthy slots available",
191        ))
192    }
193
194    /// Number of slots currently available (not acquired).
195    pub fn available(&self) -> usize {
196        self.pool.available()
197    }
198
199    /// Spawn a local task to reconnect a dead slot.
200    ///
201    /// The task owns the `Pooled` guard. On successful reconnect, the
202    /// guard drops and returns the healthy slot to the pool. On failure,
203    /// retries with exponential backoff.
204    fn spawn_reconnect(&self, mut slot: Pooled<ClientSlot>) {
205        let config = self.reconnect_config.clone();
206        tokio::task::spawn_local(async move {
207            const MAX_BACKOFF_MS: u64 = 5_000;
208            let mut backoff_ms = 100u64;
209
210            loop {
211                if let Ok(conn) = Self::connect_one_with(&config).await {
212                    slot.conn = Some(conn);
213                    slot.reader.reset();
214                    return;
215                }
216                tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
217                backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
218            }
219        });
220    }
221
222    async fn connect_one_with(
223        config: &ReconnectConfig,
224    ) -> Result<HttpConnection<MaybeTls>, RestError> {
225        let mut builder = HttpConnectionBuilder::new();
226        #[cfg(feature = "tls")]
227        if let Some(ref tls) = config.tls_config {
228            builder = builder.tls(tls);
229        }
230        if config.nodelay {
231            builder = builder.disable_nagle();
232        }
233        #[cfg(feature = "socket-opts")]
234        {
235            if let Some(idle) = config.tcp_keepalive {
236                builder = builder.tcp_keepalive(idle);
237            }
238            if let Some(size) = config.recv_buf_size {
239                builder = builder.recv_buffer_size(size);
240            }
241            if let Some(size) = config.send_buf_size {
242                builder = builder.send_buffer_size(size);
243            }
244        }
245        builder.connect(&config.url).await
246    }
247}
248
249// =============================================================================
250// Builder
251// =============================================================================
252
253/// Builder for [`ClientPool`].
254pub struct ClientPoolBuilder {
255    url: String,
256    base_path: String,
257    default_headers: Vec<(String, String)>,
258    connections: usize,
259    #[cfg(feature = "tls")]
260    tls_config: Option<TlsConfig>,
261    nodelay: bool,
262    #[cfg(feature = "socket-opts")]
263    tcp_keepalive: Option<std::time::Duration>,
264    #[cfg(feature = "socket-opts")]
265    recv_buf_size: Option<usize>,
266    #[cfg(feature = "socket-opts")]
267    send_buf_size: Option<usize>,
268    write_buffer_capacity: usize,
269    response_buffer_capacity: usize,
270    max_body_size: usize,
271}
272
273impl ClientPoolBuilder {
274    /// Create a builder with default settings: empty URL/base path, no
275    /// default headers, one connection, TCP nodelay off, default buffer
276    /// capacities.
277    #[must_use]
278    pub fn new() -> Self {
279        Self {
280            url: String::new(),
281            base_path: String::new(),
282            default_headers: Vec::new(),
283            connections: 1,
284            #[cfg(feature = "tls")]
285            tls_config: None,
286            nodelay: false,
287            #[cfg(feature = "socket-opts")]
288            tcp_keepalive: None,
289            #[cfg(feature = "socket-opts")]
290            recv_buf_size: None,
291            #[cfg(feature = "socket-opts")]
292            send_buf_size: None,
293            write_buffer_capacity: 32 * 1024,
294            response_buffer_capacity: 32 * 1024,
295            max_body_size: 0,
296        }
297    }
298
299    /// Target URL (scheme + host + optional port + optional path).
300    #[must_use]
301    pub fn url(mut self, url: &str) -> Self {
302        self.url = url.to_string();
303        self
304    }
305
306    /// Base path prefix for all requests.
307    #[must_use]
308    pub fn base_path(mut self, path: &str) -> Self {
309        self.base_path = path.to_string();
310        self
311    }
312
313    /// Add a default header sent with every request.
314    pub fn default_header(mut self, name: &str, value: &str) -> Result<Self, RestError> {
315        if name.bytes().any(|b| b == b'\r' || b == b'\n')
316            || value.bytes().any(|b| b == b'\r' || b == b'\n')
317        {
318            return Err(RestError::CrlfInjection);
319        }
320        self.default_headers
321            .push((name.to_string(), value.to_string()));
322        Ok(self)
323    }
324
325    /// Number of pre-allocated connections. Default: 1.
326    #[must_use]
327    pub fn connections(mut self, n: usize) -> Self {
328        self.connections = n;
329        self
330    }
331
332    /// Custom TLS configuration.
333    #[must_use]
334    #[cfg(feature = "tls")]
335    pub fn tls(mut self, config: &TlsConfig) -> Self {
336        self.tls_config = Some(config.clone());
337        self
338    }
339
340    /// Disable Nagle's algorithm on each connection.
341    #[must_use]
342    pub fn disable_nagle(mut self) -> Self {
343        self.nodelay = true;
344        self
345    }
346
347    /// Set TCP keepalive idle time on each connection.
348    #[cfg(feature = "socket-opts")]
349    #[must_use]
350    pub fn tcp_keepalive(mut self, idle: std::time::Duration) -> Self {
351        self.tcp_keepalive = Some(idle);
352        self
353    }
354
355    /// Set `SO_RCVBUF` on each connection.
356    #[cfg(feature = "socket-opts")]
357    #[must_use]
358    pub fn recv_buffer_size(mut self, n: usize) -> Self {
359        self.recv_buf_size = Some(n);
360        self
361    }
362
363    /// Set `SO_SNDBUF` on each connection.
364    #[cfg(feature = "socket-opts")]
365    #[must_use]
366    pub fn send_buffer_size(mut self, n: usize) -> Self {
367        self.send_buf_size = Some(n);
368        self
369    }
370
371    /// Write buffer capacity per slot. Default: 32KB.
372    #[must_use]
373    pub fn write_buffer_capacity(mut self, n: usize) -> Self {
374        self.write_buffer_capacity = n;
375        self
376    }
377
378    /// Response buffer capacity per slot. Default: 32KB.
379    #[must_use]
380    pub fn response_buffer_capacity(mut self, n: usize) -> Self {
381        self.response_buffer_capacity = n;
382        self
383    }
384
385    /// Maximum response body size per slot. Default: 0 (no limit).
386    #[must_use]
387    pub fn max_body_size(mut self, n: usize) -> Self {
388        self.max_body_size = n;
389        self
390    }
391
392    /// Build the pool, establishing all connections.
393    pub async fn build(self) -> Result<ClientPool, RestError> {
394        if self.url.is_empty() {
395            return Err(RestError::InvalidUrl("url is required".to_string()));
396        }
397        if self.connections == 0 {
398            return Err(RestError::InvalidUrl("connections must be > 0".to_string()));
399        }
400
401        let parsed = nexus_net::rest::parse_base_url(&self.url)?;
402        let host_header = parsed.host_header();
403
404        let reconnect_config = ReconnectConfig {
405            url: self.url.clone(),
406            #[cfg(feature = "tls")]
407            tls_config: self.tls_config.clone(),
408            nodelay: self.nodelay,
409            #[cfg(feature = "socket-opts")]
410            tcp_keepalive: self.tcp_keepalive,
411            #[cfg(feature = "socket-opts")]
412            recv_buf_size: self.recv_buf_size,
413            #[cfg(feature = "socket-opts")]
414            send_buf_size: self.send_buf_size,
415        };
416
417        // Connect all slots sequentially (cold path — startup only).
418        // If a later slot fails, gracefully shut down the already-built
419        // healthy slots so the peer doesn't see TCP FIN without TLS
420        // close_notify (which rustls peers log as a truncation alert).
421        let mut initial_slots: Vec<ClientSlot> = Vec::with_capacity(self.connections);
422        for _ in 0..self.connections {
423            let slot_result: Result<ClientSlot, RestError> = async {
424                let mut builder = HttpConnectionBuilder::new();
425                #[cfg(feature = "tls")]
426                if let Some(ref tls) = self.tls_config {
427                    builder = builder.tls(tls);
428                }
429                if self.nodelay {
430                    builder = builder.disable_nagle();
431                }
432                #[cfg(feature = "socket-opts")]
433                {
434                    if let Some(idle) = self.tcp_keepalive {
435                        builder = builder.tcp_keepalive(idle);
436                    }
437                    if let Some(size) = self.recv_buf_size {
438                        builder = builder.recv_buffer_size(size);
439                    }
440                    if let Some(size) = self.send_buf_size {
441                        builder = builder.send_buffer_size(size);
442                    }
443                }
444                let conn = builder.connect(&self.url).await?;
445
446                let mut writer = RequestWriter::new(&host_header)?;
447                if !self.base_path.is_empty() {
448                    writer.set_base_path(&self.base_path)?;
449                }
450                writer.set_write_buffer_capacity(self.write_buffer_capacity);
451                for (name, value) in &self.default_headers {
452                    writer.default_header(name, value)?;
453                }
454
455                let reader = ResponseReader::new(self.response_buffer_capacity)
456                    .max_body_size(self.max_body_size);
457
458                Ok(ClientSlot {
459                    writer,
460                    reader,
461                    conn: Some(conn),
462                })
463            }
464            .await;
465
466            match slot_result {
467                Ok(slot) => initial_slots.push(slot),
468                Err(e) => {
469                    for slot in &mut initial_slots {
470                        if let Some(ref mut c) = slot.conn {
471                            graceful_shutdown(c).await;
472                        }
473                    }
474                    return Err(e);
475                }
476            }
477        }
478
479        // Create pool with factory + reset.
480        let host = host_header.clone();
481        let base = self.base_path.clone();
482        let headers = self.default_headers.clone();
483        let wbuf_cap = self.write_buffer_capacity;
484        let rbuf_cap = self.response_buffer_capacity;
485        let max_body = self.max_body_size;
486
487        let pool = Pool::new(
488            move || {
489                let mut writer = RequestWriter::new(&host).expect("host already validated");
490                if !base.is_empty() {
491                    writer
492                        .set_base_path(&base)
493                        .expect("base_path already validated");
494                }
495                writer.set_write_buffer_capacity(wbuf_cap);
496                for (name, value) in &headers {
497                    writer
498                        .default_header(name, value)
499                        .expect("headers already validated");
500                }
501                ClientSlot {
502                    writer,
503                    reader: ResponseReader::new(rbuf_cap).max_body_size(max_body),
504                    conn: None,
505                }
506            },
507            |slot: &mut ClientSlot| {
508                if slot.needs_reconnect() {
509                    slot.conn = None;
510                    // Clear stale response data so the next request
511                    // after reconnect starts with a clean buffer.
512                    slot.reader.reset();
513                }
514            },
515        );
516
517        // Pre-populate with connected slots.
518        for slot in initial_slots {
519            pool.put(slot);
520        }
521
522        Ok(ClientPool {
523            pool,
524            reconnect_config,
525        })
526    }
527}
528
529impl Default for ClientPoolBuilder {
530    fn default() -> Self {
531        Self::new()
532    }
533}
534
535// =============================================================================
536// Tests
537// =============================================================================
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542
543    fn make_disconnected_slot() -> ClientSlot {
544        ClientSlot {
545            writer: RequestWriter::new("host").unwrap(),
546            reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
547            conn: None,
548        }
549    }
550
551    #[test]
552    fn slot_needs_reconnect_when_no_conn() {
553        let slot = make_disconnected_slot();
554        assert!(slot.needs_reconnect());
555    }
556
557    #[test]
558    fn pool_acquire_release_cycle() {
559        let pool = Pool::new(make_disconnected_slot, |_| {});
560        pool.put(make_disconnected_slot());
561
562        assert_eq!(pool.available(), 1);
563
564        let slot = pool.acquire();
565        assert_eq!(pool.available(), 0);
566
567        drop(slot);
568        assert_eq!(pool.available(), 1);
569    }
570
571    #[test]
572    fn pool_acquire_returns_available() {
573        let pool: Pool<ClientSlot> = Pool::new(make_disconnected_slot, |_| {});
574        pool.put(make_disconnected_slot());
575        pool.put(make_disconnected_slot());
576
577        assert_eq!(pool.available(), 2);
578        let _s1 = pool.acquire();
579        assert_eq!(pool.available(), 1);
580        let _s2 = pool.acquire();
581        assert_eq!(pool.available(), 0);
582    }
583
584    #[test]
585    fn pool_reset_clears_dead_conn() {
586        let pool = Pool::new(make_disconnected_slot, |slot| {
587            if slot.needs_reconnect() {
588                slot.conn = None;
589            }
590        });
591        pool.put(make_disconnected_slot());
592
593        let slot = pool.acquire();
594        assert!(slot.conn.is_none());
595        assert!(slot.needs_reconnect());
596        drop(slot);
597
598        // After return + reset, slot is still disconnected.
599        let slot = pool.acquire();
600        assert!(slot.conn.is_none());
601    }
602
603    #[test]
604    fn pool_multiple_slots() {
605        let pool = Pool::new(make_disconnected_slot, |_| {});
606        for _ in 0..4 {
607            pool.put(make_disconnected_slot());
608        }
609        assert_eq!(pool.available(), 4);
610
611        let s1 = pool.acquire();
612        let s2 = pool.acquire();
613        assert_eq!(pool.available(), 2);
614
615        drop(s1);
616        assert_eq!(pool.available(), 3);
617        drop(s2);
618        assert_eq!(pool.available(), 4);
619    }
620
621    #[test]
622    fn try_acquire_returns_none_when_all_in_use() {
623        let pool = Pool::new(make_disconnected_slot, |_| {});
624        pool.put(make_disconnected_slot());
625        pool.put(make_disconnected_slot());
626
627        let s1 = pool.try_acquire().unwrap();
628        let s2 = pool.try_acquire().unwrap();
629        // All slots are held — not dead, just in use.
630        assert!(pool.try_acquire().is_none());
631        assert_eq!(pool.available(), 0);
632
633        // Return one.
634        drop(s1);
635        drop(s2);
636        assert_eq!(pool.available(), 2);
637        assert!(pool.try_acquire().is_some());
638    }
639
640    #[test]
641    fn try_acquire_returns_some_after_slot_released() {
642        // Verify that when all slots are held, try_acquire returns None,
643        // and after release it returns Some.
644        let pool = Pool::new(make_disconnected_slot, |_| {});
645        pool.put(make_disconnected_slot());
646
647        let held = pool.try_acquire().unwrap();
648        assert!(pool.try_acquire().is_none());
649
650        drop(held);
651        assert!(pool.try_acquire().is_some());
652    }
653
654    /// Tests that 4 connections in the pool all work. Each connection
655    /// is set up and used one at a time (sequential, same TCP listener).
656    #[tokio::test(flavor = "current_thread")]
657    async fn pool_four_connections_all_succeed() {
658        use tokio::io::{AsyncReadExt, AsyncWriteExt};
659        use tokio::net::TcpListener;
660
661        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
662        let addr = listener.local_addr().unwrap();
663
664        // Server: accept 4 sequential connections, each gets one request.
665        tokio::spawn(async move {
666            for _ in 0..4 {
667                let (mut tcp, _) = listener.accept().await.unwrap();
668                let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
669                let _ = tcp.read(&mut buf).await.unwrap();
670                let resp = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok";
671                tcp.write_all(resp).await.unwrap();
672            }
673        });
674
675        let pool = Pool::new(make_disconnected_slot, |slot| {
676            if slot.needs_reconnect() {
677                slot.conn = None;
678                slot.reader.reset();
679            }
680        });
681
682        // Connect, send, and verify — 4 times. Each iteration creates
683        // a new connection (simulating pool with 4 slots used sequentially).
684        let mut success_count = 0u8;
685        for _ in 0..4u8 {
686            let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
687            tcp.set_nodelay(true).unwrap();
688            let stream = MaybeTls::Plain(tcp);
689            let conn = HttpConnection::new(stream);
690            pool.put(ClientSlot {
691                writer: RequestWriter::new(&addr.to_string()).unwrap(),
692                reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
693                conn: Some(conn),
694            });
695
696            let mut slot = pool.try_acquire().unwrap();
697            let s: &mut ClientSlot = &mut slot;
698            let req = s.writer.get("/test").finish().unwrap();
699            let conn = s.conn.as_mut().unwrap();
700            let resp = conn.send(req, &mut s.reader).await.unwrap();
701            assert_eq!(resp.status(), 200);
702            assert_eq!(resp.body_str().unwrap(), "ok");
703            success_count += 1;
704        }
705
706        assert_eq!(success_count, 4);
707    }
708
709    // Integration test with real TCP is in tests/httpbin.rs (ignored, needs network).
710    // The loopback test below uses tokio to verify the full send path.
711    #[tokio::test(flavor = "current_thread")]
712    async fn pool_loopback_send() {
713        use tokio::io::{AsyncReadExt, AsyncWriteExt};
714        use tokio::net::TcpListener;
715
716        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
717        let addr = listener.local_addr().unwrap();
718
719        // Server: read request, send canned response.
720        tokio::spawn(async move {
721            let (mut tcp, _) = listener.accept().await.unwrap();
722            let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
723            let _ = tcp.read(&mut buf).await.unwrap();
724            let resp = b"HTTP/1.1 200 OK\r\nContent-Length: 15\r\n\r\n{\"orderId\":123}";
725            tcp.write_all(resp).await.unwrap();
726        });
727
728        // Client: connect, wrap in MaybeTls, create slot, send.
729        let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
730        let stream = MaybeTls::Plain(tcp);
731        let conn = HttpConnection::new(stream);
732
733        let mut slot = ClientSlot {
734            writer: RequestWriter::new(&addr.to_string()).unwrap(),
735            reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
736            conn: Some(conn),
737        };
738
739        let req = slot.writer.get("/test").finish().unwrap();
740        let conn = slot.conn.as_mut().unwrap();
741        let resp = conn.send(req, &mut slot.reader).await.unwrap();
742        assert_eq!(resp.status(), 200);
743        assert_eq!(resp.body_str().unwrap(), r#"{"orderId":123}"#);
744    }
745
746    #[test]
747    fn try_acquire_returns_none_when_exhausted() {
748        let pool = Pool::new(make_disconnected_slot, |_| {});
749        pool.put(make_disconnected_slot());
750
751        let _s1 = pool.try_acquire().unwrap();
752        assert!(pool.try_acquire().is_none());
753    }
754
755    #[tokio::test(flavor = "current_thread")]
756    async fn pool_keep_alive_multiple_requests() {
757        use tokio::io::{AsyncReadExt, AsyncWriteExt};
758        use tokio::net::TcpListener;
759
760        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
761        let addr = listener.local_addr().unwrap();
762
763        // Server: handle two sequential requests on the same connection.
764        tokio::spawn(async move {
765            let (mut tcp, _) = listener.accept().await.unwrap();
766            let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
767
768            // Request 1
769            let _ = tcp.read(&mut buf).await.unwrap();
770            tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\n{\"r\":1}")
771                .await
772                .unwrap();
773
774            // Request 2 — same TCP connection
775            let _ = tcp.read(&mut buf).await.unwrap();
776            tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 7\r\n\r\n{\"r\":2}")
777                .await
778                .unwrap();
779        });
780
781        let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
782        tcp.set_nodelay(true).unwrap();
783        let stream = MaybeTls::Plain(tcp);
784        let conn = HttpConnection::new(stream);
785
786        let pool = Pool::new(make_disconnected_slot, |slot| {
787            if slot.needs_reconnect() {
788                slot.conn = None;
789                slot.reader.reset();
790            }
791        });
792        pool.put(ClientSlot {
793            writer: RequestWriter::new(&addr.to_string()).unwrap(),
794            reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
795            conn: Some(conn),
796        });
797
798        // First request
799        {
800            let mut slot = pool.acquire();
801            let s: &mut ClientSlot = &mut slot;
802            let req = s.writer.get("/first").finish().unwrap();
803            let conn = s.conn.as_mut().unwrap();
804            let resp = conn.send(req, &mut s.reader).await.unwrap();
805            assert_eq!(resp.body_str().unwrap(), r#"{"r":1}"#);
806        } // slot returned
807
808        // Second request — same slot, same connection (keep-alive)
809        {
810            let mut slot = pool.acquire();
811            let s: &mut ClientSlot = &mut slot;
812            let req = s.writer.get("/second").finish().unwrap();
813            let conn = s.conn.as_mut().unwrap();
814            let resp = conn.send(req, &mut s.reader).await.unwrap();
815            assert_eq!(resp.body_str().unwrap(), r#"{"r":2}"#);
816        }
817    }
818
819    #[tokio::test(flavor = "current_thread")]
820    async fn builder_validates_empty_url() {
821        let result = ClientPool::builder().connections(1).build().await;
822        assert!(result.is_err());
823    }
824
825    #[tokio::test(flavor = "current_thread")]
826    async fn builder_validates_zero_connections() {
827        let result = ClientPool::builder()
828            .url("http://localhost")
829            .connections(0)
830            .build()
831            .await;
832        assert!(result.is_err());
833    }
834
835    /// Reproduces the reqwest connection pool bug: server closes the
836    /// connection while idle, client writes into the dead socket,
837    /// read hangs forever. Our timeout prevents the hang.
838    #[tokio::test(flavor = "current_thread")]
839    async fn stale_connection_timeout_not_hang() {
840        use tokio::io::{AsyncReadExt, AsyncWriteExt};
841        use tokio::net::TcpListener;
842
843        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
844        let addr = listener.local_addr().unwrap();
845
846        // Server: accept, respond to first request, then close the connection.
847        tokio::spawn(async move {
848            let (mut tcp, _) = listener.accept().await.unwrap();
849            let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
850
851            // First request — respond normally.
852            let _ = tcp.read(&mut buf).await.unwrap();
853            tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nok")
854                .await
855                .unwrap();
856
857            // Close the connection — simulates server idle timeout.
858            drop(tcp);
859        });
860
861        let tcp = tokio::net::TcpStream::connect(addr).await.unwrap();
862        tcp.set_nodelay(true).unwrap();
863        let stream = MaybeTls::Plain(tcp);
864        let conn = HttpConnection::new(stream);
865
866        let pool = Pool::new(make_disconnected_slot, |slot| {
867            if slot.needs_reconnect() {
868                slot.conn = None;
869                slot.reader.reset();
870            }
871        });
872        pool.put(ClientSlot {
873            writer: RequestWriter::new(&addr.to_string()).unwrap(),
874            reader: ResponseReader::new(HTTP_HANDSHAKE_BUFFER),
875            conn: Some(conn),
876        });
877
878        // First request — succeeds.
879        {
880            let mut slot = pool.acquire();
881            let s: &mut ClientSlot = &mut slot;
882            let req = s.writer.get("/first").finish().unwrap();
883            let conn = s.conn.as_mut().unwrap();
884            let resp = conn.send(req, &mut s.reader).await.unwrap();
885            assert_eq!(resp.status(), 200);
886        }
887
888        // Small delay to let server close.
889        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
890
891        // Second request — server has closed. Without timeout this hangs.
892        // With tokio::time::timeout, it fails fast.
893        {
894            let mut slot = pool.acquire();
895            let s: &mut ClientSlot = &mut slot;
896            let req = s.writer.get("/second").finish().unwrap();
897            let conn = s.conn.as_mut().unwrap();
898
899            let timeout = std::time::Duration::from_millis(500);
900            let result = tokio::time::timeout(timeout, conn.send(req, &mut s.reader)).await;
901
902            match result {
903                Ok(Ok(_)) => panic!("stale connection should not succeed"),
904                Ok(Err(_)) => {} // Connection error — correct (ConnectionClosed)
905                Err(_elapsed) => {
906                    // Timeout — also correct. This is what reqwest hangs on.
907                    // We don't hang — we fail.
908                }
909            }
910
911            // The connection should be dead.
912            assert!(
913                s.needs_reconnect(),
914                "slot should be poisoned after stale connection"
915            );
916        }
917    }
918
919    /// Dead connection heals automatically via spawned reconnect task.
920    ///
921    /// Scenario:
922    /// 1. Server accepts one connection, responds, then closes
923    /// 2. Client sends on the now-dead connection → poisoned
924    /// 3. Slot returns to pool (dead)
925    /// 4. Server starts listening again
926    /// 5. try_acquire() spawns reconnect task for dead slot
927    /// 6. acquire() waits until reconnect task finishes
928    /// 7. Slot is healthy — request succeeds
929    #[tokio::test(flavor = "current_thread")]
930    #[allow(clippy::large_futures)]
931    async fn dead_connection_heals_via_reconnect_task() {
932        use tokio::io::{AsyncReadExt, AsyncWriteExt};
933        use tokio::net::TcpListener;
934
935        let local = tokio::task::LocalSet::new();
936        local
937            .run_until(async {
938                // Phase 1: Start server, build pool with 1 connection.
939                let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
940                let addr = listener.local_addr().unwrap();
941
942                let server_handle = tokio::task::spawn_local({
943                    let listener_fd = listener.into_std().unwrap();
944                    async move {
945                        let listener = TcpListener::from_std(listener_fd).unwrap();
946                        // Accept first connection — respond then close.
947                        let (mut tcp, _) = listener.accept().await.unwrap();
948                        let mut buf = [0u8; HTTP_HANDSHAKE_BUFFER];
949                        let _ = tcp.read(&mut buf).await.unwrap();
950                        tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nfirst")
951                            .await
952                            .unwrap();
953                        drop(tcp); // kill connection
954
955                        // Accept second connection (reconnect) — respond.
956                        let (mut tcp, _) = listener.accept().await.unwrap();
957                        let _ = tcp.read(&mut buf).await.unwrap();
958                        tcp.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 6\r\n\r\nhealed")
959                            .await
960                            .unwrap();
961                    }
962                });
963
964                let pool = ClientPool::builder()
965                    .url(&format!("http://{addr}"))
966                    .connections(1)
967                    .build()
968                    .await
969                    .unwrap();
970
971                // Phase 2: First request succeeds.
972                {
973                    let mut slot = pool.try_acquire().unwrap();
974                    let s: &mut ClientSlot = &mut slot;
975                    let req = s.writer.get("/first").finish().unwrap();
976                    let conn = s.conn.as_mut().unwrap();
977                    let resp = conn.send(req, &mut s.reader).await.unwrap();
978                    assert_eq!(resp.body_str().unwrap(), "first");
979                }
980
981                // Phase 3: Server closed the connection. Next send will fail.
982                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
983                {
984                    let mut slot = pool.try_acquire().unwrap();
985                    let s: &mut ClientSlot = &mut slot;
986                    let req = s.writer.get("/dead").finish().unwrap();
987                    let conn = s.conn.as_mut().unwrap();
988                    let timeout = std::time::Duration::from_millis(500);
989                    let _ = tokio::time::timeout(timeout, conn.send(req, &mut s.reader)).await;
990                    assert!(s.needs_reconnect(), "should be poisoned");
991                }
992                // Slot returned to pool (dead).
993
994                // Phase 4: try_acquire sees dead slot → spawns reconnect task.
995                // Server is still listening (waiting for second accept).
996                // acquire() waits for the reconnect task to finish.
997                let mut slot = pool.acquire().await.unwrap();
998                let s: &mut ClientSlot = &mut slot;
999                let req = s.writer.get("/healed").finish().unwrap();
1000                let conn = s.conn.as_mut().unwrap();
1001                let resp = conn.send(req, &mut s.reader).await.unwrap();
1002                assert_eq!(resp.body_str().unwrap(), "healed");
1003
1004                server_handle.await.unwrap();
1005            })
1006            .await;
1007    }
1008}