use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use clap::Parser;
use futures_util::StreamExt;
use iroh::EndpointAddr;
use p2panda_core::cbor::{decode_cbor, encode_cbor};
use p2panda_core::{Body, Hash, Header, Operation, SigningKey, Timestamp, Topic, VerifyingKey};
use p2panda_net::addrs::NodeInfo;
use p2panda_net::iroh_mdns::MdnsDiscoveryMode;
use p2panda_net::utils::{ShortFormat, from_verifying_key};
use p2panda_net::{AddressBook, Discovery, Endpoint, Gossip, LogSync, MdnsDiscovery, NodeId};
use p2panda_store::operations::OperationStore;
use p2panda_store::topics::TopicStore;
use p2panda_store::{SqliteStore, Transaction};
use p2panda_sync::protocols::TopicLogSyncEvent as SyncEvent;
use serde::{Deserialize, Serialize};
use tokio::sync::{RwLock, mpsc};
use tokio::time::Instant;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
type LogId = u64;
const LOG_ID: LogId = 1;
const RELAY_URL: &str = "https://euc1-1.relay.n0.iroh-canary.iroh.link.";
#[derive(Debug, Serialize, Deserialize)]
struct Heartbeat {
sender: VerifyingKey,
online: bool,
rnd: u64,
}
impl Heartbeat {
fn new(sender: VerifyingKey, online: bool) -> Self {
Self {
sender,
online,
rnd: rand::random(),
}
}
}
pub fn setup_logging() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
.try_init()
.ok();
}
#[derive(Parser)]
struct Args {
#[arg(short = 'b', long, value_name = "BOOTSTRAP_ID")]
bootstrap_id: Option<NodeId>,
#[arg(short = 'c', long, value_name = "CHAT_TOPIC_ID")]
chat_topic_id: Option<String>,
#[arg(short = 'm', long, action)]
mdns: bool,
}
#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Args::parse();
let signing_key = SigningKey::generate();
let verifying_key = signing_key.verifying_key();
let topic: Topic = if let Some(topic_str) = args.chat_topic_id {
topic_str.parse().expect("topic id should be 32 bytes")
} else {
Topic::random()
};
let store = SqliteStore::temporary().await;
let permit = store.begin().await?;
store.associate(&topic, &verifying_key, &LOG_ID).await?;
store.commit(permit).await?;
let address_book = AddressBook::builder().spawn().await?;
if let Some(id) = args.bootstrap_id {
let endpoint_addr = EndpointAddr::new(from_verifying_key(id));
let endpoint_addr = endpoint_addr.with_relay_url(RELAY_URL.parse()?);
address_book
.insert_node_info(NodeInfo::from(endpoint_addr).bootstrap())
.await?;
}
let endpoint = Endpoint::builder(address_book.clone())
.signing_key(signing_key.clone())
.relay_url(RELAY_URL.parse().unwrap())
.spawn()
.await?;
println!("network id: {}", endpoint.network_id().fmt_short());
println!("chat topic: {}", topic);
println!("public key: {}", verifying_key.to_hex());
println!("relay url: {}", RELAY_URL);
let _discovery = Discovery::builder(address_book.clone(), endpoint.clone())
.spawn()
.await?;
let mdns_discovery_mode = if args.mdns {
MdnsDiscoveryMode::Active
} else {
MdnsDiscoveryMode::Passive
};
let _mdns = MdnsDiscovery::builder(address_book.clone(), endpoint.clone())
.mode(mdns_discovery_mode)
.spawn()
.await?;
let gossip = Gossip::builder(address_book.clone(), endpoint.clone())
.spawn()
.await?;
let heartbeat_tx = gossip.stream(topic).await?;
let mut heartbeat_rx = heartbeat_tx.subscribe();
let final_heartbeat_tx = heartbeat_tx.clone();
let nicknames = Arc::new(RwLock::new(HashMap::<VerifyingKey, String>::new()));
let status = Arc::new(RwLock::new(HashMap::new()));
tokio::task::spawn(async move {
loop {
let msg = Heartbeat::new(verifying_key, true);
let encoded_msg = encode_cbor(&msg).unwrap();
heartbeat_tx.publish(encoded_msg).await.unwrap();
tokio::time::sleep(Duration::from_secs(rand::random_range(20..30))).await;
}
});
{
let nicknames = Arc::clone(&nicknames);
let status = Arc::clone(&status);
tokio::spawn(async move {
loop {
if let Some(Ok(message)) = heartbeat_rx.next().await {
let msg: Heartbeat = decode_cbor(&message[..]).expect("valid cbor encoding");
info!("received heartbeat: {:?}", msg);
let name = if let Some(nickname) = nicknames.read().await.get(&msg.sender) {
nickname.to_owned()
} else {
msg.sender.fmt_short()
};
if status
.write()
.await
.insert(msg.sender, Instant::now())
.is_none()
{
println!("-> {} came online", name)
}
if !msg.online {
status.write().await.remove(&msg.sender);
println!("-> {} went offline", name)
}
}
}
});
}
let sync = LogSync::<_, LogId, _>::builder(store.clone(), endpoint, gossip)
.spawn()
.await?;
let sync_tx = sync.stream(topic, true).await?;
let mut sync_rx = sync_tx.subscribe().await?;
{
let store = store.clone();
let nicknames = Arc::clone(&nicknames);
tokio::task::spawn(async move {
while let Some(Ok(from_sync)) = sync_rx.next().await {
match from_sync.event {
SyncEvent::SessionFinished { metrics } => {
info!(
"finished sync session with {}, bytes received = {}, bytes sent = {}",
from_sync.remote.fmt_short(),
metrics.received_bytes(),
metrics.sent_bytes()
);
}
SyncEvent::OperationReceived { operation, .. } => {
if <SqliteStore as OperationStore<Operation, Hash, LogId>>::has_operation(
&store,
&operation.hash,
)
.await
.unwrap()
{
continue;
}
let remote_verifying_key = operation.header.verifying_key;
let remote_id = remote_verifying_key.fmt_short();
let text =
String::from_utf8(operation.body.as_ref().unwrap().to_bytes()).unwrap();
if let Some(nick) = text.strip_prefix("/nick ") {
if let Some(previous_nick) =
nicknames.read().await.get(&remote_verifying_key)
{
print!("-> {} is now known as: {}", previous_nick, nick);
} else {
print!("-> {} is now known as: {}", remote_id, nick);
}
nicknames
.write()
.await
.insert(remote_verifying_key, nick.trim().to_owned());
} else {
print!(
"{}: {}",
nicknames
.read()
.await
.get(&remote_verifying_key)
.unwrap_or(&remote_id),
text
)
}
let permit = store.begin().await.unwrap();
let id = operation.hash;
let author = operation.header.verifying_key;
store
.insert_operation(&id, &operation, &LOG_ID)
.await
.unwrap();
store.associate(&topic, &author, &LOG_ID).await.unwrap();
store.commit(permit).await.unwrap();
}
_ => (),
}
}
});
}
let (line_tx, mut line_rx) = mpsc::channel(1);
std::thread::spawn(move || input_loop(line_tx));
let mut seq_num = 0;
let mut backlink = None;
tokio::task::spawn(async move {
while let Some(text) = line_rx.recv().await {
let body = Body::new(text.as_bytes());
let (hash, operation) = create_operation(&signing_key, &body, seq_num, backlink);
let permit = store.begin().await.unwrap();
store
.insert_operation(&hash, &operation, &LOG_ID)
.await
.unwrap();
store.commit(permit).await.unwrap();
sync_tx.publish(operation).await.unwrap();
seq_num += 1;
backlink = Some(hash);
if let Some(nick) = text.strip_prefix("/nick ") {
print!("-> changed nick to: {}", nick);
}
}
});
tokio::signal::ctrl_c().await?;
let msg = Heartbeat::new(verifying_key, false);
let encoded_msg = encode_cbor(&msg)?;
final_heartbeat_tx.publish(&encoded_msg[..]).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
fn input_loop(line_tx: mpsc::Sender<String>) -> Result<()> {
let mut buffer = String::new();
let stdin = std::io::stdin();
loop {
stdin.read_line(&mut buffer)?;
line_tx.blocking_send(buffer.clone())?;
buffer.clear();
}
}
fn create_operation(
signing_key: &SigningKey,
body: &Body,
seq_num: u64,
backlink: Option<Hash>,
) -> (Hash, Operation) {
let mut header = Header {
version: 1,
verifying_key: signing_key.verifying_key(),
signature: None,
payload_size: body.size(),
payload_hash: Some(body.hash()),
timestamp: Timestamp::now(),
seq_num,
backlink,
extensions: (),
};
header.sign(signing_key);
let hash = header.hash();
let operation = Operation {
hash,
header: header.clone(),
body: Some(body.to_owned()),
};
(hash, operation)
}