use std::{
collections::HashMap, io::Write, sync::Arc, sync::Mutex, time::Duration,
};
use futures_util::stream::StreamExt;
use quinn::Connection;
use correspondent::{
CertificateResponse, Event, Events, IdentityCanonicalizer, PeerId, Socket,
SocketBuilder,
};
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
const CA_CERT: &str = include_str!("debug-cert.pem");
const CA_KEY_PK8: &[u8] = include_bytes!("debug-cert.pk8");
type SharedConnectionSet = Arc<Mutex<HashMap<PeerId<u32>, Connection>>>;
pub struct ProcessIdCanonicalizer;
impl IdentityCanonicalizer for ProcessIdCanonicalizer {
type Identity = u32;
fn to_dns(&self, id: &Self::Identity) -> String {
format!("id-{}.example.com", id)
}
fn to_txt(&self, id: &Self::Identity) -> Vec<u8> {
id.to_string().into_bytes()
}
fn parse_txt(&self, txt: &[u8]) -> Option<Self::Identity> {
std::str::from_utf8(txt).ok()?.parse().ok()
}
}
fn show_prompt(process_id: u32) {
print!("{process_id}: ");
let _ = std::io::stdout().flush();
}
#[tokio::main]
async fn main() {
let ca_key = rcgen::KeyPair::from_der(CA_KEY_PK8).unwrap();
let params =
rcgen::CertificateParams::from_ca_cert_pem(CA_CERT, ca_key).unwrap();
let ca_cert = rcgen::Certificate::from_params(params).unwrap();
let certificate_signing_callback = |csr: &str| {
std::future::ready((|| -> Result<_, Box<rcgen::RcgenError>> {
let csr = rcgen::CertificateSigningRequest::from_pem(csr)?;
let chain_pem = csr.serialize_pem_with_signer(&ca_cert)?;
Ok(CertificateResponse {
chain_pem,
authority_pem: CA_CERT.to_string(),
})
})())
};
let process_id = std::process::id();
let mut builder = SocketBuilder::new()
.with_identity(process_id, ProcessIdCanonicalizer)
.with_service_name("Correspondent Chat Example".to_string())
.with_recommended_socket()
.expect("Failed to bind UDP socket")
.with_new_certificate(ONE_DAY, certificate_signing_callback)
.await
.expect("Failed to setup socket certificate");
Arc::get_mut(&mut builder.client_cfg.transport)
.expect("there should not be any other references at this point")
.keep_alive_interval(Some(Duration::from_secs(5)));
let connection_set: Arc<Mutex<HashMap<PeerId<u32>, Connection>>> =
Arc::default();
let (socket, events) = builder.start().expect("Failed to start socket");
let event_task = tokio::spawn(handle_events(
process_id,
events,
Arc::clone(&connection_set),
));
let input_task = tokio::task::spawn_blocking(move || {
read_from_stdin(process_id, socket, connection_set)
});
let _ = tokio::join!(event_task, input_task);
println!();
}
fn read_from_stdin(
process_id: u32,
socket: Socket<ProcessIdCanonicalizer>,
connection_set: SharedConnectionSet,
) {
use std::io::BufRead;
let stdin = std::io::stdin();
let mut lines = stdin.lock().lines();
show_prompt(process_id);
while let Some(Ok(line)) = lines.next() {
let current_peers: Vec<Connection> = {
let current_peers = connection_set.lock().unwrap();
current_peers.values().cloned().collect()
};
for conn in current_peers {
let line = line.clone();
tokio::spawn(async move {
let mut stream = conn.open_uni().await.ok()?;
stream.write_all(line.as_bytes()).await.ok()?;
stream.finish().await.ok()?;
Some(())
});
}
show_prompt(process_id);
}
socket.endpoint().close(0u8.into(), b"");
}
async fn handle_events(
process_id: u32,
mut events: Events<u32>,
connection_set: SharedConnectionSet,
) {
while let Some(event) = events.next().await {
match event {
Event::NewPeer(peer_id, connection) => {
{
connection_set.lock().unwrap().insert(peer_id, connection);
}
println!("\r{} joined.", peer_id.identity);
show_prompt(process_id);
}
Event::PeerGone(peer_id) => {
{
connection_set.lock().unwrap().remove(&peer_id);
}
println!("\r{} left.", peer_id.identity);
show_prompt(process_id);
}
Event::UniStream(peer_id, stream) => {
tokio::spawn(async move {
if let Ok(message) = stream.read_to_end(1024).await {
let text = String::from_utf8_lossy(&message);
println!("\r{}: {}", peer_id.identity, text);
show_prompt(process_id);
}
});
}
Event::BiStream(..) => {}
_ => (),
}
}
}