use std::{collections::HashMap, fmt, str::FromStr};
use anyhow::{bail, Context};
use bytes::Bytes;
use clap::Parser;
use ed25519_dalek::Signature;
use iroh_base::base32;
use iroh_gossip::{
net::{Gossip, GOSSIP_ALPN},
proto::{Event, TopicId},
};
use iroh_net::{
key::{PublicKey, SecretKey},
magic_endpoint::accept_conn,
relay::{RelayMap, RelayMode, RelayUrl},
MagicEndpoint, NodeAddr,
};
use serde::{Deserialize, Serialize};
#[derive(Parser, Debug)]
struct Args {
#[clap(long)]
secret_key: Option<String>,
#[clap(short, long)]
relay: Option<RelayUrl>,
#[clap(long)]
no_relay: bool,
#[clap(short, long)]
name: Option<String>,
#[clap(short, long, default_value = "0")]
bind_port: u16,
#[clap(subcommand)]
command: Command,
}
#[derive(Parser, Debug)]
enum Command {
Open {
topic: Option<TopicId>,
},
Join {
ticket: String,
},
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let (topic, peers) = match &args.command {
Command::Open { topic } => {
let topic = topic.unwrap_or_else(|| TopicId::from_bytes(rand::random()));
println!("> opening chat room for topic {topic}");
(topic, vec![])
}
Command::Join { ticket } => {
let Ticket { topic, peers } = Ticket::from_str(ticket)?;
println!("> joining chat room for topic {topic}");
(topic, peers)
}
};
let secret_key = match args.secret_key {
None => SecretKey::generate(),
Some(key) => parse_secret_key(&key)?,
};
println!("> our secret key: {}", base32::fmt(secret_key.to_bytes()));
let relay_mode = match (args.no_relay, args.relay) {
(false, None) => RelayMode::Default,
(false, Some(url)) => RelayMode::Custom(RelayMap::from_url(url)),
(true, None) => RelayMode::Disabled,
(true, Some(_)) => bail!("You cannot set --no-relay and --relay at the same time"),
};
println!("> using relay servers: {}", fmt_relay_mode(&relay_mode));
let endpoint = MagicEndpoint::builder()
.secret_key(secret_key)
.alpns(vec![GOSSIP_ALPN.to_vec()])
.relay_mode(relay_mode)
.bind(args.bind_port)
.await?;
println!("> our node id: {}", endpoint.node_id());
let my_addr = endpoint.my_addr().await?;
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &my_addr.info);
let ticket = {
let me = endpoint.my_addr().await?;
let peers = peers.iter().cloned().chain([me]).collect();
Ticket { topic, peers }
};
println!("> ticket to join us: {ticket}");
tokio::spawn(endpoint_loop(endpoint.clone(), gossip.clone()));
let peer_ids = peers.iter().map(|p| p.node_id).collect();
if peers.is_empty() {
println!("> waiting for peers to join us...");
} else {
println!("> trying to connect to {} peers...", peers.len());
for peer in peers.into_iter() {
endpoint.add_node_addr(peer)?;
}
};
gossip.join(topic, peer_ids).await?.await?;
println!("> connected!");
if let Some(name) = args.name {
let message = Message::AboutMe { name };
let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?;
gossip.broadcast(topic, encoded_message).await?;
}
tokio::spawn(subscribe_loop(gossip.clone(), topic));
let (line_tx, mut line_rx) = tokio::sync::mpsc::channel(1);
std::thread::spawn(move || input_loop(line_tx));
println!("> type a message and hit enter to broadcast...");
while let Some(text) = line_rx.recv().await {
let message = Message::Message { text: text.clone() };
let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?;
gossip.broadcast(topic, encoded_message).await?;
println!("> sent: {text}");
}
Ok(())
}
async fn subscribe_loop(gossip: Gossip, topic: TopicId) -> anyhow::Result<()> {
let mut names = HashMap::new();
let mut stream = gossip.subscribe(topic).await?;
loop {
let event = stream.recv().await?;
if let Event::Received(msg) = event {
let (from, message) = SignedMessage::verify_and_decode(&msg.content)?;
match message {
Message::AboutMe { name } => {
names.insert(from, name.clone());
println!("> {} is now known as {}", fmt_node_id(&from), name);
}
Message::Message { text } => {
let name = names
.get(&from)
.map_or_else(|| fmt_node_id(&from), String::to_string);
println!("{}: {}", name, text);
}
}
}
}
}
async fn endpoint_loop(endpoint: MagicEndpoint, gossip: Gossip) {
while let Some(conn) = endpoint.accept().await {
let gossip = gossip.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(conn, gossip).await {
println!("> connection closed: {err}");
}
});
}
}
async fn handle_connection(conn: quinn::Connecting, gossip: Gossip) -> anyhow::Result<()> {
let (peer_id, alpn, conn) = accept_conn(conn).await?;
match alpn.as_bytes() {
GOSSIP_ALPN => gossip
.handle_connection(conn)
.await
.context(format!("connection to {peer_id} with ALPN {alpn} failed"))?,
_ => println!("> ignoring connection from {peer_id}: unsupported ALPN protocol"),
}
Ok(())
}
fn input_loop(line_tx: tokio::sync::mpsc::Sender<String>) -> anyhow::Result<()> {
let mut buffer = String::new();
let stdin = std::io::stdin(); loop {
stdin.read_line(&mut buffer)?;
line_tx.blocking_send(buffer.clone())?;
buffer.clear();
}
}
#[derive(Debug, Serialize, Deserialize)]
struct SignedMessage {
from: PublicKey,
data: Bytes,
signature: Signature,
}
impl SignedMessage {
pub fn verify_and_decode(bytes: &[u8]) -> anyhow::Result<(PublicKey, Message)> {
let signed_message: Self = postcard::from_bytes(bytes)?;
let key: PublicKey = signed_message.from;
key.verify(&signed_message.data, &signed_message.signature)?;
let message: Message = postcard::from_bytes(&signed_message.data)?;
Ok((signed_message.from, message))
}
pub fn sign_and_encode(secret_key: &SecretKey, message: &Message) -> anyhow::Result<Bytes> {
let data: Bytes = postcard::to_stdvec(&message)?.into();
let signature = secret_key.sign(&data);
let from: PublicKey = secret_key.public();
let signed_message = Self {
from,
data,
signature,
};
let encoded = postcard::to_stdvec(&signed_message)?;
Ok(encoded.into())
}
}
#[derive(Debug, Serialize, Deserialize)]
enum Message {
AboutMe { name: String },
Message { text: String },
}
#[derive(Debug, Serialize, Deserialize)]
struct Ticket {
topic: TopicId,
peers: Vec<NodeAddr>,
}
impl Ticket {
fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
postcard::from_bytes(bytes).map_err(Into::into)
}
pub fn to_bytes(&self) -> Vec<u8> {
postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible")
}
}
impl fmt::Display for Ticket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", base32::fmt(self.to_bytes()))
}
}
impl FromStr for Ticket {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::from_bytes(&base32::parse_vec(s)?)
}
}
fn fmt_node_id(input: &PublicKey) -> String {
base32::fmt_short(input.as_bytes())
}
fn parse_secret_key(secret: &str) -> anyhow::Result<SecretKey> {
let bytes: [u8; 32] = base32::parse_array(secret)?;
Ok(SecretKey::from(bytes))
}
fn fmt_relay_mode(relay_mode: &RelayMode) -> String {
match relay_mode {
RelayMode::Disabled => "None".to_string(),
RelayMode::Default => "Default Relay servers".to_string(),
RelayMode::Custom(map) => map
.urls()
.map(|url| url.to_string())
.collect::<Vec<_>>()
.join(", "),
}
}