use std::{
pin::Pin,
sync::{Arc, RwLock, RwLockReadGuard},
task::{Context, Poll},
time::Duration,
};
use chrono::prelude::*;
use futures_util::{future::Future, ready, sink::Sink, stream::Stream};
use pin_project::pin_project;
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
io::{AsyncRead, AsyncWrite},
time::{self, Interval, Sleep},
};
use tokio_util::codec::Framed;
use crate::{
client::data::Config,
error,
proto::{Command, IrcCodec, Message, Response},
};
#[pin_project]
struct Pinger {
tx: UnboundedSender<Message>,
enabled: bool,
ping_timeout: Duration,
#[pin]
ping_deadline: Option<Sleep>,
#[pin]
ping_interval: Interval,
}
impl Pinger {
pub fn new(tx: UnboundedSender<Message>, config: &Config) -> Pinger {
let ping_time = Duration::from_secs(u64::from(config.ping_time()));
let ping_timeout = Duration::from_secs(u64::from(config.ping_timeout()));
Self {
tx,
enabled: false,
ping_timeout,
ping_deadline: None,
ping_interval: time::interval(ping_time),
}
}
fn handle_message(self: Pin<&mut Self>, message: &Message) -> error::Result<()> {
match message.command {
Command::Response(Response::RPL_ENDOFMOTD, _)
| Command::Response(Response::ERR_NOMOTD, _) => {
*self.project().enabled = true;
}
Command::PING(ref data, _) => {
self.send_pong(data)?;
}
Command::PONG(_, None) | Command::PONG(_, Some(_)) => {
log::trace!("Received PONG");
self.project().ping_deadline.set(None);
}
_ => (),
}
Ok(())
}
fn send_pong(self: Pin<&mut Self>, data: &str) -> error::Result<()> {
self.project()
.tx
.send(Command::PONG(data.to_owned(), None).into())?;
Ok(())
}
fn send_ping(self: Pin<&mut Self>) -> error::Result<()> {
log::trace!("Sending PING");
let data = format!("{}", Local::now().timestamp());
let mut this = self.project();
this.tx.send(Command::PING(data.clone(), None).into())?;
if this.ping_deadline.is_none() {
let ping_deadline = time::sleep(*this.ping_timeout);
this.ping_deadline.set(Some(ping_deadline));
}
Ok(())
}
}
impl Future for Pinger {
type Output = Result<(), error::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ping_deadline) = self.as_mut().project().ping_deadline.as_pin_mut() {
match ping_deadline.poll(cx) {
Poll::Ready(()) => return Poll::Ready(Err(error::Error::PingTimeout)),
Poll::Pending => (),
}
}
if let Poll::Ready(_) = self.as_mut().project().ping_interval.poll_tick(cx) {
if *self.as_mut().project().enabled {
self.as_mut().send_ping()?;
}
}
Poll::Pending
}
}
#[pin_project]
pub struct Transport<T> {
#[pin]
inner: Framed<T, IrcCodec>,
#[pin]
pinger: Option<Pinger>,
}
impl<T> Transport<T>
where
T: Unpin + AsyncRead + AsyncWrite,
{
pub fn new(
config: &Config,
inner: Framed<T, IrcCodec>,
tx: UnboundedSender<Message>,
) -> Transport<T> {
let pinger = Some(Pinger::new(tx, config));
Transport { inner, pinger }
}
pub fn into_inner(self) -> Framed<T, IrcCodec> {
self.inner
}
}
impl<T> Stream for Transport<T>
where
T: Unpin + AsyncRead + AsyncWrite,
{
type Item = Result<Message, error::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() {
match pinger.poll(cx) {
Poll::Ready(result) => result?,
Poll::Pending => (),
}
}
let result = ready!(self.as_mut().project().inner.poll_next(cx));
let message = match result {
None => return Poll::Ready(None),
Some(message) => message?,
};
if let Some(pinger) = self.as_mut().project().pinger.as_pin_mut() {
pinger.handle_message(&message)?;
}
Poll::Ready(Some(Ok(message)))
}
}
impl<T> Sink<Message> for Transport<T>
where
T: Unpin + AsyncRead + AsyncWrite,
{
type Error = error::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.project().inner.poll_ready(cx))?;
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
log::trace!("[SEND] {}", item);
self.project().inner.start_send(item)?;
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.project().inner.poll_flush(cx))?;
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.project().inner.poll_close(cx))?;
Poll::Ready(Ok(()))
}
}
#[derive(Clone, Debug)]
pub struct LogView {
sent: Arc<RwLock<Vec<Message>>>,
received: Arc<RwLock<Vec<Message>>>,
}
impl LogView {
pub fn sent(&self) -> error::Result<RwLockReadGuard<Vec<Message>>> {
self.sent.read().map_err(|_| error::Error::PoisonedLog)
}
pub fn received(&self) -> error::Result<RwLockReadGuard<Vec<Message>>> {
self.received.read().map_err(|_| error::Error::PoisonedLog)
}
}
#[pin_project]
pub struct Logged<T> {
#[pin]
inner: Transport<T>,
view: LogView,
}
impl<T> Logged<T>
where
T: AsyncRead + AsyncWrite,
{
pub fn wrap(inner: Transport<T>) -> Logged<T> {
Logged {
inner,
view: LogView {
sent: Arc::new(RwLock::new(vec![])),
received: Arc::new(RwLock::new(vec![])),
},
}
}
pub fn view(&self) -> LogView {
self.view.clone()
}
}
impl<T> Stream for Logged<T>
where
T: Unpin + AsyncRead + AsyncWrite,
{
type Item = Result<Message, error::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.inner.poll_next(cx)) {
Some(msg) => {
let msg = msg?;
this.view
.received
.write()
.map_err(|_| error::Error::PoisonedLog)?
.push(msg.clone());
Poll::Ready(Some(Ok(msg)))
}
None => Poll::Ready(None),
}
}
}
impl<T> Sink<Message> for Logged<T>
where
T: Unpin + AsyncRead + AsyncWrite,
{
type Error = error::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx)
}
fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
let this = self.project();
this.inner.start_send(item.clone())?;
this.view
.sent
.write()
.map_err(|_| error::Error::PoisonedLog)?
.push(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(cx)
}
}