p2panda-net 0.6.0

Data-type-agnostic p2p networking, discovery, gossip and local-first sync
Documentation
// SPDX-License-Identifier: MIT OR Apache-2.0

//! Example chat application using p2panda-net.
//!
//! Here we showcase the sync and gossip features of the p2panda-net API and demonstrate the
//! configuration steps required to successfully run the required sub-systems.
//!
//! Chat messages are sent using the p2panda log sync protocol; this allows catching up on past
//! messages which were published before you came online. The gossip protocol is used to broadcast
//! heartbeat messages which are used to display online / offline messages for other peers in the
//! chat.
//!
//! ## Usage
//!
//! Run the example on the first node:
//!
//! `cargo run --example chat`
//!
//! Run the example on a second node, using the chat topic ID and public key of the first node:
//!
//! `cargo run --example chat -- -c <CHAT_TOPIC_ID> -b <FIRST_NODE_PUBLIC_KEY>`
//!
//! Once several nodes are in the same chat, any node's public key can be supplied as the `-b`
//! parameter (bootstrap node ID).
//!
//! Type into the terminal and press <ENTER> to send messages. Type `/nick <NICKNAME>` and press
//! <ENTER> to set your desired nickname.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use clap::Parser;
use futures_util::StreamExt;
use iroh::EndpointAddr;
use p2panda_core::cbor::{decode_cbor, encode_cbor};
use p2panda_core::{Body, Hash, Header, Operation, SigningKey, Timestamp, Topic, VerifyingKey};
use p2panda_net::addrs::NodeInfo;
use p2panda_net::iroh_mdns::MdnsDiscoveryMode;
use p2panda_net::utils::{ShortFormat, from_verifying_key};
use p2panda_net::{AddressBook, Discovery, Endpoint, Gossip, LogSync, MdnsDiscovery, NodeId};
use p2panda_store::operations::OperationStore;
use p2panda_store::topics::TopicStore;
use p2panda_store::{SqliteStore, Transaction};
use p2panda_sync::protocols::TopicLogSyncEvent as SyncEvent;
use serde::{Deserialize, Serialize};
use tokio::sync::{RwLock, mpsc};
use tokio::time::Instant;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;

type LogId = u64;

/// This application maintains only one log per author, this is why we can hard-code it.
const LOG_ID: LogId = 1;

const RELAY_URL: &str = "https://euc1-1.relay.n0.iroh-canary.iroh.link.";

/// Heartbeat message to be sent over gossip (ephemeral messaging).
#[derive(Debug, Serialize, Deserialize)]
struct Heartbeat {
    sender: VerifyingKey,
    online: bool,
    rnd: u64,
}

impl Heartbeat {
    fn new(sender: VerifyingKey, online: bool) -> Self {
        Self {
            sender,
            online,
            rnd: rand::random(),
        }
    }
}

pub fn setup_logging() {
    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
        .with(EnvFilter::from_default_env())
        .try_init()
        .ok();
}

#[derive(Parser)]
struct Args {
    /// Bootstrap node identifier.
    #[arg(short = 'b', long, value_name = "BOOTSTRAP_ID")]
    bootstrap_id: Option<NodeId>,

    /// Chat topic identifier.
    #[arg(short = 'c', long, value_name = "CHAT_TOPIC_ID")]
    chat_topic_id: Option<String>,

    /// Enable mDNS discovery
    #[arg(short = 'm', long, action)]
    mdns: bool,
}

