apt_swarm/p2p/
mod.rs

1#[cfg(unix)]
2pub mod db;
3pub mod dns;
4pub mod fetch;
5#[cfg(feature = "irc")]
6pub mod irc;
7pub mod peerdb;
8pub mod peering;
9pub mod proto;
10pub mod sync;
11pub mod update_check;
12
13use crate::args::P2p;
14use crate::config::Config;
15use crate::db::{Database, DatabaseServer};
16use crate::errors::*;
17use crate::keyring::Keyring;
18use crate::p2p::peerdb::PeerDb;
19use socket2::{Domain, Socket, Type};
20use std::convert::Infallible;
21use std::net::SocketAddr;
22use std::time::Duration;
23use tokio::net::TcpSocket;
24use tokio::sync::mpsc;
25use tokio::task::JoinSet;
26use tokio::time;
27
28const FETCH_INTERVAL: Duration = Duration::from_secs(60 * 15); // 15min
29const FETCH_INTERVAL_JITTER: Duration = Duration::from_secs(45);
30const SYNC_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
31
32const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); // 15min
33const UPDATE_CHECK_DEBOUNCE: Duration = Duration::from_secs(5);
34const UPDATE_SHUTDOWN_DELAY: Duration = Duration::from_secs(60 * 20); // 20min
35
36const GOSSIP_IDLE_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(3600 * 24); // 1h, set this to 24h later
37const P2P_SYNC_PORT_BACKLOG: u32 = 1024;
38
39pub async fn random_jitter(jitter: Duration) {
40    let jitter = fastrand::u64(..jitter.as_secs());
41    time::sleep(Duration::from_secs(jitter)).await;
42}
43
44pub async fn spawn(
45    db: Database,
46    keyring: Keyring,
47    config: Config,
48    p2p: P2p,
49    proxy: Option<SocketAddr>,
50) -> Result<Infallible> {
51    let mut set = JoinSet::new();
52
53    let (mut db_server, mut db_client) = DatabaseServer::new(db);
54    set.spawn(async move {
55        db_server.run().await?;
56        bail!("Database server has terminated");
57    });
58
59    #[cfg(unix)]
60    {
61        let db_client = db_client.clone();
62        let db_socket_path = config.db_socket_path()?;
63        set.spawn(async move { db::spawn_unix_db_server(&db_client, db_socket_path).await });
64    }
65
66    let (irc_tx, irc_rx) = mpsc::channel(32);
67    let irc_tx = cfg!(feature = "irc").then(|| irc_tx);
68    let peerdb = PeerDb::read(&config).await?;
69
70    let (peerdb_tx, peerdb_rx) = peerdb::Client::new();
71    set.spawn(async move { peerdb::spawn(peerdb, peerdb_rx).await });
72
73    if !p2p.no_bind {
74        for addr in p2p.bind {
75            let db_client = db_client.clone();
76            let socket = match addr {
77                SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
78                SocketAddr::V6(_) => {
79                    let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;
80                    socket
81                        .set_only_v6(true)
82                        .context("Failed to set port to ipv6-only")?;
83                    socket
84                }
85            };
86            socket
87                .set_reuse_address(true)
88                .context("Failed to set reuseaddr for port")?;
89            socket
90                .set_nonblocking(true)
91                .context("Failed to set port to non-blocking")?;
92            let socket = TcpSocket::from_std_stream(socket.into());
93
94            socket
95                .bind(addr)
96                .with_context(|| anyhow!("Failed to bind to address: {:?}", addr))?;
97            let listener = socket.listen(P2P_SYNC_PORT_BACKLOG)?;
98
99            debug!("Listening on address: {addr:?}");
100            let peerdb_tx = peerdb_tx.clone();
101            set.spawn(
102                async move { sync::spawn_sync_server(&db_client, peerdb_tx, listener).await },
103            );
104        }
105    }
106
107    if !p2p.no_fetch {
108        let mut db_client = db_client.clone();
109        let keyring = keyring.clone();
110        let repositories = config.data.repositories;
111        set.spawn(async move {
112            fetch::spawn_fetch_timer(
113                &mut db_client,
114                keyring,
115                repositories,
116                proxy,
117                p2p.announce,
118                irc_tx,
119            )
120            .await
121        });
122    }
123
124    if let Some(image) = p2p.check_container_updates {
125        let commit = match p2p.update_assume_commit {
126            Some(s) if s.is_empty() => {
127                bail!("Update checks are configured but current commit is empty string")
128            }
129            Some(commit) => commit,
130            None => bail!("Update checks are configured but current commit is not provided"),
131        };
132        set.spawn(update_check::spawn_update_check(image, commit));
133    }
134
135    let (peering_tx, peering_rx) = mpsc::channel(1024);
136    set.spawn(async move {
137        peering::spawn(&mut db_client, keyring, peerdb_tx, proxy, peering_rx).await
138    });
139
140    #[cfg(feature = "irc")]
141    if !p2p.no_bootstrap && !p2p.irc.no_irc {
142        set.spawn(irc::spawn(irc_rx, p2p.irc.irc_channel, peering_tx.clone()));
143    }
144
145    if !p2p.no_bootstrap && !p2p.dns.no_dns {
146        set.spawn(dns::spawn(p2p.dns.dns, peering_tx));
147    }
148
149    // if irc is not enabled, supress an unused variable warning
150    #[cfg(not(feature = "irc"))]
151    let _ = irc_rx;
152
153    info!("Successfully started p2p node...");
154    let result = set
155        .join_next()
156        .await
157        .context("All features have been disabled, nothing to do")?;
158    result.context("Failed to wait for task")?
159}