use futures::sink::SinkExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::codec::Framed;
use blather::{codec, Telegram};
use crate::auth::Auth;
use crate::err::Error;
pub use protwrap::tokio::Stream;
pub use protwrap::ProtAddr;
pub type Frm = Framed<protwrap::tokio::Stream, blather::Codec>;
pub async fn connect(
pa: &ProtAddr,
auth: Option<&Auth>
) -> Result<Framed<protwrap::tokio::Stream, blather::Codec>, Error> {
let strm = protwrap::tokio::connect(pa).await?;
let mut framed = Framed::new(strm, blather::Codec::new());
if let Some(auth) = auth {
auth.authenticate(&mut framed).await?;
}
Ok(framed)
}
pub async fn sendrecv<T>(
conn: &mut Framed<T, blather::Codec>,
tg: &Telegram
) -> Result<blather::Params, Error>
where
T: AsyncRead + AsyncWrite + Unpin
{
conn.send(tg).await?;
expect_okfail(conn).await
}
pub async fn expect_okfail<T>(
conn: &mut Framed<T, blather::Codec>
) -> Result<blather::Params, Error>
where
T: AsyncRead + Unpin
{
if let Some(o) = conn.next().await {
let o = o?;
match o {
codec::Input::Telegram(tg) => {
if let Some(topic) = tg.get_topic() {
if topic == "Ok" {
return Ok(tg.into_params());
} else if topic == "Fail" {
return Err(Error::ServerError(tg.into_params()));
}
}
}
_ => {
println!("unexpected reply");
}
}
return Err(Error::BadState("Unexpected reply from server.".to_string()));
}
Err(Error::Disconnected)
}
#[derive(Debug)]
pub struct WhoAmI {
pub id: i64,
pub name: String
}
pub async fn whoami<T>(
conn: &mut Framed<T, blather::Codec>
) -> Result<WhoAmI, Error>
where
T: AsyncRead + AsyncWrite + Unpin
{
let tg = Telegram::new_topic("WhoAmI")?;
let params = sendrecv(conn, &tg).await?;
let id = params.get_param::<i64>("Id")?;
let name = params.get_param("Name")?;
Ok(WhoAmI { id, name })
}