use async_trait::async_trait;
use mio::net::TcpStream as MIOTcpStream;
use parking_lot::Mutex;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream as TokioTcpStream};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use super::pool::{AddressedStreamFactory, NonBlockingStream};
use super::TransportError;
use crate::hub::event::IOSource;
use crate::hub::utils::error;
use bytes::Bytes;
type MIOMessageStream = super::stream::MessageStream<MIOTcpStream, super::stream::SimpleTokenizer>;
impl NonBlockingStream for MIOMessageStream {
fn try_recv(&mut self) -> Result<Bytes, TransportError> {
self.try_recv()
}
fn try_send(&mut self, data: Option<Bytes>) -> Result<bool, TransportError> {
self.try_send(data)
}
fn source(&mut self) -> IOSource {
IOSource::MIO(self.as_inner_mut())
}
fn shutdown(&mut self, how: std::net::Shutdown) -> io::Result<()> {
self.as_inner().shutdown(how)
}
}
struct FactoryInner {
listen_handle: Option<JoinHandle<()>>,
accepted_stream: Mutex<mpsc::Receiver<MIOMessageStream>>,
}
impl Drop for FactoryInner {
fn drop(&mut self) {
if let Some(h) = self.listen_handle.take() {
h.abort()
}
}
}
#[derive(Clone)]
pub struct Factory(Arc<FactoryInner>);
impl Factory {
pub fn new(listen_addr: Option<SocketAddr>) -> Self {
let (tx, accepted_stream) = mpsc::channel(1);
let listen_handle = listen_addr.map(|listen_addr| {
tokio::spawn(async move {
let listener = match TcpListener::bind(listen_addr).await {
Ok(l) => l,
Err(e) => {
error!("[Tcp] failed to bind to address {}: {}", listen_addr, e);
return
}
};
loop {
if let Ok((stream, _)) = listener.accept().await {
tx.send(MIOMessageStream::new(tokio_to_mio_stream(stream), 65536))
.await
.ok();
}
}
})
});
Self(Arc::new(FactoryInner {
accepted_stream: Mutex::new(accepted_stream),
listen_handle,
}))
}
}
#[async_trait]
impl AddressedStreamFactory for Factory {
async fn create_stream(&self, addr: &str) -> Option<Box<dyn NonBlockingStream>> {
let addr: SocketAddr = addr.parse().ok()?;
let tcp_stream = TokioTcpStream::connect(addr).await.ok()?;
Some(Box::new(MIOMessageStream::new(tokio_to_mio_stream(tcp_stream), 65536)))
}
async fn discover_stream(&self) -> Box<dyn NonBlockingStream> {
match self.0.accepted_stream.lock().recv().await {
None => futures::future::pending().await,
Some(s) => Box::new(s),
}
}
}
pub(crate) fn tokio_to_mio_stream(src: TokioTcpStream) -> MIOTcpStream {
let stream = src.into_std().unwrap();
MIOTcpStream::from_std(stream)
}