chat/
chat.rs

1/* SPDX-License-Identifier: (Apache-2.0 OR MIT OR Zlib) */
2/* Copyright © 2021 Violet Leonard */
3
4//! This example demonstrates using correspondent to create a basic LAN chat
5//! application.
6//!
7//! To try out it out, run multiple instances of this example.
8//! The instances should automatically connect, and entering a message on one
9//! will cause it to appear on the other(s).
10//!
11//! The example uses process ids as identity values.
12
13use std::{
14    collections::HashMap, io::Write, sync::Arc, sync::Mutex, time::Duration,
15};
16
17use futures_util::stream::StreamExt;
18
19use quinn::Connection;
20
21use correspondent::{
22    CertificateResponse, Event, Events, IdentityCanonicalizer, PeerId, Socket,
23    SocketBuilder,
24};
25
26const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
27
28// These certificates are publicly available, and should not be used for
29// real applications
30const CA_CERT: &str = include_str!("debug-cert.pem");
31const CA_KEY_PK8: &[u8] = include_bytes!("debug-cert.pk8");
32
33// A type alias for the set of active connections shared between tasks
34type SharedConnectionSet = Arc<Mutex<HashMap<PeerId<u32>, Connection>>>;
35
36pub struct ProcessIdCanonicalizer;
37
38/// IdentityCanonicalizer specifies some essential conversions for the
39/// protocols that correspondent uses.
40impl IdentityCanonicalizer for ProcessIdCanonicalizer {
41    type Identity = u32;
42
43    fn to_dns(&self, id: &Self::Identity) -> String {
44        format!("id-{}.example.com", id)
45    }
46
47    fn to_txt(&self, id: &Self::Identity) -> Vec<u8> {
48        id.to_string().into_bytes()
49    }
50
51    fn parse_txt(&self, txt: &[u8]) -> Option<Self::Identity> {
52        std::str::from_utf8(txt).ok()?.parse().ok()
53    }
54}
55
56// Utility to re-show the prompt after printing
57fn show_prompt(process_id: u32) {
58    print!("{process_id}: ");
59    let _ = std::io::stdout().flush();
60}
61
62#[tokio::main]
63async fn main() {
64    // Create the certificate signing callback from the debug-cert files
65    let ca_key = rcgen::KeyPair::from_der(CA_KEY_PK8).unwrap();
66    let params =
67        rcgen::CertificateParams::from_ca_cert_pem(CA_CERT, ca_key).unwrap();
68    let ca_cert = rcgen::Certificate::from_params(params).unwrap();
69    let certificate_signing_callback = |csr: &str| {
70        std::future::ready((|| -> Result<_, Box<rcgen::RcgenError>> {
71            let csr = rcgen::CertificateSigningRequest::from_pem(csr)?;
72            let chain_pem = csr.serialize_pem_with_signer(&ca_cert)?;
73            Ok(CertificateResponse {
74                chain_pem,
75                authority_pem: CA_CERT.to_string(),
76            })
77        })())
78    };
79
80    // Get the current process id to use as our socket identity
81    let process_id = std::process::id();
82
83    // Configure correspondent socket
84    let mut builder = SocketBuilder::new()
85        .with_identity(process_id, ProcessIdCanonicalizer)
86        .with_service_name("Correspondent Chat Example".to_string())
87        .with_recommended_socket()
88        .expect("Failed to bind UDP socket")
89        .with_new_certificate(ONE_DAY, certificate_signing_callback)
90        .await
91        .expect("Failed to setup socket certificate");
92
93    // For applications that are not constantly sending data (like this
94    // chat app, which may idle when messages are not being typed) setting a
95    // keep-alive value will prevent connections from closing due to timeout
96    Arc::get_mut(&mut builder.client_cfg.transport)
97        .expect("there should not be any other references at this point")
98        .keep_alive_interval(Some(Duration::from_secs(5)));
99
100    let connection_set: Arc<Mutex<HashMap<PeerId<u32>, Connection>>> =
101        Arc::default();
102
103    let (socket, events) = builder.start().expect("Failed to start socket");
104
105    // Run the network event handling on tokio's async workers
106    let event_task = tokio::spawn(handle_events(
107        process_id,
108        events,
109        Arc::clone(&connection_set),
110    ));
111    // Run the input handling on tokio's blocking thread pool
112    let input_task = tokio::task::spawn_blocking(move || {
113        read_from_stdin(process_id, socket, connection_set)
114    });
115    // Wait for both tasks to complete
116    let _ = tokio::join!(event_task, input_task);
117    println!();
118}
119
120fn read_from_stdin(
121    process_id: u32,
122    socket: Socket<ProcessIdCanonicalizer>,
123    connection_set: SharedConnectionSet,
124) {
125    use std::io::BufRead;
126    let stdin = std::io::stdin();
127    let mut lines = stdin.lock().lines();
128    show_prompt(process_id);
129    while let Some(Ok(line)) = lines.next() {
130        let current_peers: Vec<Connection> = {
131            let current_peers = connection_set.lock().unwrap();
132            // Clone the connections in the set so the mutex can
133            // be released early
134            current_peers.values().cloned().collect()
135        };
136        for conn in current_peers {
137            let line = line.clone();
138            // Use tokio::spawn so messages are sent to peers concurrently
139            tokio::spawn(async move {
140                let mut stream = conn.open_uni().await.ok()?;
141                stream.write_all(line.as_bytes()).await.ok()?;
142                stream.finish().await.ok()?;
143                Some(())
144            });
145        }
146        show_prompt(process_id);
147    }
148    // We're done with input, so shutdown the socket
149    socket.endpoint().close(0u8.into(), b"");
150}
151
152async fn handle_events(
153    process_id: u32,
154    mut events: Events<u32>,
155    connection_set: SharedConnectionSet,
156) {
157    while let Some(event) = events.next().await {
158        match event {
159            Event::NewPeer(peer_id, connection) => {
160                {
161                    connection_set.lock().unwrap().insert(peer_id, connection);
162                }
163                println!("\r{} joined.", peer_id.identity);
164                show_prompt(process_id);
165            }
166            Event::PeerGone(peer_id) => {
167                {
168                    connection_set.lock().unwrap().remove(&peer_id);
169                }
170                println!("\r{} left.", peer_id.identity);
171                show_prompt(process_id);
172            }
173            Event::UniStream(peer_id, stream) => {
174                tokio::spawn(async move {
175                    if let Ok(message) = stream.read_to_end(1024).await {
176                        let text = String::from_utf8_lossy(&message);
177                        println!("\r{}: {}", peer_id.identity, text);
178                        show_prompt(process_id);
179                    }
180                });
181            }
182            // This example does not use bidirectional streams, or handle
183            // any events which may be added in a future version
184            Event::BiStream(..) => {}
185            _ => (),
186        }
187    }
188}