1#[cfg(unix)]
2pub mod db;
3pub mod dns;
4pub mod fetch;
5#[cfg(feature = "irc")]
6pub mod irc;
7pub mod peerdb;
8pub mod peering;
9pub mod proto;
10pub mod sync;
11pub mod update_check;
12
13use crate::args::P2p;
14use crate::config::Config;
15use crate::db::{Database, DatabaseServer};
16use crate::errors::*;
17use crate::keyring::Keyring;
18use crate::p2p::peerdb::PeerDb;
19use socket2::{Domain, Socket, Type};
20use std::convert::Infallible;
21use std::net::SocketAddr;
22use std::time::Duration;
23use tokio::net::TcpSocket;
24use tokio::sync::mpsc;
25use tokio::task::JoinSet;
26use tokio::time;
27
28const FETCH_INTERVAL: Duration = Duration::from_secs(60 * 15); const FETCH_INTERVAL_JITTER: Duration = Duration::from_secs(45);
30const SYNC_IDLE_TIMEOUT: Duration = Duration::from_secs(60);
31
32const UPDATE_CHECK_INTERVAL: Duration = Duration::from_secs(60 * 15); const UPDATE_CHECK_DEBOUNCE: Duration = Duration::from_secs(5);
34const UPDATE_SHUTDOWN_DELAY: Duration = Duration::from_secs(60 * 20); const GOSSIP_IDLE_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(3600 * 24); const P2P_SYNC_PORT_BACKLOG: u32 = 1024;
38
39pub async fn random_jitter(jitter: Duration) {
40 let jitter = fastrand::u64(..jitter.as_secs());
41 time::sleep(Duration::from_secs(jitter)).await;
42}
43
44pub async fn spawn(
45 db: Database,
46 keyring: Keyring,
47 config: Config,
48 p2p: P2p,
49 proxy: Option<SocketAddr>,
50) -> Result<Infallible> {
51 let mut set = JoinSet::new();
52
53 let (mut db_server, mut db_client) = DatabaseServer::new(db);
54 set.spawn(async move {
55 db_server.run().await?;
56 bail!("Database server has terminated");
57 });
58
59 #[cfg(unix)]
60 {
61 let db_client = db_client.clone();
62 let db_socket_path = config.db_socket_path()?;
63 set.spawn(async move { db::spawn_unix_db_server(&db_client, db_socket_path).await });
64 }
65
66 let (irc_tx, irc_rx) = mpsc::channel(32);
67 let irc_tx = cfg!(feature = "irc").then(|| irc_tx);
68 let peerdb = PeerDb::read(&config).await?;
69
70 let (peerdb_tx, peerdb_rx) = peerdb::Client::new();
71 set.spawn(async move { peerdb::spawn(peerdb, peerdb_rx).await });
72
73 if !p2p.no_bind {
74 for addr in p2p.bind {
75 let db_client = db_client.clone();
76 let socket = match addr {
77 SocketAddr::V4(_) => Socket::new(Domain::IPV4, Type::STREAM, None)?,
78 SocketAddr::V6(_) => {
79 let socket = Socket::new(Domain::IPV6, Type::STREAM, None)?;
80 socket
81 .set_only_v6(true)
82 .context("Failed to set port to ipv6-only")?;
83 socket
84 }
85 };
86 socket
87 .set_reuse_address(true)
88 .context("Failed to set reuseaddr for port")?;
89 socket
90 .set_nonblocking(true)
91 .context("Failed to set port to non-blocking")?;
92 let socket = TcpSocket::from_std_stream(socket.into());
93
94 socket
95 .bind(addr)
96 .with_context(|| anyhow!("Failed to bind to address: {:?}", addr))?;
97 let listener = socket.listen(P2P_SYNC_PORT_BACKLOG)?;
98
99 debug!("Listening on address: {addr:?}");
100 let peerdb_tx = peerdb_tx.clone();
101 set.spawn(
102 async move { sync::spawn_sync_server(&db_client, peerdb_tx, listener).await },
103 );
104 }
105 }
106
107 if !p2p.no_fetch {
108 let mut db_client = db_client.clone();
109 let keyring = keyring.clone();
110 let repositories = config.data.repositories;
111 set.spawn(async move {
112 fetch::spawn_fetch_timer(
113 &mut db_client,
114 keyring,
115 repositories,
116 proxy,
117 p2p.announce,
118 irc_tx,
119 )
120 .await
121 });
122 }
123
124 if let Some(image) = p2p.check_container_updates {
125 let commit = match p2p.update_assume_commit {
126 Some(s) if s.is_empty() => {
127 bail!("Update checks are configured but current commit is empty string")
128 }
129 Some(commit) => commit,
130 None => bail!("Update checks are configured but current commit is not provided"),
131 };
132 set.spawn(update_check::spawn_update_check(image, commit));
133 }
134
135 let (peering_tx, peering_rx) = mpsc::channel(1024);
136 set.spawn(async move {
137 peering::spawn(&mut db_client, keyring, peerdb_tx, proxy, peering_rx).await
138 });
139
140 #[cfg(feature = "irc")]
141 if !p2p.no_bootstrap && !p2p.irc.no_irc {
142 set.spawn(irc::spawn(irc_rx, p2p.irc.irc_channel, peering_tx.clone()));
143 }
144
145 if !p2p.no_bootstrap && !p2p.dns.no_dns {
146 set.spawn(dns::spawn(p2p.dns.dns, peering_tx));
147 }
148
149 #[cfg(not(feature = "irc"))]
151 let _ = irc_rx;
152
153 info!("Successfully started p2p node...");
154 let result = set
155 .join_next()
156 .await
157 .context("All features have been disabled, nothing to do")?;
158 result.context("Failed to wait for task")?
159}