use crate::tagvalue::{DecodeError, DecoderBuffered, Message};
use futures::future::Fuse;
use futures::future::Ready;
use futures::select;
use futures::{AsyncRead, AsyncReadExt, FutureExt};
use futures_timer::Delay;
use std::io;
use std::time::Duration;
use std::time::Instant;
#[derive(Debug)]
pub struct LlEventLoop<I> {
decoder: DecoderBuffered,
input: I,
heartbeat: Duration,
heartbeat_soft_tolerance: Duration,
heartbeat_hard_tolerance: Duration,
last_reset: Instant,
last_heartbeat: Instant,
is_alive: bool,
}
impl<I> LlEventLoop<I>
where
I: AsyncRead + std::marker::Unpin,
{
pub fn new(decoder: DecoderBuffered, input: I, heartbeat: Duration) -> Self {
let heartbeat_soft_tolerance = heartbeat * 2;
let heartbeat_hard_tolerance = heartbeat * 3;
Self {
decoder,
input,
heartbeat,
heartbeat_soft_tolerance,
heartbeat_hard_tolerance,
last_reset: Instant::now(),
last_heartbeat: Instant::now(),
is_alive: true,
}
}
pub fn set_soft_tolerance(&mut self, soft_tolerance: Duration) {
self.heartbeat_soft_tolerance = soft_tolerance;
}
pub fn set_hard_tolerance(&mut self, hard_tolerance: Duration) {
self.heartbeat_hard_tolerance = hard_tolerance;
}
pub async fn next_event<'a>(&'a mut self) -> Option<LlEvent<'a>> {
let mut buf_filled_len = 0;
let mut buf = &mut self.decoder.supply_buffer()[buf_filled_len..];
loop {
if !self.is_alive {
return None;
}
let now = Instant::now();
let mut timer_heartbeat = Delay::new(now - self.last_heartbeat + self.heartbeat).fuse();
let mut timer_test_request =
Delay::new(now - self.last_reset + self.heartbeat_soft_tolerance).fuse();
let mut timer_logout =
Delay::new(now - self.last_reset + self.heartbeat_hard_tolerance).fuse();
let mut read_result = self.input.read(buf).fuse();
select! {
read_result = read_result => {
match read_result {
Err(e) => {
return Some(LlEvent::IoError(e));
}
Ok(num_bytes) => {
buf_filled_len += num_bytes;
if buf_filled_len < buf.len() {
continue;
}
let result = self.decoder.parse();
buf_filled_len = 0;
buf = &mut self.decoder.supply_buffer()[buf_filled_len..];
match result {
Ok(Some(())) => {
let msg = self.decoder.message();
return Some(LlEvent::Message(msg));
}
Ok(None) => {
continue;
}
Err(err) => {
self.is_alive = false;
return Some(LlEvent::BadMessage(err))
}
}
}
};
},
() = timer_heartbeat => {
self.last_heartbeat = Instant::now();
return Some(LlEvent::Heartbeat);
},
() = timer_test_request => {
return Some(LlEvent::TestRequest);
},
() = timer_logout => {
self.is_alive = false;
return Some(LlEvent::Logout);
}
}
}
}
pub fn ping_heartbeat(&mut self) {
self.last_reset = Instant::now();
}
}
#[derive(Debug)]
pub enum LlEvent<'a> {
Message(Message<'a, &'a [u8]>),
BadMessage(DecodeError),
IoError(io::Error),
Heartbeat,
TestRequest,
Logout,
}
#[cfg(test)]
mod test {
use super::*;
use crate::tagvalue::{Config, Decoder};
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tokio_util::compat::*;
async fn produce_events(events: Vec<(&'static [u8], Duration)>) -> TcpStream {
let tcp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = tcp_listener.local_addr().unwrap();
tokio::spawn(async move {
let mut stream = TcpStream::connect(local_addr).await.unwrap();
for (event_bytes, delay) in events.iter() {
stream.write(event_bytes).await.unwrap();
tokio::time::sleep(*delay).await;
}
});
tcp_listener.accept().await.unwrap().0
}
async fn new_event_loop(
events: Vec<(&'static [u8], Duration)>,
) -> LlEventLoop<Compat<TcpStream>> {
let input = produce_events(events).await;
LlEventLoop::new(
Decoder::<Config>::new(crate::Dictionary::fix44()).buffered(),
input.compat(),
Duration::from_secs(3),
)
}
#[tokio::test]
async fn dead_input_triggers_logout() {
let mut event_loop = new_event_loop(vec![(b"8", Duration::from_secs(10))]).await;
let event = event_loop.next_event().await;
assert!(matches!(event, Some(LlEvent::Heartbeat)));
let event = event_loop.next_event().await;
assert!(
matches!(event, Some(LlEvent::Heartbeat))
|| matches!(event, Some(LlEvent::TestRequest))
);
}
}