use super::{
commands,
commands::{AddOnionFlag, AddOnionResponse, TorCommand},
error::TorClientError,
response::ResponseLine,
types::{KeyBlob, KeyType, PortMapping},
PrivateKey,
LOG_TARGET,
};
use crate::{
multiaddr::Multiaddr,
tor::control_client::{event::TorControlEvent, monitor::spawn_monitor},
transports::{TcpTransport, Transport},
};
use futures::{channel::mpsc, AsyncRead, AsyncWrite, SinkExt, StreamExt};
use log::*;
use std::{borrow::Cow, fmt, fmt::Display, num::NonZeroU16};
use tokio::sync::broadcast;
pub struct TorControlPortClient {
cmd_tx: mpsc::Sender<String>,
output_stream: mpsc::Receiver<ResponseLine>,
event_tx: broadcast::Sender<TorControlEvent>,
}
impl TorControlPortClient {
pub async fn connect(
addr: Multiaddr,
event_tx: broadcast::Sender<TorControlEvent>,
) -> Result<Self, TorClientError>
{
let mut tcp = TcpTransport::new();
tcp.set_nodelay(true);
let socket = tcp.dial(addr)?.await?;
Ok(Self::new(socket, event_tx))
}
pub fn new<TSocket>(socket: TSocket, event_tx: broadcast::Sender<TorControlEvent>) -> Self
where TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static {
let (cmd_tx, cmd_rx) = mpsc::channel(10);
let output_stream = spawn_monitor(cmd_rx, socket, event_tx.clone());
Self {
cmd_tx,
output_stream,
event_tx,
}
}
pub fn is_connected(&self) -> bool {
!self.cmd_tx.is_closed()
}
pub(in crate::tor) fn event_sender(&self) -> &broadcast::Sender<TorControlEvent> {
&self.event_tx
}
pub fn get_event_stream(&self) -> broadcast::Receiver<TorControlEvent> {
self.event_tx.subscribe()
}
pub async fn authenticate(&mut self, authentication: &Authentication) -> Result<(), TorClientError> {
match authentication {
Authentication::None => {
self.send_line("AUTHENTICATE".to_string()).await?;
},
Authentication::HashedPassword(passwd) => {
self.send_line(format!("AUTHENTICATE \"{}\"", passwd.replace("\"", "\\\"")))
.await?;
},
Authentication::Cookie(cookie) => {
self.send_line(format!("AUTHENTICATE {}", cookie)).await?;
},
}
self.recv_ok().await?;
Ok(())
}
#[allow(clippy::needless_lifetimes)]
pub async fn get_conf<'a>(&mut self, conf_name: &'a str) -> Result<Vec<Cow<'a, str>>, TorClientError> {
let command = commands::get_conf(conf_name);
self.request_response(command).await
}
#[allow(clippy::needless_lifetimes)]
pub async fn get_info<'a>(&mut self, key_name: &'a str) -> Result<Vec<Cow<'a, str>>, TorClientError> {
let command = commands::get_info(key_name);
let response = self.request_response(command).await?;
if response.is_empty() {
return Err(TorClientError::ServerNoResponse);
}
Ok(response)
}
pub async fn set_events(&mut self, events: &[&str]) -> Result<(), TorClientError> {
let command = commands::set_events(events);
let _ = self.request_response(command).await?;
Ok(())
}
pub async fn add_onion_custom<P: Into<PortMapping>>(
&mut self,
key_type: KeyType,
key_blob: KeyBlob<'_>,
flags: Vec<AddOnionFlag>,
port: P,
num_streams: Option<NonZeroU16>,
) -> Result<AddOnionResponse, TorClientError>
{
let command = commands::AddOnion::new(key_type, key_blob, flags, port.into(), num_streams);
self.request_response(command).await
}
pub async fn add_onion_v2<P: Into<PortMapping>>(
&mut self,
flags: Vec<AddOnionFlag>,
port: P,
num_streams: Option<NonZeroU16>,
) -> Result<AddOnionResponse, TorClientError>
{
self.add_onion_custom(KeyType::New, KeyBlob::Rsa1024, flags, port, num_streams)
.await
}
pub async fn add_onion<P: Into<PortMapping>>(
&mut self,
flags: Vec<AddOnionFlag>,
port: P,
num_streams: Option<NonZeroU16>,
) -> Result<AddOnionResponse, TorClientError>
{
self.add_onion_custom(KeyType::New, KeyBlob::Best, flags, port, num_streams)
.await
}
pub async fn add_onion_from_private_key<P: Into<PortMapping>>(
&mut self,
private_key: &PrivateKey,
flags: Vec<AddOnionFlag>,
port: P,
num_streams: Option<NonZeroU16>,
) -> Result<AddOnionResponse, TorClientError>
{
let (key_type, key_blob) = match private_key {
PrivateKey::Rsa1024(key) => (KeyType::Rsa1024, KeyBlob::String(key)),
PrivateKey::Ed25519V3(key) => (KeyType::Ed25519V3, KeyBlob::String(key)),
};
self.add_onion_custom(key_type, key_blob, flags, port, num_streams)
.await
}
pub async fn del_onion(&mut self, service_id: &str) -> Result<(), TorClientError> {
let command = commands::DelOnion::new(service_id);
self.request_response(command).await
}
async fn request_response<T: TorCommand + Display>(&mut self, command: T) -> Result<T::Output, TorClientError>
where T::Error: Into<TorClientError> {
trace!(target: LOG_TARGET, "Sent command: {}", command);
let cmd_str = command.to_command_string().map_err(Into::into)?;
self.send_line(cmd_str).await?;
let responses = self.recv_next_responses().await?;
trace!(target: LOG_TARGET, "Response from tor: {:?}", responses);
if responses.is_empty() {
return Err(TorClientError::ServerNoResponse);
}
let response = command.parse_responses(responses).map_err(Into::into)?;
Ok(response)
}
async fn send_line(&mut self, line: String) -> Result<(), TorClientError> {
self.cmd_tx
.send(line)
.await
.map_err(|_| TorClientError::CommandSenderDisconnected)
}
async fn recv_ok(&mut self) -> Result<(), TorClientError> {
let resp = self.receive_line().await?;
if resp.is_ok() {
Ok(())
} else {
Err(TorClientError::TorCommandFailed(resp.value))
}
}
async fn recv_next_responses(&mut self) -> Result<Vec<ResponseLine>, TorClientError> {
let mut msgs = Vec::new();
loop {
let msg = self.receive_line().await?;
let has_more = msg.has_more();
msgs.push(msg.into_owned());
if !has_more {
break;
}
}
Ok(msgs)
}
async fn receive_line(&mut self) -> Result<ResponseLine, TorClientError> {
let line = self
.output_stream
.next()
.await
.ok_or_else(|| TorClientError::UnexpectedEof)?;
Ok(line)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Authentication {
None,
HashedPassword(String),
Cookie(String),
}
impl Default for Authentication {
fn default() -> Self {
Authentication::None
}
}
impl fmt::Display for Authentication {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Authentication::*;
match self {
None => write!(f, "None"),
HashedPassword(_) => write!(f, "HashedPassword"),
Cookie(_) => write!(f, "Cookie"),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::{
runtime,
tor::control_client::{test_server, test_server::canned_responses, types::PrivateKey},
};
use futures::future;
use std::net::SocketAddr;
use tari_test_utils::unpack_enum;
async fn setup_test() -> (TorControlPortClient, test_server::State) {
let (_, mock_state, socket) = test_server::spawn().await;
let (event_tx, _) = broadcast::channel(1);
let tor = TorControlPortClient::new(socket, event_tx);
(tor, mock_state)
}
#[runtime::test]
async fn connect() {
let (mut listener, addr) = TcpTransport::default()
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap()
.await
.unwrap();
let (event_tx, _) = broadcast::channel(1);
let (result_out, result_in) =
future::join(TorControlPortClient::connect(addr, event_tx), listener.next()).await;
result_out.unwrap();
result_in.unwrap().unwrap().0.await.unwrap();
}
#[runtime::test]
async fn authenticate() {
let (mut tor, mock_state) = setup_test().await;
tor.authenticate(&Authentication::None).await.unwrap();
let mut req = mock_state.take_requests().await;
assert_eq!(req.len(), 1);
assert_eq!(req.remove(0), "AUTHENTICATE");
tor.authenticate(&Authentication::HashedPassword("ab\"cde".to_string()))
.await
.unwrap();
let mut req = mock_state.take_requests().await;
assert_eq!(req.len(), 1);
assert_eq!(req.remove(0), "AUTHENTICATE \"ab\\\"cde\"");
tor.authenticate(&Authentication::Cookie("NOTACTUALLYHEXENCODED".to_string()))
.await
.unwrap();
let mut req = mock_state.take_requests().await;
assert_eq!(req.len(), 1);
assert_eq!(req.remove(0), "AUTHENTICATE NOTACTUALLYHEXENCODED");
}
#[runtime::test]
async fn get_conf_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state
.set_canned_response(canned_responses::GET_CONF_HIDDEN_SERVICE_PORT_OK)
.await;
let results = tor.get_conf("HiddenServicePort").await.unwrap();
assert_eq!(results.len(), 3);
assert_eq!(results[0], "8080");
assert_eq!(results[1], "8081 127.0.0.1:9000");
assert_eq!(results[2], "8082 127.0.0.1:9001");
}
#[runtime::test]
async fn get_conf_err() {
let (mut tor, mock_state) = setup_test().await;
mock_state.set_canned_response(canned_responses::ERR_552).await;
let err = tor.get_conf("HiddenServicePort").await.unwrap_err();
unpack_enum!(TorClientError::TorCommandFailed(_s) = err);
}
#[runtime::test]
async fn get_info_multiline_kv_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state
.set_canned_response(canned_responses::GET_INFO_NET_LISTENERS_OK)
.await;
let values = tor.get_info("net/listeners/socks").await.unwrap();
assert_eq!(values, &["127.0.0.1:9050", "unix:/run/tor/socks"]);
}
#[runtime::test]
async fn get_info_kv_multiline_value_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state
.set_canned_response(canned_responses::GET_INFO_ONIONS_DETACHED_OK)
.await;
let values = tor.get_info("onions/detached").await.unwrap();
assert_eq!(values, [
"mochz2xppfziim5olr5f6q27poc4vfob2xxxxxxxxxxxxxxxxxxxxxxx",
"nhqdqym6j35rk7tdou4cdj4gjjqagimutxxxxxxxxxxxxxxxxxxxxxxx"
]);
}
#[runtime::test]
async fn get_info_err() {
let (mut tor, mock_state) = setup_test().await;
mock_state.set_canned_response(canned_responses::ERR_552).await;
let err = tor.get_info("net/listeners/socks").await.unwrap_err();
unpack_enum!(TorClientError::TorCommandFailed(_s) = err);
}
#[runtime::test]
async fn add_onion_from_private_key_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state
.set_canned_response(canned_responses::ADD_ONION_RSA1024_OK)
.await;
let private_key = PrivateKey::Rsa1024("dummy-key".into());
let response = tor
.add_onion_from_private_key(&private_key, vec![], 8080, None)
.await
.unwrap();
assert_eq!(response.service_id, "62q4tswkxp74dtn7");
assert!(response.private_key.is_none());
let request = mock_state.take_requests().await.pop().unwrap();
assert_eq!(request, "ADD_ONION RSA1024:dummy-key Port=8080,127.0.0.1:8080");
}
#[runtime::test]
async fn add_onion_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state.set_canned_response(canned_responses::ADD_ONION_OK).await;
let response = tor
.add_onion_custom(
KeyType::New,
KeyBlob::Best,
vec![],
8080,
Some(NonZeroU16::new(10u16).unwrap()),
)
.await
.unwrap();
assert_eq!(
response.service_id,
"qigbgbs4ue3ghbupsotgh73cmmkjrin2aprlyxsrnrvpmcmzy3g4wbid"
);
assert_eq!(
response.private_key,
Some(PrivateKey::Ed25519V3(
"Pg3GEyssauPRW3jP6mHwKOxvl_fMsF0QsZC3DvQ8jZ9AxmfRvSP35m9l0vOYyOxkOqWM6ufjdYuM8Ae6cR2UdreG6".to_string()
))
);
let request = mock_state.take_requests().await.pop().unwrap();
assert_eq!(request, "ADD_ONION NEW:BEST NumStreams=10 Port=8080,127.0.0.1:8080");
}
#[runtime::test]
async fn add_onion_discard_pk_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state
.set_canned_response(canned_responses::ADD_ONION_DISCARDPK_OK)
.await;
let response = tor
.add_onion_custom(
KeyType::Rsa1024,
KeyBlob::Rsa1024,
vec![
AddOnionFlag::DiscardPK,
AddOnionFlag::Detach,
AddOnionFlag::BasicAuth,
AddOnionFlag::MaxStreamsCloseCircuit,
AddOnionFlag::NonAnonymous,
],
PortMapping::new(8080, SocketAddr::from(([127u8, 0, 0, 1], 8081u16))),
None,
)
.await
.unwrap();
assert_eq!(
response.service_id,
"qigbgbs4ue3ghbupsotgh73cmmkjrin2aprlyxsrnrvpmcmzy3g4wbid"
);
assert_eq!(response.private_key, None);
let request = mock_state.take_requests().await.pop().unwrap();
assert_eq!(
request,
"ADD_ONION RSA1024:RSA1024 Flags=DiscardPK,Detach,BasicAuth,MaxStreamsCloseCircuit,NonAnonymous \
Port=8080,127.0.0.1:8081"
);
}
#[runtime::test]
async fn add_onion_err() {
let (mut tor, mock_state) = setup_test().await;
mock_state.set_canned_response(canned_responses::ERR_552).await;
let err = tor
.add_onion_custom(KeyType::Ed25519V3, KeyBlob::Ed25519V3, vec![], 8080, None)
.await
.unwrap_err();
unpack_enum!(TorClientError::TorCommandFailed(_s) = err);
}
#[runtime::test]
async fn del_onion_ok() {
let (mut tor, mock_state) = setup_test().await;
mock_state.set_canned_response(canned_responses::OK).await;
tor.del_onion("some-fake-id").await.unwrap();
let request = mock_state.take_requests().await.pop().unwrap();
assert_eq!(request, "DEL_ONION some-fake-id");
}
#[runtime::test]
async fn del_onion_err() {
let (mut tor, mock_state) = setup_test().await;
mock_state.set_canned_response(canned_responses::ERR_552).await;
tor.del_onion("some-fake-id").await.unwrap_err();
let request = mock_state.take_requests().await.pop().unwrap();
assert_eq!(request, "DEL_ONION some-fake-id");
}
}