pooled_fetch/
body.rs

1use std::{mem::ManuallyDrop, net::SocketAddr};
2
3use crossbeam_skiplist::SkipMap;
4use dashmap::DashMap;
5use hyper::body::{Bytes, Incoming};
6
7use crate::{Conn, Sender};
8
9#[static_init::dynamic]
10pub static POOL: DashMap<SocketAddr, SkipMap<SocketAddr, ManuallyDrop<Sender>>> = DashMap::new();
11
12pub struct Body {
13  incoming: Incoming,
14  sender: ManuallyDrop<Sender>,
15  addr: SocketAddr,
16  peer_addr: SocketAddr,
17}
18
19impl Body {
20  pub fn new(incoming: Incoming, addr: SocketAddr, conn: Conn) -> Self {
21    Self {
22      incoming,
23      addr,
24      peer_addr: conn.peer_addr,
25      sender: ManuallyDrop::new(conn.sender),
26    }
27  }
28}
29
30impl Drop for Body {
31  fn drop(&mut self) {
32    let sender = unsafe { ManuallyDrop::take(&mut self.sender) };
33    POOL
34      .entry(self.addr)
35      .or_default()
36      .insert(self.peer_addr, ManuallyDrop::new(sender));
37  }
38}
39
40impl http_body::Body for Body {
41  type Data = Bytes;
42  type Error = hyper::Error;
43
44  fn poll_frame(
45    self: std::pin::Pin<&mut Self>,
46    cx: &mut std::task::Context<'_>,
47  ) -> std::task::Poll<Option<std::result::Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
48    http_body::Body::poll_frame(std::pin::Pin::new(&mut self.get_mut().incoming), cx)
49  }
50}