sharedstate 0.4.4

Sync heavily read state across many servers
Documentation
use std::time::Duration;

use message_encoding::MessageEncoding;
use tokio::sync::mpsc::{Receiver, Sender};

use super::{io::SyncIO, message_io::{read_message_opt, send_message, send_zero_message}};

#[derive(Clone, Debug)]
pub struct NetIoSettings {
    pub process_timeout: Duration,
    pub message_timeout: Duration,
}

impl Default for NetIoSettings {
    fn default() -> Self {
        Self {
            process_timeout: Duration::from_secs(2),
            message_timeout: Duration::from_secs(12),
        }
    }
}

pub struct ReadChannel<I: SyncIO, M: MessageEncoding> {
    pub input: I::Read,
    pub output: Sender<M>,
    pub settings: NetIoSettings,
}

pub struct WriteChannel<I: SyncIO, M: MessageEncoding> {
    pub input: Receiver<M>,
    pub output: I::Write,
    pub settings: NetIoSettings,
}

impl<I: SyncIO, M: MessageEncoding + Send + Sync + 'static> ReadChannel<I, M> {
    pub async fn start(mut self) {
        let mut buffer = vec![0u8; 2048];

        loop {
            tokio::task::yield_now().await;

            let read_opt_res = tokio::select! {
                read_opt_res = read_message_opt::<M, _>(
                    &mut buffer,
                    &mut self.input,
                    self.settings.process_timeout,
                    Some(self.settings.message_timeout),
                ) => read_opt_res,
                _ = self.output.closed() => {
                    tracing::info!("output closed, stopping read");
                    break;
                }
            };

            match read_opt_res {
                Ok(Some(msg)) => {
                    if self.output.send(msg).await.is_err() {
                        tracing::error!("failed to send message to output, stopping read");
                        break;
                    }
                }
                Ok(None) => {
                    continue;
                }
                Err(error) => {
                    tracing::error!(?error, "failed to read from network");
                    break;
                }
            }
        }
    }
}

impl<I: SyncIO, M: MessageEncoding + Send + Sync + 'static> WriteChannel<I, M> {
    pub async fn start(mut self) {
        let mut buffer = vec![0u8; 2048];
        let zero_msg_timeout = self.settings.process_timeout / 3;

        loop {
            tokio::task::yield_now().await;

            let msg = tokio::select! {
                msg_opt = self.input.recv() => {
                    match msg_opt {
                        Some(v) => Some(v),
                        None => {
                            tracing::info!("input closed, ending write");
                            break;
                        }
                    }
                }
                _ = tokio::time::sleep(zero_msg_timeout) => None,
            };

            let send_res = match msg {
                Some(msg) => {
                    tokio::time::timeout(
                        self.settings.process_timeout,
                        send_message(&mut buffer, &msg, &mut self.output, self.settings.process_timeout)
                    ).await
                }
                None => {
                    tokio::time::timeout(
                        self.settings.process_timeout,
                        send_zero_message(&mut self.output)
                    ).await
                }
            };

            if let Err(error) = send_res {
                tracing::error!(?error, "failed to send message, closing write");
                break;
            }
        }
    }
}