use std::{
convert::TryFrom,
sync::{
Arc,
Mutex,
},
};
use bytes::BytesMut;
use futures::prelude::*;
use tellem::{
Cmd,
Event,
KnownOpt,
Opt,
};
use tokio::select;
use tracing::debug;
use crate::{
text::{
Line,
Text,
},
widget::ansi::AnsiProcessor,
};
#[derive(Default, Debug, Clone, Copy)]
struct FlagsInner {
noecho: bool,
exit: bool,
}
#[derive(Default, Debug, Clone)]
pub struct Flags {
inner: Arc<Mutex<FlagsInner>>,
}
impl Flags {
pub fn noecho(&self) -> bool {
self.inner.lock().unwrap().noecho
}
pub fn set_noecho(&self, noecho: bool) {
self.inner.lock().unwrap().noecho = noecho;
}
pub fn should_exit(&self) -> bool {
self.inner.lock().unwrap().exit
}
pub fn exit(&self) {
self.inner.lock().unwrap().exit = true;
}
}
pub trait ScriptEngine {
type Error;
fn start<I, O, E>(
self,
user_input: I,
buffer_output: O,
flags: Flags,
) -> Result<(), anyhow::Error>
where
I: Stream<Item = String> + Send + Unpin + 'static,
O: Sink<(Vec<Line>, Text), Error = E> + Clone + Send + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static;
}
pub struct NopTelnetEngine(pub String);
impl ScriptEngine for NopTelnetEngine {
type Error = anyhow::Error;
fn start<I, O, E>(
self,
mut user_input: I,
buffer_output: O,
flags: Flags,
) -> Result<(), anyhow::Error>
where
I: Stream<Item = String> + Send + Unpin + 'static,
O: Sink<(Vec<Line>, Text), Error = E> + Clone + Send + Unpin + 'static,
E: std::error::Error + Send + Sync + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
let conn = match tokio::net::TcpStream::connect(self.0).await {
Ok(conn) => {
tx.send(Ok(())).unwrap();
conn
}
Err(e) => {
tx.send(Err(e)).unwrap();
return;
}
};
let (t_tx, mut t_rx) = tellem::TnConn::start(conn).split();
let (mut tc_tx, tc_rx) = futures::channel::mpsc::channel(64);
tokio::spawn(async { tc_rx.map(Ok).forward(t_tx).await });
let ga = Arc::new(Mutex::new(false));
let input_handler = {
let mut buffer_output = buffer_output.clone();
let flags = flags.clone();
let mut tc_tx = tc_tx.clone();
let ga = ga.clone();
async move {
while let Some(input) = user_input.next().await {
let mut data: BytesMut = input.as_bytes().into();
data.extend_from_slice(b"\r\n");
tc_tx.send(Event::Data(data)).await?;
let input_line = if !flags.noecho() {
Line::from(Text::try_from(input).unwrap())
} else {
Line::from(Text::try_from(String::new()).unwrap())
};
buffer_output.send((vec![input_line], Text::default())).await?;
*ga.lock().unwrap() = false;
}
let res: Result<(), anyhow::Error> = Ok(());
res
}
};
let telnet_handler = {
let mut buffer_output = buffer_output.clone();
async move {
let mut ansi_processor = AnsiProcessor::new(Default::default());
while let Some(event) = t_rx.try_next().await? {
match event {
Event::Negotiation(Cmd::WONT, Opt::Known(KnownOpt::ECHO)) => {
flags.set_noecho(false);
}
Event::Negotiation(Cmd::WILL, Opt::Known(KnownOpt::ECHO)) => {
flags.set_noecho(true);
tc_tx
.send(Event::Negotiation(Cmd::DO, Opt::Known(KnownOpt::ECHO)))
.await?;
}
Event::Cmd(Cmd::GA) => {
*ga.lock().unwrap() = true;
debug!("Received go ahead");
}
Event::Data(bytes) => {
if *ga.lock().unwrap() {
*ga.lock().unwrap() = false;
ansi_processor.append(b"\r\n");
}
ansi_processor.append(&bytes);
buffer_output.send(ansi_processor.take()).await?;
}
event => {
debug!(?event, "unhandled telnet event");
}
}
}
let res: Result<(), anyhow::Error> = Ok(());
res
}
};
select! {
_ = telnet_handler => {},
_ = input_handler => {},
}
});
debug!("waiting for 'connected' response");
Ok(futures::executor::block_on(rx)??)
}
}