tari_comms 5.3.0

A peer-to-peer messaging system
Documentation
// Copyright 2022 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{
    collections::HashMap,
    convert::identity,
    env,
    net::SocketAddr,
    path::{Path, PathBuf},
    process,
    sync::Arc,
};

use anyhow::anyhow;
use bytes::Bytes;
use chrono::Utc;
use rand::{RngCore, rngs::OsRng, thread_rng};
use tari_common_sqlite::connection::{DbConnection, DbConnectionUrl};
use tari_comms::{
    CommsBuilder,
    CommsNode,
    message::{InboundMessage, OutboundMessage},
    multiaddr::Multiaddr,
    net_address::{MultiaddressesWithStats, PeerAddressSource},
    peer_manager::{
        NodeId,
        NodeIdentity,
        Peer,
        PeerFeatures,
        PeerFlags,
        database::{MIGRATIONS, PeerDatabaseSql},
    },
    pipeline,
    pipeline::SinkService,
    protocol::{ProtocolId, messaging::MessagingProtocolExtension},
    tor,
    types::CommsPublicKey,
};
use tari_utilities::message_format::MessageFormat;
use tempfile::Builder;
use tokio::{
    runtime,
    sync::{broadcast, mpsc},
};
// Tor example for tari_comms.
//
// _Note:_ A running tor proxy with `ControlPort` set is required for this example to work.

static MSG_PROTOCOL_ID: ProtocolId = ProtocolId::from_static(b"example/tor/msg/1.0");

type Error = anyhow::Error;

#[tokio::main]
async fn main() {
    env_logger::init();
    if let Err(err) = run().await {
        eprintln!("{err:?}: {err}");
        process::exit(1);
    }
}

fn load_tor_identity<P: AsRef<Path>>(path: P) -> tor::TorIdentity {
    let contents = std::fs::read_to_string(path).unwrap();
    tor::TorIdentity::from_json(&contents).unwrap()
}

async fn run() -> Result<(), Error> {
    let mut args_iter = env::args().skip(1);
    let control_port_addr = args_iter
        .next()
        .unwrap_or_else(|| "/ip4/127.0.0.1/tcp/9051".to_string())
        .parse::<Multiaddr>()?;

    let tor_identity1 = args_iter.next().map(load_tor_identity);
    let tor_identity2 = args_iter.next().map(load_tor_identity);

    println!("Starting comms nodes...",);

    let temp_dir1 = Builder::new().prefix("tor-example1").tempdir().unwrap();
    let (comms_node1, inbound_rx1, outbound_tx1) = setup_node_with_tor(
        control_port_addr.clone(),
        temp_dir1.as_ref(),
        (9098u16, "127.0.0.1:0".parse::<SocketAddr>().unwrap()),
        tor_identity1,
    )
    .await?;

    let temp_dir2 = Builder::new().prefix("tor-example2").tempdir().unwrap();
    let (comms_node2, inbound_rx2, outbound_tx2) = setup_node_with_tor(
        control_port_addr,
        temp_dir2.as_ref(),
        (9099u16, "127.0.0.1:0".parse::<SocketAddr>().unwrap()),
        tor_identity2,
    )
    .await?;

    let node_identity1 = comms_node1.node_identity();
    let node_identity2 = comms_node2.node_identity();

    println!("Comms nodes started!");
    println!(
        "Node 1 is '{}' with address '{:?}')",
        node_identity1.node_id().short_str(),
        node_identity1.public_addresses(),
    );
    println!(
        "Node 2 is '{}' with address '{:?}')",
        node_identity2.node_id().short_str(),
        node_identity2.public_addresses(),
    );

    // Let's add node 2 as a peer to node 1
    comms_node1
        .peer_manager()
        .add_or_update_peer(Peer::new(
            node_identity2.public_key().clone(),
            node_identity2.node_id().clone(),
            MultiaddressesWithStats::from_addresses_with_source(
                node_identity2.public_addresses(),
                &PeerAddressSource::Config,
            ),
            Default::default(),
            PeerFeatures::COMMUNICATION_CLIENT,
            Default::default(),
            Default::default(),
        ))
        .await?;

    // This kicks things off
    outbound_tx1.send(OutboundMessage::new(
        comms_node2.node_identity().node_id().clone(),
        Bytes::from_static(b"START"),
    ))?;

    let executor = runtime::Handle::current();

    println!("Starting ping pong between nodes over tor. This may take a few moments to begin.");
    let handle1 = executor.spawn(start_ping_ponger(
        comms_node2.node_identity().node_id().clone(),
        inbound_rx1,
        outbound_tx1,
    ));
    let handle2 = executor.spawn(start_ping_ponger(
        comms_node1.node_identity().node_id().clone(),
        inbound_rx2,
        outbound_tx2,
    ));

    tokio::signal::ctrl_c().await.expect("ctrl-c failed");

    println!("Tor example is shutting down...");
    comms_node1.wait_until_shutdown().await;
    comms_node2.wait_until_shutdown().await;

    handle1.await??;
    handle2.await??;

    Ok(())
}

