use std::{
collections::HashMap,
convert::identity,
env,
net::SocketAddr,
path::{Path, PathBuf},
process,
sync::Arc,
};
use anyhow::anyhow;
use bytes::Bytes;
use chrono::Utc;
use rand::{RngCore, rngs::OsRng, thread_rng};
use tari_common_sqlite::connection::{DbConnection, DbConnectionUrl};
use tari_comms::{
CommsBuilder,
CommsNode,
message::{InboundMessage, OutboundMessage},
multiaddr::Multiaddr,
net_address::{MultiaddressesWithStats, PeerAddressSource},
peer_manager::{
NodeId,
NodeIdentity,
Peer,
PeerFeatures,
PeerFlags,
database::{MIGRATIONS, PeerDatabaseSql},
},
pipeline,
pipeline::SinkService,
protocol::{ProtocolId, messaging::MessagingProtocolExtension},
tor,
types::CommsPublicKey,
};
use tari_utilities::message_format::MessageFormat;
use tempfile::Builder;
use tokio::{
runtime,
sync::{broadcast, mpsc},
};
static MSG_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"example/tor/msg/1.0");
type Error = anyhow::Error;
#[tokio::main]
async fn main() {
env_logger::init();
if let Err(err) = run().await {
eprintln!("{err:?}: {err}");
process::exit(1);
}
}
fn load_tor_identity<P: AsRef<Path>>(path: P) -> tor::TorIdentity {
let contents = std::fs::read_to_string(path).unwrap();
tor::TorIdentity::from_json(&contents).unwrap()
}
async fn run() -> Result<(), Error> {
let mut args_iter = env::args().skip(1);
let control_port_addr = args_iter
.next()
.unwrap_or_else(|| "/ip4/127.0.0.1/tcp/9051".to_string())
.parse::<Multiaddr>()?;
let tor_identity1 = args_iter.next().map(load_tor_identity);
let tor_identity2 = args_iter.next().map(load_tor_identity);
println!("Starting comms nodes...",);
let temp_dir1 = Builder::new().prefix("tor-example1").tempdir().unwrap();
let (comms_node1, inbound_rx1, outbound_tx1) = setup_node_with_tor(
control_port_addr.clone(),
temp_dir1.as_ref(),
(9098u16, "127.0.0.1:0".parse::<SocketAddr>().unwrap()),
tor_identity1,
)
.await?;
let temp_dir2 = Builder::new().prefix("tor-example2").tempdir().unwrap();
let (comms_node2, inbound_rx2, outbound_tx2) = setup_node_with_tor(
control_port_addr,
temp_dir2.as_ref(),
(9099u16, "127.0.0.1:0".parse::<SocketAddr>().unwrap()),
tor_identity2,
)
.await?;
let node_identity1 = comms_node1.node_identity();
let node_identity2 = comms_node2.node_identity();
println!("Comms nodes started!");
println!(
"Node 1 is '{}' with address '{:?}')",
node_identity1.node_id().short_str(),
node_identity1.public_addresses(),
);
println!(
"Node 2 is '{}' with address '{:?}')",
node_identity2.node_id().short_str(),
node_identity2.public_addresses(),
);
comms_node1
.peer_manager()
.add_or_update_peer(Peer::new(
node_identity2.public_key().clone(),
node_identity2.node_id().clone(),
MultiaddressesWithStats::from_addresses_with_source(
node_identity2.public_addresses(),
&PeerAddressSource::Config,
),
Default::default(),
PeerFeatures::COMMUNICATION_CLIENT,
Default::default(),
Default::default(),
))
.await?;
outbound_tx1.send(OutboundMessage::new(
comms_node2.node_identity().node_id().clone(),
Bytes::from_static(b"START"),
))?;
let executor = runtime::Handle::current();
println!("Starting ping pong between nodes over tor. This may take a few moments to begin.");
let handle1 = executor.spawn(start_ping_ponger(
comms_node2.node_identity().node_id().clone(),
inbound_rx1,
outbound_tx1,
));
let handle2 = executor.spawn(start_ping_ponger(
comms_node1.node_identity().node_id().clone(),
inbound_rx2,
outbound_tx2,
));
tokio::signal::ctrl_c().await.expect("ctrl-c failed");
println!("Tor example is shutting down...");
comms_node1.wait_until_shutdown().await;
comms_node2.wait_until_shutdown().await;
handle1.await??;
handle2.await??;
Ok(())
}
pub fn create_test_peer() -> Peer {
let mut rng = rand::rngs::OsRng;
let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
let node_id = NodeId::from_key(&pk);
let addresses = MultiaddressesWithStats::from_addresses_with_source(
vec!["/ip4/123.0.0.123/tcp/8000".parse::<Multiaddr>().unwrap()],
&PeerAddressSource::Config,
);
Peer::new(
pk,
node_id,
addresses,
PeerFlags::default(),
PeerFeatures::empty(),
Default::default(),
Default::default(),
)
}
async fn setup_node_with_tor<P: Into<tor::PortMapping>>(
control_port_addr: Multiaddr,
database_path: &Path,
port_mapping: P,
tor_identity: Option<tor::TorIdentity>,
) -> Result<
(
CommsNode,
mpsc::UnboundedReceiver<InboundMessage>,
mpsc::UnboundedSender<OutboundMessage>,
),
Error,
> {
let database_url = DbConnectionUrl::File(PathBuf::from(database_path).join("peers.db"));
let db_connection = DbConnection::connect_and_migrate(&database_url, MIGRATIONS, Some(5))?;
let peer_database = PeerDatabaseSql::new(db_connection, &create_test_peer())?;
let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();
let mut hs_builder = tor::HiddenServiceBuilder::new()
.with_port_mapping(port_mapping)
.with_control_server_address(control_port_addr);
if let Some(ident) = tor_identity {
hs_builder = hs_builder.with_tor_identity(ident);
}
let mut hs_controller = hs_builder.build()?;
let node_identity = Arc::new(NodeIdentity::random(
&mut OsRng,
"/ip4/127.0.0.1/tcp/0".parse().unwrap(),
PeerFeatures::COMMUNICATION_CLIENT,
));
let comms_node = CommsBuilder::new()
.with_node_identity(node_identity)
.with_listener_address(hs_controller.proxied_address())
.with_peer_storage(peer_database)
.build()
.unwrap();
let transport = hs_controller.initialize_transport().await?;
let (event_tx, _) = broadcast::channel(1);
let comms_node = comms_node
.add_protocol_extension(MessagingProtocolExtension::new(
MSG_PROTOCOL_ID.clone(),
event_tx,
pipeline::Builder::new()
.with_outbound_pipeline(outbound_rx, identity)
.max_concurrent_inbound_tasks(1)
.with_inbound_pipeline(SinkService::new(inbound_tx))
.build(),
))
.spawn_with_transport(transport)
.await?;
println!(
"Tor hidden service created with address '{:?}'",
comms_node.node_identity().public_addresses()
);
Ok((comms_node, inbound_rx, outbound_tx))
}
async fn start_ping_ponger(
dest_node_id: NodeId,
mut inbound_rx: mpsc::UnboundedReceiver<InboundMessage>,
outbound_tx: mpsc::UnboundedSender<OutboundMessage>,
) -> Result<usize, Error> {
let mut inflight_pings = HashMap::new();
let mut counter = 0;
while let Some(msg) = inbound_rx.recv().await {
counter += 1;
let msg_str = String::from_utf8_lossy(&msg.body);
println!("Received '{}' from '{}'", msg_str, msg.source_peer.short_str());
let mut msg_parts = msg_str.split(' ');
match msg_parts.next() {
Some("START") => {
println!("\n-----------------------------------");
let id = thread_rng().next_u64();
inflight_pings.insert(id, Utc::now().naive_utc());
let msg = make_msg(&dest_node_id, &format!("PING {id}"));
outbound_tx.send(msg)?;
},
Some("PING") => {
let id = msg_parts.next().ok_or_else(|| anyhow!("Received PING without id"))?;
let msg_str = format!("PONG {id}");
let msg = make_msg(&dest_node_id, &msg_str);
outbound_tx.send(msg)?;
},
Some("PONG") => {
let id = msg_parts.next().ok_or_else(|| anyhow!("Received PING without id"))?;
id.parse::<u64>()
.ok()
.and_then(|id_num| inflight_pings.remove(&id_num))
.inspect(|&latency| {
println!("Latency: {latency}ms");
});
println!("-----------------------------------");
let new_id = thread_rng().next_u64();
inflight_pings.insert(new_id, Utc::now().naive_utc());
let msg = make_msg(&dest_node_id, &format!("PING {new_id}"));
outbound_tx.send(msg)?;
},
msg => {
return Err(anyhow!("Received invalid message '{msg:?}'"));
},
}
}
Ok(counter)
}
fn make_msg(node_id: &NodeId, msg: &str) -> OutboundMessage {
let msg = Bytes::copy_from_slice(msg.as_bytes());
OutboundMessage::new(node_id.clone(), msg)
}