use std::io::{self, Write};
use std::sync::{Arc, Mutex};
use std::thread;
use log::*;
use serde::de::DeserializeOwned;
use serde::Serialize;
use tokio::sync::mpsc;
type SendFn = Arc<Mutex<Box<dyn FnMut(&[u8]) -> io::Result<()> + Send>>>;
type RecvFn = Arc<Mutex<Box<dyn FnMut(&mut String) -> io::Result<()> + Send>>>;
#[derive(Clone)]
pub struct MsgSender {
send: SendFn,
}
impl<F> From<F> for MsgSender
where
F: FnMut(&[u8]) -> io::Result<()> + Send + 'static,
{
fn from(f: F) -> Self {
Self {
send: Arc::new(Mutex::new(Box::new(f))),
}
}
}
impl MsgSender {
pub fn from_stdout() -> Self {
let mut writer = std::io::stdout();
Self::from(Box::new(move |output: &'_ [u8]| {
writer.write_all(output)?;
writer.flush()?;
Ok(())
}))
}
pub fn send_blocking<T>(&self, ser: &T) -> io::Result<()>
where
T: Serialize,
{
let msg = format!("{}\n", serde_json::to_string(ser)?);
self.send.lock().unwrap()(msg.as_bytes())
}
}
#[derive(Clone)]
pub struct MsgReceiver {
recv: RecvFn,
}
impl<F> From<F> for MsgReceiver
where
F: FnMut(&mut String) -> io::Result<()> + Send + 'static,
{
fn from(f: F) -> Self {
Self {
recv: Arc::new(Mutex::new(Box::new(f))),
}
}
}
impl MsgReceiver {
pub fn from_stdin() -> Self {
let reader = std::io::stdin();
Self::from(move |input: &'_ mut String| {
let _ = reader.read_line(input)?;
Ok(())
})
}
pub fn into_rx<T>(self) -> mpsc::Receiver<io::Result<T>>
where
T: DeserializeOwned + Send + 'static,
{
let (tx, rx) = mpsc::channel(1);
thread::spawn(move || {
loop {
let res = self.recv_blocking();
let is_eof = match res.as_ref() {
Err(x) => x.kind() == io::ErrorKind::UnexpectedEof,
Ok(_) => false,
};
if tx.blocking_send(res).is_err() {
break;
}
if is_eof {
break;
}
}
});
rx
}
pub fn recv_blocking<T>(&self) -> io::Result<T>
where
T: DeserializeOwned,
{
let mut input = String::new();
let data: T = loop {
self.recv.lock().unwrap()(&mut input)?;
trace!(
"Parsing into {} for {:?}",
std::any::type_name::<T>(),
input,
);
match serde_json::from_str(&input) {
Ok(data) => break data,
Err(x) if x.is_eof() => {
trace!(
"Not ready to parse as {}, so trying again with next update",
std::any::type_name::<T>(),
);
continue;
}
Err(x) => return Err(x.into()),
}
};
Ok(data)
}
}