use std::future::Future;
use std::path::PathBuf;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
use async_trait::async_trait;
use num::NumCast;
use bytes::{Bytes, BytesMut};
use blather::{codec, KVLines, Params, Telegram};
use crate::err::Error;
pub enum SubCh {
Num(u8),
Name(String)
}
pub struct SubInfo {
pub ch: SubCh
}
pub async fn subscribe<C>(
conn: &mut Framed<C, blather::Codec>,
subinfo: SubInfo
) -> Result<(), Error>
where
C: AsyncRead + AsyncWrite + Unpin
{
let mut tg = Telegram::new();
tg.set_topic("Sub")?;
match subinfo.ch {
SubCh::Num(ch) => {
tg.add_param("Ch", ch)?;
}
SubCh::Name(nm) => {
tg.add_param("Ch", nm)?;
}
}
crate::sendrecv(conn, &tg).await?;
Ok(())
}
pub enum StoreType {
None,
Bytes,
BytesMut,
Params,
KVLines,
File(PathBuf)
}
pub enum Storage {
Bytes(Bytes),
BytesMut(BytesMut),
Params(Params),
KVLines(KVLines),
File(PathBuf),
LocalFile(PathBuf)
}
pub struct Msg {
pub cmd: u32,
pub meta: Option<Storage>,
pub payload: Option<Storage>
}
pub struct MsgInfo {
pub cmd: u32,
pub metalen: u32,
pub payloadlen: u64
}
pub async fn recv_c<C, S>(
conn: &mut Framed<C, blather::Codec>,
storeq: S
) -> Result<Msg, Error>
where
C: AsyncRead + AsyncWrite + Unpin,
S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>
{
if let Some(o) = conn.next().await {
let o = o?;
match o {
codec::Input::Telegram(tg) => {
if let Some(topic) = tg.get_topic() {
if topic == "Msg" {
let mp = tg.into_params();
return proc_inbound_msg(conn, mp, storeq).await;
} else if topic == "Fail" {
return Err(Error::ServerError(tg.into_params()));
}
}
}
_ => {
return Err(Error::BadState(
"Unexpected codec input type.".to_string()
));
}
}
return Err(Error::bad_state("Unexpected reply from server."));
}
Err(Error::Disconnected)
}
pub async fn recvloop_c<C, S, P>(
conn: &mut Framed<C, blather::Codec>,
kill: Option<killswitch::Shutdown>,
mut storeq: S,
procmsg: P
) -> Result<(), Error>
where
C: AsyncRead + AsyncWrite + Unpin,
S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>,
P: Fn(Msg) -> Result<(), Error>
{
if let Some(kill) = kill {
loop {
tokio::select! {
msg = recv_c(conn, &mut storeq) => {
let msg = msg?;
procmsg(msg)?;
}
_ = kill.wait() => {
break;
}
}
}
} else {
loop {
let msg = recv_c(conn, &mut storeq).await?;
procmsg(msg)?;
}
}
Ok(())
}
pub async fn recvloop_ca<C, S, F, P>(
conn: &mut Framed<C, blather::Codec>,
kill: Option<killswitch::Shutdown>,
mut storeq: S,
procmsg: P
) -> Result<(), Error>
where
C: AsyncRead + AsyncWrite + Unpin,
S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>,
F: Future<Output = Result<(), Error>>,
P: Fn(Msg) -> F
{
if let Some(kill) = kill {
loop {
tokio::select! {
msg = recv_c(conn, &mut storeq) => {
let msg = msg?;
procmsg(msg).await?;
}
_ = kill.wait() => {
break;
}
}
}
} else {
loop {
let msg = recv_c(conn, &mut storeq).await?;
procmsg(msg).await?;
}
}
Ok(())
}
async fn proc_inbound_msg<C, S>(
conn: &mut Framed<C, blather::Codec>,
mp: Params,
mut storeq: S
) -> Result<Msg, Error>
where
C: AsyncRead + AsyncWrite + Unpin,
S: FnMut(&MsgInfo) -> Result<(StoreType, StoreType), Error>
{
let (cmd, metalen, payloadlen) = parse_header(&mp)?;
let (meta_store, payload_store) = if metalen != 0 || payloadlen != 0 {
let mi = MsgInfo {
cmd,
metalen,
payloadlen
};
let (ms, ps) = storeq(&mi)?;
let ms = if metalen != 0 { Some(ms) } else { None };
let ps = if payloadlen != 0 { Some(ps) } else { None };
(ms, ps)
} else {
(None, None)
};
let meta = get_content_to(conn, metalen, meta_store).await?;
let payload = get_content_to(conn, payloadlen, payload_store).await?;
Ok(Msg { cmd, meta, payload })
}
async fn get_content_to<C, S>(
conn: &mut Framed<C, blather::Codec>,
size: S,
store_type: Option<StoreType>
) -> Result<Option<Storage>, Error>
where
C: AsyncRead + AsyncWrite + Unpin,
S: NumCast
{
if let Some(store_type) = store_type {
match store_type {
StoreType::None => {
conn.codec_mut().skip(num::cast(size).unwrap())?;
}
StoreType::Bytes => {
conn.codec_mut().expect_bytes(num::cast(size).unwrap())?;
}
StoreType::BytesMut => {
conn.codec_mut().expect_bytesmut(num::cast(size).unwrap())?;
}
StoreType::Params => {
conn.codec_mut().expect_params();
}
StoreType::KVLines => {
conn.codec_mut().expect_kvlines();
}
StoreType::File(ref fname) => {
conn
.codec_mut()
.expect_file(fname, num::cast(size).unwrap())?;
}
}
get_content(conn).await
} else {
Ok(None)
}
}
async fn get_content<C>(
conn: &mut Framed<C, blather::Codec>
) -> Result<Option<Storage>, Error>
where
C: AsyncRead + AsyncWrite + Unpin
{
if let Some(o) = conn.next().await {
let o = o?;
match o {
codec::Input::SkipDone => Ok(None),
codec::Input::Bytes(bytes) => Ok(Some(Storage::Bytes(bytes))),
codec::Input::BytesMut(bytes) => Ok(Some(Storage::BytesMut(bytes))),
codec::Input::Params(params) => Ok(Some(Storage::Params(params))),
codec::Input::KVLines(kvlines) => Ok(Some(Storage::KVLines(kvlines))),
codec::Input::File(fname) => Ok(Some(Storage::File(fname))),
_ => Err(Error::bad_state("Unexpected codec input type."))
}
} else {
Err(Error::Disconnected)
}
}
#[async_trait]
pub trait Handler {
async fn on_header(
&mut self,
mi: &MsgInfo
) -> Result<(StoreType, StoreType), Error>;
async fn on_data(&mut self, msg: Msg) -> Result<(), Error>;
}
pub async fn recv_h<C>(
conn: &mut Framed<C, blather::Codec>,
handler: &mut Box<dyn Handler + Send + Sync>
) -> Result<Msg, Error>
where
C: AsyncRead + AsyncWrite + Unpin
{
if let Some(o) = conn.next().await {
let o = o?;
match o {
codec::Input::Telegram(tg) => {
if let Some(topic) = tg.get_topic() {
if topic == "Msg" {
let mp = tg.into_params();
return proc_inbound_msg_h(conn, mp, handler).await;
} else if topic == "Fail" {
return Err(Error::ServerError(tg.into_params()));
}
}
}
_ => {
return Err(Error::BadState(
"Unexpected codec input type.".to_string()
));
}
}
return Err(Error::bad_state("Unexpected reply from server."));
}
Err(Error::Disconnected)
}
fn parse_header(mp: &Params) -> Result<(u32, u32, u64), Error> {
let cmd = if mp.have("Cmd") {
mp.get_param::<u32>("Cmd")?
} else {
0
};
let metalen = if mp.have("MetaLen") {
mp.get_param::<u32>("MetaLen")?
} else {
0u32
};
let payloadlen = if mp.have("Len") {
mp.get_param::<u64>("Len")?
} else {
0u64
};
Ok((cmd, metalen, payloadlen))
}
async fn proc_inbound_msg_h<C>(
conn: &mut Framed<C, blather::Codec>,
mp: Params,
handler: &mut Box<dyn Handler + Send + Sync>
) -> Result<Msg, Error>
where
C: AsyncRead + AsyncWrite + Unpin
{
let (cmd, metalen, payloadlen) = parse_header(&mp)?;
let (meta_store, payload_store) = if metalen != 0 || payloadlen != 0 {
let mi = MsgInfo {
cmd,
metalen,
payloadlen
};
let (ms, ps) = handler.on_header(&mi).await?;
let ms = if metalen != 0 { Some(ms) } else { None };
let ps = if payloadlen != 0 { Some(ps) } else { None };
(ms, ps)
} else {
(None, None)
};
let meta = get_content_to(conn, metalen, meta_store).await?;
let payload = get_content_to(conn, payloadlen, payload_store).await?;
Ok(Msg { cmd, meta, payload })
}
pub async fn recvloop_h<C>(
conn: &mut Framed<C, blather::Codec>,
kill: Option<killswitch::Shutdown>,
handler: &mut Box<dyn Handler + Send + Sync>
) -> Result<(), Error>
where
C: AsyncRead + AsyncWrite + Unpin
{
if let Some(kill) = kill {
loop {
tokio::select! {
msg = recv_h(conn, handler) => {
let msg = msg?;
handler.on_data(msg).await?;
}
_ = kill.wait() => {
break;
}
}
}
} else {
loop {
let msg = recv_h(conn, handler).await?;
handler.on_data(msg).await?;
}
}
Ok(())
}