1use crate::Incoming;
2use futures::{AsyncRead, AsyncWrite, Stream};
3use futures_rustls::server::TlsStream;
4use std::fmt::Debug;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio_util::compat::FuturesAsyncReadCompatExt;
8use tokio_util::compat::TokioAsyncReadCompatExt;
9
10pub struct TokioIncomingTcpWrapper<
11 TokioTCP: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
12 ETCP,
13 TokioITCP: Stream<Item = Result<TokioTCP, ETCP>> + Unpin,
14> {
15 incoming_tcp: TokioITCP,
16}
17
18impl<TokioTCP: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, ETCP, TokioITCP: Stream<Item = Result<TokioTCP, ETCP>> + Unpin>
19 TokioIncomingTcpWrapper<TokioTCP, ETCP, TokioITCP>
20{
21 pub fn into_inner(self) -> TokioITCP {
22 self.incoming_tcp
23 }
24}
25
26impl<TokioTCP: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, ETCP, TokioITCP: Stream<Item = Result<TokioTCP, ETCP>> + Unpin> Stream
27 for TokioIncomingTcpWrapper<TokioTCP, ETCP, TokioITCP>
28{
29 type Item = Result<tokio_util::compat::Compat<TokioTCP>, ETCP>;
30
31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32 match Pin::new(&mut self.incoming_tcp).poll_next(cx) {
33 Poll::Ready(Some(Ok(tcp))) => Poll::Ready(Some(Ok(tcp.compat()))),
34 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
35 Poll::Ready(None) => Poll::Ready(None),
36 Poll::Pending => Poll::Pending,
37 }
38 }
39}
40
41impl<TokioTCP: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, ETCP, TokioITCP: Stream<Item = Result<TokioTCP, ETCP>> + Unpin> From<TokioITCP>
42 for TokioIncomingTcpWrapper<TokioTCP, ETCP, TokioITCP>
43{
44 fn from(incoming_tcp: TokioITCP) -> Self {
45 Self { incoming_tcp }
46 }
47}
48
49pub struct TokioIncoming<
50 TCP: AsyncRead + AsyncWrite + Unpin,
51 ETCP,
52 ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin,
53 EC: Debug + 'static,
54 EA: Debug + 'static,
55> {
56 incoming: Incoming<TCP, ETCP, ITCP, EC, EA>,
57}
58
59impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static> Stream
60 for TokioIncoming<TCP, ETCP, ITCP, EC, EA>
61{
62 type Item = Result<tokio_util::compat::Compat<TlsStream<TCP>>, ETCP>;
63
64 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 match Pin::new(&mut self.incoming).poll_next(cx) {
66 Poll::Ready(Some(Ok(tls))) => Poll::Ready(Some(Ok(tls.compat()))),
67 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
68 Poll::Ready(None) => Poll::Ready(None),
69 Poll::Pending => Poll::Pending,
70 }
71 }
72}
73
74impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static>
75 From<Incoming<TCP, ETCP, ITCP, EC, EA>> for TokioIncoming<TCP, ETCP, ITCP, EC, EA>
76{
77 fn from(incoming: Incoming<TCP, ETCP, ITCP, EC, EA>) -> Self {
78 Self { incoming }
79 }
80}
81
82impl<TCP: AsyncRead + AsyncWrite + Unpin, ETCP, ITCP: Stream<Item = Result<TCP, ETCP>> + Unpin, EC: Debug + 'static, EA: Debug + 'static>
83 From<TokioIncoming<TCP, ETCP, ITCP, EC, EA>> for Incoming<TCP, ETCP, ITCP, EC, EA>
84{
85 fn from(tokio_incoming: TokioIncoming<TCP, ETCP, ITCP, EC, EA>) -> Self {
86 tokio_incoming.incoming
87 }
88}