1use std::{mem::ManuallyDrop, net::SocketAddr};
2
3use crossbeam_skiplist::SkipMap;
4use dashmap::DashMap;
5use hyper::body::{Bytes, Incoming};
6
7use crate::{Conn, Sender};
8
9#[static_init::dynamic]
10pub static POOL: DashMap<SocketAddr, SkipMap<SocketAddr, ManuallyDrop<Sender>>> = DashMap::new();
11
12pub struct Body {
13 incoming: Incoming,
14 sender: ManuallyDrop<Sender>,
15 addr: SocketAddr,
16 peer_addr: SocketAddr,
17}
18
19impl Body {
20 pub fn new(incoming: Incoming, addr: SocketAddr, conn: Conn) -> Self {
21 Self {
22 incoming,
23 addr,
24 peer_addr: conn.peer_addr,
25 sender: ManuallyDrop::new(conn.sender),
26 }
27 }
28}
29
30impl Drop for Body {
31 fn drop(&mut self) {
32 let sender = unsafe { ManuallyDrop::take(&mut self.sender) };
33 POOL
34 .entry(self.addr)
35 .or_default()
36 .insert(self.peer_addr, ManuallyDrop::new(sender));
37 }
38}
39
40impl http_body::Body for Body {
41 type Data = Bytes;
42 type Error = hyper::Error;
43
44 fn poll_frame(
45 self: std::pin::Pin<&mut Self>,
46 cx: &mut std::task::Context<'_>,
47 ) -> std::task::Poll<Option<std::result::Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
48 http_body::Body::poll_frame(std::pin::Pin::new(&mut self.get_mut().incoming), cx)
49 }
50}