1#[cfg(unix)]
2pub mod db;
3pub mod dns;
4pub mod fetch;
5#[cfg(feature = "irc")]
6pub mod irc;
7pub mod peerdb;
8pub mod peering;
9pub mod proto;
10pub mod sync;
11pub mod update_check;
12
13use crate::args::P2p;
14use crate::config::Config;
15use crate::db::{Database, DatabaseServer};
16use crate::errors::*;
17use crate::keyring::Keyring;
18use crate::p2p::peerdb::PeerDb;
19use socket2::{Domain, Socket, Type};
20use std::convert::Infallible;
21use std::net::SocketAddr;
22use std::time::Duration;
23use tokio::net::TcpSocket;
24use tokio::sync::mpsc;
25use tokio::task::JoinSet;
26use tokio::time;
27
28const FETCH_INTERVAL: Duration = Duration::from_secs(60 * 15); const FETCH_INTERVAL_JITTER: Duration = Duration::from_secs(45);
30const SYNC_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
31
32const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); const UPDATE_CHECK_DEBOUNCE: Duration = Duration::from_secs(5);
34const UPDATE_SHUTDOWN_DELAY: Duration = Duration::from_secs(60 * 20); const GOSSIP_IDLE_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(3600 * 24); const P2P_SYNC_PORT_BACKLOG: u32 = 1024;
38
39pub async fn random_jitter(jitter: Duration) {
40 let jitter = fastrand::u64(..jitter.as_secs());
41 time::sleep(Duration::from_secs(jitter)).await;
42}
43
44pub async fn spawn(
45 db: Database,
46 keyring: Keyring,
47 config: Config,
48 p2p: P2p,
49 proxy: Option<SocketAddr>,
50) -> Result<Infallible> {
51 let mut set = JoinSet::new();
52
53 let (mut db_server, mut db_client) = DatabaseServer::new(db);
54 set.spawn(async move {
55 db_server.run().await?;
56 bail!("Database server has terminated");
57 });
58
59 #[cfg(unix)]
60 {
61 let db_client = db_client.clone();
62 let db_socket_path = config.db_socket_path()?;
63 set.spawn(async move { db::spawn_unix_db_server(&db_client, db_socket_path).await });
64 }
65
66 let (irc_tx, irc_rx) = mpsc::channel(32);
67 let irc_tx = cfg!(feature = "irc").then(|| irc_tx);
68 let peerdb = PeerDb::read(&config).await?;
69
70 if !p2p.no_bind {
71 for addr in p2p.bind {
72 let db_client = db_client.clone();
73 let socket = match addr {
74 SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
75 SocketAddr::V6(_) => {
76 let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;
77 socket
78 .set_only_v6(true)
79 .context("Failed to set port to ipv6-only")?;
80 socket
81 }
82 };
83 socket
84 .set_reuse_address(true)
85 .context("Failed to set reuseaddr for port")?;
86 socket
87 .set_nonblocking(true)
88 .context("Failed to set port to non-blocking")?;
89 let socket = TcpSocket::from_std_stream(socket.into());
90
91 socket
92 .bind(addr)
93 .with_context(|| anyhow!("Failed to bind to address: {:?}", addr))?;
94 let listener = socket.listen(P2P_SYNC_PORT_BACKLOG)?;
95
96 debug!("Listening on address: {addr:?}");
97 set.spawn(async move { sync::spawn_sync_server(&db_client, listener).await });
98 }
99 }
100
101 if !p2p.no_fetch {
102 let mut db_client = db_client.clone();
103 let keyring = keyring.clone();
104 let repositories = config.data.repositories;
105 set.spawn(async move {
106 fetch::spawn_fetch_timer(
107 &mut db_client,
108 keyring,
109 repositories,
110 proxy,
111 p2p.announce,
112 irc_tx,
113 )
114 .await
115 });
116 }
117
118 if let Some(image) = p2p.check_container_updates {
119 let commit = match p2p.update_assume_commit {
120 Some(s) if s.is_empty() => {
121 bail!("Update checks are configured but current commit is empty string")
122 }
123 Some(commit) => commit,
124 None => bail!("Update checks are configured but current commit is not provided"),
125 };
126 set.spawn(update_check::spawn_update_check(image, commit));
127 }
128
129 let (peering_tx, peering_rx) = mpsc::channel(1024);
130 set.spawn(
131 async move { peering::spawn(&mut db_client, keyring, peerdb, proxy, peering_rx).await },
132 );
133
134 #[cfg(feature = "irc")]
135 if !p2p.no_bootstrap && !p2p.irc.no_irc {
136 set.spawn(irc::spawn(irc_rx, p2p.irc.irc_channel, peering_tx.clone()));
137 }
138
139 if !p2p.no_bootstrap && !p2p.dns.no_dns {
140 set.spawn(dns::spawn(p2p.dns.dns, peering_tx));
141 }
142
143 #[cfg(not(feature = "irc"))]
145 let _ = irc_rx;
146
147 info!("Successfully started p2p node...");
148 let result = set
149 .join_next()
150 .await
151 .context("All features have been disabled, nothing to do")?;
152 result.context("Failed to wait for task")?
153}