1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_precision_loss,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 clippy::unchecked_time_subtraction,
7 reason = "M175: DHT actor — KRPC field widths fixed by spec; time deltas use post-bootstrap Instants captured well after process start"
8)]
9
10use std::collections::HashMap;
14use std::net::SocketAddr;
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU16, Ordering};
18use std::time::{Duration, Instant};
19
20use dashmap::DashMap;
21use tokio::net::UdpSocket;
22use tokio::sync::{mpsc, oneshot};
23use tracing::{debug, trace, warn};
24
25use irontide_core::{AddressFamily, Id20};
26
27use crate::bep44::{self, ImmutableItem, MAX_SALT_SIZE, MAX_VALUE_SIZE, MutableItem};
28use crate::compact::CompactNodeInfo;
29use crate::error::{Error, Result};
30use crate::krpc::{
31 GetPeersResponse, KrpcBody, KrpcMessage, KrpcQuery, KrpcResponse, SampleInfohashesResponse,
32 TransactionId,
33};
34use crate::lookup::{FindNodeCallbacks, IterativeLookup};
35use crate::node_id::{self, ExternalIpVoter, IpVoteSource};
36use crate::peer_store::PeerStore;
37use crate::routing_table::{K, RoutingTable};
38use crate::storage::{DhtStorage, InMemoryDhtStorage};
39
40#[allow(unused_imports)]
41use ed25519_dalek::SigningKey;
42
43struct QueryRateLimiter {
48 permits: u32,
49 max_permits: u32,
50 last_refill: Instant,
51 refill_rate: u32,
52}
53
54impl QueryRateLimiter {
55 fn new(rate: usize) -> Self {
58 Self {
59 permits: rate as u32,
60 max_permits: rate as u32,
61 last_refill: Instant::now(),
62 refill_rate: rate as u32,
63 }
64 }
65
66 fn try_acquire(&mut self) -> bool {
69 self.refill();
70 if self.permits > 0 {
71 self.permits -= 1;
72 true
73 } else {
74 false
75 }
76 }
77
78 fn refill(&mut self) {
82 let elapsed = self.last_refill.elapsed();
83 let elapsed_secs = elapsed.as_secs_f64();
84 let new_permits = (elapsed_secs * f64::from(self.refill_rate)) as u32;
85 if new_permits > 0 {
86 self.permits = (self.permits + new_permits).min(self.max_permits);
87 self.last_refill = Instant::now();
88 }
89 }
90}
91
92pub(crate) struct SharedRateLimiter {
96 inner: parking_lot::Mutex<QueryRateLimiter>,
97}
98
99impl SharedRateLimiter {
100 pub fn new(rate: usize) -> Self {
102 Self {
103 inner: parking_lot::Mutex::new(QueryRateLimiter::new(rate)),
104 }
105 }
106
107 pub fn try_acquire(&self) -> bool {
109 self.inner.lock().try_acquire()
110 }
111
112 pub async fn acquire(&self) {
115 loop {
116 if self.try_acquire() {
117 return;
118 }
119 tokio::time::sleep(Duration::from_millis(4)).await;
120 }
121 }
122}
123
124#[derive(Debug, Clone)]
126pub struct DhtConfig {
127 pub bind_addr: SocketAddr,
129 pub bootstrap_nodes: Vec<String>,
131 pub own_id: Option<Id20>,
133 pub queries_per_second: usize,
135 pub query_timeout: Duration,
137 pub address_family: AddressFamily,
139 pub enforce_node_id: bool,
142 pub restrict_routing_ips: bool,
144 pub dht_max_items: usize,
146 pub dht_item_lifetime_secs: u64,
148 pub max_routing_nodes: usize,
151 pub state_dir: Option<PathBuf>,
155 pub read_only_mode: bool,
159 pub enable_multi_address: bool,
162}
163
164impl Default for DhtConfig {
165 fn default() -> Self {
166 Self {
167 bind_addr: "0.0.0.0:0".parse().unwrap(),
168 bootstrap_nodes: vec![
169 "router.bittorrent.com:6881".into(),
170 "dht.transmissionbt.com:6881".into(),
171 "router.utorrent.com:6881".into(),
172 ],
173 own_id: None,
174 queries_per_second: 250,
175 query_timeout: Duration::from_secs(5),
176 address_family: AddressFamily::V4,
177 enforce_node_id: false,
178 restrict_routing_ips: true,
179 dht_max_items: 700,
180 dht_item_lifetime_secs: 7200,
181 max_routing_nodes: 512,
182 state_dir: None,
183 read_only_mode: false,
184 enable_multi_address: true,
185 }
186 }
187}
188
189impl DhtConfig {
190 #[must_use]
192 pub fn default_v6() -> Self {
193 Self {
194 bind_addr: "[::]:0".parse().unwrap(),
195 bootstrap_nodes: vec![
196 "router.bittorrent.com:6881".into(),
197 "dht.libtorrent.org:25401".into(),
198 ],
199 own_id: None,
200 queries_per_second: 250,
201 query_timeout: Duration::from_secs(5),
202 address_family: AddressFamily::V6,
203 enforce_node_id: false,
204 restrict_routing_ips: true,
205 dht_max_items: 700,
206 dht_item_lifetime_secs: 7200,
207 max_routing_nodes: 512,
208 state_dir: None,
209 read_only_mode: false,
210 enable_multi_address: true,
211 }
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct DhtStats {
218 pub node_id: Id20,
220 pub routing_table_size: usize,
222 pub bucket_count: usize,
224 pub peer_store_info_hashes: usize,
226 pub peer_store_peers: usize,
228 pub pending_queries: usize,
230 pub total_queries_sent: u64,
232 pub total_responses_received: u64,
234 pub dht_item_count: usize,
236}
237
238#[derive(Debug, Clone)]
240pub struct SampleInfohashesResult {
241 pub interval: i64,
243 pub num: i64,
245 pub samples: Vec<Id20>,
247 pub nodes: Vec<CompactNodeInfo>,
249}
250
251#[derive(Clone, Debug)]
253pub struct DhtHandle {
254 tx: mpsc::Sender<DhtCommand>,
255}
256
257enum DhtCommand {
258 GetPeers {
259 info_hash: Id20,
260 reply: mpsc::UnboundedSender<Vec<SocketAddr>>,
261 },
262 Announce {
263 info_hash: Id20,
264 port: u16,
265 reply: oneshot::Sender<Result<()>>,
266 },
267 Stats {
268 reply: oneshot::Sender<DhtStats>,
269 },
270 UpdateExternalIp {
271 ip: std::net::IpAddr,
272 source: IpVoteSource,
273 },
274 GetImmutable {
275 target: Id20,
276 reply: oneshot::Sender<Result<Option<Vec<u8>>>>,
277 },
278 PutImmutable {
279 value: Vec<u8>,
280 reply: oneshot::Sender<Result<Id20>>,
281 },
282 GetMutable {
283 public_key: [u8; 32],
284 salt: Vec<u8>,
285 #[allow(clippy::type_complexity)]
286 reply: oneshot::Sender<Result<Option<(Vec<u8>, i64)>>>,
287 },
288 PutMutable {
289 keypair_bytes: [u8; 32],
290 value: Vec<u8>,
291 seq: i64,
292 salt: Vec<u8>,
293 reply: oneshot::Sender<Result<Id20>>,
294 },
295 SampleInfohashes {
296 target: Id20,
297 reply: oneshot::Sender<Result<SampleInfohashesResult>>,
298 },
299 GetRoutingNodes {
300 reply: oneshot::Sender<Vec<(Id20, SocketAddr)>>,
301 },
302 SaveRoutingTable {
308 reply: oneshot::Sender<Result<()>>,
309 },
310 Shutdown {
315 reply: Option<oneshot::Sender<()>>,
316 },
317}
318
319impl DhtHandle {
320 pub async fn start(config: DhtConfig) -> Result<(Self, mpsc::Receiver<std::net::IpAddr>)> {
329 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await?);
330 let local_addr = socket.local_addr()?;
331 debug!(addr = %local_addr, "DHT socket bound");
332
333 let (tx, rx) = mpsc::channel(256);
334 let (ip_consensus_tx, ip_consensus_rx) = mpsc::channel(4);
335 let handle = Self { tx };
336
337 let actor = DhtActor::new(config, socket, rx, ip_consensus_tx);
338 tokio::spawn(actor.run());
339
340 Ok((handle, ip_consensus_rx))
341 }
342
343 pub async fn update_external_ip(
349 &self,
350 ip: std::net::IpAddr,
351 source: IpVoteSource,
352 ) -> Result<()> {
353 self.tx
354 .send(DhtCommand::UpdateExternalIp { ip, source })
355 .await
356 .map_err(|_| Error::Shutdown)
357 }
358
359 pub async fn get_peers(
368 &self,
369 info_hash: Id20,
370 ) -> Result<mpsc::UnboundedReceiver<Vec<SocketAddr>>> {
371 let (reply_tx, reply_rx) = mpsc::unbounded_channel();
372 self.tx
373 .send(DhtCommand::GetPeers {
374 info_hash,
375 reply: reply_tx,
376 })
377 .await
378 .map_err(|_| Error::Shutdown)?;
379 Ok(reply_rx)
380 }
381
382 pub async fn announce(&self, info_hash: Id20, port: u16) -> Result<()> {
388 let (reply_tx, reply_rx) = oneshot::channel();
389 self.tx
390 .send(DhtCommand::Announce {
391 info_hash,
392 port,
393 reply: reply_tx,
394 })
395 .await
396 .map_err(|_| Error::Shutdown)?;
397 reply_rx.await.map_err(|_| Error::Shutdown)?
398 }
399
400 pub async fn stats(&self) -> Result<DhtStats> {
406 let (reply_tx, reply_rx) = oneshot::channel();
407 self.tx
408 .send(DhtCommand::Stats { reply: reply_tx })
409 .await
410 .map_err(|_| Error::Shutdown)?;
411 reply_rx.await.map_err(|_| Error::Shutdown)
412 }
413
414 pub async fn node_count(&self) -> Result<usize> {
426 Ok(self.stats().await?.routing_table_size)
427 }
428
429 pub async fn shutdown(&self) -> Result<()> {
443 self.tx
444 .send(DhtCommand::Shutdown { reply: None })
445 .await
446 .map_err(|_| Error::Shutdown)
447 }
448
449 pub async fn shutdown_and_wait(&self) -> Result<()> {
464 let (reply_tx, reply_rx) = oneshot::channel();
465 self.tx
466 .send(DhtCommand::Shutdown {
467 reply: Some(reply_tx),
468 })
469 .await
470 .map_err(|_| Error::Shutdown)?;
471 reply_rx.await.map_err(|_| Error::Shutdown)
472 }
473
474 pub async fn save_routing_table(&self) -> Result<()> {
490 let (reply_tx, reply_rx) = oneshot::channel();
491 self.tx
492 .send(DhtCommand::SaveRoutingTable { reply: reply_tx })
493 .await
494 .map_err(|_| Error::Shutdown)?;
495 reply_rx.await.map_err(|_| Error::Shutdown)?
496 }
497
498 pub async fn put_immutable(&self, value: Vec<u8>) -> Result<Id20> {
508 let (reply_tx, reply_rx) = oneshot::channel();
509 self.tx
510 .send(DhtCommand::PutImmutable {
511 value,
512 reply: reply_tx,
513 })
514 .await
515 .map_err(|_| Error::Shutdown)?;
516 reply_rx.await.map_err(|_| Error::Shutdown)?
517 }
518
519 pub async fn get_immutable(&self, target: Id20) -> Result<Option<Vec<u8>>> {
527 let (reply_tx, reply_rx) = oneshot::channel();
528 self.tx
529 .send(DhtCommand::GetImmutable {
530 target,
531 reply: reply_tx,
532 })
533 .await
534 .map_err(|_| Error::Shutdown)?;
535 reply_rx.await.map_err(|_| Error::Shutdown)?
536 }
537
538 pub async fn put_mutable(
552 &self,
553 keypair_bytes: [u8; 32],
554 value: Vec<u8>,
555 seq: i64,
556 salt: Vec<u8>,
557 ) -> Result<Id20> {
558 let (reply_tx, reply_rx) = oneshot::channel();
559 self.tx
560 .send(DhtCommand::PutMutable {
561 keypair_bytes,
562 value,
563 seq,
564 salt,
565 reply: reply_tx,
566 })
567 .await
568 .map_err(|_| Error::Shutdown)?;
569 reply_rx.await.map_err(|_| Error::Shutdown)?
570 }
571
572 pub async fn sample_infohashes(&self, target: Id20) -> Result<SampleInfohashesResult> {
581 let (reply_tx, reply_rx) = oneshot::channel();
582 self.tx
583 .send(DhtCommand::SampleInfohashes {
584 target,
585 reply: reply_tx,
586 })
587 .await
588 .map_err(|_| Error::Shutdown)?;
589 reply_rx.await.map_err(|_| Error::Shutdown)?
590 }
591
592 pub async fn get_mutable(
600 &self,
601 public_key: [u8; 32],
602 salt: Vec<u8>,
603 ) -> Result<Option<(Vec<u8>, i64)>> {
604 let (reply_tx, reply_rx) = oneshot::channel();
605 self.tx
606 .send(DhtCommand::GetMutable {
607 public_key,
608 salt,
609 reply: reply_tx,
610 })
611 .await
612 .map_err(|_| Error::Shutdown)?;
613 reply_rx.await.map_err(|_| Error::Shutdown)?
614 }
615
616 pub async fn get_routing_nodes(&self) -> Vec<(Id20, SocketAddr)> {
618 let (reply_tx, reply_rx) = oneshot::channel();
619 let _ = self
620 .tx
621 .send(DhtCommand::GetRoutingNodes { reply: reply_tx })
622 .await;
623 reply_rx.await.unwrap_or_default()
624 }
625}
626
627struct DhtActor {
630 config: DhtConfig,
631 address_family: AddressFamily,
632 socket: Arc<UdpSocket>,
634 rx: mpsc::Receiver<DhtCommand>,
635 routing_table: Arc<parking_lot::RwLock<RoutingTable>>,
637 peer_store: PeerStore,
638 item_store: Box<dyn DhtStorage + Send>,
640 pending: Arc<DashMap<u16, PendingQuery>>,
642 next_txn_id: Arc<AtomicU16>,
644 stats: ActorStats,
645 announce_tokens: HashMap<Id20, HashMap<Id20, (SocketAddr, Vec<u8>)>>,
647 lookup_token_tx: mpsc::UnboundedSender<(Id20, Id20, SocketAddr, Vec<u8>)>,
649 lookup_token_rx: mpsc::UnboundedReceiver<(Id20, Id20, SocketAddr, Vec<u8>)>,
651 lookup_node_tx: mpsc::UnboundedSender<(Id20, SocketAddr)>,
653 lookup_node_rx: mpsc::UnboundedReceiver<(Id20, SocketAddr)>,
655 active_lookups: HashMap<Id20, tokio::task::JoinHandle<()>>,
657 item_lookups: HashMap<Id20, ItemLookupState>,
659 item_put_ops: HashMap<Id20, ItemPutState>,
661 ip_voter: ExternalIpVoter,
663 ip_consensus_tx: mpsc::Sender<std::net::IpAddr>,
665 sample_replies: HashMap<u16, oneshot::Sender<Result<SampleInfohashesResult>>>,
667 rate_limiter: Arc<SharedRateLimiter>,
669 bootstrap_lookup: Option<IterativeLookup<FindNodeCallbacks>>,
671 bootstrap_complete: bool,
673 pending_get_peers: Vec<(Id20, mpsc::UnboundedSender<Vec<SocketAddr>>)>,
676 bootstrap_timeout: Option<std::pin::Pin<Box<tokio::time::Sleep>>>,
678 last_ping: Instant,
680 dns_bootstrap_rx: Option<mpsc::Receiver<Vec<SocketAddr>>>,
684}
685
686struct ActorStats {
687 total_queries_sent: u64,
688 total_responses_received: u64,
689}
690
691pub(crate) struct PendingQuery {
693 pub sent_at: Instant,
694 pub addr: SocketAddr,
695 pub kind: PendingQueryKind,
696 pub node_id: Option<Id20>,
697 pub response_tx: Option<oneshot::Sender<PendingQueryResponse>>,
700}
701
702pub(crate) struct PendingQueryResponse {
704 pub sender_id: Id20,
705 pub response: KrpcResponse,
706}
707
708#[derive(Debug)]
709pub(crate) enum PendingQueryKind {
710 Ping,
711 FindNode,
712 GetPeers {
713 info_hash: Id20,
714 },
715 AnnouncePeer,
716 GetItem {
718 target: Id20,
719 },
720 PutItem,
722 SampleInfohashes,
724}
725
726enum ItemLookupState {
728 Immutable {
729 #[allow(clippy::type_complexity)]
730 reply: Option<oneshot::Sender<Result<Option<Vec<u8>>>>>,
731 queried: std::collections::HashSet<Id20>,
732 },
733 Mutable {
734 salt: Vec<u8>,
735 #[allow(clippy::type_complexity)]
736 reply: Option<oneshot::Sender<Result<Option<(Vec<u8>, i64)>>>>,
737 best_seq: i64,
738 best_value: Option<Vec<u8>>,
739 queried: std::collections::HashSet<Id20>,
740 },
741}
742
743enum ItemPutState {
745 Immutable {
746 item: crate::bep44::ImmutableItem,
747 tokens: HashMap<Id20, (SocketAddr, Vec<u8>)>,
748 sent_puts: usize,
749 reply: Option<oneshot::Sender<Result<Id20>>>,
750 },
751 Mutable {
752 item: crate::bep44::MutableItem,
753 tokens: HashMap<Id20, (SocketAddr, Vec<u8>)>,
754 sent_puts: usize,
755 reply: Option<oneshot::Sender<Result<Id20>>>,
756 },
757}
758
759struct PutItemParams {
761 addr: SocketAddr,
762 token: Vec<u8>,
763 value: Vec<u8>,
764 key: Option<[u8; 32]>,
765 signature: Option<[u8; 64]>,
766 seq: Option<i64>,
767 salt: Option<Vec<u8>>,
768}
769
770#[derive(serde::Serialize, serde::Deserialize)]
772struct DhtState {
773 node_id: String,
775 nodes: Vec<DhtNodeEntry>,
777}
778
779#[derive(serde::Serialize, serde::Deserialize)]
781struct DhtNodeEntry {
782 id: String,
784 addr: String,
786}
787
788const MAINTENANCE_INTERVAL: Duration = Duration::from_mins(1);
790const CLEANUP_INTERVAL: Duration = Duration::from_mins(5);
792const PING_INTERVAL: Duration = Duration::from_secs(5);
794
795impl DhtActor {
796 fn new(
797 config: DhtConfig,
798 socket: Arc<UdpSocket>,
799 rx: mpsc::Receiver<DhtCommand>,
800 ip_consensus_tx: mpsc::Sender<std::net::IpAddr>,
801 ) -> Self {
802 let own_id = config.own_id.unwrap_or_else(generate_node_id);
803 let address_family = config.address_family;
804 let restrict_ips = config.restrict_routing_ips;
805 let max_routing_nodes = config.max_routing_nodes;
806 debug!(id = %own_id, family = ?address_family, "DHT node ID");
807
808 let max_items = config.dht_max_items;
809 let queries_per_second = config.queries_per_second;
810
811 let (lookup_token_tx, lookup_token_rx) = mpsc::unbounded_channel();
812 let (lookup_node_tx, lookup_node_rx) = mpsc::unbounded_channel();
813
814 let mut actor = Self {
815 config,
816 address_family,
817 socket,
818 rx,
819 routing_table: Arc::new(parking_lot::RwLock::new(RoutingTable::with_config(
820 own_id,
821 restrict_ips,
822 max_routing_nodes,
823 ))),
824 peer_store: PeerStore::new(),
825 item_store: Box::new(InMemoryDhtStorage::new(max_items)),
826 pending: Arc::new(DashMap::new()),
827 next_txn_id: Arc::new(AtomicU16::new(1)),
828 stats: ActorStats {
829 total_queries_sent: 0,
830 total_responses_received: 0,
831 },
832 announce_tokens: HashMap::new(),
833 lookup_token_tx,
834 lookup_token_rx,
835 lookup_node_tx,
836 lookup_node_rx,
837 active_lookups: HashMap::new(),
838 item_lookups: HashMap::new(),
839 item_put_ops: HashMap::new(),
840 ip_voter: ExternalIpVoter::new(10),
841 ip_consensus_tx,
842 sample_replies: HashMap::new(),
843 rate_limiter: Arc::new(SharedRateLimiter::new(queries_per_second)),
844 bootstrap_lookup: None,
845 bootstrap_complete: false,
846 pending_get_peers: Vec::new(),
847 bootstrap_timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(10)))),
848 last_ping: Instant::now(),
849 dns_bootstrap_rx: None,
850 };
851
852 actor.load_routing_table();
855
856 actor
857 }
858
859 async fn run(mut self) {
860 self.bootstrap().await;
862
863 let mut recv_buf = vec![0u8; 65535];
864 let mut maintenance_tick = tokio::time::interval(MAINTENANCE_INTERVAL);
865 let mut cleanup_tick = tokio::time::interval(CLEANUP_INTERVAL);
866 let mut query_timeout_tick = tokio::time::interval(self.config.query_timeout);
867 let mut ping_tick = tokio::time::interval(PING_INTERVAL);
868
869 loop {
870 tokio::select! {
871 result = self.socket.recv_from(&mut recv_buf) => {
873 match result {
874 Ok((n, addr)) => {
875 self.handle_packet(&recv_buf[..n], addr).await;
876 }
877 Err(e) => {
878 warn!(error = %e, "UDP recv error");
879 }
880 }
881 }
882
883 cmd = self.rx.recv() => {
885 match cmd {
886 Some(DhtCommand::GetPeers { info_hash, reply }) => {
887 self.start_get_peers(info_hash, reply);
888 }
889 Some(DhtCommand::Announce { info_hash, port, reply }) => {
890 self.handle_announce(info_hash, port, reply).await;
891 }
892 Some(DhtCommand::Stats { reply }) => {
893 let _ = reply.send(self.make_stats());
894 }
895 Some(DhtCommand::UpdateExternalIp { ip, source }) => {
896 let source_id = source.source_id();
897 if let Some(consensus_ip) = self.ip_voter.add_vote(source_id, ip) {
898 debug!(%consensus_ip, "BEP 42: external IP consensus (via NAT/tracker)");
899 let _ = self.ip_consensus_tx.try_send(consensus_ip);
900 self.regenerate_node_id(consensus_ip);
901 }
902 }
903 Some(DhtCommand::GetImmutable { target, reply }) => {
904 self.handle_get_immutable(target, reply).await;
905 }
906 Some(DhtCommand::PutImmutable { value, reply }) => {
907 self.handle_put_immutable(value, reply).await;
908 }
909 Some(DhtCommand::GetMutable { public_key, salt, reply }) => {
910 self.handle_get_mutable(public_key, salt, reply).await;
911 }
912 Some(DhtCommand::PutMutable { keypair_bytes, value, seq, salt, reply }) => {
913 self.handle_put_mutable(keypair_bytes, value, seq, salt, reply).await;
914 }
915 Some(DhtCommand::SampleInfohashes { target, reply }) => {
916 self.handle_sample_infohashes(target, reply).await;
917 }
918 Some(DhtCommand::GetRoutingNodes { reply }) => {
919 let nodes = self.routing_table.read().all_nodes();
920 let _ = reply.send(nodes);
921 }
922 Some(DhtCommand::SaveRoutingTable { reply }) => {
923 self.save_routing_table();
931 let _ = reply.send(Ok(()));
932 }
933 Some(DhtCommand::Shutdown { reply }) => {
934 debug!("DHT shutting down — persisting routing table");
935 self.save_routing_table();
942 if let Some(tx) = reply {
943 let _ = tx.send(());
944 }
945 return;
946 }
947 None => {
948 debug!("DHT shutting down (cmd channel closed) — persisting routing table");
949 self.save_routing_table();
953 return;
954 }
955 }
956 }
957
958 _ = query_timeout_tick.tick() => {
961 self.expire_queries_and_advance_lookups().await;
962 }
963
964 _ = maintenance_tick.tick() => {
966 self.maintenance().await;
967 }
968
969 _ = cleanup_tick.tick() => {
971 self.peer_store.cleanup();
972 self.item_store.expire(
973 Duration::from_secs(self.config.dht_item_lifetime_secs)
974 );
975 }
976
977 _ = ping_tick.tick() => {
979 let ping_interval = if self.bootstrap_complete {
983 Duration::from_mins(1)
984 } else {
985 Duration::from_secs(5)
986 };
987 if self.last_ping.elapsed() >= ping_interval {
988 self.ping_questionable_nodes().await;
989 self.last_ping = Instant::now();
990 }
991 self.drain_pending_if_table_ready();
994 }
995
996 () = async {
998 match &mut self.bootstrap_timeout {
999 Some(timer) => timer.as_mut().await,
1000 None => std::future::pending().await,
1001 }
1002 }, if self.bootstrap_timeout.is_some() && !self.bootstrap_complete => {
1003 warn!(
1004 table_size = self.routing_table.read().len(),
1005 "bootstrap timeout (10s), proceeding with current routing table"
1006 );
1007 self.on_bootstrap_complete();
1008 }
1009
1010 result = async {
1012 match &mut self.dns_bootstrap_rx {
1013 Some(rx) => rx.recv().await,
1014 None => std::future::pending().await,
1015 }
1016 } => {
1017 let own_id = *self.routing_table.read().own_id();
1018 if let Some(addrs) = result {
1019 if self.bootstrap_lookup.is_none() && !self.bootstrap_complete {
1023 debug!(
1024 dns_addrs = addrs.len(),
1025 "restarting bootstrap lookup from DNS results"
1026 );
1027 self.bootstrap_lookup = Some(IterativeLookup::new(
1028 own_id,
1029 FindNodeCallbacks {
1030 round: 0,
1031 max_rounds: 6,
1032 },
1033 ));
1034 }
1035 for addr in addrs {
1036 self.send_find_node(addr, own_id, None).await;
1037 }
1038 } else {
1039 debug!("DNS bootstrap tasks completed");
1041 self.dns_bootstrap_rx = None;
1042 }
1043 }
1044
1045 Some((info_hash, node_id, addr, token)) = self.lookup_token_rx.recv() => {
1047 self.announce_tokens
1048 .entry(info_hash)
1049 .or_default()
1050 .insert(node_id, (addr, token));
1051 }
1052
1053 Some((id, addr)) = self.lookup_node_rx.recv() => {
1055 self.checked_insert(id, addr, false);
1056 }
1057 }
1058 }
1059 }
1060
1061 async fn bootstrap(&mut self) {
1062 let own_id = *self.routing_table.read().own_id();
1063
1064 let (saved_addrs, hostname_strs): (Vec<_>, Vec<_>) = self
1068 .config
1069 .bootstrap_nodes
1070 .clone()
1071 .into_iter()
1072 .partition(|s| s.parse::<SocketAddr>().is_ok());
1073
1074 debug!(
1075 saved_nodes = saved_addrs.len(),
1076 dns_nodes = hostname_strs.len(),
1077 family = ?self.address_family,
1078 "bootstrap: starting (pinging saved nodes, resolving DNS nodes)"
1079 );
1080
1081 for addr_str in &saved_addrs {
1084 if let Ok(addr) = addr_str.parse::<SocketAddr>() {
1085 self.send_ping(addr, None).await;
1086 }
1087 }
1088
1089 if !hostname_strs.is_empty() {
1095 let (dns_tx, dns_rx) = mpsc::channel(16);
1096 for hostname in hostname_strs {
1097 let tx = dns_tx.clone();
1098 let family = self.address_family;
1099 tokio::spawn(async move {
1100 dns_bootstrap_resolve(hostname, family, tx).await;
1101 });
1102 }
1103 drop(dns_tx); self.dns_bootstrap_rx = Some(dns_rx);
1105 }
1106
1107 let initial_closest: Vec<CompactNodeInfo> = self
1109 .routing_table
1110 .read()
1111 .closest(&own_id, K)
1112 .into_iter()
1113 .map(|n| CompactNodeInfo {
1114 id: n.id,
1115 addr: n.addr,
1116 })
1117 .collect();
1118
1119 debug!(
1120 initial_nodes = initial_closest.len(),
1121 table_size = self.routing_table.read().len(),
1122 "bootstrap: starting iterative lookup"
1123 );
1124
1125 let mut lookup = IterativeLookup::new(
1126 own_id,
1127 FindNodeCallbacks {
1128 round: 0,
1129 max_rounds: 6,
1130 },
1131 );
1132 lookup.closest = initial_closest;
1133 self.bootstrap_lookup = Some(lookup);
1134 }
1135
1136 async fn handle_packet(&mut self, data: &[u8], addr: SocketAddr) {
1137 let msg = match KrpcMessage::from_bytes(data) {
1138 Ok(msg) => msg,
1139 Err(e) => {
1140 trace!(error = %e, from = %addr, "invalid KRPC message");
1141 return;
1142 }
1143 };
1144
1145 match &msg.body {
1146 KrpcBody::Query(query) => {
1147 self.handle_query(&msg, query, addr).await;
1148 }
1149 KrpcBody::Response(resp) => {
1150 self.handle_response(&msg, resp, addr).await;
1151 }
1152 KrpcBody::Error { code, message } => {
1153 trace!(code, message, from = %addr, "KRPC error received");
1154 let txn = msg.transaction_id.as_u16();
1156 if let Some((_, pending)) = self.pending.remove(&txn)
1157 && let Some(nid) = pending.node_id
1158 {
1159 self.routing_table.write().mark_failed(&nid);
1160 }
1161 }
1162 }
1163 }
1164
1165 fn matches_family(&self, addr: &SocketAddr) -> bool {
1167 match self.address_family {
1168 AddressFamily::V4 => addr.is_ipv4(),
1169 AddressFamily::V6 => addr.is_ipv6(),
1170 }
1171 }
1172
1173 fn outgoing_want(&self) -> Option<Vec<crate::krpc::WantFamily>> {
1177 if self.config.enable_multi_address {
1178 Some(vec![
1179 crate::krpc::WantFamily::N4,
1180 crate::krpc::WantFamily::N6,
1181 ])
1182 } else {
1183 None
1184 }
1185 }
1186
1187 async fn handle_query(&mut self, msg: &KrpcMessage, query: &KrpcQuery, addr: SocketAddr) {
1188 if !self.matches_family(&addr) {
1189 return; }
1191 let sender_id = *query.sender_id();
1192 self.checked_insert(sender_id, addr, msg.read_only);
1193 self.routing_table.write().mark_query(&sender_id);
1194
1195 let own_id = *self.routing_table.read().own_id();
1196 let response = match query {
1197 KrpcQuery::Ping { id: _ } => KrpcResponse::NodeId { id: own_id },
1198 KrpcQuery::FindNode {
1199 id: _,
1200 target,
1201 want: _,
1202 } => {
1203 let closest = self.routing_table.read().closest(target, K);
1204 let nodes: Vec<CompactNodeInfo> = closest
1205 .into_iter()
1206 .map(|n| CompactNodeInfo {
1207 id: n.id,
1208 addr: n.addr,
1209 })
1210 .collect();
1211 KrpcResponse::FindNode {
1212 id: own_id,
1213 nodes,
1214 nodes6: Vec::new(),
1215 }
1216 }
1217 KrpcQuery::GetPeers {
1218 id: _,
1219 info_hash,
1220 noseed: _,
1221 scrape,
1222 want: _,
1223 } => {
1224 let ip = addr.ip();
1225 let token = self.peer_store.generate_token(&ip);
1226 let peers = self.peer_store.get_peers(info_hash, 50);
1227
1228 let (bfpe, bfsd) = if *scrape == Some(1) {
1230 let all_peers = self.peer_store.all_peers(info_hash);
1231 let mut filter = crate::bloom::ScrapeBloomFilter::new();
1232 for peer_addr in &all_peers {
1233 filter.insert(*peer_addr);
1234 }
1235 (Some(filter.as_bytes().to_vec()), None)
1236 } else {
1237 (None, None)
1238 };
1239
1240 if peers.is_empty() {
1241 let closest = self.routing_table.read().closest(info_hash, K);
1242 let nodes: Vec<CompactNodeInfo> = closest
1243 .into_iter()
1244 .map(|n| CompactNodeInfo {
1245 id: n.id,
1246 addr: n.addr,
1247 })
1248 .collect();
1249 KrpcResponse::GetPeers(GetPeersResponse {
1250 id: own_id,
1251 token: Some(token),
1252 peers: Vec::new(),
1253 nodes,
1254 nodes6: Vec::new(),
1255 bfpe,
1256 bfsd,
1257 })
1258 } else {
1259 KrpcResponse::GetPeers(GetPeersResponse {
1260 id: own_id,
1261 token: Some(token),
1262 peers,
1263 nodes: Vec::new(),
1264 nodes6: Vec::new(),
1265 bfpe,
1266 bfsd,
1267 })
1268 }
1269 }
1270 KrpcQuery::AnnouncePeer {
1271 id: _,
1272 info_hash,
1273 port,
1274 implied_port,
1275 token,
1276 } => {
1277 let ip = addr.ip();
1278 if !self.peer_store.validate_token(token, &ip) {
1279 let err_msg = KrpcMessage {
1281 transaction_id: msg.transaction_id,
1282 body: KrpcBody::Error {
1283 code: 203,
1284 message: "invalid token".into(),
1285 },
1286 sender_ip: Some(addr),
1287 read_only: false,
1288 };
1289 if let Ok(bytes) = err_msg.to_bytes() {
1290 let _ = self.socket.send_to(&bytes, addr).await;
1291 }
1292 return;
1293 }
1294 let peer_port = if *implied_port { addr.port() } else { *port };
1295 let peer_addr = SocketAddr::new(addr.ip(), peer_port);
1296 self.peer_store.add_peer(*info_hash, peer_addr);
1297 KrpcResponse::NodeId {
1298 id: *self.routing_table.read().own_id(),
1299 }
1300 }
1301 KrpcQuery::Get {
1303 id: _,
1304 target,
1305 seq: requested_seq,
1306 } => {
1307 let ip = addr.ip();
1308 let token = self.peer_store.generate_token(&ip);
1309
1310 if let Some(item) = self.item_store.get_immutable(target) {
1312 KrpcResponse::GetItem {
1313 id: *self.routing_table.read().own_id(),
1314 token: Some(token),
1315 nodes: Vec::new(),
1316 nodes6: Vec::new(),
1317 value: Some(item.value),
1318 key: None,
1319 signature: None,
1320 seq: None,
1321 }
1322 } else if let Some(item) = self.item_store.get_mutable_by_target(target) {
1323 if let Some(min_seq) = requested_seq {
1325 if item.seq <= *min_seq {
1326 let closest = self.routing_table.read().closest(target, K);
1328 let nodes: Vec<CompactNodeInfo> = closest
1329 .into_iter()
1330 .map(|n| CompactNodeInfo {
1331 id: n.id,
1332 addr: n.addr,
1333 })
1334 .collect();
1335 KrpcResponse::GetItem {
1336 id: *self.routing_table.read().own_id(),
1337 token: Some(token),
1338 nodes,
1339 nodes6: Vec::new(),
1340 value: None,
1341 key: Some(item.public_key),
1342 signature: Some(item.signature),
1343 seq: Some(item.seq),
1344 }
1345 } else {
1346 KrpcResponse::GetItem {
1347 id: *self.routing_table.read().own_id(),
1348 token: Some(token),
1349 nodes: Vec::new(),
1350 nodes6: Vec::new(),
1351 value: Some(item.value),
1352 key: Some(item.public_key),
1353 signature: Some(item.signature),
1354 seq: Some(item.seq),
1355 }
1356 }
1357 } else {
1358 KrpcResponse::GetItem {
1359 id: *self.routing_table.read().own_id(),
1360 token: Some(token),
1361 nodes: Vec::new(),
1362 nodes6: Vec::new(),
1363 value: Some(item.value),
1364 key: Some(item.public_key),
1365 signature: Some(item.signature),
1366 seq: Some(item.seq),
1367 }
1368 }
1369 } else {
1370 let closest = self.routing_table.read().closest(target, K);
1372 let nodes: Vec<CompactNodeInfo> = closest
1373 .into_iter()
1374 .map(|n| CompactNodeInfo {
1375 id: n.id,
1376 addr: n.addr,
1377 })
1378 .collect();
1379 KrpcResponse::GetItem {
1380 id: *self.routing_table.read().own_id(),
1381 token: Some(token),
1382 nodes,
1383 nodes6: Vec::new(),
1384 value: None,
1385 key: None,
1386 signature: None,
1387 seq: None,
1388 }
1389 }
1390 }
1391 KrpcQuery::Put {
1393 id: _,
1394 token,
1395 value,
1396 key,
1397 signature,
1398 seq,
1399 salt,
1400 cas,
1401 } => {
1402 let ip = addr.ip();
1403
1404 if !self.peer_store.validate_token(token, &ip) {
1406 let err_msg = KrpcMessage {
1407 transaction_id: msg.transaction_id,
1408 body: KrpcBody::Error {
1409 code: 203,
1410 message: "invalid token".into(),
1411 },
1412 sender_ip: Some(addr),
1413 read_only: false,
1414 };
1415 if let Ok(bytes) = err_msg.to_bytes() {
1416 let _ = self.socket.send_to(&bytes, addr).await;
1417 }
1418 return;
1419 }
1420
1421 if value.len() > MAX_VALUE_SIZE {
1423 let err_msg = KrpcMessage {
1424 transaction_id: msg.transaction_id,
1425 body: KrpcBody::Error {
1426 code: 205,
1427 message: "message (v field) too big".into(),
1428 },
1429 sender_ip: Some(addr),
1430 read_only: false,
1431 };
1432 if let Ok(bytes) = err_msg.to_bytes() {
1433 let _ = self.socket.send_to(&bytes, addr).await;
1434 }
1435 return;
1436 }
1437
1438 if let (Some(k), Some(sig), Some(seq_val)) = (key, signature, seq) {
1439 let salt_bytes = salt.clone().unwrap_or_default();
1441
1442 if salt_bytes.len() > MAX_SALT_SIZE {
1444 let err_msg = KrpcMessage {
1445 transaction_id: msg.transaction_id,
1446 body: KrpcBody::Error {
1447 code: 207,
1448 message: "salt (salt field) too big".into(),
1449 },
1450 sender_ip: Some(addr),
1451 read_only: false,
1452 };
1453 if let Ok(bytes) = err_msg.to_bytes() {
1454 let _ = self.socket.send_to(&bytes, addr).await;
1455 }
1456 return;
1457 }
1458
1459 let item = MutableItem {
1460 value: value.clone(),
1461 public_key: *k,
1462 signature: *sig,
1463 seq: *seq_val,
1464 salt: salt_bytes,
1465 target: bep44::compute_mutable_target(k, salt.as_deref().unwrap_or(&[])),
1466 };
1467
1468 if !item.verify() {
1470 let err_msg = KrpcMessage {
1471 transaction_id: msg.transaction_id,
1472 body: KrpcBody::Error {
1473 code: 206,
1474 message: "invalid signature".into(),
1475 },
1476 sender_ip: Some(addr),
1477 read_only: false,
1478 };
1479 if let Ok(bytes) = err_msg.to_bytes() {
1480 let _ = self.socket.send_to(&bytes, addr).await;
1481 }
1482 return;
1483 }
1484
1485 if let Some(expected_seq) = cas
1487 && let Some(existing) = self.item_store.get_mutable(k, &item.salt)
1488 && existing.seq != *expected_seq
1489 {
1490 let err_msg = KrpcMessage {
1491 transaction_id: msg.transaction_id,
1492 body: KrpcBody::Error {
1493 code: 301,
1494 message: format!(
1495 "CAS mismatch: expected seq {}, got {}",
1496 expected_seq, existing.seq
1497 ),
1498 },
1499 sender_ip: Some(addr),
1500 read_only: false,
1501 };
1502 if let Ok(bytes) = err_msg.to_bytes() {
1503 let _ = self.socket.send_to(&bytes, addr).await;
1504 }
1505 return;
1506 }
1507
1508 if let Some(existing) = self.item_store.get_mutable(k, &item.salt)
1510 && *seq_val <= existing.seq
1511 {
1512 let err_msg = KrpcMessage {
1513 transaction_id: msg.transaction_id,
1514 body: KrpcBody::Error {
1515 code: 302,
1516 message: format!(
1517 "sequence number not newer: {} <= {}",
1518 seq_val, existing.seq
1519 ),
1520 },
1521 sender_ip: Some(addr),
1522 read_only: false,
1523 };
1524 if let Ok(bytes) = err_msg.to_bytes() {
1525 let _ = self.socket.send_to(&bytes, addr).await;
1526 }
1527 return;
1528 }
1529
1530 self.item_store.put_mutable(item);
1531 } else {
1532 if let Ok(item) = ImmutableItem::new(value.clone()) {
1534 self.item_store.put_immutable(item);
1535 } else {
1536 let err_msg = KrpcMessage {
1537 transaction_id: msg.transaction_id,
1538 body: KrpcBody::Error {
1539 code: 205,
1540 message: "message (v field) too big".into(),
1541 },
1542 sender_ip: Some(addr),
1543 read_only: false,
1544 };
1545 if let Ok(bytes) = err_msg.to_bytes() {
1546 let _ = self.socket.send_to(&bytes, addr).await;
1547 }
1548 return;
1549 }
1550 }
1551
1552 KrpcResponse::NodeId {
1553 id: *self.routing_table.read().own_id(),
1554 }
1555 }
1556 KrpcQuery::SampleInfohashes { id: _, target } => {
1558 let closest = self.routing_table.read().closest(target, K);
1559 let nodes: Vec<CompactNodeInfo> = closest
1560 .into_iter()
1561 .map(|n| CompactNodeInfo {
1562 id: n.id,
1563 addr: n.addr,
1564 })
1565 .collect();
1566
1567 let samples = self.peer_store.random_info_hashes(20);
1569 let num = self.peer_store.info_hash_count() as i64;
1570
1571 KrpcResponse::SampleInfohashes(SampleInfohashesResponse {
1572 id: *self.routing_table.read().own_id(),
1573 interval: 60, num,
1575 samples,
1576 nodes,
1577 })
1578 }
1579 };
1580
1581 let reply = KrpcMessage {
1582 transaction_id: msg.transaction_id,
1583 body: KrpcBody::Response(response),
1584 sender_ip: Some(addr), read_only: false, };
1587 if let Ok(bytes) = reply.to_bytes() {
1588 let _ = self.socket.send_to(&bytes, addr).await;
1589 }
1590 }
1591
1592 async fn handle_response(&mut self, msg: &KrpcMessage, resp: &KrpcResponse, addr: SocketAddr) {
1593 if !self.matches_family(&addr) {
1594 return; }
1596 self.stats.total_responses_received += 1;
1597
1598 if let Some(reported_ip) = msg.sender_ip {
1600 let source_id = hash_source_addr(&addr);
1601 if let Some(consensus_ip) = self.ip_voter.add_vote(source_id, reported_ip.ip()) {
1602 debug!(%consensus_ip, "BEP 42: external IP consensus changed");
1603 let _ = self.ip_consensus_tx.try_send(consensus_ip);
1604 self.regenerate_node_id(consensus_ip);
1605 }
1606 }
1607
1608 let sender_id = *resp.sender_id();
1609 self.checked_insert(sender_id, addr, false);
1610 self.routing_table.write().mark_response(&sender_id);
1611
1612 self.drain_pending_if_table_ready();
1614
1615 let txn = msg.transaction_id.as_u16();
1616 let Some((_, pending)) = self.pending.remove(&txn) else {
1617 trace!(txn, from = %addr, "response for unknown transaction");
1618 return;
1619 };
1620
1621 if let Some(response_tx) = pending.response_tx {
1627 self.checked_insert(sender_id, pending.addr, false);
1628 let _ = response_tx.send(PendingQueryResponse {
1629 sender_id,
1630 response: resp.clone(),
1631 });
1632 return;
1633 }
1634
1635 match (&pending.kind, resp) {
1636 (PendingQueryKind::FindNode, KrpcResponse::FindNode { nodes, nodes6, .. }) => {
1637 for node in nodes {
1638 if self.matches_family(&node.addr) {
1639 self.checked_insert(node.id, node.addr, false);
1640 }
1641 }
1642 for node in nodes6 {
1643 if self.matches_family(&node.addr) {
1644 self.checked_insert(node.id, node.addr, false);
1645 }
1646 }
1647
1648 if let Some(ref mut lookup) = self.bootstrap_lookup {
1650 let mut all_nodes: Vec<CompactNodeInfo> = nodes.clone();
1652 all_nodes.extend(nodes6.iter().map(|n| CompactNodeInfo {
1653 id: n.id,
1654 addr: n.addr,
1655 }));
1656 lookup.feed_nodes(all_nodes, self.address_family);
1657 }
1658
1659 if self.bootstrap_lookup.is_some() {
1660 let (to_query, target, terminate) =
1662 if let Some(ref mut lookup) = self.bootstrap_lookup {
1663 if lookup.callbacks.round >= lookup.callbacks.max_rounds {
1664 (Vec::new(), lookup.target, true)
1665 } else {
1666 let to_query = lookup.next_to_query(3);
1667 let target = lookup.target;
1668 if to_query.is_empty() {
1669 (Vec::new(), target, true)
1670 } else {
1671 lookup.callbacks.round += 1;
1672 (to_query, target, false)
1673 }
1674 }
1675 } else {
1676 (Vec::new(), Id20::ZERO, false)
1677 };
1678
1679 if terminate {
1680 debug!(
1681 routing_table_size = self.routing_table.read().len(),
1682 "iterative bootstrap complete"
1683 );
1684 self.bootstrap_lookup = None;
1685 self.on_bootstrap_complete();
1686 } else {
1687 let queries: Vec<(SocketAddr, Id20)> =
1688 to_query.iter().map(|n| (n.addr, n.id)).collect();
1689 for (node_addr, nid) in queries {
1690 self.send_find_node(node_addr, target, Some(nid)).await;
1691 }
1692 }
1693 }
1694 }
1695 (PendingQueryKind::GetPeers { info_hash }, KrpcResponse::GetPeers(gp)) => {
1696 for node in &gp.nodes {
1701 if self.matches_family(&node.addr) {
1702 self.checked_insert(node.id, node.addr, false);
1703 }
1704 }
1705 for node in &gp.nodes6 {
1706 if self.matches_family(&node.addr) {
1707 self.checked_insert(node.id, node.addr, false);
1708 }
1709 }
1710 trace!(%info_hash, "get_peers response for orphaned lookup");
1711 }
1712 (PendingQueryKind::Ping, KrpcResponse::NodeId { .. }) => {
1713 if !self.bootstrap_complete {
1715 debug!(
1716 from = %pending.addr,
1717 table_size = self.routing_table.read().len(),
1718 "bootstrap: ping response received"
1719 );
1720 }
1721 }
1722 (
1723 PendingQueryKind::AnnouncePeer | PendingQueryKind::PutItem,
1724 KrpcResponse::NodeId { .. },
1725 ) => {
1726 }
1728 (PendingQueryKind::SampleInfohashes, KrpcResponse::SampleInfohashes(si)) => {
1729 for node in &si.nodes {
1731 if self.matches_family(&node.addr) {
1732 self.checked_insert(node.id, node.addr, false);
1733 }
1734 }
1735
1736 if let Some(reply) = self.sample_replies.remove(&txn) {
1738 let _ = reply.send(Ok(SampleInfohashesResult {
1739 interval: si.interval,
1740 num: si.num,
1741 samples: si.samples.clone(),
1742 nodes: si.nodes.clone(),
1743 }));
1744 }
1745 }
1746 (
1747 PendingQueryKind::GetItem { target },
1748 KrpcResponse::GetItem {
1749 token,
1750 nodes,
1751 nodes6,
1752 value,
1753 key,
1754 signature,
1755 seq,
1756 ..
1757 },
1758 ) => {
1759 for node in nodes {
1761 if self.matches_family(&node.addr) {
1762 self.checked_insert(node.id, node.addr, false);
1763 }
1764 }
1765 for node in nodes6 {
1766 if self.matches_family(&node.addr) {
1767 self.checked_insert(node.id, node.addr, false);
1768 }
1769 }
1770
1771 let target = *target;
1772
1773 if let (Some(token), Some(put_op)) = (token, self.item_put_ops.get_mut(&target)) {
1775 match put_op {
1776 ItemPutState::Immutable { tokens, .. }
1777 | ItemPutState::Mutable { tokens, .. } => {
1778 tokens.insert(sender_id, (addr, token.clone()));
1779 }
1780 }
1781
1782 let should_send = match &self.item_put_ops[&target] {
1784 ItemPutState::Immutable {
1785 tokens, sent_puts, ..
1786 }
1787 | ItemPutState::Mutable {
1788 tokens, sent_puts, ..
1789 } => tokens.len() >= K && *sent_puts == 0,
1790 };
1791
1792 if should_send {
1793 self.send_pending_puts(target).await;
1794 }
1795 }
1796
1797 if self.item_lookups.contains_key(&target) {
1799 let is_immutable = matches!(
1801 self.item_lookups.get(&target),
1802 Some(ItemLookupState::Immutable { .. })
1803 );
1804
1805 if is_immutable {
1806 if let Some(v) = value {
1807 if irontide_core::sha1(v) == target {
1809 if let Ok(item) = crate::bep44::ImmutableItem::new(v.clone()) {
1811 self.item_store.put_immutable(item);
1812 }
1813 if let Some(ItemLookupState::Immutable { reply, .. }) =
1814 self.item_lookups.get_mut(&target)
1815 && let Some(r) = reply.take()
1816 {
1817 let _ = r.send(Ok(Some(v.clone())));
1818 }
1819 }
1820 } else {
1821 let family = self.address_family;
1824 let to_query: Vec<SocketAddr> = {
1825 if let Some(ItemLookupState::Immutable { queried, .. }) =
1826 self.item_lookups.get_mut(&target)
1827 {
1828 nodes
1829 .iter()
1830 .filter(|n| match family {
1831 AddressFamily::V4 => n.addr.is_ipv4(),
1832 AddressFamily::V6 => n.addr.is_ipv6(),
1833 })
1834 .filter(|n| queried.insert(n.id))
1835 .take(3)
1836 .map(|n| n.addr)
1837 .collect()
1838 } else {
1839 vec![]
1840 }
1841 };
1842 for query_addr in to_query {
1843 self.send_get_item(query_addr, target, None).await;
1844 }
1845 }
1846 } else {
1847 if let (Some(v), Some(k), Some(sig), Some(s)) = (value, key, signature, seq)
1849 {
1850 let salt = if let Some(ItemLookupState::Mutable { salt, .. }) =
1852 self.item_lookups.get(&target)
1853 {
1854 salt.clone()
1855 } else {
1856 Vec::new()
1857 };
1858
1859 let item = crate::bep44::MutableItem {
1860 value: v.clone(),
1861 public_key: *k,
1862 signature: *sig,
1863 seq: *s,
1864 salt,
1865 target,
1866 };
1867
1868 if item.verify()
1869 && let Some(ItemLookupState::Mutable {
1870 best_seq,
1871 best_value,
1872 ..
1873 }) = self.item_lookups.get_mut(&target)
1874 && *s > *best_seq
1875 {
1876 *best_seq = *s;
1877 *best_value = Some(v.clone());
1878 self.item_store.put_mutable(item);
1880 }
1881 }
1882
1883 let family = self.address_family;
1885 let to_query: Vec<SocketAddr> = {
1886 if let Some(ItemLookupState::Mutable { queried, .. }) =
1887 self.item_lookups.get_mut(&target)
1888 {
1889 nodes
1890 .iter()
1891 .filter(|n| match family {
1892 AddressFamily::V4 => n.addr.is_ipv4(),
1893 AddressFamily::V6 => n.addr.is_ipv6(),
1894 })
1895 .filter(|n| queried.insert(n.id))
1896 .take(3)
1897 .map(|n| n.addr)
1898 .collect()
1899 } else {
1900 vec![]
1901 }
1902 };
1903 for query_addr in to_query {
1904 self.send_get_item(query_addr, target, None).await;
1905 }
1906 }
1907 }
1908 }
1909 _ => {
1910 trace!(txn, "mismatched response type");
1911 }
1912 }
1913 }
1914
1915 fn start_get_peers(&mut self, info_hash: Id20, reply: mpsc::UnboundedSender<Vec<SocketAddr>>) {
1916 if !self.bootstrap_complete && self.routing_table.read().is_empty() {
1926 debug!(
1927 %info_hash,
1928 "get_peers: routing table empty, queuing until first node arrives"
1929 );
1930 self.pending_get_peers.push((info_hash, reply));
1931 return;
1932 }
1933 self.start_get_peers_inner(info_hash, reply);
1934 }
1935
1936 fn start_get_peers_inner(
1937 &mut self,
1938 info_hash: Id20,
1939 reply: mpsc::UnboundedSender<Vec<SocketAddr>>,
1940 ) {
1941 debug!(
1942 %info_hash,
1943 table_size = self.routing_table.read().len(),
1944 "starting get_peers query"
1945 );
1946
1947 let own_id = *self.routing_table.read().own_id();
1960 debug!(
1961 family = ?self.address_family,
1962 %info_hash,
1963 table_size = self.routing_table.read().len(),
1964 "get_peers: spawning DhtLookup"
1965 );
1966
1967 let lookup = crate::dht_lookup::DhtLookup::new(
1968 info_hash,
1969 crate::dht_lookup::LookupConfig {
1970 max_depth: 4,
1971 max_nodes: 256,
1972 },
1973 self.address_family,
1974 self.socket.clone(),
1975 self.pending.clone(),
1976 self.rate_limiter.clone(),
1977 self.routing_table.clone(),
1978 self.next_txn_id.clone(),
1979 own_id,
1980 reply,
1981 self.lookup_token_tx.clone(),
1982 self.lookup_node_tx.clone(),
1983 self.config.read_only_mode,
1984 self.outgoing_want(),
1985 );
1986
1987 let handle = tokio::spawn(lookup.run());
1988 if let Some(old_handle) = self.active_lookups.insert(info_hash, handle) {
1991 drop(old_handle);
1994 }
1995 }
1996
1997 fn on_bootstrap_complete(&mut self) {
2000 if self.bootstrap_complete {
2001 return;
2002 }
2003 self.bootstrap_complete = true;
2004 self.bootstrap_timeout = None;
2005
2006 let pending = std::mem::take(&mut self.pending_get_peers);
2007 debug!(
2008 count = pending.len(),
2009 table_size = self.routing_table.read().len(),
2010 "bootstrap complete, processing queued get_peers"
2011 );
2012 for (info_hash, reply) in pending {
2013 self.start_get_peers_inner(info_hash, reply);
2014 }
2015 }
2016
2017 fn drain_pending_if_table_ready(&mut self) {
2021 if self.pending_get_peers.is_empty() || self.routing_table.read().is_empty() {
2022 return;
2023 }
2024 let pending = std::mem::take(&mut self.pending_get_peers);
2025 debug!(
2026 count = pending.len(),
2027 table_size = self.routing_table.read().len(),
2028 "routing table populated, draining queued get_peers"
2029 );
2030 for (info_hash, reply) in pending {
2031 self.start_get_peers_inner(info_hash, reply);
2032 }
2033 }
2034
2035 async fn handle_announce(
2036 &mut self,
2037 info_hash: Id20,
2038 port: u16,
2039 reply: oneshot::Sender<Result<()>>,
2040 ) {
2041 if self.config.read_only_mode {
2043 trace!("BEP 43: suppressing announce_peer in read-only mode");
2044 let _ = reply.send(Ok(()));
2045 return;
2046 }
2047
2048 let tokens: Vec<(SocketAddr, Vec<u8>)> = self
2050 .announce_tokens
2051 .get(&info_hash)
2052 .map(|m| m.values().cloned().collect())
2053 .unwrap_or_default();
2054
2055 if tokens.is_empty() {
2056 let _ = reply.send(Err(Error::InvalidMessage(
2057 "no tokens available; call get_peers first".into(),
2058 )));
2059 return;
2060 }
2061
2062 let own_id = *self.routing_table.read().own_id();
2063 for (addr, token) in &tokens {
2064 if !self.rate_limiter.try_acquire() {
2065 break; }
2067 let txn = self.next_transaction_id();
2068 let msg = KrpcMessage {
2069 transaction_id: TransactionId::from_u16(txn),
2070 body: KrpcBody::Query(KrpcQuery::AnnouncePeer {
2071 id: own_id,
2072 info_hash,
2073 port,
2074 implied_port: false,
2075 token: token.clone(),
2076 }),
2077 sender_ip: None,
2078 read_only: false, };
2080 if let Ok(bytes) = msg.to_bytes() {
2081 let _ = self.socket.send_to(&bytes, addr).await;
2082 self.pending.insert(
2083 txn,
2084 PendingQuery {
2085 sent_at: Instant::now(),
2086 addr: *addr,
2087 kind: PendingQueryKind::AnnouncePeer,
2088 node_id: None,
2089 response_tx: None,
2090 },
2091 );
2092 self.stats.total_queries_sent += 1;
2093 }
2094 }
2095
2096 self.announce_tokens.remove(&info_hash);
2098
2099 let _ = reply.send(Ok(()));
2100 }
2101
2102 async fn expire_queries_and_advance_lookups(&mut self) {
2107 let timeout = self.config.query_timeout;
2108 let expired: Vec<u16> = self
2109 .pending
2110 .iter()
2111 .filter(|entry| entry.value().sent_at.elapsed() > timeout)
2112 .map(|entry| *entry.key())
2113 .collect();
2114
2115 if expired.is_empty() {
2116 return;
2117 }
2118
2119 debug!(
2120 family = ?self.address_family,
2121 expired_count = expired.len(),
2122 total_pending = self.pending.len(),
2123 active_lookups = self.active_lookups.len(),
2124 "expiring timed-out queries"
2125 );
2126
2127 let mut find_node_timed_out = false;
2128
2129 for txn in expired {
2130 if let Some((_, pending)) = self.pending.remove(&txn) {
2131 trace!(txn, addr = %pending.addr, "query timed out");
2132 if let Some(nid) = pending.node_id {
2133 self.routing_table.write().mark_failed(&nid);
2134 }
2135 if matches!(pending.kind, PendingQueryKind::SampleInfohashes)
2136 && let Some(reply) = self.sample_replies.remove(&txn)
2137 {
2138 let _ = reply.send(Err(Error::Timeout));
2139 }
2140 if matches!(pending.kind, PendingQueryKind::FindNode) {
2145 find_node_timed_out = true;
2146 }
2147 }
2148 }
2149
2150 if find_node_timed_out && self.bootstrap_lookup.is_some() {
2152 let (to_query, target, terminate) = if let Some(ref mut lookup) = self.bootstrap_lookup
2154 {
2155 let to_query = lookup.next_to_query(3);
2156 let target = lookup.target;
2157 if to_query.is_empty() {
2158 (Vec::new(), target, true)
2159 } else {
2160 (to_query, target, false)
2161 }
2162 } else {
2163 (Vec::new(), Id20::ZERO, false)
2164 };
2165
2166 if terminate {
2167 self.bootstrap_lookup = None;
2168 self.on_bootstrap_complete();
2169 } else {
2170 let queries: Vec<(SocketAddr, Id20)> =
2171 to_query.iter().map(|n| (n.addr, n.id)).collect();
2172 for (node_addr, nid) in queries {
2173 self.send_find_node(node_addr, target, Some(nid)).await;
2174 }
2175 }
2176 }
2177 }
2178
2179 fn state_file_path(state_dir: &std::path::Path, family: AddressFamily) -> PathBuf {
2183 match family {
2184 AddressFamily::V4 => state_dir.join("dht_state.json"),
2185 AddressFamily::V6 => state_dir.join("dht_state_v6.json"),
2186 }
2187 }
2188
2189 fn save_routing_table(&self) {
2194 let Some(state_dir) = &self.config.state_dir else {
2195 return;
2196 };
2197
2198 let nodes = self.routing_table.read().all_nodes();
2199 let own_id = *self.routing_table.read().own_id();
2200
2201 let state = DhtState {
2202 node_id: own_id.to_hex(),
2203 nodes: nodes
2204 .iter()
2205 .map(|(id, addr)| DhtNodeEntry {
2206 id: id.to_hex(),
2207 addr: addr.to_string(),
2208 })
2209 .collect(),
2210 };
2211
2212 let json = match serde_json::to_string_pretty(&state) {
2213 Ok(j) => j,
2214 Err(e) => {
2215 warn!(error = %e, "failed to serialize DHT state to JSON");
2216 return;
2217 }
2218 };
2219
2220 let final_path = Self::state_file_path(state_dir, self.address_family);
2221 let tmp_path = state_dir.join(format!(
2222 ".dht_state_{}.tmp",
2223 match self.address_family {
2224 AddressFamily::V4 => "v4",
2225 AddressFamily::V6 => "v6",
2226 }
2227 ));
2228
2229 if let Err(e) = std::fs::write(&tmp_path, json.as_bytes()) {
2230 warn!(error = %e, path = %tmp_path.display(), "failed to write DHT state temp file");
2231 return;
2232 }
2233
2234 if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
2235 warn!(
2236 error = %e,
2237 tmp = %tmp_path.display(),
2238 dst = %final_path.display(),
2239 "failed to rename DHT state temp file"
2240 );
2241 }
2242 }
2243
2244 fn load_routing_table(&mut self) {
2252 let state_dir = match &self.config.state_dir {
2253 Some(dir) => dir.clone(),
2254 None => return,
2255 };
2256
2257 let path = Self::state_file_path(&state_dir, self.address_family);
2258
2259 let data = match std::fs::read_to_string(&path) {
2260 Ok(d) => d,
2261 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2262 debug!(path = %path.display(), "no saved DHT state (first run)");
2263 return;
2264 }
2265 Err(e) => {
2266 warn!(error = %e, path = %path.display(), "failed to read DHT state file");
2267 return;
2268 }
2269 };
2270
2271 let state: DhtState = match serde_json::from_str(&data) {
2272 Ok(s) => s,
2273 Err(e) => {
2274 warn!(error = %e, path = %path.display(), "corrupt DHT state file, ignoring");
2275 return;
2276 }
2277 };
2278
2279 let mut loaded = 0u32;
2280 for entry in &state.nodes {
2281 let Ok(id) = Id20::from_hex(&entry.id) else {
2282 continue;
2283 };
2284 let addr: SocketAddr = match entry.addr.parse() {
2285 Ok(a) => a,
2286 Err(_) => continue,
2287 };
2288 if self.routing_table.write().insert(id, addr) {
2289 loaded = loaded.saturating_add(1);
2290 }
2291 }
2292
2293 if loaded > 0 {
2294 self.routing_table.write().mark_all_questionable();
2296
2297 self.config
2300 .bootstrap_nodes
2301 .retain(|s| s.parse::<SocketAddr>().is_err());
2302
2303 debug!(
2304 loaded,
2305 table_size = self.routing_table.read().len(),
2306 family = ?self.address_family,
2307 "loaded DHT routing table from JSON"
2308 );
2309 }
2310 }
2311
2312 async fn maintenance(&mut self) {
2313 self.active_lookups
2317 .retain(|_, handle| !handle.is_finished());
2318
2319 self.item_lookups.retain(|_, lookup| match lookup {
2321 ItemLookupState::Immutable { reply, .. } => {
2322 if reply
2323 .as_ref()
2324 .is_some_and(tokio::sync::oneshot::Sender::is_closed)
2325 {
2326 false
2328 } else if reply.is_some() {
2329 true
2330 } else {
2331 false
2333 }
2334 }
2335 ItemLookupState::Mutable {
2336 reply,
2337 best_value,
2338 best_seq,
2339 ..
2340 } => {
2341 if reply
2342 .as_ref()
2343 .is_some_and(tokio::sync::oneshot::Sender::is_closed)
2344 {
2345 false
2346 } else if reply.is_some() {
2347 true
2348 } else {
2349 let _ = best_value;
2351 let _ = best_seq;
2352 false
2353 }
2354 }
2355 });
2356
2357 self.item_put_ops.retain(|_, put_op| match put_op {
2359 ItemPutState::Immutable { reply, .. } | ItemPutState::Mutable { reply, .. } => {
2360 reply.is_some()
2361 }
2362 });
2363
2364 let stale = self
2366 .routing_table
2367 .read()
2368 .stale_buckets(Duration::from_mins(15));
2369 for bucket_idx in stale {
2370 let target = self.routing_table.read().random_id_in_bucket(bucket_idx);
2371 let closest = self.routing_table.read().closest(&target, 3);
2372 for node in closest {
2373 self.send_find_node(node.addr, target, Some(node.id)).await;
2374 }
2375 }
2376
2377 self.save_routing_table();
2379 }
2380
2381 async fn send_find_node(&mut self, addr: SocketAddr, target: Id20, node_id: Option<Id20>) {
2382 if !self.rate_limiter.try_acquire() {
2383 return;
2384 }
2385 let txn = self.next_transaction_id();
2386 let own_id = *self.routing_table.read().own_id();
2387 let msg = KrpcMessage {
2388 transaction_id: TransactionId::from_u16(txn),
2389 body: KrpcBody::Query(KrpcQuery::FindNode {
2390 id: own_id,
2391 target,
2392 want: self.outgoing_want(),
2393 }),
2394 sender_ip: None,
2395 read_only: self.config.read_only_mode,
2396 };
2397 if let Ok(bytes) = msg.to_bytes() {
2398 let _ = self.socket.send_to(&bytes, addr).await;
2399 self.pending.insert(
2400 txn,
2401 PendingQuery {
2402 sent_at: Instant::now(),
2403 addr,
2404 kind: PendingQueryKind::FindNode,
2405 node_id,
2406 response_tx: None,
2407 },
2408 );
2409 self.stats.total_queries_sent += 1;
2410 }
2411 }
2412
2413 async fn send_ping(&mut self, addr: SocketAddr, node_id: Option<Id20>) {
2414 if !self.rate_limiter.try_acquire() {
2415 return;
2416 }
2417 let txn = self.next_transaction_id();
2418 let own_id = *self.routing_table.read().own_id();
2419 let msg = KrpcMessage {
2420 transaction_id: TransactionId::from_u16(txn),
2421 body: KrpcBody::Query(KrpcQuery::Ping { id: own_id }),
2422 sender_ip: None,
2423 read_only: self.config.read_only_mode,
2424 };
2425 if let Ok(bytes) = msg.to_bytes() {
2426 let _ = self.socket.send_to(&bytes, addr).await;
2427 self.pending.insert(
2428 txn,
2429 PendingQuery {
2430 sent_at: Instant::now(),
2431 addr,
2432 node_id,
2433 kind: PendingQueryKind::Ping,
2434 response_tx: None,
2435 },
2436 );
2437 self.stats.total_queries_sent += 1;
2438 }
2439 }
2440
2441 async fn ping_questionable_nodes(&mut self) {
2442 let nodes = self.routing_table.read().questionable_nodes();
2443 for (id, addr) in nodes {
2444 self.send_ping(addr, Some(id)).await;
2445 }
2446 }
2447
2448 async fn handle_get_immutable(
2451 &mut self,
2452 target: Id20,
2453 reply: oneshot::Sender<Result<Option<Vec<u8>>>>,
2454 ) {
2455 if let Some(item) = self.item_store.get_immutable(&target) {
2457 let _ = reply.send(Ok(Some(item.value)));
2458 return;
2459 }
2460
2461 let closest = self.routing_table.read().closest(&target, K);
2463 if closest.is_empty() {
2464 let _ = reply.send(Ok(None));
2466 return;
2467 }
2468
2469 for node in closest.iter().take(3) {
2470 self.send_get_item(node.addr, target, None).await;
2471 }
2472
2473 self.item_lookups.insert(
2474 target,
2475 ItemLookupState::Immutable {
2476 reply: Some(reply),
2477 queried: closest.iter().map(|n| n.id).collect(),
2478 },
2479 );
2480 }
2481
2482 async fn handle_put_immutable(&mut self, value: Vec<u8>, reply: oneshot::Sender<Result<Id20>>) {
2483 let item = match crate::bep44::ImmutableItem::new(value) {
2484 Ok(item) => item,
2485 Err(e) => {
2486 let _ = reply.send(Err(e));
2487 return;
2488 }
2489 };
2490 let target = item.target;
2491
2492 self.item_store.put_immutable(item.clone());
2494
2495 let _ = reply.send(Ok(target));
2497
2498 let closest = self.routing_table.read().closest(&target, K);
2500 if closest.is_empty() {
2501 return;
2502 }
2503
2504 for node in closest.iter().take(K) {
2505 self.send_get_item(node.addr, target, None).await;
2506 }
2507
2508 self.item_put_ops.insert(
2509 target,
2510 ItemPutState::Immutable {
2511 item,
2512 tokens: HashMap::new(),
2513 sent_puts: 0,
2514 reply: None,
2515 },
2516 );
2517 }
2518
2519 #[allow(clippy::type_complexity)]
2520 async fn handle_get_mutable(
2521 &mut self,
2522 public_key: [u8; 32],
2523 salt: Vec<u8>,
2524 reply: oneshot::Sender<Result<Option<(Vec<u8>, i64)>>>,
2525 ) {
2526 let target = crate::bep44::compute_mutable_target(&public_key, &salt);
2527
2528 if let Some(item) = self.item_store.get_mutable(&public_key, &salt) {
2530 let _ = reply.send(Ok(Some((item.value, item.seq))));
2531 return;
2532 }
2533
2534 let closest = self.routing_table.read().closest(&target, K);
2536 if closest.is_empty() {
2537 let _ = reply.send(Ok(None));
2538 return;
2539 }
2540
2541 for node in closest.iter().take(3) {
2542 self.send_get_item(node.addr, target, None).await;
2543 }
2544
2545 self.item_lookups.insert(
2546 target,
2547 ItemLookupState::Mutable {
2548 salt,
2549 reply: Some(reply),
2550 best_seq: i64::MIN,
2551 best_value: None,
2552 queried: closest.iter().map(|n| n.id).collect(),
2553 },
2554 );
2555 }
2556
2557 async fn handle_put_mutable(
2558 &mut self,
2559 keypair_bytes: [u8; 32],
2560 value: Vec<u8>,
2561 seq: i64,
2562 salt: Vec<u8>,
2563 reply: oneshot::Sender<Result<Id20>>,
2564 ) {
2565 let keypair = ed25519_dalek::SigningKey::from_bytes(&keypair_bytes);
2566 let item = match crate::bep44::MutableItem::create(&keypair, value, seq, salt) {
2567 Ok(item) => item,
2568 Err(e) => {
2569 let _ = reply.send(Err(e));
2570 return;
2571 }
2572 };
2573 let target = item.target;
2574
2575 self.item_store.put_mutable(item.clone());
2577
2578 let _ = reply.send(Ok(target));
2580
2581 let closest = self.routing_table.read().closest(&target, K);
2583 if closest.is_empty() {
2584 return;
2585 }
2586
2587 for node in closest.iter().take(K) {
2588 self.send_get_item(node.addr, target, None).await;
2589 }
2590
2591 self.item_put_ops.insert(
2592 target,
2593 ItemPutState::Mutable {
2594 item,
2595 tokens: HashMap::new(),
2596 sent_puts: 0,
2597 reply: None,
2598 },
2599 );
2600 }
2601
2602 async fn send_get_item(&mut self, addr: SocketAddr, target: Id20, seq: Option<i64>) {
2604 if !self.rate_limiter.try_acquire() {
2605 return;
2606 }
2607 let txn = self.next_transaction_id();
2608 let own_id = *self.routing_table.read().own_id();
2609 let msg = KrpcMessage {
2610 transaction_id: TransactionId::from_u16(txn),
2611 body: KrpcBody::Query(KrpcQuery::Get {
2612 id: own_id,
2613 target,
2614 seq,
2615 }),
2616 sender_ip: None, read_only: self.config.read_only_mode,
2618 };
2619 if let Ok(bytes) = msg.to_bytes() {
2620 let _ = self.socket.send_to(&bytes, addr).await;
2621 self.pending.insert(
2622 txn,
2623 PendingQuery {
2624 sent_at: Instant::now(),
2625 addr,
2626 kind: PendingQueryKind::GetItem { target },
2627 node_id: None,
2628 response_tx: None,
2629 },
2630 );
2631 self.stats.total_queries_sent += 1;
2632 }
2633 }
2634
2635 async fn send_put_item(&mut self, params: PutItemParams) {
2637 if !self.rate_limiter.try_acquire() {
2638 return;
2639 }
2640 let txn = self.next_transaction_id();
2641 let own_id = *self.routing_table.read().own_id();
2642 let msg = KrpcMessage {
2643 transaction_id: TransactionId::from_u16(txn),
2644 body: KrpcBody::Query(KrpcQuery::Put {
2645 id: own_id,
2646 token: params.token,
2647 value: params.value,
2648 key: params.key,
2649 signature: params.signature,
2650 seq: params.seq,
2651 salt: params.salt,
2652 cas: None,
2653 }),
2654 sender_ip: None, read_only: self.config.read_only_mode,
2656 };
2657 if let Ok(bytes) = msg.to_bytes() {
2658 let _ = self.socket.send_to(&bytes, params.addr).await;
2659 self.pending.insert(
2660 txn,
2661 PendingQuery {
2662 sent_at: Instant::now(),
2663 addr: params.addr,
2664 kind: PendingQueryKind::PutItem,
2665 node_id: None,
2666 response_tx: None,
2667 },
2668 );
2669 self.stats.total_queries_sent += 1;
2670 }
2671 }
2672
2673 async fn send_pending_puts(&mut self, target: Id20) {
2675 let puts_to_send: Vec<PutItemParams> = if let Some(put_op) = self.item_put_ops.get(&target)
2676 {
2677 match put_op {
2678 ItemPutState::Immutable { item, tokens, .. } => tokens
2679 .values()
2680 .take(K)
2681 .map(|(addr, token)| PutItemParams {
2682 addr: *addr,
2683 token: token.clone(),
2684 value: item.value.clone(),
2685 key: None,
2686 signature: None,
2687 seq: None,
2688 salt: None,
2689 })
2690 .collect(),
2691 ItemPutState::Mutable { item, tokens, .. } => {
2692 let salt = if item.salt.is_empty() {
2693 None
2694 } else {
2695 Some(item.salt.clone())
2696 };
2697 tokens
2698 .values()
2699 .take(K)
2700 .map(|(addr, token)| PutItemParams {
2701 addr: *addr,
2702 token: token.clone(),
2703 value: item.value.clone(),
2704 key: Some(item.public_key),
2705 signature: Some(item.signature),
2706 seq: Some(item.seq),
2707 salt: salt.clone(),
2708 })
2709 .collect()
2710 }
2711 }
2712 } else {
2713 return;
2714 };
2715
2716 let num_puts = puts_to_send.len();
2717 for params in puts_to_send {
2718 self.send_put_item(params).await;
2719 }
2720
2721 if let Some(put_op) = self.item_put_ops.get_mut(&target) {
2723 match put_op {
2724 ItemPutState::Immutable {
2725 item,
2726 sent_puts,
2727 reply,
2728 ..
2729 } => {
2730 *sent_puts = num_puts;
2731 if let Some(r) = reply.take() {
2732 let _ = r.send(Ok(item.target));
2733 }
2734 }
2735 ItemPutState::Mutable {
2736 item,
2737 sent_puts,
2738 reply,
2739 ..
2740 } => {
2741 *sent_puts = num_puts;
2742 if let Some(r) = reply.take() {
2743 let _ = r.send(Ok(item.target));
2744 }
2745 }
2746 }
2747 }
2748 }
2749
2750 async fn handle_sample_infohashes(
2753 &mut self,
2754 target: Id20,
2755 reply: oneshot::Sender<Result<SampleInfohashesResult>>,
2756 ) {
2757 let closest = self.routing_table.read().closest(&target, 1);
2759 let (addr, closest_node_id) = if let Some(node) = closest.first() {
2760 (node.addr, node.id)
2761 } else {
2762 let _ = reply.send(Err(Error::InvalidMessage(
2763 "no nodes in routing table".into(),
2764 )));
2765 return;
2766 };
2767
2768 if !self.rate_limiter.try_acquire() {
2769 let _ = reply.send(Err(Error::Timeout));
2770 return;
2771 }
2772 let txn = self.next_transaction_id();
2773 let own_id = *self.routing_table.read().own_id();
2774 let msg = KrpcMessage {
2775 transaction_id: TransactionId::from_u16(txn),
2776 body: KrpcBody::Query(KrpcQuery::SampleInfohashes { id: own_id, target }),
2777 sender_ip: None, read_only: self.config.read_only_mode,
2779 };
2780 if let Ok(bytes) = msg.to_bytes() {
2781 let _ = self.socket.send_to(&bytes, addr).await;
2782 self.pending.insert(
2783 txn,
2784 PendingQuery {
2785 sent_at: Instant::now(),
2786 addr,
2787 kind: PendingQueryKind::SampleInfohashes,
2788 node_id: Some(closest_node_id),
2789 response_tx: None,
2790 },
2791 );
2792 self.stats.total_queries_sent += 1;
2793 }
2794 self.sample_replies.insert(txn, reply);
2796 }
2797
2798 fn next_transaction_id(&self) -> u16 {
2799 let txn = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
2800 if txn == 0 {
2802 return self.next_txn_id.fetch_add(1, Ordering::Relaxed);
2803 }
2804 txn
2805 }
2806
2807 fn checked_insert(&self, id: Id20, addr: SocketAddr, read_only: bool) -> bool {
2809 if read_only {
2811 trace!(
2812 node_id = %id,
2813 ip = %addr.ip(),
2814 "BEP 43: skipping read-only node"
2815 );
2816 return false;
2817 }
2818 if self.config.enforce_node_id && !node_id::is_valid_node_id(&id, addr.ip()) {
2819 trace!(
2820 node_id = %id,
2821 ip = %addr.ip(),
2822 "BEP 42: rejecting node with invalid ID for IP"
2823 );
2824 return false;
2825 }
2826 self.routing_table.write().insert(id, addr)
2827 }
2828
2829 fn regenerate_node_id(&mut self, external_ip: std::net::IpAddr) {
2835 let r = self.routing_table.read().own_id().0[19] & 0x07;
2836 let new_id = node_id::generate_node_id(external_ip, r);
2837 let restrict_ips = self.config.restrict_routing_ips;
2838 let max_routing_nodes = self.config.max_routing_nodes;
2839 let mut old_nodes = self.routing_table.read().all_nodes();
2840 debug!(
2841 old_id = %self.routing_table.read().own_id(),
2842 new_id = %new_id,
2843 preserved_nodes = old_nodes.len(),
2844 "BEP 42: regenerating node ID"
2845 );
2846 *self.routing_table.write() =
2847 RoutingTable::with_config(new_id, restrict_ips, max_routing_nodes);
2848
2849 old_nodes.sort_by_key(|(id, _)| id.xor_distance(&new_id));
2855
2856 let mut inserted = 0usize;
2857 for (id, addr) in &old_nodes {
2858 if self.routing_table.write().insert(*id, *addr) {
2859 inserted += 1;
2860 }
2861 }
2862 debug!(
2863 new_table_size = self.routing_table.read().len(),
2864 attempted = old_nodes.len(),
2865 inserted,
2866 "BEP 42: node ID regeneration complete"
2867 );
2868
2869 if !self.active_lookups.is_empty() {
2875 let cleared_hashes: std::collections::HashSet<Id20> =
2879 self.active_lookups.keys().copied().collect();
2880 let stale_txns: Vec<u16> = self
2881 .pending
2882 .iter()
2883 .filter(|entry| {
2884 matches!(entry.value().kind, PendingQueryKind::GetPeers { info_hash }
2885 if cleared_hashes.contains(&info_hash))
2886 })
2887 .map(|entry| *entry.key())
2888 .collect();
2889 debug!(
2890 active_lookups = self.active_lookups.len(),
2891 stale_pending = stale_txns.len(),
2892 "BEP 42: invalidating active get_peers lookups (will be re-issued by session)"
2893 );
2894 for txn in stale_txns {
2895 self.pending.remove(&txn);
2896 }
2897 for (_, handle) in self.active_lookups.drain() {
2898 handle.abort();
2899 }
2900 }
2901
2902 let initial_closest: Vec<CompactNodeInfo> = self
2907 .routing_table
2908 .read()
2909 .closest(&new_id, K)
2910 .into_iter()
2911 .map(|n| CompactNodeInfo {
2912 id: n.id,
2913 addr: n.addr,
2914 })
2915 .collect();
2916 if !initial_closest.is_empty() {
2917 debug!(
2918 seed_nodes = initial_closest.len(),
2919 "BEP 42: re-bootstrapping with new node ID"
2920 );
2921 let mut lookup = IterativeLookup::new(
2922 new_id,
2923 FindNodeCallbacks {
2924 round: 0,
2925 max_rounds: 6,
2926 },
2927 );
2928 lookup.closest = initial_closest;
2929 self.bootstrap_lookup = Some(lookup);
2930 self.bootstrap_complete = false;
2932 self.bootstrap_timeout = Some(Box::pin(tokio::time::sleep(Duration::from_secs(10))));
2933 }
2934 }
2935
2936 fn make_stats(&self) -> DhtStats {
2937 let (immutable, mutable) = self.item_store.count();
2938 DhtStats {
2939 node_id: *self.routing_table.read().own_id(),
2940 routing_table_size: self.routing_table.read().len(),
2941 bucket_count: self.routing_table.read().bucket_count(),
2942 peer_store_info_hashes: self.peer_store.info_hash_count(),
2943 peer_store_peers: self.peer_store.peer_count(),
2944 pending_queries: self.pending.len(),
2945 total_queries_sent: self.stats.total_queries_sent,
2946 total_responses_received: self.stats.total_responses_received,
2947 dht_item_count: immutable + mutable,
2948 }
2949 }
2950}
2951
2952fn hash_source_addr(addr: &SocketAddr) -> u64 {
2954 use std::hash::{Hash, Hasher};
2955 let mut hasher = std::collections::hash_map::DefaultHasher::new();
2956 addr.hash(&mut hasher);
2957 hasher.finish()
2958}
2959
2960const DNS_BOOTSTRAP_DEADLINE: Duration = Duration::from_mins(2);
2962
2963const DNS_BOOTSTRAP_INITIAL_DELAY: Duration = Duration::from_secs(1);
2965
2966const DNS_BOOTSTRAP_MAX_DELAY: Duration = Duration::from_secs(30);
2968
2969async fn dns_bootstrap_resolve(
2975 hostname: String,
2976 family: AddressFamily,
2977 tx: mpsc::Sender<Vec<SocketAddr>>,
2978) {
2979 let deadline = Instant::now() + DNS_BOOTSTRAP_DEADLINE;
2980 let mut delay = DNS_BOOTSTRAP_INITIAL_DELAY;
2981
2982 loop {
2983 match tokio::net::lookup_host(hostname.as_str()).await {
2984 Ok(addrs) => {
2985 let matching: Vec<SocketAddr> = addrs
2986 .filter(|a| match family {
2987 AddressFamily::V4 => a.is_ipv4(),
2988 AddressFamily::V6 => a.is_ipv6(),
2989 })
2990 .collect();
2991 debug!(
2992 %hostname,
2993 count = matching.len(),
2994 ?family,
2995 "DNS bootstrap resolved"
2996 );
2997 let _ = tx.send(matching).await;
2998 break;
2999 }
3000 Err(e) if Instant::now() + delay < deadline => {
3001 warn!(%hostname, %e, ?delay, "DNS bootstrap retry");
3002 tokio::time::sleep(delay).await;
3003 delay = delay.saturating_mul(2).min(DNS_BOOTSTRAP_MAX_DELAY);
3004 }
3005 Err(e) => {
3006 warn!(%hostname, %e, "DNS bootstrap failed after retries");
3007 break;
3008 }
3009 }
3010 }
3011}
3012
3013fn generate_node_id() -> Id20 {
3015 use std::cell::Cell;
3016 use std::time::SystemTime;
3017
3018 thread_local! {
3019 static STATE: Cell<u64> = Cell::new(
3020 SystemTime::now()
3021 .duration_since(SystemTime::UNIX_EPOCH)
3022 .unwrap_or_default()
3023 .as_nanos() as u64
3024 );
3025 }
3026
3027 let mut bytes = [0u8; 20];
3028 for byte in &mut bytes {
3029 STATE.with(|s| {
3030 let mut x = s.get();
3031 x ^= x << 13;
3032 x ^= x >> 7;
3033 x ^= x << 17;
3034 s.set(x);
3035 *byte = x as u8;
3036 });
3037 }
3038 Id20(bytes)
3039}
3040
3041#[cfg(test)]
3042mod tests {
3043 use super::*;
3044
3045 #[test]
3046 fn generate_node_id_is_unique() {
3047 let a = generate_node_id();
3048 let b = generate_node_id();
3049 assert_ne!(a, b);
3050 }
3051
3052 #[tokio::test]
3053 async fn dht_handle_start_and_shutdown() {
3054 let config = DhtConfig {
3055 bind_addr: "127.0.0.1:0".parse().unwrap(),
3056 bootstrap_nodes: Vec::new(), ..DhtConfig::default()
3058 };
3059 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3060 let stats = handle.stats().await.unwrap();
3061 assert_eq!(stats.routing_table_size, 0);
3062 handle.shutdown().await.unwrap();
3063 }
3064
3065 #[tokio::test]
3066 async fn dht_handle_stats() {
3067 let config = DhtConfig {
3068 bind_addr: "127.0.0.1:0".parse().unwrap(),
3069 bootstrap_nodes: Vec::new(),
3070 ..DhtConfig::default()
3071 };
3072 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3073 let stats = handle.stats().await.unwrap();
3074 assert_eq!(stats.routing_table_size, 0);
3075 assert_eq!(stats.bucket_count, 1);
3076 assert_eq!(stats.pending_queries, 0);
3077 handle.shutdown().await.unwrap();
3078 }
3079
3080 #[tokio::test]
3084 async fn dht_handle_node_count_matches_stats() {
3085 let config = DhtConfig {
3086 bind_addr: "127.0.0.1:0".parse().unwrap(),
3087 bootstrap_nodes: Vec::new(),
3088 ..DhtConfig::default()
3089 };
3090 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3091 let stats = handle.stats().await.unwrap();
3092 let count = handle.node_count().await.unwrap();
3093 assert_eq!(count, stats.routing_table_size);
3094 assert_eq!(count, 0, "empty bootstrap ⇒ empty routing table");
3095 handle.shutdown().await.unwrap();
3096 }
3097
3098 #[tokio::test]
3099 async fn two_dht_nodes_ping() {
3100 let config_a = DhtConfig {
3102 bind_addr: "127.0.0.1:0".parse().unwrap(),
3103 bootstrap_nodes: Vec::new(),
3104 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3105 ..DhtConfig::default()
3106 };
3107 let config_b = DhtConfig {
3108 bind_addr: "127.0.0.1:0".parse().unwrap(),
3109 bootstrap_nodes: Vec::new(),
3110 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000002").unwrap()),
3111 ..DhtConfig::default()
3112 };
3113
3114 let (handle_a, _ip_rx_a) = DhtHandle::start(config_a).await.unwrap();
3115 let (handle_b, _ip_rx_b) = DhtHandle::start(config_b).await.unwrap();
3116
3117 tokio::time::sleep(Duration::from_millis(50)).await;
3119
3120 let stats_a = handle_a.stats().await.unwrap();
3122 let stats_b = handle_b.stats().await.unwrap();
3123 assert_eq!(stats_a.routing_table_size, 0);
3124 assert_eq!(stats_b.routing_table_size, 0);
3125
3126 handle_a.shutdown().await.unwrap();
3127 handle_b.shutdown().await.unwrap();
3128 }
3129
3130 #[tokio::test]
3131 async fn dht_handle_get_peers_empty_table() {
3132 let config = DhtConfig {
3133 bind_addr: "127.0.0.1:0".parse().unwrap(),
3134 bootstrap_nodes: Vec::new(),
3135 ..DhtConfig::default()
3136 };
3137 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3138
3139 let info_hash = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
3140 let _rx = handle.get_peers(info_hash).await.unwrap();
3141
3142 tokio::time::sleep(Duration::from_millis(100)).await;
3144 let stats = handle.stats().await.unwrap();
3146 assert_eq!(stats.routing_table_size, 0);
3147
3148 handle.shutdown().await.unwrap();
3149 }
3150
3151 #[tokio::test]
3152 async fn dht_handles_malformed_packet() {
3153 let config = DhtConfig {
3154 bind_addr: "127.0.0.1:0".parse().unwrap(),
3155 bootstrap_nodes: Vec::new(),
3156 ..DhtConfig::default()
3157 };
3158 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3159
3160 tokio::time::sleep(Duration::from_millis(50)).await;
3163 handle.shutdown().await.unwrap();
3164 }
3165
3166 #[test]
3167 fn dht_config_default_is_v4() {
3168 let config = DhtConfig::default();
3169 assert_eq!(config.address_family, AddressFamily::V4);
3170 assert!(config.bind_addr.is_ipv4());
3171 }
3172
3173 #[test]
3174 fn dht_config_default_v6() {
3175 let config = DhtConfig::default_v6();
3176 assert_eq!(config.address_family, AddressFamily::V6);
3177 assert!(config.bind_addr.is_ipv6());
3178 assert!(!config.bootstrap_nodes.is_empty());
3180 }
3181
3182 #[tokio::test]
3183 async fn dht_v6_start_and_shutdown() {
3184 let config = DhtConfig {
3185 bind_addr: "[::1]:0".parse().unwrap(),
3186 bootstrap_nodes: Vec::new(),
3187 ..DhtConfig::default_v6()
3188 };
3189 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3190 let stats = handle.stats().await.unwrap();
3191 assert_eq!(stats.routing_table_size, 0);
3192 handle.shutdown().await.unwrap();
3193 }
3194
3195 #[tokio::test]
3196 async fn dht_v6_stats_on_empty_table() {
3197 let config = DhtConfig {
3198 bind_addr: "[::1]:0".parse().unwrap(),
3199 bootstrap_nodes: Vec::new(),
3200 ..DhtConfig::default_v6()
3201 };
3202 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3203 let stats = handle.stats().await.unwrap();
3204 assert_eq!(stats.routing_table_size, 0);
3205 assert_eq!(stats.bucket_count, 1);
3206 assert_eq!(stats.pending_queries, 0);
3207 assert_eq!(stats.total_queries_sent, 0);
3208 handle.shutdown().await.unwrap();
3209 }
3210
3211 #[test]
3212 fn matches_family_helper() {
3213 let actor_v4 = AddressFamily::V4;
3214 let actor_v6 = AddressFamily::V6;
3215 let v4_addr: SocketAddr = "1.2.3.4:6881".parse().unwrap();
3216 let v6_addr: SocketAddr = "[::1]:6881".parse().unwrap();
3217
3218 assert!(matches!(actor_v4, AddressFamily::V4) && v4_addr.is_ipv4());
3219 assert!(!v6_addr.is_ipv4());
3220 assert!(matches!(actor_v6, AddressFamily::V6) && v6_addr.is_ipv6());
3221 assert!(!v4_addr.is_ipv6());
3222 }
3223
3224 #[test]
3225 fn dht_config_security_defaults() {
3226 let config = DhtConfig::default();
3227 assert!(!config.enforce_node_id);
3229 assert!(config.restrict_routing_ips);
3230
3231 let config_v6 = DhtConfig::default_v6();
3232 assert!(!config_v6.enforce_node_id);
3233 assert!(config_v6.restrict_routing_ips);
3234 }
3235
3236 #[tokio::test]
3237 async fn dht_handle_start_returns_ip_channel() {
3238 let config = DhtConfig {
3239 bind_addr: "127.0.0.1:0".parse().unwrap(),
3240 bootstrap_nodes: Vec::new(),
3241 ..DhtConfig::default()
3242 };
3243 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3244 handle.shutdown().await.unwrap();
3245 }
3246
3247 #[tokio::test]
3248 async fn dht_update_external_ip() {
3249 let config = DhtConfig {
3250 bind_addr: "127.0.0.1:0".parse().unwrap(),
3251 bootstrap_nodes: Vec::new(),
3252 ..DhtConfig::default()
3253 };
3254 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3255 handle
3256 .update_external_ip("203.0.113.5".parse().unwrap(), IpVoteSource::Nat)
3257 .await
3258 .unwrap();
3259 handle.shutdown().await.unwrap();
3260 }
3261
3262 #[tokio::test]
3267 async fn dht_put_get_immutable_local() {
3268 let config = DhtConfig {
3269 bind_addr: "127.0.0.1:0".parse().unwrap(),
3270 bootstrap_nodes: Vec::new(),
3271 ..DhtConfig::default()
3272 };
3273 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3274
3275 let value = b"12:Hello World!".to_vec();
3277 let target = handle.put_immutable(value.clone()).await.unwrap();
3278
3279 let result = handle.get_immutable(target).await.unwrap();
3281 assert_eq!(result, Some(value));
3282
3283 assert_eq!(target, irontide_core::sha1(b"12:Hello World!"));
3285
3286 handle.shutdown().await.unwrap();
3287 }
3288
3289 #[tokio::test]
3290 async fn dht_put_get_mutable_local() {
3291 let config = DhtConfig {
3292 bind_addr: "127.0.0.1:0".parse().unwrap(),
3293 bootstrap_nodes: Vec::new(),
3294 ..DhtConfig::default()
3295 };
3296 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3297
3298 let seed = [42u8; 32];
3299 let keypair = ed25519_dalek::SigningKey::from_bytes(&seed);
3300 let pubkey = keypair.verifying_key().to_bytes();
3301
3302 let value = b"4:test".to_vec();
3303 let target = handle
3304 .put_mutable(seed, value.clone(), 1, Vec::new())
3305 .await
3306 .unwrap();
3307
3308 let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3310 assert_eq!(result, Some((value, 1)));
3311
3312 let expected_target = crate::bep44::compute_mutable_target(&pubkey, &[]);
3314 assert_eq!(target, expected_target);
3315
3316 handle.shutdown().await.unwrap();
3317 }
3318
3319 #[tokio::test]
3320 async fn dht_get_immutable_not_found() {
3321 let config = DhtConfig {
3322 bind_addr: "127.0.0.1:0".parse().unwrap(),
3323 bootstrap_nodes: Vec::new(),
3324 ..DhtConfig::default()
3325 };
3326 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3327
3328 let target = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
3329 let result = handle.get_immutable(target).await.unwrap();
3331 assert_eq!(result, None);
3332
3333 handle.shutdown().await.unwrap();
3334 }
3335
3336 #[tokio::test]
3337 async fn dht_put_immutable_rejects_oversized() {
3338 let config = DhtConfig {
3339 bind_addr: "127.0.0.1:0".parse().unwrap(),
3340 bootstrap_nodes: Vec::new(),
3341 ..DhtConfig::default()
3342 };
3343 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3344
3345 let value = vec![0u8; 1001];
3346 let result = handle.put_immutable(value).await;
3347 assert!(result.is_err());
3348
3349 handle.shutdown().await.unwrap();
3350 }
3351
3352 #[tokio::test]
3353 async fn dht_stats_includes_item_count() {
3354 let config = DhtConfig {
3355 bind_addr: "127.0.0.1:0".parse().unwrap(),
3356 bootstrap_nodes: Vec::new(),
3357 ..DhtConfig::default()
3358 };
3359 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3360
3361 let stats = handle.stats().await.unwrap();
3362 assert_eq!(stats.dht_item_count, 0);
3363
3364 handle.put_immutable(b"5:hello".to_vec()).await.unwrap();
3365 let stats = handle.stats().await.unwrap();
3366 assert_eq!(stats.dht_item_count, 1);
3367
3368 handle.shutdown().await.unwrap();
3369 }
3370
3371 #[tokio::test]
3372 async fn dht_get_mutable_not_found() {
3373 let config = DhtConfig {
3374 bind_addr: "127.0.0.1:0".parse().unwrap(),
3375 bootstrap_nodes: Vec::new(),
3376 ..DhtConfig::default()
3377 };
3378 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3379
3380 let pubkey = [99u8; 32];
3381 let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3382 assert_eq!(result, None);
3383
3384 handle.shutdown().await.unwrap();
3385 }
3386
3387 #[tokio::test]
3388 async fn two_nodes_put_get_immutable() {
3389 let config_a = DhtConfig {
3390 bind_addr: "127.0.0.1:0".parse().unwrap(),
3391 bootstrap_nodes: Vec::new(),
3392 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3393 ..DhtConfig::default()
3394 };
3395 let (handle_a, _ip_rx) = DhtHandle::start(config_a).await.unwrap();
3396
3397 let value = b"12:Hello World!".to_vec();
3399 let target = handle_a.put_immutable(value.clone()).await.unwrap();
3400
3401 let result = handle_a.get_immutable(target).await.unwrap();
3403 assert_eq!(result, Some(value));
3404
3405 handle_a.shutdown().await.unwrap();
3406 }
3407
3408 #[tokio::test]
3409 async fn put_mutable_sequence_update() {
3410 let config = DhtConfig {
3411 bind_addr: "127.0.0.1:0".parse().unwrap(),
3412 bootstrap_nodes: Vec::new(),
3413 ..DhtConfig::default()
3414 };
3415 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3416
3417 let seed = [99u8; 32];
3418 let keypair = ed25519_dalek::SigningKey::from_bytes(&seed);
3419 let pubkey = keypair.verifying_key().to_bytes();
3420
3421 handle
3423 .put_mutable(seed, b"5:first".to_vec(), 1, Vec::new())
3424 .await
3425 .unwrap();
3426 let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3427 assert_eq!(result, Some((b"5:first".to_vec(), 1)));
3428
3429 handle
3431 .put_mutable(seed, b"6:second".to_vec(), 2, Vec::new())
3432 .await
3433 .unwrap();
3434 let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3435 assert_eq!(result, Some((b"6:second".to_vec(), 2)));
3436
3437 handle.shutdown().await.unwrap();
3438 }
3439
3440 #[tokio::test]
3441 async fn put_mutable_with_salt_isolation() {
3442 let config = DhtConfig {
3443 bind_addr: "127.0.0.1:0".parse().unwrap(),
3444 bootstrap_nodes: Vec::new(),
3445 ..DhtConfig::default()
3446 };
3447 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3448
3449 let seed = [77u8; 32];
3450 let keypair = ed25519_dalek::SigningKey::from_bytes(&seed);
3451 let pubkey = keypair.verifying_key().to_bytes();
3452
3453 handle
3455 .put_mutable(seed, b"1:A".to_vec(), 1, b"a".to_vec())
3456 .await
3457 .unwrap();
3458 handle
3460 .put_mutable(seed, b"1:B".to_vec(), 1, b"b".to_vec())
3461 .await
3462 .unwrap();
3463
3464 let a = handle.get_mutable(pubkey, b"a".to_vec()).await.unwrap();
3466 assert_eq!(a, Some((b"1:A".to_vec(), 1)));
3467 let b = handle.get_mutable(pubkey, b"b".to_vec()).await.unwrap();
3468 assert_eq!(b, Some((b"1:B".to_vec(), 1)));
3469
3470 handle.shutdown().await.unwrap();
3471 }
3472
3473 #[tokio::test]
3476 async fn dht_sample_infohashes_empty_table() {
3477 let config = DhtConfig {
3478 bind_addr: "127.0.0.1:0".parse().unwrap(),
3479 bootstrap_nodes: Vec::new(),
3480 ..DhtConfig::default()
3481 };
3482 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3483
3484 let target = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
3485 let result = handle.sample_infohashes(target).await;
3486 assert!(result.is_err());
3488
3489 handle.shutdown().await.unwrap();
3490 }
3491
3492 #[tokio::test]
3493 async fn two_nodes_sample_infohashes() {
3494 let config_a = DhtConfig {
3496 bind_addr: "127.0.0.1:0".parse().unwrap(),
3497 bootstrap_nodes: Vec::new(),
3498 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3499 ..DhtConfig::default()
3500 };
3501 let (handle_a, _ip_rx_a) = DhtHandle::start(config_a).await.unwrap();
3502
3503 tokio::time::sleep(Duration::from_millis(50)).await;
3509 handle_a.shutdown().await.unwrap();
3510 }
3511
3512 #[test]
3515 fn rate_limiter_new_starts_full() {
3516 let limiter = QueryRateLimiter::new(10);
3517 assert_eq!(limiter.permits, 10);
3518 assert_eq!(limiter.max_permits, 10);
3519 assert_eq!(limiter.refill_rate, 10);
3520 }
3521
3522 #[test]
3523 fn rate_limiter_new_zero_rate() {
3524 let mut limiter = QueryRateLimiter::new(0);
3526 assert!(!limiter.try_acquire());
3527 }
3528
3529 #[test]
3530 fn rate_limiter_exhaustion() {
3531 let mut limiter = QueryRateLimiter::new(5);
3533 for _ in 0..5 {
3534 assert!(limiter.try_acquire(), "permit should be available");
3535 }
3536 assert!(
3537 !limiter.try_acquire(),
3538 "bucket must be empty after N acquires"
3539 );
3540 }
3541
3542 #[test]
3543 fn rate_limiter_initial_permits_work() {
3544 let mut limiter = QueryRateLimiter::new(1);
3546 assert!(limiter.try_acquire());
3547 assert!(!limiter.try_acquire());
3549 }
3550
3551 #[test]
3552 fn rate_limiter_refill_caps_at_max() {
3553 let mut limiter = QueryRateLimiter::new(10);
3559 for _ in 0..10 {
3561 limiter.try_acquire();
3562 }
3563 assert_eq!(limiter.permits, 0);
3564
3565 limiter.last_refill = Instant::now() - Duration::from_secs(5);
3569 limiter.refill();
3570 assert_eq!(limiter.permits, 10, "permits must not exceed max_permits");
3572 }
3573
3574 #[test]
3575 fn rate_limiter_refill_adds_correct_permits() {
3576 let mut limiter = QueryRateLimiter::new(100);
3577 for _ in 0..100 {
3579 limiter.try_acquire();
3580 }
3581 limiter.last_refill = Instant::now() - Duration::from_millis(500);
3583 limiter.refill();
3584 assert!(
3586 limiter.permits >= 45 && limiter.permits <= 55,
3587 "expected ~50 permits after 0.5s refill at rate 100, got {}",
3588 limiter.permits
3589 );
3590 }
3591
3592 #[tokio::test]
3596 async fn dht_bootstrap_logging() {
3597 let config = DhtConfig {
3601 bind_addr: "127.0.0.1:0".parse().unwrap(),
3602 bootstrap_nodes: vec!["127.0.0.1:16881".to_owned(), "127.0.0.1:16882".to_owned()],
3603 ..DhtConfig::default()
3604 };
3605 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3606
3607 tokio::time::sleep(Duration::from_millis(200)).await;
3609
3610 let stats = handle.stats().await.unwrap();
3611 assert!(
3614 stats.total_queries_sent >= 2,
3615 "expected at least 2 ping queries, got {}",
3616 stats.total_queries_sent
3617 );
3618
3619 handle.shutdown().await.unwrap();
3620 }
3621
3622 #[test]
3628 fn ping_interval_5s_during_bootstrap() {
3629 let bootstrap_complete = false;
3630
3631 let tick = Duration::from_millis(10);
3634 let bootstrap_interval = tick;
3635 let steady_interval = Duration::from_millis(120);
3636
3637 let mut last_ping = Instant::now();
3638 let mut ping_count: u32 = 0;
3639
3640 for _ in 0..6 {
3642 std::thread::sleep(tick);
3643
3644 let ping_interval = if bootstrap_complete {
3645 steady_interval
3646 } else {
3647 bootstrap_interval
3648 };
3649 if last_ping.elapsed() >= ping_interval {
3650 ping_count = ping_count.saturating_add(1);
3651 last_ping = Instant::now();
3652 }
3653 }
3654
3655 assert_eq!(
3657 ping_count, 6,
3658 "expected 6 pings during bootstrap (every tick), got {ping_count}"
3659 );
3660 }
3661
3662 #[test]
3668 fn ping_interval_60s_after_bootstrap() {
3669 let bootstrap_complete = true;
3670
3671 let tick = Duration::from_millis(10);
3674 let bootstrap_interval = tick;
3675 let steady_interval = Duration::from_millis(120);
3676
3677 let mut last_ping = Instant::now();
3678 let mut ping_count: u32 = 0;
3679
3680 for _ in 0..24 {
3683 std::thread::sleep(tick);
3684
3685 let ping_interval = if bootstrap_complete {
3686 steady_interval
3687 } else {
3688 bootstrap_interval
3689 };
3690 if last_ping.elapsed() >= ping_interval {
3691 ping_count = ping_count.saturating_add(1);
3692 last_ping = Instant::now();
3693 }
3694 }
3695
3696 assert_eq!(
3698 ping_count, 2,
3699 "expected 2 pings post-bootstrap (12:1 tick-to-interval ratio), got {ping_count}"
3700 );
3701 }
3702
3703 #[test]
3711 fn dns_backoff_retries_on_failure() {
3712 let mut delay = DNS_BOOTSTRAP_INITIAL_DELAY;
3714 let expected_delays = [
3715 Duration::from_secs(1),
3716 Duration::from_secs(2),
3717 Duration::from_secs(4),
3718 Duration::from_secs(8),
3719 Duration::from_secs(16),
3720 Duration::from_secs(30), Duration::from_secs(30), ];
3723
3724 for expected in &expected_delays {
3725 assert_eq!(
3726 delay, *expected,
3727 "backoff delay mismatch: got {delay:?}, expected {expected:?}"
3728 );
3729 delay = delay.saturating_mul(2).min(DNS_BOOTSTRAP_MAX_DELAY);
3730 }
3731 }
3732
3733 #[tokio::test]
3738 async fn dns_backoff_succeeds_on_retry() {
3739 let (tx, mut rx) = mpsc::channel(16);
3740
3741 let hostname = "localhost:1234".to_owned();
3743 tokio::spawn(dns_bootstrap_resolve(hostname, AddressFamily::V4, tx));
3744
3745 let result = tokio::time::timeout(Duration::from_secs(5), rx.recv()).await;
3747 assert!(
3748 result.is_ok(),
3749 "expected DNS resolution to complete within 5 seconds"
3750 );
3751 let addrs = result.expect("timeout should not occur");
3752 assert!(
3754 addrs.is_some(),
3755 "expected Some(addresses) from dns_bootstrap_resolve"
3756 );
3757 let addrs = addrs.expect("already checked is_some");
3758 assert!(
3759 !addrs.is_empty(),
3760 "expected at least one resolved address for localhost"
3761 );
3762 for addr in &addrs {
3764 assert!(addr.is_ipv4(), "expected IPv4 address, got {addr}");
3765 }
3766 }
3767
3768 #[test]
3773 fn dns_backoff_total_timeout_120s() {
3774 let deadline_duration = DNS_BOOTSTRAP_DEADLINE;
3779 let mut delay = DNS_BOOTSTRAP_INITIAL_DELAY;
3780 let mut total_sleep = Duration::ZERO;
3781 let mut retries = 0u32;
3782
3783 loop {
3784 let next_total = total_sleep.saturating_add(delay);
3788 if next_total >= deadline_duration {
3789 break;
3790 }
3791 total_sleep = next_total;
3792 retries = retries.saturating_add(1);
3793 delay = delay.saturating_mul(2).min(DNS_BOOTSTRAP_MAX_DELAY);
3794 }
3795
3796 assert!(
3798 retries >= 5,
3799 "expected at least 5 retries before 120s deadline, got {retries}"
3800 );
3801 assert!(
3803 total_sleep < deadline_duration,
3804 "total sleep {total_sleep:?} should be less than deadline {deadline_duration:?}"
3805 );
3806 }
3807
3808 #[tokio::test]
3815 async fn bootstrap_phase3_starts_before_dns() {
3816 let config = DhtConfig {
3819 bind_addr: "127.0.0.1:0".parse().unwrap(),
3820 bootstrap_nodes: vec![
3821 "127.0.0.1:16881".to_owned(),
3823 "router.bittorrent.com:6881".to_owned(),
3825 ],
3826 ..DhtConfig::default()
3827 };
3828
3829 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3830 let (tx, rx) = mpsc::channel(256);
3831 let (ip_tx, _ip_rx) = mpsc::channel(4);
3832 let mut actor = DhtActor::new(config, socket, rx, ip_tx);
3833
3834 actor.bootstrap().await;
3836
3837 assert!(
3839 actor.bootstrap_lookup.is_some(),
3840 "Phase 3 (FindNodeLookup) must start without waiting for DNS"
3841 );
3842
3843 assert!(
3845 actor.dns_bootstrap_rx.is_some(),
3846 "dns_bootstrap_rx should be Some (background DNS tasks still running)"
3847 );
3848
3849 drop(tx);
3851 }
3852
3853 #[tokio::test]
3858 async fn json_persistence_round_trip_and_corrupt() {
3859 use crate::routing_table::NodeStatus;
3860 let dir = tempfile::tempdir().expect("failed to create temp dir");
3861
3862 let config = DhtConfig {
3863 bind_addr: "127.0.0.1:0".parse().unwrap(),
3864 bootstrap_nodes: Vec::new(),
3865 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3866 state_dir: Some(dir.path().to_path_buf()),
3867 ..DhtConfig::default()
3868 };
3869
3870 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3871 let (_tx, rx) = mpsc::channel(256);
3872 let (ip_tx, _ip_rx) = mpsc::channel(4);
3873 let actor = DhtActor::new(config.clone(), socket, rx, ip_tx);
3874
3875 let node1_id = Id20::from_hex("1111111111111111111111111111111111111111").unwrap();
3877 let node2_id = Id20::from_hex("2222222222222222222222222222222222222222").unwrap();
3878 let addr1: SocketAddr = "10.0.0.1:6881".parse().unwrap();
3879 let addr2: SocketAddr = "10.0.0.2:6882".parse().unwrap();
3880 actor.routing_table.write().insert(node1_id, addr1);
3881 actor.routing_table.write().insert(node2_id, addr2);
3882 actor.routing_table.write().mark_response(&node1_id);
3884
3885 actor.save_routing_table();
3887
3888 let path = DhtActor::state_file_path(dir.path(), AddressFamily::V4);
3890 assert!(path.exists(), "JSON state file should exist after save");
3891
3892 let config2 = DhtConfig {
3894 bind_addr: "127.0.0.1:0".parse().unwrap(),
3895 bootstrap_nodes: Vec::new(),
3896 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3897 state_dir: Some(dir.path().to_path_buf()),
3898 ..DhtConfig::default()
3899 };
3900 let socket2 = Arc::new(UdpSocket::bind(config2.bind_addr).await.unwrap());
3901 let (_tx2, rx2) = mpsc::channel(256);
3902 let (ip_tx2, _ip_rx2) = mpsc::channel(4);
3903 let actor2 = DhtActor::new(config2, socket2, rx2, ip_tx2);
3904
3905 assert_eq!(actor2.routing_table.read().len(), 2);
3907 assert!(actor2.routing_table.read().get(&node1_id).is_some());
3908 assert!(actor2.routing_table.read().get(&node2_id).is_some());
3909
3910 assert_eq!(
3912 actor2.routing_table.read().get(&node1_id).unwrap().status(),
3913 NodeStatus::Questionable
3914 );
3915 assert_eq!(
3916 actor2.routing_table.read().get(&node2_id).unwrap().status(),
3917 NodeStatus::Questionable
3918 );
3919
3920 std::fs::write(&path, b"{{not valid json at all!!}}")
3922 .expect("failed to write corrupt data");
3923
3924 let config3 = DhtConfig {
3925 bind_addr: "127.0.0.1:0".parse().unwrap(),
3926 bootstrap_nodes: Vec::new(),
3927 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3928 state_dir: Some(dir.path().to_path_buf()),
3929 ..DhtConfig::default()
3930 };
3931 let socket3 = Arc::new(UdpSocket::bind(config3.bind_addr).await.unwrap());
3932 let (_tx3, rx3) = mpsc::channel(256);
3933 let (ip_tx3, _ip_rx3) = mpsc::channel(4);
3934 let actor3 = DhtActor::new(config3, socket3, rx3, ip_tx3);
3935
3936 assert_eq!(actor3.routing_table.read().len(), 0);
3938 }
3939
3940 #[tokio::test]
3943 async fn json_persistence_atomic_write() {
3944 let dir = tempfile::tempdir().expect("failed to create temp dir");
3945
3946 let config = DhtConfig {
3947 bind_addr: "127.0.0.1:0".parse().unwrap(),
3948 bootstrap_nodes: Vec::new(),
3949 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3950 state_dir: Some(dir.path().to_path_buf()),
3951 ..DhtConfig::default()
3952 };
3953
3954 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3955 let (_tx, rx) = mpsc::channel(256);
3956 let (ip_tx, _ip_rx) = mpsc::channel(4);
3957 let actor = DhtActor::new(config, socket, rx, ip_tx);
3958
3959 let node_id = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
3960 let addr: SocketAddr = "10.0.0.1:6881".parse().unwrap();
3961 actor.routing_table.write().insert(node_id, addr);
3962
3963 let final_path = DhtActor::state_file_path(dir.path(), AddressFamily::V4);
3965 std::fs::write(&final_path, b"old data").unwrap();
3966
3967 actor.save_routing_table();
3969
3970 let content = std::fs::read_to_string(&final_path).unwrap();
3972 let state: DhtState =
3973 serde_json::from_str(&content).expect("final file should contain valid JSON");
3974 assert_eq!(state.nodes.len(), 1);
3975 assert_eq!(state.nodes[0].id, node_id.to_hex());
3976
3977 let tmp_path = dir.path().join(".dht_state_v4.tmp");
3979 assert!(
3980 !tmp_path.exists(),
3981 "temp file should be cleaned up by rename"
3982 );
3983 }
3984
3985 #[tokio::test]
3987 async fn json_persistence_no_state_dir() {
3988 let config = DhtConfig {
3989 bind_addr: "127.0.0.1:0".parse().unwrap(),
3990 bootstrap_nodes: Vec::new(),
3991 state_dir: None, ..DhtConfig::default()
3993 };
3994
3995 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3996 let (_tx, rx) = mpsc::channel(256);
3997 let (ip_tx, _ip_rx) = mpsc::channel(4);
3998 let actor = DhtActor::new(config, socket, rx, ip_tx);
3999
4000 let node_id = Id20::from_hex("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
4002 actor
4003 .routing_table
4004 .write()
4005 .insert(node_id, "10.0.0.1:6881".parse().unwrap());
4006
4007 actor.save_routing_table();
4009
4010 assert_eq!(actor.routing_table.read().len(), 1); }
4013
4014 #[tokio::test]
4017 async fn json_persistence_priority_over_config() {
4018 let dir = tempfile::tempdir().expect("failed to create temp dir");
4019
4020 let config_save = DhtConfig {
4022 bind_addr: "127.0.0.1:0".parse().unwrap(),
4023 bootstrap_nodes: Vec::new(),
4024 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
4025 state_dir: Some(dir.path().to_path_buf()),
4026 ..DhtConfig::default()
4027 };
4028
4029 let socket = Arc::new(UdpSocket::bind(config_save.bind_addr).await.unwrap());
4030 let (_tx, rx) = mpsc::channel(256);
4031 let (ip_tx, _ip_rx) = mpsc::channel(4);
4032 let actor = DhtActor::new(config_save, socket, rx, ip_tx);
4033
4034 let node_id = Id20::from_hex("cccccccccccccccccccccccccccccccccccccccc").unwrap();
4035 actor
4036 .routing_table
4037 .write()
4038 .insert(node_id, "10.0.0.1:6881".parse().unwrap());
4039 actor.save_routing_table();
4040 drop(actor);
4041
4042 let config_load = DhtConfig {
4044 bind_addr: "127.0.0.1:0".parse().unwrap(),
4045 bootstrap_nodes: vec![
4046 "192.168.1.100:6881".to_owned(), "10.0.0.50:6881".to_owned(), "router.bittorrent.com:6881".to_owned(), "dht.transmissionbt.com:6881".to_owned(), ],
4051 own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
4052 state_dir: Some(dir.path().to_path_buf()),
4053 ..DhtConfig::default()
4054 };
4055
4056 let socket2 = Arc::new(UdpSocket::bind(config_load.bind_addr).await.unwrap());
4057 let (_tx2, rx2) = mpsc::channel(256);
4058 let (ip_tx2, _ip_rx2) = mpsc::channel(4);
4059 let actor2 = DhtActor::new(config_load, socket2, rx2, ip_tx2);
4060
4061 assert_eq!(actor2.routing_table.read().len(), 1);
4063
4064 assert_eq!(actor2.config.bootstrap_nodes.len(), 2);
4066 assert!(
4067 actor2
4068 .config
4069 .bootstrap_nodes
4070 .contains(&"router.bittorrent.com:6881".to_owned())
4071 );
4072 assert!(
4073 actor2
4074 .config
4075 .bootstrap_nodes
4076 .contains(&"dht.transmissionbt.com:6881".to_owned())
4077 );
4078 assert!(
4080 !actor2
4081 .config
4082 .bootstrap_nodes
4083 .contains(&"192.168.1.100:6881".to_owned())
4084 );
4085 assert!(
4086 !actor2
4087 .config
4088 .bootstrap_nodes
4089 .contains(&"10.0.0.50:6881".to_owned())
4090 );
4091 }
4092
4093 #[tokio::test]
4096 async fn checked_insert_rejects_read_only() {
4097 let config = DhtConfig {
4098 bind_addr: "127.0.0.1:0".parse().unwrap(),
4099 bootstrap_nodes: Vec::new(),
4100 ..DhtConfig::default()
4101 };
4102 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
4103 let (_tx, rx) = mpsc::channel(256);
4104 let (ip_tx, _ip_rx) = mpsc::channel(4);
4105 let actor = DhtActor::new(config, socket, rx, ip_tx);
4106
4107 let id = Id20::from_hex("0000000000000000000000000000000000000042").unwrap();
4108 let addr: SocketAddr = "10.0.0.1:6881".parse().unwrap();
4109
4110 assert!(!actor.checked_insert(id, addr, true));
4112 assert_eq!(actor.routing_table.read().len(), 0);
4113 }
4114
4115 #[tokio::test]
4116 async fn checked_insert_accepts_normal() {
4117 let config = DhtConfig {
4118 bind_addr: "127.0.0.1:0".parse().unwrap(),
4119 bootstrap_nodes: Vec::new(),
4120 ..DhtConfig::default()
4121 };
4122 let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
4123 let (_tx, rx) = mpsc::channel(256);
4124 let (ip_tx, _ip_rx) = mpsc::channel(4);
4125 let actor = DhtActor::new(config, socket, rx, ip_tx);
4126
4127 let id = Id20::from_hex("0000000000000000000000000000000000000042").unwrap();
4128 let addr: SocketAddr = "10.0.0.1:6881".parse().unwrap();
4129
4130 assert!(actor.checked_insert(id, addr, false));
4132 assert_eq!(actor.routing_table.read().len(), 1);
4133 }
4134
4135 #[tokio::test]
4136 async fn outgoing_query_includes_ro() {
4137 let info_hash = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
4142 let own_id = Id20::ZERO;
4143
4144 let msg = crate::krpc::KrpcMessage {
4145 transaction_id: crate::krpc::TransactionId::from_u16(1),
4146 body: crate::krpc::KrpcBody::Query(crate::krpc::KrpcQuery::FindNode {
4147 id: own_id,
4148 target: info_hash,
4149 want: None,
4150 }),
4151 sender_ip: None,
4152 read_only: true, };
4154 let bytes = msg.to_bytes().unwrap();
4155
4156 let raw: irontide_bencode::BencodeValue = irontide_bencode::from_bytes(&bytes).unwrap();
4158 let dict = raw.as_dict().unwrap();
4159 assert!(
4160 dict.contains_key(&b"ro"[..]),
4161 "query with read_only: true should contain ro key in wire format"
4162 );
4163
4164 let decoded = crate::krpc::KrpcMessage::from_bytes(&bytes).unwrap();
4165 assert!(decoded.read_only, "outgoing query should include ro flag");
4166 }
4167
4168 #[tokio::test]
4169 async fn response_never_includes_ro() {
4170 let own_id = Id20::ZERO;
4172 let msg = crate::krpc::KrpcMessage {
4173 transaction_id: crate::krpc::TransactionId::from_u16(1),
4174 body: crate::krpc::KrpcBody::Response(crate::krpc::KrpcResponse::NodeId { id: own_id }),
4175 sender_ip: None,
4176 read_only: false, };
4178 let bytes = msg.to_bytes().unwrap();
4179 let decoded = crate::krpc::KrpcMessage::from_bytes(&bytes).unwrap();
4180 assert!(!decoded.read_only, "responses should never include ro flag");
4181
4182 let raw: irontide_bencode::BencodeValue = irontide_bencode::from_bytes(&bytes).unwrap();
4185 let dict = raw.as_dict().unwrap();
4186 assert!(
4187 !dict.contains_key(&b"ro"[..]),
4188 "response bytes should not contain ro key"
4189 );
4190 }
4191
4192 #[tokio::test]
4193 async fn announce_suppressed_in_read_only_mode() {
4194 let config = DhtConfig {
4195 bind_addr: "127.0.0.1:0".parse().unwrap(),
4196 bootstrap_nodes: Vec::new(),
4197 read_only_mode: true,
4198 ..DhtConfig::default()
4199 };
4200 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4201
4202 let info_hash = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
4203 let result = handle.announce(info_hash, 6881).await;
4205 assert!(
4206 result.is_ok(),
4207 "announce should return Ok in read-only mode (suppressed)"
4208 );
4209
4210 handle.shutdown().await.unwrap();
4211 }
4212
4213 #[tokio::test]
4221 async fn save_routing_table_acks_even_without_state_dir() {
4222 let config = DhtConfig {
4223 bind_addr: "127.0.0.1:0".parse().unwrap(),
4224 bootstrap_nodes: Vec::new(),
4225 state_dir: None,
4226 ..DhtConfig::default()
4227 };
4228 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4229 let result = handle.save_routing_table().await;
4230 assert!(result.is_ok(), "expected Ok, got {result:?}");
4231 handle.shutdown().await.unwrap();
4232 }
4233
4234 #[tokio::test]
4238 async fn save_routing_table_writes_state_file() {
4239 let tmp = tempfile::tempdir().unwrap();
4240 let state_dir = tmp.path().to_path_buf();
4241
4242 let config = DhtConfig {
4243 bind_addr: "127.0.0.1:0".parse().unwrap(),
4244 bootstrap_nodes: Vec::new(),
4245 state_dir: Some(state_dir.clone()),
4246 ..DhtConfig::default()
4247 };
4248 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4249
4250 tokio::time::sleep(Duration::from_millis(50)).await;
4253
4254 handle.save_routing_table().await.unwrap();
4255
4256 let state_path = state_dir.join("dht_state.json");
4257 assert!(
4258 state_path.exists(),
4259 "save_routing_table must write dht_state.json to {}",
4260 state_path.display()
4261 );
4262
4263 let contents = std::fs::read_to_string(&state_path).unwrap();
4265 let parsed: serde_json::Value = serde_json::from_str(&contents).unwrap();
4266 assert!(parsed.get("node_id").is_some());
4267
4268 handle.shutdown().await.unwrap();
4269 }
4270
4271 #[tokio::test]
4276 async fn shutdown_and_wait_persists_state_before_returning() {
4277 let tmp = tempfile::tempdir().unwrap();
4278 let state_dir = tmp.path().to_path_buf();
4279
4280 let config = DhtConfig {
4281 bind_addr: "127.0.0.1:0".parse().unwrap(),
4282 bootstrap_nodes: Vec::new(),
4283 state_dir: Some(state_dir.clone()),
4284 ..DhtConfig::default()
4285 };
4286 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4287
4288 tokio::time::sleep(Duration::from_millis(50)).await;
4290
4291 let state_path = state_dir.join("dht_state.json");
4295 let _ = std::fs::remove_file(&state_path);
4296 assert!(!state_path.exists());
4297
4298 handle.shutdown_and_wait().await.unwrap();
4299
4300 assert!(
4304 state_path.exists(),
4305 "shutdown_and_wait must persist state BEFORE returning"
4306 );
4307 }
4308
4309 #[tokio::test]
4312 async fn shutdown_and_wait_after_actor_exit_returns_shutdown_error() {
4313 let config = DhtConfig {
4314 bind_addr: "127.0.0.1:0".parse().unwrap(),
4315 bootstrap_nodes: Vec::new(),
4316 ..DhtConfig::default()
4317 };
4318 let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4319 handle.shutdown().await.unwrap();
4320 tokio::time::sleep(Duration::from_millis(50)).await;
4322 let result = handle.shutdown_and_wait().await;
4323 assert!(
4324 matches!(result, Err(Error::Shutdown)),
4325 "expected Error::Shutdown, got {result:?}"
4326 );
4327 }
4328
4329 #[test]
4330 fn dht_config_enable_multi_address_default_true() {
4331 let cfg = DhtConfig::default();
4332 assert!(cfg.enable_multi_address);
4333 }
4334
4335 #[test]
4336 fn dht_config_v6_enable_multi_address_default_true() {
4337 let cfg = DhtConfig::default_v6();
4338 assert!(cfg.enable_multi_address);
4339 }
4340}