use crate::network::Network;
use bytes::{BufMut, BytesMut};
use flume::{bounded, Receiver, Sender};
use open_protocol::messages::communication::MID0001rev7;
use open_protocol::messages::keep_alive::MID9999rev1;
use open_protocol::{Header, Message};
use open_protocol::{decode, encode::{self, Encode, Encoder}};
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
use thiserror;
use tokio::net::TcpStream;
use tokio::time::{Instant, Sleep};
use tokio::{select, time};
#[derive(Debug, thiserror::Error)]
pub enum ConnectionError {
#[error("Requests are done")]
RequestsDone,
#[error("Decode error: {0}")]
DecodeError(#[from] decode::Error),
#[error("Encode error: {0}")]
EncodeError(#[from] encode::Error),
#[error("IO error: {0}")]
IoError(#[from] io::Error),
}
#[derive(Debug)]
pub enum Event {
Incoming(Message),
Outgoing(Message),
}
pub struct EventLoop {
network: Option<Network>,
requests_rx: Receiver<Message>,
pub requests_tx: Sender<Message>,
pending: VecDeque<Message>, events: VecDeque<Event>,
write_buf: BytesMut,
keepalive_timeout: Option<Pin<Box<Sleep>>>,
}
impl EventLoop {
pub fn new(socket: TcpStream) -> Self {
let (requests_tx, requests_rx) = bounded(1000);
let pending = VecDeque::with_capacity(1000);
let events = VecDeque::with_capacity(1000);
Self {
requests_tx,
requests_rx,
pending,
events,
write_buf: BytesMut::with_capacity(10 * 1024),
network: Some(Network::new(socket)),
keepalive_timeout: None,
}
}
async fn select(&mut self) -> Result<Event, ConnectionError> {
let inflight_full = false; let collision = false;
if let Some(event) = self.events.pop_front() {
return Ok(event);
}
select! {
o = next_request(
&mut self.pending,
&self.requests_rx,
Duration::ZERO
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.handle_outgoing_packet(request)?;
self.network.as_mut().unwrap().flush(&mut self.write_buf).await?;
Ok(self.events.pop_front().unwrap())
}
Err(_) => Err(ConnectionError::RequestsDone),
},
o = self.network.as_mut().unwrap().read(&mut self.events) => {
o?;
self.network.as_mut().unwrap().flush(&mut self.write_buf).await?;
Ok(self.events.pop_front().unwrap())
},
_ = self.keepalive_timeout.as_mut().unwrap() => {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + Duration::from_secs(5));
self.handle_outgoing_packet(Message::MID9999rev1(MID9999rev1 {}))?;
self.network.as_mut().unwrap().flush(&mut self.write_buf).await?;
Ok(self.events.pop_front().unwrap())
}
}
}
fn handle_outgoing_packet(&mut self, request: Message) -> Result<(), ConnectionError> {
let mut payload_encoder = Encoder::new();
request.encode_payload(&mut payload_encoder)?;
let (mid, revision) = request.mid_revision();
let header = Header {
mid,
revision: Some(revision),
length: (payload_encoder.len() as u16) + 20,
..Default::default()
};
let mut header_encoder = Encoder::new();
header.encode(&mut header_encoder)?;
self.write_buf.extend(header_encoder.as_slice());
self.write_buf.extend(payload_encoder.as_slice());
self.write_buf.put_u8(0x0);
self.events.push_back(Event::Outgoing(request));
Ok(())
}
pub async fn poll(&mut self) -> Result<Event, ConnectionError> {
if self.keepalive_timeout.is_none() {
self.keepalive_timeout = Some(Box::pin(time::sleep(Duration::from_secs(5))));
}
match self.select().await {
Ok(v) => Ok(v),
Err(e) => {
Err(e)
}
}
}
}
async fn next_request(
pending: &mut VecDeque<Message>,
rx: &Receiver<Message>,
pending_throttle: Duration,
) -> Result<Message, ConnectionError> {
if !pending.is_empty() {
time::sleep(pending_throttle).await;
Ok(pending.pop_front().unwrap())
} else {
match rx.recv_async().await {
Ok(r) => Ok(r),
Err(_) => Err(ConnectionError::RequestsDone),
}
}
}
pub async fn connect() -> (Sender<Message>, EventLoop) {
let socket = TcpStream::connect(SocketAddr::from_str("127.0.0.1:4545").unwrap())
.await
.unwrap();
let event_loop = EventLoop::new(socket);
let sender = event_loop.requests_tx.clone();
sender.send_async(Message::MID0001rev1(MID0001rev7 { keep_alive: None })).await.unwrap();
(sender, event_loop)
}