use {
bytes::Bytes,
heapless::HistoryBuf,
serde::Deserialize,
std::io::Result as IOResult,
tokio::sync::{broadcast, mpsc, oneshot},
tokio_stream::wrappers::UnboundedReceiverStream,
warp::ws::{Message, WebSocket},
};
use crate::{cli::Config, envvars::Env};
pub type RoomID = String;
pub type ConnID = u32;
pub type PortID = u16;
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct Header {
#[serde(rename = "_to")]
pub to: Option<ConnID>,
#[serde(rename = "_meta", default = "bool::default")]
pub is_meta: bool,
#[serde(rename = "_cache", default = "bool::default")]
pub is_cache: bool,
}
impl Header {
pub fn to(to: ConnID) -> Self {
Header {
to: Some(to),
is_meta: false,
is_cache: false,
}
}
pub fn broadcast() -> Self {
Header {
to: None,
is_meta: false,
is_cache: false,
}
}
}
#[derive(Debug)]
pub enum Event {
Connect {
env: Env,
room: RoomID,
ws: Box<WebSocket>,
},
Disconnect {
env: Env,
room: RoomID,
conn: ConnID,
},
ProcessExit {
room: RoomID,
code: Option<i32>,
port: Option<PortID>,
},
ProcessMeta {
room: RoomID,
value: serde_json::Value,
},
Shutdown,
}
#[derive(Debug, Clone, Copy)]
pub enum Framing {
None,
Symmetric(Frame),
Asymmetric(Option<Frame>, Option<Frame>),
}
impl Framing {
pub fn socket_to_process(&self) -> Option<Frame> {
match self {
Framing::None => None,
Framing::Symmetric(f) => Some(*f),
Framing::Asymmetric(f, _) => *f,
}
}
pub fn process_to_socket(&self) -> Option<Frame> {
match self {
Framing::None => None,
Framing::Symmetric(f) => Some(*f),
Framing::Asymmetric(_, f) => *f,
}
}
}
impl From<&Config> for Framing {
fn from(cfg: &Config) -> Self {
if cfg.server_frame.is_some() || cfg.client_frame.is_some() {
return Self::Asymmetric(cfg.client_frame, cfg.server_frame);
}
if let Some(frame) = cfg.frame {
return Self::Symmetric(frame);
}
Self::None
}
}
#[derive(Debug, clap::ValueEnum, Clone, Copy)]
#[allow(clippy::upper_case_acronyms)]
pub enum Frame {
JSON,
#[clap(name = "gwsocket")]
GWSocket,
}
#[derive(Debug, clap::ValueEnum, Clone, Copy)]
#[allow(clippy::upper_case_acronyms)]
pub enum Log {
JSON,
Text,
}
#[derive(Debug, Clone)]
pub enum Cache {
All(usize),
Tagged(usize),
}
#[derive(Debug, Clone)]
pub enum Caching {
None,
All,
Tagged,
}
impl Caching {
pub fn matches(&self, h: &Header) -> bool {
match self {
Self::All => true,
Self::Tagged if h.is_cache => true,
_ => false,
}
}
}
impl From<&Config> for Caching {
fn from(cfg: &Config) -> Self {
match cfg.cache {
Some(Cache::All(_)) => Self::All,
Some(Cache::Tagged(_)) => Self::Tagged,
None => Self::None,
}
}
}
pub type BoxedHistoryBuf<T, const N: usize> = Box<HistoryBuf<T, N>>;
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum CacheBuffer {
Single(HistoryBuf<Message, 1>),
Tiny(HistoryBuf<Message, 8>),
Small(BoxedHistoryBuf<Message, 64>),
}
impl CacheBuffer {
pub fn new(cache: &Cache) -> Self {
use Cache::*;
match cache {
All(1) | Tagged(1) => Self::Single(HistoryBuf::<_, 1>::new()),
All(8) | Tagged(8) => Self::Tiny(HistoryBuf::<_, 8>::new()),
All(64) | Tagged(64) => Self::Small(Box::new(HistoryBuf::<_, 64>::new())),
_ => panic!("invalid cache size"),
}
}
pub fn write(&mut self, msg: Message) {
match self {
Self::Single(h) => h.write(msg),
Self::Tiny(h) => h.write(msg),
Self::Small(h) => h.write(msg),
}
}
pub fn to_vec(&self) -> Vec<(Header, Message)> {
match self {
Self::Single(h) => h
.oldest_ordered()
.cloned()
.map(|msg| (Header::broadcast(), msg))
.collect(),
Self::Tiny(h) => h
.oldest_ordered()
.cloned()
.map(|msg| (Header::broadcast(), msg))
.collect(),
Self::Small(h) => h
.oldest_ordered()
.cloned()
.map(|msg| (Header::broadcast(), msg))
.collect(),
}
}
}
pub type EventTx = mpsc::UnboundedSender<Event>;
pub type EventRx = mpsc::UnboundedReceiver<Event>;
pub type ToProcessTx = mpsc::UnboundedSender<Message>;
pub type ToProcessRx = mpsc::UnboundedReceiver<Message>;
pub type ToProcessRxStream = UnboundedReceiverStream<Message>;
pub type ShutdownTx = oneshot::Sender<()>;
pub type ShutdownRx = oneshot::Receiver<()>;
pub type ShutdownRxStream = futures::future::IntoStream<ShutdownRx>;
pub type FromProcessTx = broadcast::Sender<(Header, Message)>;
pub type FromProcessRx = broadcast::Receiver<(Header, Message)>;
pub type FromProcessTxAny = Box<dyn tokio::io::AsyncWrite + Unpin + Send>;
pub type FromProcessRxAny = Box<dyn futures::Stream<Item = IOResult<Bytes>> + Unpin + Send>;
pub type ProcessSenders = (FromProcessTx, ToProcessTx, ShutdownTx);