use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use futures_legacy::future;
use futures_legacy::stream::SplitSink;
use futures_legacy::sync::mpsc::{self, UnboundedSender};
use futures_legacy::sync::oneshot::{self, Sender};
use futures_legacy::{Future, Sink, Stream};
use protobuf::RepeatedField;
use tokio::codec::Framed;
use tokio::net::TcpStream;
use tokio::prelude::*;
use crate::codec::MsgCodec;
use crate::protos::riemann::{Event, Msg};
#[derive(Debug)]
pub struct Connection {
sender_queue: UnboundedSender<Sender<Msg>>,
socket_sender: SplitSink<Framed<TcpStream, MsgCodec>>,
}
impl Connection {
pub(crate) fn connect(
addr: &SocketAddr,
connect_timeout_ms: u64,
) -> impl Future<Item = Connection, Error = io::Error> {
TcpStream::connect(addr)
.timeout(Duration::from_millis(connect_timeout_ms))
.map_err(|e| {
if e.is_timer() {
io::Error::new(io::ErrorKind::TimedOut, e)
} else {
e.into_inner().unwrap()
}
})
.and_then(|socket| {
socket.set_nodelay(true)?;
let framed = Framed::new(socket, MsgCodec);
let (conn_sender, conn_receiver) = framed.split();
let (cb_queue_tx, cb_queue_rx) = mpsc::unbounded::<Sender<Msg>>();
let receiver_loop =
conn_receiver
.zip(cb_queue_rx.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "callback queue error")
}))
.for_each(move |(frame, cb)| {
cb.send(frame).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("failed to deliver msg to callback {:?}", e),
)
})
})
.map_err(|e| eprintln!("{:?}", e));
tokio::spawn(receiver_loop);
Ok(Connection {
sender_queue: cb_queue_tx,
socket_sender: conn_sender,
})
})
}
pub(crate) fn send_events(
&mut self,
events: &[Event],
socket_timeout: u64,
) -> impl Future<Item = Msg, Error = io::Error> {
let mut msg = Msg::new();
msg.set_events(RepeatedField::from_slice(events));
let (tx, rx) = oneshot::channel::<Msg>();
future::result(
self.sender_queue
.unbounded_send(tx)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
.and_then(|_| {
self.socket_sender
.start_send(msg)
.and_then(|_| self.socket_sender.poll_complete())
}),
)
.map_err(|e| io::Error::new(io::ErrorKind::UnexpectedEof, e))
.and_then(move |_| {
rx.timeout(Duration::from_millis(socket_timeout))
.map_err(|e| io::Error::new(io::ErrorKind::TimedOut, e))
})
}
}