1use std::{
14 collections::HashMap, io::Write, sync::Arc, sync::Mutex, time::Duration,
15};
16
17use futures_util::stream::StreamExt;
18
19use quinn::Connection;
20
21use correspondent::{
22 CertificateResponse, Event, Events, IdentityCanonicalizer, PeerId, Socket,
23 SocketBuilder,
24};
25
26const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
27
28const CA_CERT: &str = include_str!("debug-cert.pem");
31const CA_KEY_PK8: &[u8] = include_bytes!("debug-cert.pk8");
32
33type SharedConnectionSet = Arc<Mutex<HashMap<PeerId<u32>, Connection>>>;
35
36pub struct ProcessIdCanonicalizer;
37
38impl IdentityCanonicalizer for ProcessIdCanonicalizer {
41 type Identity = u32;
42
43 fn to_dns(&self, id: &Self::Identity) -> String {
44 format!("id-{}.example.com", id)
45 }
46
47 fn to_txt(&self, id: &Self::Identity) -> Vec<u8> {
48 id.to_string().into_bytes()
49 }
50
51 fn parse_txt(&self, txt: &[u8]) -> Option<Self::Identity> {
52 std::str::from_utf8(txt).ok()?.parse().ok()
53 }
54}
55
56fn show_prompt(process_id: u32) {
58 print!("{process_id}: ");
59 let _ = std::io::stdout().flush();
60}
61
62#[tokio::main]
63async fn main() {
64 let ca_key = rcgen::KeyPair::from_der(CA_KEY_PK8).unwrap();
66 let params =
67 rcgen::CertificateParams::from_ca_cert_pem(CA_CERT, ca_key).unwrap();
68 let ca_cert = rcgen::Certificate::from_params(params).unwrap();
69 let certificate_signing_callback = |csr: &str| {
70 std::future::ready((|| -> Result<_, Box<rcgen::RcgenError>> {
71 let csr = rcgen::CertificateSigningRequest::from_pem(csr)?;
72 let chain_pem = csr.serialize_pem_with_signer(&ca_cert)?;
73 Ok(CertificateResponse {
74 chain_pem,
75 authority_pem: CA_CERT.to_string(),
76 })
77 })())
78 };
79
80 let process_id = std::process::id();
82
83 let mut builder = SocketBuilder::new()
85 .with_identity(process_id, ProcessIdCanonicalizer)
86 .with_service_name("Correspondent Chat Example".to_string())
87 .with_recommended_socket()
88 .expect("Failed to bind UDP socket")
89 .with_new_certificate(ONE_DAY, certificate_signing_callback)
90 .await
91 .expect("Failed to setup socket certificate");
92
93 Arc::get_mut(&mut builder.client_cfg.transport)
97 .expect("there should not be any other references at this point")
98 .keep_alive_interval(Some(Duration::from_secs(5)));
99
100 let connection_set: Arc<Mutex<HashMap<PeerId<u32>, Connection>>> =
101 Arc::default();
102
103 let (socket, events) = builder.start().expect("Failed to start socket");
104
105 let event_task = tokio::spawn(handle_events(
107 process_id,
108 events,
109 Arc::clone(&connection_set),
110 ));
111 let input_task = tokio::task::spawn_blocking(move || {
113 read_from_stdin(process_id, socket, connection_set)
114 });
115 let _ = tokio::join!(event_task, input_task);
117 println!();
118}
119
120fn read_from_stdin(
121 process_id: u32,
122 socket: Socket<ProcessIdCanonicalizer>,
123 connection_set: SharedConnectionSet,
124) {
125 use std::io::BufRead;
126 let stdin = std::io::stdin();
127 let mut lines = stdin.lock().lines();
128 show_prompt(process_id);
129 while let Some(Ok(line)) = lines.next() {
130 let current_peers: Vec<Connection> = {
131 let current_peers = connection_set.lock().unwrap();
132 current_peers.values().cloned().collect()
135 };
136 for conn in current_peers {
137 let line = line.clone();
138 tokio::spawn(async move {
140 let mut stream = conn.open_uni().await.ok()?;
141 stream.write_all(line.as_bytes()).await.ok()?;
142 stream.finish().await.ok()?;
143 Some(())
144 });
145 }
146 show_prompt(process_id);
147 }
148 socket.endpoint().close(0u8.into(), b"");
150}
151
152async fn handle_events(
153 process_id: u32,
154 mut events: Events<u32>,
155 connection_set: SharedConnectionSet,
156) {
157 while let Some(event) = events.next().await {
158 match event {
159 Event::NewPeer(peer_id, connection) => {
160 {
161 connection_set.lock().unwrap().insert(peer_id, connection);
162 }
163 println!("\r{} joined.", peer_id.identity);
164 show_prompt(process_id);
165 }
166 Event::PeerGone(peer_id) => {
167 {
168 connection_set.lock().unwrap().remove(&peer_id);
169 }
170 println!("\r{} left.", peer_id.identity);
171 show_prompt(process_id);
172 }
173 Event::UniStream(peer_id, stream) => {
174 tokio::spawn(async move {
175 if let Ok(message) = stream.read_to_end(1024).await {
176 let text = String::from_utf8_lossy(&message);
177 println!("\r{}: {}", peer_id.identity, text);
178 show_prompt(process_id);
179 }
180 });
181 }
182 Event::BiStream(..) => {}
185 _ => (),
186 }
187 }
188}