#[tokio::main]
async fn main() -> Result<()> {
    setup_logging();

    let args = Args::parse();

    let signing_key = SigningKey::generate();
    let verifying_key = signing_key.verifying_key();

    // Retrieve the chat topic ID from the provided arguments, otherwise generate a new, random,
    // cryptographically-secure identifier.
    let topic: Topic = if let Some(topic_str) = args.chat_topic_id {
        topic_str.parse().expect("topic id should be 32 bytes")
    } else {
        Topic::random()
    };

    // Create a temporary SQLite database to store topic associations and operations.
    let store = SqliteStore::temporary().await;

    // Associate our local log with the chat topic and commit it to the database.
    let permit = store.begin().await?;
    store.associate(&topic, &verifying_key, &LOG_ID).await?;
    store.commit(permit).await?;

    // Prepare address book.
    let address_book = AddressBook::builder().spawn().await?;

    // Add a bootstrap node to our address book if one was supplied by the user.
    if let Some(id) = args.bootstrap_id {
        let endpoint_addr = EndpointAddr::new(from_verifying_key(id));
        let endpoint_addr = endpoint_addr.with_relay_url(RELAY_URL.parse()?);
        address_book
            .insert_node_info(NodeInfo::from(endpoint_addr).bootstrap())
            .await?;
    }

    let endpoint = Endpoint::builder(address_book.clone())
        .signing_key(signing_key.clone())
        .relay_url(RELAY_URL.parse().unwrap())
        .spawn()
        .await?;

    println!("network id: {}", endpoint.network_id().fmt_short());
    println!("chat topic: {}", topic);
    println!("public key: {}", verifying_key.to_hex());
    println!("relay url: {}", RELAY_URL);

    let _discovery = Discovery::builder(address_book.clone(), endpoint.clone())
        .spawn()
        .await?;

    let mdns_discovery_mode = if args.mdns {
        MdnsDiscoveryMode::Active
    } else {
        MdnsDiscoveryMode::Passive
    };
    let _mdns = MdnsDiscovery::builder(address_book.clone(), endpoint.clone())
        .mode(mdns_discovery_mode)
        .spawn()
        .await?;

    let gossip = Gossip::builder(address_book.clone(), endpoint.clone())
        .spawn()
        .await?;

    // Subscribe to gossip overlay to receive and publish (ephemeral) messages.
    let heartbeat_tx = gossip.stream(topic).await?;
    let mut heartbeat_rx = heartbeat_tx.subscribe();

    let final_heartbeat_tx = heartbeat_tx.clone();

    // Mapping of public key to nickname.
    let nicknames = Arc::new(RwLock::new(HashMap::<VerifyingKey, String>::new()));

    // Mapping of public key to the instant that the last heartbeat message was received.
    let status = Arc::new(RwLock::new(HashMap::new()));

    // Publish (ephemeral) heartbeat messages.
    tokio::task::spawn(async move {
        loop {
            // Create and serialize a heartbeat message.
            let msg = Heartbeat::new(verifying_key, true);
            let encoded_msg = encode_cbor(&msg).unwrap();

            // Publish the message to the gossip topic.
            heartbeat_tx.publish(encoded_msg).await.unwrap();

            tokio::time::sleep(Duration::from_secs(rand::random_range(20..30))).await;
        }
    });

    // Receive and log each (ephemeral) heartbeat message.
    {
        let nicknames = Arc::clone(&nicknames);
        let status = Arc::clone(&status);
        tokio::spawn(async move {
            loop {
                if let Some(Ok(message)) = heartbeat_rx.next().await {
                    let msg: Heartbeat = decode_cbor(&message[..]).expect("valid cbor encoding");

                    info!("received heartbeat: {:?}", msg);

                    // Look up nickname for sender.
                    let name = if let Some(nickname) = nicknames.read().await.get(&msg.sender) {
                        nickname.to_owned()
                    } else {
                        msg.sender.fmt_short()
                    };

                    // Update status hashmap.
                    if status
                        .write()
                        .await
                        .insert(msg.sender, Instant::now())
                        .is_none()
                    {
                        println!("-> {} came online", name)
                    }

                    if !msg.online {
                        status.write().await.remove(&msg.sender);
                        println!("-> {} went offline", name)
                    }
                }
            }
        });
    }

    let sync = LogSync::<_, LogId, _>::builder(store.clone(), endpoint, gossip)
        .spawn()
        .await?;

    let sync_tx = sync.stream(topic, true).await?;
    let mut sync_rx = sync_tx.subscribe().await?;

    // Receive messages from the sync stream.
    {
        let store = store.clone();
        let nicknames = Arc::clone(&nicknames);
        tokio::task::spawn(async move {
            while let Some(Ok(from_sync)) = sync_rx.next().await {
                match from_sync.event {
                    SyncEvent::SessionFinished { metrics } => {
                        info!(
                            "finished sync session with {}, bytes received = {}, bytes sent = {}",
                            from_sync.remote.fmt_short(),
                            metrics.received_bytes(),
                            metrics.sent_bytes()
                        );
                    }
                    SyncEvent::OperationReceived { operation, .. } => {
                        if <SqliteStore as OperationStore<Operation, Hash, LogId>>::has_operation(
                            &store,
                            &operation.hash,
                        )
                        .await
                        .unwrap()
                        {
                            continue;
                        }

                        let remote_verifying_key = operation.header.verifying_key;
                        let remote_id = remote_verifying_key.fmt_short();

                        let text =
                            String::from_utf8(operation.body.as_ref().unwrap().to_bytes()).unwrap();

                        // Check if the text of this operation is setting a nickname.
                        if let Some(nick) = text.strip_prefix("/nick ") {
                            if let Some(previous_nick) =
                                nicknames.read().await.get(&remote_verifying_key)
                            {
                                print!("-> {} is now known as: {}", previous_nick, nick);
                            } else {
                                print!("-> {} is now known as: {}", remote_id, nick);
                            }

                            // Update the nicknames map.
                            nicknames
                                .write()
                                .await
                                .insert(remote_verifying_key, nick.trim().to_owned());
                        } else {
                            // Print a regular chat message.
                            print!(
                                "{}: {}",
                                nicknames
                                    .read()
                                    .await
                                    .get(&remote_verifying_key)
                                    .unwrap_or(&remote_id),
                                text
                            )
                        }

                        let permit = store.begin().await.unwrap();
                        let id = operation.hash;
                        let author = operation.header.verifying_key;
                        store
                            .insert_operation(&id, &operation, &LOG_ID)
                            .await
                            .unwrap();
                        store.associate(&topic, &author, &LOG_ID).await.unwrap();
                        store.commit(permit).await.unwrap();
                    }
                    _ => (),
                }
            }
        });
    }

    // Listen for text input via the terminal.
    let (line_tx, mut line_rx) = mpsc::channel(1);
    std::thread::spawn(move || input_loop(line_tx));

    let mut seq_num = 0;
    let mut backlink = None;

    // Sign and encode each line of text input and broadcast it on the chat topic.
    tokio::task::spawn(async move {
        while let Some(text) = line_rx.recv().await {
            let body = Body::new(text.as_bytes());
            let (hash, operation) = create_operation(&signing_key, &body, seq_num, backlink);
            let permit = store.begin().await.unwrap();
            store
                .insert_operation(&hash, &operation, &LOG_ID)
                .await
                .unwrap();
            store.commit(permit).await.unwrap();

            sync_tx.publish(operation).await.unwrap();

            seq_num += 1;
            backlink = Some(hash);

            // Update the nickname mapping for the local node.
            if let Some(nick) = text.strip_prefix("/nick ") {
                print!("-> changed nick to: {}", nick);
            }
        }
    });

    // Listen for `Ctrl+c` and shutdown the node.
    tokio::signal::ctrl_c().await?;

    // Create and serialize a final heartbeat message.
    //
    // This informs other chatters that we are going offline.
    let msg = Heartbeat::new(verifying_key, false);
    let encoded_msg = encode_cbor(&msg)?;

    final_heartbeat_tx.publish(&encoded_msg[..]).await?;

    // Sleep briefly to allow sending of heartbeat message.
    tokio::time::sleep(Duration::from_millis(100)).await;

    Ok(())
}

fn input_loop(line_tx: mpsc::Sender<String>) -> Result<()> {
    let mut buffer = String::new();
    let stdin = std::io::stdin();
    loop {
        stdin.read_line(&mut buffer)?;
        line_tx.blocking_send(buffer.clone())?;
        buffer.clear();
    }
}

fn create_operation(
    signing_key: &SigningKey,
    body: &Body,
    seq_num: u64,
    backlink: Option<Hash>,
) -> (Hash, Operation) {
    let mut header = Header {
        version: 1,
        verifying_key: signing_key.verifying_key(),
        signature: None,
        payload_size: body.size(),
        payload_hash: Some(body.hash()),
        timestamp: Timestamp::now(),
        seq_num,
        backlink,
        extensions: (),
    };

    header.sign(signing_key);
    let hash = header.hash();

    let operation = Operation {
        hash,
        header: header.clone(),
        body: Some(body.to_owned()),
    };

    (hash, operation)
}