apt_swarm/p2p/
irc.rs

1use crate::errors::*;
2use crate::p2p;
3use crate::p2p::proto::{PeerGossip, SyncRequest};
4use futures::prelude::*;
5use irc::client::prelude::{Client, Command, Config, Response};
6use std::convert::Infallible;
7use std::time::Duration;
8use tokio::sync::mpsc;
9use tokio::sync::mpsc::error::TrySendError;
10use tokio::time;
11use url::Url;
12
13const IRC_DEBOUNCE: Duration = Duration::from_millis(250);
14pub const IRC_RECONNECT_COOLDOWN: Duration = Duration::from_secs(60); // 1min
15pub const IRC_RECONNECT_JITTER: Duration = Duration::from_secs(60 * 3); // 3min
16
17const IRC_CLIENT_ID: &str = concat!(
18    "p2p bootstrap - v",
19    env!("CARGO_PKG_VERSION"),
20    " https://github.com/kpcyrd/apt-swarm"
21);
22
23fn random_nickname() -> String {
24    let mut buf = [0u8; 3];
25    getrandom::fill(&mut buf).expect("Failed to use getrandom");
26    let name = format!("apt-swarm-{}", hex::encode(buf));
27    name
28}
29
30fn parse_channel(url: &str) -> Result<(String, String)> {
31    let url = Url::parse(url).with_context(|| anyhow!("Failed to parse url: {url:?}"))?;
32
33    if url.scheme() != "ircs" {
34        bail!("Only secure ircs:// links are supported: {url:?}");
35    }
36
37    let mut host = url
38        .host_str()
39        .with_context(|| anyhow!("Could not found host in irc url: {url:?}"))?
40        .to_string();
41
42    if let Some(port) = url.port() {
43        host += &format!(":{port}");
44    }
45
46    if !["", "/"].contains(&url.path()) {
47        bail!("Found unexpected path in irc url: {url:?}");
48    }
49
50    let fragment = url
51        .fragment()
52        .with_context(|| anyhow!("Could not find channel in irc url: {url:?}"))?;
53    Ok((host, format!("#{fragment}")))
54}
55
56pub async fn connect_irc(
57    rx: &mut mpsc::Receiver<String>,
58    irc: &(String, String),
59    peering_tx: &mpsc::Sender<SyncRequest>,
60) -> Result<Infallible> {
61    let (server, channel) = irc;
62    info!("Connecting to irc for peer discovery (server={server:?}, channel={channel:?})...");
63    let nickname = random_nickname();
64
65    let config = Config {
66        nickname: Some(nickname),
67        server: Some("irc.hackint.org".to_string()),
68        channels: vec![channel.to_string()],
69        realname: Some(IRC_CLIENT_ID.to_string()),
70        use_tls: Some(true),
71        ..Default::default()
72    };
73
74    let mut client = Client::from_config(config).await?;
75
76    client
77        .identify()
78        .context("Failed to identify with irc server")?;
79
80    let mut stream = client.stream().context("Failed to setup irc stream")?;
81
82    loop {
83        tokio::select! {
84            msg = stream.next() => {
85                if let Some(message) = msg.transpose().context("Failed to read from irc stream")? {
86                    trace!("Received msg from irc server: {message:?}");
87                    match message.command {
88                        Command::PRIVMSG(target, msg) => {
89                            debug!("Received irc privmsg: {:?}: {:?}", target, msg);
90                            if target != *channel {
91                                continue;
92                            }
93
94                            if !msg.starts_with("[sync] ") {
95                                continue;
96                            }
97
98                            match msg.parse::<PeerGossip>() {
99                                Ok(info) => {
100                                    info!("Discovered peer: {info:?}");
101                                    let info = SyncRequest::from(info);
102                                    if let Err(TrySendError::Full(info)) = peering_tx.try_send(info) {
103                                        warn!("Discarding peer gossip because peering backlog is full: {info:?}");
104                                    }
105                                }
106                                Err(err) => {
107                                    warn!("Malformed irc message: {err:#}");
108                                }
109                            }
110                        }
111                        Command::Response(Response::RPL_ISUPPORT, _) => {
112                            // client.send_quit("QUIT")?;
113                        }
114                        _ => (),
115                    }
116                } else {
117                    bail!("irc client has been shutdown");
118                }
119            }
120            msg = rx.recv() => {
121                if let Some(msg) = msg {
122                    debug!("Sending message to irc: {msg:?}");
123                    client.send_privmsg(channel, &msg)
124                        .context("Failed to send to irc")?;
125                    // slowing this down slightly, just in case
126                    time::sleep(Duration::from_millis(250)).await;
127                }
128            }
129        }
130    }
131}
132
133pub async fn spawn(
134    mut rx: mpsc::Receiver<String>,
135    url: String,
136    peering_tx: mpsc::Sender<SyncRequest>,
137) -> Result<Infallible> {
138    // briefly delay the connection, so we don't spam irc in case something crashes immediately
139    tokio::time::sleep(IRC_DEBOUNCE).await;
140
141    let irc = parse_channel(&url)?;
142    loop {
143        let Err(err) = connect_irc(&mut rx, &irc, &peering_tx).await;
144        error!("irc connection has crashed: {err:#}");
145
146        time::sleep(IRC_RECONNECT_COOLDOWN).await;
147        p2p::random_jitter(IRC_RECONNECT_JITTER).await;
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_parse_irc_url() {
157        let url = "ircs://irc.hackint.org/##apt-swarm-p2p";
158        let (server, channel) = parse_channel(url).unwrap();
159        assert_eq!(server, "irc.hackint.org");
160        assert_eq!(channel, "##apt-swarm-p2p");
161
162        let url = "ircs://irc.hackint.org##apt-swarm-p2p";
163        let (server, channel) = parse_channel(url).unwrap();
164        assert_eq!(server, "irc.hackint.org");
165        assert_eq!(channel, "##apt-swarm-p2p");
166
167        let url = "ircs://irc.hackint.org:1337/##apt-swarm-p2p";
168        let (server, channel) = parse_channel(url).unwrap();
169        assert_eq!(server, "irc.hackint.org:1337");
170        assert_eq!(channel, "##apt-swarm-p2p");
171    }
172
173    #[test]
174    fn test_parse_irc_invalid() {
175        assert!(parse_channel("irc://irc.hackint.org/##apt-swarm-p2p").is_err());
176        assert!(parse_channel("https://irc.hackint.org/##apt-swarm-p2p").is_err());
177        assert!(parse_channel("ircs://irc.hackint.org/").is_err());
178        assert!(parse_channel("ircs://irc.hackint.org/abc").is_err());
179        assert!(parse_channel("ircs://irc.hackint.org/abc##apt-swarm-p2p").is_err());
180    }
181}