1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
//! This crate provides a trait for taking ownership of a [`Stream`] of incoming connections. //! //! # Why? //! The types tokio provides, [`tokio::net::tcp::Incoming`] and [`tokio::net::unix::Incoming`], are //! both tied to the lifetime of their respective Listeners [1]. //! The provided `.incoming()` used to consume self, but [`this was changed`](https://github.com/tokio-rs/tokio/commit/8a7e57786a5dca139f5b4261685e22991ded0859#diff-a0e934bf38b64e9a75c741bff132d8adL245). //! //! # Example //! ```rust //! # use futures_util::stream::{Stream, StreamExt}; //! # use tokio::io::{AsyncRead, AsyncWrite}; //! # use std::error::Error; //! # fn handle_this_conn<S>(_: S) {} //! async fn use_owned_stream<S, C, E>(s: S) //! where //! S: Stream<Item = Result<C, E>> + Send + 'static, //! C: AsyncRead + AsyncWrite + Unpin + Send + 'static, //! E: Into<Box<dyn Error + Sync + Send + 'static>> + std::fmt::Debug + Unpin + Send + 'static, //! { //! tokio::spawn(s.for_each_concurrent(None, |st| async move { handle_this_conn(st) })); //! } //! //! fn main() -> Result<(), Box<dyn Error>> { //! let mut rt = tokio::runtime::Runtime::new()?; //! //! use incoming::IntoIncoming; //! rt.block_on(async move { //! let addr: std::net::SocketAddr = "0.0.0.0:4242".parse()?; //! let st = tokio::net::TcpListener::bind(addr).await?; //! use_owned_stream(st.into_incoming()).await; //! Ok(()) //! }) //! } //! ``` #![deny(missing_docs)] use futures_util::stream::Stream; use std::error::Error; use std::{boxed::Box, pin::Pin}; use tokio::io::{AsyncRead, AsyncWrite}; /// Provide a version of `.incoming()` that takes ownership of `self.` pub trait IntoIncoming<S, C, E> where S: Stream<Item = Result<C, E>> + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send, E: Into<Box<dyn Error + Sync + Send + 'static>> + std::fmt::Debug + Unpin + Send, { /// Consume `self`, returning a `Stream` which yields connections (`impl AsyncRead + AsyncWrite`). /// /// The underlying implementation uses [`futures_util::stream::poll_fn`]. fn into_incoming(self) -> S; } impl IntoIncoming< Pin<Box<dyn Stream<Item = Result<tokio::net::TcpStream, tokio::io::Error>> + Send>>, tokio::net::TcpStream, tokio::io::Error, > for tokio::net::TcpListener { fn into_incoming( mut self, ) -> Pin<Box<dyn Stream<Item = Result<tokio::net::TcpStream, tokio::io::Error>> + Send>> { Box::pin(futures_util::stream::poll_fn(move |cx| { self.poll_accept(cx).map(|st| Some(st.map(|(st, _)| st))) })) } } impl IntoIncoming< Pin<Box<dyn Stream<Item = Result<tokio::net::UnixStream, tokio::io::Error>> + Send>>, tokio::net::UnixStream, tokio::io::Error, > for tokio::net::UnixListener { fn into_incoming( mut self, ) -> Pin<Box<dyn Stream<Item = Result<tokio::net::UnixStream, tokio::io::Error>> + Send>> { Box::pin(futures_util::stream::poll_fn(move |cx| { let mut inc = self.incoming(); Pin::new(&mut inc).poll_accept(cx).map(|st| Some(st)) })) } }