vls_proxy/grpc/
incoming.rs

1use hyper::server::accept::Accept;
2use hyper::server::conn::{AddrIncoming, AddrStream};
3use std::net::{SocketAddr, TcpListener as StdTcpListener};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::time::Duration;
7use tokio::net::TcpListener;
8use tonic::codegen::futures_core::Stream;
9
10/// A copy of the tonic TcpIncoming, but initialized from either an address or an std listener
11pub struct TcpIncoming {
12    inner: AddrIncoming,
13}
14
15impl TcpIncoming {
16    pub fn new(addr: SocketAddr, nodelay: bool, keepalive: Option<Duration>) -> Result<Self, ()> {
17        let mut inner = AddrIncoming::bind(&addr).expect("");
18        inner.set_nodelay(nodelay);
19        inner.set_keepalive(keepalive);
20        Ok(TcpIncoming { inner })
21    }
22
23    pub fn new_from_std(
24        std_listener: StdTcpListener,
25        nodelay: bool,
26        keepalive: Option<Duration>,
27    ) -> Result<Self, ()> {
28        std_listener.set_nonblocking(true).expect("set_nonblocking"); // should be infallible on a new socket
29        let listener = TcpListener::from_std(std_listener).expect("tokio TcpListener"); // should only fail due to a fatal error in tokio runtime
30
31        let mut inner = AddrIncoming::from_listener(listener).expect("from_listener"); // should be infallible
32        inner.set_nodelay(nodelay);
33        inner.set_keepalive(keepalive);
34        Ok(TcpIncoming { inner })
35    }
36}
37
38impl Stream for TcpIncoming {
39    type Item = Result<AddrStream, std::io::Error>;
40
41    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        Pin::new(&mut self.inner).poll_accept(cx)
43    }
44}