pub fn create_test_peer() -> Peer {
    let mut rng = rand::rngs::OsRng;
    let (_sk, pk) = CommsPublicKey::random_keypair(&mut rng);
    let node_id = NodeId::from_key(&pk);
    let addresses = MultiaddressesWithStats::from_addresses_with_source(
        vec!["/ip4/123.0.0.123/tcp/8000".parse::<Multiaddr>().unwrap()],
        &PeerAddressSource::Config,
    );
    Peer::new(
        pk,
        node_id,
        addresses,
        PeerFlags::default(),
        PeerFeatures::empty(),
        Default::default(),
        Default::default(),
    )
}

async fn setup_node_with_tor<P: Into<tor::PortMapping>>(
    control_port_addr: Multiaddr,
    database_path: &Path,
    port_mapping: P,
    tor_identity: Option<tor::TorIdentity>,
) -> Result<
    (
        CommsNode,
        mpsc::UnboundedReceiver<InboundMessage>,
        mpsc::UnboundedSender<OutboundMessage>,
    ),
    Error,
> {
    let database_url = DbConnectionUrl::File(PathBuf::from(database_path).join("peers.db"));
    let db_connection = DbConnection::connect_and_migrate(&database_url, MIGRATIONS, Some(5))?;
    let peer_database = PeerDatabaseSql::new(db_connection, &create_test_peer())?;

    let (inbound_tx, inbound_rx) = mpsc::unbounded_channel();
    let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();

    let mut hs_builder = tor::HiddenServiceBuilder::new()
        .with_port_mapping(port_mapping)
        .with_control_server_address(control_port_addr);

    if let Some(ident) = tor_identity {
        hs_builder = hs_builder.with_tor_identity(ident);
    }

    let mut hs_controller = hs_builder.build()?;

    let node_identity = Arc::new(NodeIdentity::random(
        &mut OsRng,
        "/ip4/127.0.0.1/tcp/0".parse().unwrap(),
        PeerFeatures::COMMUNICATION_CLIENT,
    ));

    let comms_node = CommsBuilder::new()
        .with_node_identity(node_identity)
        .with_listener_address(hs_controller.proxied_address())
        .with_peer_storage(peer_database)
        .build()
        .unwrap();

    let transport = hs_controller.initialize_transport().await?;

    let (event_tx, _) = broadcast::channel(1);
    let comms_node = comms_node
        .add_protocol_extension(MessagingProtocolExtension::new(
            MSG_PROTOCOL_ID.clone(),
            event_tx,
            pipeline::Builder::new()
            // Outbound messages will be forwarded "as is" to outbound messaging
            .with_outbound_pipeline(outbound_rx, identity)
            .max_concurrent_inbound_tasks(1)
            // Inbound messages will be forwarded "as is" to inbound_tx
            .with_inbound_pipeline(SinkService::new(inbound_tx))
            .build(),
        ))
        .spawn_with_transport(transport)
        .await?;

    println!(
        "Tor hidden service created with address '{:?}'",
        comms_node.node_identity().public_addresses()
    );

    Ok((comms_node, inbound_rx, outbound_tx))
}

async fn start_ping_ponger(
    dest_node_id: NodeId,
    mut inbound_rx: mpsc::UnboundedReceiver<InboundMessage>,
    outbound_tx: mpsc::UnboundedSender<OutboundMessage>,
) -> Result<usize, Error> {
    let mut inflight_pings = HashMap::new();
    let mut counter = 0;
    while let Some(msg) = inbound_rx.recv().await {
        counter += 1;

        let msg_str = String::from_utf8_lossy(&msg.body);
        println!("Received '{}' from '{}'", msg_str, msg.source_peer.short_str());

        let mut msg_parts = msg_str.split(' ');
        match msg_parts.next() {
            Some("START") => {
                println!("\n-----------------------------------");
                let id = thread_rng().next_u64();
                inflight_pings.insert(id, Utc::now().naive_utc());
                let msg = make_msg(&dest_node_id, &format!("PING {id}"));
                outbound_tx.send(msg)?;
            },
            Some("PING") => {
                let id = msg_parts.next().ok_or_else(|| anyhow!("Received PING without id"))?;

                let msg_str = format!("PONG {id}");
                let msg = make_msg(&dest_node_id, &msg_str);

                outbound_tx.send(msg)?;
            },

            Some("PONG") => {
                let id = msg_parts.next().ok_or_else(|| anyhow!("Received PING without id"))?;

                id.parse::<u64>()
                    .ok()
                    .and_then(|id_num| inflight_pings.remove(&id_num))
                    .inspect(|&latency| {
                        println!("Latency: {latency}ms");
                    });

                println!("-----------------------------------");
                let new_id = thread_rng().next_u64();
                inflight_pings.insert(new_id, Utc::now().naive_utc());
                let msg = make_msg(&dest_node_id, &format!("PING {new_id}"));
                outbound_tx.send(msg)?;
            },
            msg => {
                return Err(anyhow!("Received invalid message '{msg:?}'"));
            },
        }
    }

    Ok(counter)
}

fn make_msg(node_id: &NodeId, msg: &str) -> OutboundMessage {
    let msg = Bytes::copy_from_slice(msg.as_bytes());
    OutboundMessage::new(node_id.clone(), msg)
}