use std::borrow::Borrow;
use std::fs;
use std::path::PathBuf;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::Framed;
use futures::sink::SinkExt;
use bytes::Bytes;
use blather::{Params, Telegram};
use protwrap::ProtAddr;
use crate::auth::Auth;
use crate::conn;
use crate::types::AppChannel;
use crate::err::Error;
pub enum InputType {
Params(Params),
File(PathBuf),
VecBuf(Vec<u8>),
Bytes(Bytes)
}
impl InputType {
fn get_size(&self) -> Result<usize, Error> {
match self {
InputType::Params(params) => Ok(params.calc_buf_size()),
InputType::File(f) => {
let metadata = fs::metadata(f)?;
Ok(metadata.len() as usize)
}
InputType::VecBuf(v) => Ok(v.len()),
InputType::Bytes(b) => Ok(b.len())
}
}
}
pub struct Transport {
pub ch: AppChannel
}
pub struct MsgInfo {
pub cmd: u32,
pub meta: Option<InputType>,
pub payload: Option<InputType>
}
impl MsgInfo {
fn get_meta_size(&self) -> Result<u32, Error> {
let sz = match &self.meta {
Some(meta) => meta.get_size()?,
None => 0
};
if sz > u32::MAX as usize {
}
Ok(sz as u32)
}
fn get_payload_size(&self) -> Result<u64, Error> {
let sz = match &self.payload {
Some(payload) => payload.get_size()?,
None => 0
};
Ok(sz as u64)
}
}
pub async fn connsend<P, X, M>(
pa: &ProtAddr,
auth: Option<&Auth>,
xfer: X,
mi: M
) -> Result<String, Error>
where
X: Borrow<Transport>,
M: Borrow<MsgInfo>
{
let mut conn = conn::connect(pa, auth).await?;
send(&mut conn, xfer, mi).await
}
pub async fn send<T, X, M>(
conn: &mut Framed<T, blather::Codec>,
xfer: X,
mi: M
) -> Result<String, Error>
where
T: AsyncRead + AsyncWrite + Unpin,
X: Borrow<Transport>,
M: Borrow<MsgInfo>
{
let xfer = xfer.borrow();
let mi = mi.borrow();
let metalen = mi.get_meta_size()?;
let payloadlen = mi.get_payload_size()?;
let mut tg = Telegram::new_topic("Msg")?;
tg.add_param("_Ch", xfer.ch.to_string())?;
if mi.cmd != 0 {
tg.add_param("Cmd", mi.cmd)?;
}
if metalen != 0 {
tg.add_param("MetaLen", metalen)?;
}
if payloadlen != 0 {
tg.add_param("Len", payloadlen)?;
}
let params = crate::sendrecv(conn, &tg).await?;
let xferid = match params.get_str("XferId") {
Some(xferid) => xferid.to_string(),
None => {
let e = "Missing expected transfer identifier from server reply";
return Err(Error::MissingData(String::from(e)));
}
};
if let Some(meta) = &mi.meta {
send_content(conn, meta).await?;
crate::expect_okfail(conn).await?;
}
if let Some(payload) = &mi.payload {
send_content(conn, payload).await?;
crate::expect_okfail(conn).await?;
}
Ok(xferid)
}
async fn send_content<T>(
conn: &mut Framed<T, blather::Codec>,
data: &InputType
) -> Result<(), Error>
where
T: AsyncRead + AsyncWrite + Unpin
{
match data {
InputType::Params(params) => Ok(conn.send(params).await?),
InputType::File(fname) => {
let mut f = tokio::fs::File::open(fname).await?;
let _ = tokio::io::copy(&mut f, conn.get_mut()).await?;
Ok(())
}
InputType::VecBuf(v) => Ok(conn.send(v.as_slice()).await?),
InputType::Bytes(b) => Ok(conn.send(b.as_ref()).await?)
}
}