apt_swarm/p2p/
mod.rs

1#[cfg(unix)]
2pub mod db;
3pub mod dns;
4pub mod fetch;
5#[cfg(feature = "irc")]
6pub mod irc;
7pub mod peerdb;
8pub mod peering;
9pub mod proto;
10pub mod sync;
11pub mod update_check;
12
13use crate::args::P2p;
14use crate::config::Config;
15use crate::db::{Database, DatabaseServer};
16use crate::errors::*;
17use crate::keyring::Keyring;
18use crate::p2p::peerdb::PeerDb;
19use socket2::{Domain, Socket, Type};
20use std::convert::Infallible;
21use std::net::SocketAddr;
22use std::time::Duration;
23use tokio::net::TcpSocket;
24use tokio::sync::mpsc;
25use tokio::task::JoinSet;
26use tokio::time;
27
28const FETCH_INTERVAL: Duration = Duration::from_secs(60 * 15); // 15min
29const FETCH_INTERVAL_JITTER: Duration = Duration::from_secs(45);
30const SYNC_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
31
32const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); // 15min
33const UPDATE_CHECK_DEBOUNCE: Duration = Duration::from_secs(5);
34const UPDATE_SHUTDOWN_DELAY: Duration = Duration::from_secs(60 * 20); // 20min
35
36const GOSSIP_IDLE_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(3600 * 24); // 1h, set this to 24h later
37const P2P_SYNC_PORT_BACKLOG: u32 = 1024;
38
39pub async fn random_jitter(jitter: Duration) {
40    let jitter = fastrand::u64(..jitter.as_secs());
41    time::sleep(Duration::from_secs(jitter)).await;
42}
43
44pub async fn spawn(
45    db: Database,
46    keyring: Keyring,
47    config: Config,
48    p2p: P2p,
49    proxy: Option<SocketAddr>,
50) -> Result<Infallible> {
51    let mut set = JoinSet::new();
52
53    let (mut db_server, mut db_client) = DatabaseServer::new(db);
54    set.spawn(async move {
55        db_server.run().await?;
56        bail!("Database server has terminated");
57    });
58
59    #[cfg(unix)]
60    {
61        let db_client = db_client.clone();
62        let db_socket_path = config.db_socket_path()?;
63        set.spawn(async move { db::spawn_unix_db_server(&db_client, db_socket_path).await });
64    }
65
66    let (irc_tx, irc_rx) = mpsc::channel(32);
67    let irc_tx = cfg!(feature = "irc").then(|| irc_tx);
68    let peerdb = PeerDb::read(&config).await?;
69
70    if !p2p.no_bind {
71        for addr in p2p.bind {
72            let db_client = db_client.clone();
73            let socket = match addr {
74                SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
75                SocketAddr::V6(_) => {
76                    let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;
77                    socket
78                        .set_only_v6(true)
79                        .context("Failed to set port to ipv6-only")?;
80                    socket
81                }
82            };
83            socket
84                .set_reuse_address(true)
85                .context("Failed to set reuseaddr for port")?;
86            socket
87                .set_nonblocking(true)
88                .context("Failed to set port to non-blocking")?;
89            let socket = TcpSocket::from_std_stream(socket.into());
90
91            socket
92                .bind(addr)
93                .with_context(|| anyhow!("Failed to bind to address: {:?}", addr))?;
94            let listener = socket.listen(P2P_SYNC_PORT_BACKLOG)?;
95
96            debug!("Listening on address: {addr:?}");
97            set.spawn(async move { sync::spawn_sync_server(&db_client, listener).await });
98        }
99    }
100
101    if !p2p.no_fetch {
102        let mut db_client = db_client.clone();
103        let keyring = keyring.clone();
104        let repositories = config.data.repositories;
105        set.spawn(async move {
106            fetch::spawn_fetch_timer(
107                &mut db_client,
108                keyring,
109                repositories,
110                proxy,
111                p2p.announce,
112                irc_tx,
113            )
114            .await
115        });
116    }
117
118    if let Some(image) = p2p.check_container_updates {
119        let commit = match p2p.update_assume_commit {
120            Some(s) if s.is_empty() => {
121                bail!("Update checks are configured but current commit is empty string")
122            }
123            Some(commit) => commit,
124            None => bail!("Update checks are configured but current commit is not provided"),
125        };
126        set.spawn(update_check::spawn_update_check(image, commit));
127    }
128
129    let (peering_tx, peering_rx) = mpsc::channel(1024);
130    set.spawn(
131        async move { peering::spawn(&mut db_client, keyring, peerdb, proxy, peering_rx).await },
132    );
133
134    #[cfg(feature = "irc")]
135    if !p2p.no_bootstrap && !p2p.irc.no_irc {
136        set.spawn(irc::spawn(irc_rx, p2p.irc.irc_channel, peering_tx.clone()));
137    }
138
139    if !p2p.no_bootstrap && !p2p.dns.no_dns {
140        set.spawn(dns::spawn(p2p.dns.dns, peering_tx));
141    }
142
143    // if irc is not enabled, supress an unused variable warning
144    #[cfg(not(feature = "irc"))]
145    let _ = irc_rx;
146
147    info!("Successfully started p2p node...");
148    let result = set
149        .join_next()
150        .await
151        .context("All features have been disabled, nothing to do")?;
152    result.context("Failed to wait for task")?
153}