use std::fmt;
use futures::{SinkExt, Stream, StreamExt, future::Either};
use log::*;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc},
};
use tokio_util::codec::{Framed, LinesCodec};
use super::{LOG_TARGET, event::TorControlEvent, parsers, response::ResponseLine};
pub fn spawn_monitor<TSocket>(
mut cmd_rx: mpsc::Receiver<String>,
socket: TSocket,
event_tx: broadcast::Sender<TorControlEvent>,
) -> mpsc::Receiver<ResponseLine>
where
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let (responses_tx, responses_rx) = mpsc::channel(100);
tokio::spawn(async move {
let framed = Framed::new(socket, LinesCodec::new());
let (mut sink, mut stream) = framed.split();
loop {
let either = tokio::select! {
next = cmd_rx.recv() => Either::Left(next),
next = stream.next() => Either::Right(next),
};
match either {
Either::Left(Some(line)) => {
trace!(target: LOG_TARGET, "Tor send: {line}");
if let Err(err) = sink.send(line).await {
error!(
target: LOG_TARGET,
"Error when sending to Tor control server: {err:?}. Monitor is shutting down."
);
break;
}
},
Either::Left(None) => {
warn!(
target: LOG_TARGET,
"Tor control server command receiver closed. Monitor is exiting."
);
break;
},
Either::Right(Some(Ok(line))) => {
trace!(target: LOG_TARGET, "Tor recv: {line}");
match parsers::response_line(&line) {
Ok(mut line) => {
if line.is_multiline {
let lines = read_until(&mut stream, ".").await;
line.value = format!("{}\n{}", line.value, lines.join("\n"));
}
if line.is_event() {
match TorControlEvent::try_from_response(&line) {
Ok(event) => {
let _result = event_tx.send(event);
},
Err(err) => {
log_server_response_error(err);
},
}
} else if let Err(err) = responses_tx.send(line).await {
warn!(
target: LOG_TARGET,
"Failed to send response on internal channel: {err:?}"
);
} else {
}
},
Err(err) => log_server_response_error(err),
}
},
Either::Right(Some(Err(err))) => {
cmd_rx.close();
error!(
target: LOG_TARGET,
"Line framing error when reading from tor control server: '{err:?}'. Monitor is exiting."
);
let _result = event_tx.send(TorControlEvent::TorControlDisconnected);
break;
},
Either::Right(None) => {
cmd_rx.close();
warn!(
target: LOG_TARGET,
"Connection to tor control port closed. Monitor is exiting."
);
let _result = event_tx.send(TorControlEvent::TorControlDisconnected);
break;
},
}
}
});
responses_rx
}
async fn read_until<E: fmt::Debug, S: Stream<Item = Result<String, E>> + Unpin>(
stream: &mut S,
pat: &str,
) -> Vec<String> {
let mut items = Vec::new();
loop {
match stream.next().await {
Some(Ok(item)) => {
if item.trim() == pat {
break items;
}
items.push(item);
},
Some(Err(err)) => {
error!(target: LOG_TARGET, "read_until: {err:?}");
},
None => {
break items;
},
}
}
}
fn log_server_response_error<E: fmt::Debug>(err: E) {
error!(
target: LOG_TARGET,
"Error processing response from tor control server: '{err:?}'"
);
}