Skip to main content

irontide_dht/
actor.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_precision_loss,
4    clippy::cast_possible_wrap,
5    clippy::cast_sign_loss,
6    clippy::unchecked_time_subtraction,
7    reason = "M175: DHT actor — KRPC field widths fixed by spec; time deltas use post-bootstrap Instants captured well after process start"
8)]
9
10//! DHT actor: single-owner event loop managing the routing table, UDP socket,
11//! and pending queries.
12
13use std::collections::HashMap;
14use std::net::SocketAddr;
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicU16, Ordering};
18use std::time::{Duration, Instant};
19
20use dashmap::DashMap;
21use tokio::net::UdpSocket;
22use tokio::sync::{mpsc, oneshot};
23use tracing::{debug, trace, warn};
24
25use irontide_core::{AddressFamily, Id20};
26
27use crate::bep44::{self, ImmutableItem, MAX_SALT_SIZE, MAX_VALUE_SIZE, MutableItem};
28use crate::compact::CompactNodeInfo;
29use crate::error::{Error, Result};
30use crate::krpc::{
31    GetPeersResponse, KrpcBody, KrpcMessage, KrpcQuery, KrpcResponse, SampleInfohashesResponse,
32    TransactionId,
33};
34use crate::lookup::{FindNodeCallbacks, IterativeLookup};
35use crate::node_id::{self, ExternalIpVoter, IpVoteSource};
36use crate::peer_store::PeerStore;
37use crate::routing_table::{K, RoutingTable};
38use crate::storage::{DhtStorage, InMemoryDhtStorage};
39
40#[allow(unused_imports)]
41use ed25519_dalek::SigningKey;
42
43/// Token-bucket rate limiter for outgoing KRPC queries.
44///
45/// Permits refill continuously based on elapsed real time. `try_acquire` is
46/// non-blocking: it either consumes a permit or returns `false` immediately.
47struct QueryRateLimiter {
48    permits: u32,
49    max_permits: u32,
50    last_refill: Instant,
51    refill_rate: u32,
52}
53
54impl QueryRateLimiter {
55    /// Create a new limiter with `rate` permits per second. Starts with a full
56    /// bucket so the first burst of queries is not artificially delayed.
57    fn new(rate: usize) -> Self {
58        Self {
59            permits: rate as u32,
60            max_permits: rate as u32,
61            last_refill: Instant::now(),
62            refill_rate: rate as u32,
63        }
64    }
65
66    /// Attempt to consume one permit. Returns `true` if a permit was available,
67    /// `false` if the bucket is empty. Never blocks.
68    fn try_acquire(&mut self) -> bool {
69        self.refill();
70        if self.permits > 0 {
71            self.permits -= 1;
72            true
73        } else {
74            false
75        }
76    }
77
78    /// Refill the bucket based on elapsed time since the last refill. Caps at
79    /// `max_permits`. Only updates `last_refill` when at least one permit is
80    /// added (avoids drift on very fast calls).
81    fn refill(&mut self) {
82        let elapsed = self.last_refill.elapsed();
83        let elapsed_secs = elapsed.as_secs_f64();
84        let new_permits = (elapsed_secs * f64::from(self.refill_rate)) as u32;
85        if new_permits > 0 {
86            self.permits = (self.permits + new_permits).min(self.max_permits);
87            self.last_refill = Instant::now();
88        }
89    }
90}
91
92/// Arc-compatible rate limiter wrapping [`QueryRateLimiter`] in a `Mutex`.
93///
94/// Used by both the DHT actor and spawned `DhtLookup` tasks.
95pub(crate) struct SharedRateLimiter {
96    inner: parking_lot::Mutex<QueryRateLimiter>,
97}
98
99impl SharedRateLimiter {
100    /// Create a new shared rate limiter with `rate` permits per second.
101    pub fn new(rate: usize) -> Self {
102        Self {
103            inner: parking_lot::Mutex::new(QueryRateLimiter::new(rate)),
104        }
105    }
106
107    /// Non-blocking acquire. Returns `true` if a permit was available.
108    pub fn try_acquire(&self) -> bool {
109        self.inner.lock().try_acquire()
110    }
111
112    /// Async acquire — sleeps briefly between attempts until a permit is
113    /// available. At 250 permits/sec the bucket refills ~1 permit per 4ms.
114    pub async fn acquire(&self) {
115        loop {
116            if self.try_acquire() {
117                return;
118            }
119            tokio::time::sleep(Duration::from_millis(4)).await;
120        }
121    }
122}
123
124/// Configuration for the DHT.
125#[derive(Debug, Clone)]
126pub struct DhtConfig {
127    /// Address to bind the UDP socket.
128    pub bind_addr: SocketAddr,
129    /// Bootstrap nodes (host:port strings resolved at startup).
130    pub bootstrap_nodes: Vec<String>,
131    /// Our node ID. Generated randomly if `None`.
132    pub own_id: Option<Id20>,
133    /// Max outgoing queries per second (0 = unlimited).
134    pub queries_per_second: usize,
135    /// Timeout for individual KRPC queries.
136    pub query_timeout: Duration,
137    /// Address family for this DHT instance (determines compact format and DNS filtering).
138    pub address_family: AddressFamily,
139    /// BEP 42: Enforce node ID verification when inserting into routing table.
140    /// Nodes with IDs that don't match their IP are rejected.
141    pub enforce_node_id: bool,
142    /// BEP 42: Restrict routing table to one node per IP address.
143    pub restrict_routing_ips: bool,
144    /// BEP 44: Maximum number of stored DHT items (immutable + mutable).
145    pub dht_max_items: usize,
146    /// BEP 44: Lifetime of DHT items in seconds before expiry.
147    pub dht_item_lifetime_secs: u64,
148    /// Maximum number of nodes in the routing table. Prevents unbounded growth
149    /// from adversarial node injection. Default: 512 (matches rqbit).
150    pub max_routing_nodes: usize,
151    /// Directory for persisting DHT routing table state as JSON.
152    /// When set, the actor saves/loads `dht_state.json` (V4) or
153    /// `dht_state_v6.json` (V6) via atomic temp-file + rename.
154    pub state_dir: Option<PathBuf>,
155    /// BEP 43: Read-only mode. When enabled, outgoing queries include `ro: 1`
156    /// and the node does not send `announce_peer` messages. Other nodes should
157    /// not add us to their routing tables.
158    pub read_only_mode: bool,
159    /// BEP 45: Include `want` in outgoing `find_node`/`get_peers` to request
160    /// both IPv4 and IPv6 nodes from dual-stack peers.
161    pub enable_multi_address: bool,
162}
163
164impl Default for DhtConfig {
165    fn default() -> Self {
166        Self {
167            bind_addr: "0.0.0.0:0".parse().unwrap(),
168            bootstrap_nodes: vec![
169                "router.bittorrent.com:6881".into(),
170                "dht.transmissionbt.com:6881".into(),
171                "router.utorrent.com:6881".into(),
172            ],
173            own_id: None,
174            queries_per_second: 250,
175            query_timeout: Duration::from_secs(5),
176            address_family: AddressFamily::V4,
177            enforce_node_id: false,
178            restrict_routing_ips: true,
179            dht_max_items: 700,
180            dht_item_lifetime_secs: 7200,
181            max_routing_nodes: 512,
182            state_dir: None,
183            read_only_mode: false,
184            enable_multi_address: true,
185        }
186    }
187}
188
189impl DhtConfig {
190    /// Default configuration for an IPv6 DHT instance (BEP 24).
191    #[must_use]
192    pub fn default_v6() -> Self {
193        Self {
194            bind_addr: "[::]:0".parse().unwrap(),
195            bootstrap_nodes: vec![
196                "router.bittorrent.com:6881".into(),
197                "dht.libtorrent.org:25401".into(),
198            ],
199            own_id: None,
200            queries_per_second: 250,
201            query_timeout: Duration::from_secs(5),
202            address_family: AddressFamily::V6,
203            enforce_node_id: false,
204            restrict_routing_ips: true,
205            dht_max_items: 700,
206            dht_item_lifetime_secs: 7200,
207            max_routing_nodes: 512,
208            state_dir: None,
209            read_only_mode: false,
210            enable_multi_address: true,
211        }
212    }
213}
214
215/// Runtime statistics for the DHT.
216#[derive(Debug, Clone)]
217pub struct DhtStats {
218    /// Our current node ID (may differ from startup ID after BEP 42 regeneration).
219    pub node_id: Id20,
220    /// Number of nodes in the routing table.
221    pub routing_table_size: usize,
222    /// Number of k-buckets in use.
223    pub bucket_count: usize,
224    /// Number of distinct info hashes tracked in the peer store.
225    pub peer_store_info_hashes: usize,
226    /// Total number of peers across all info hashes.
227    pub peer_store_peers: usize,
228    /// Number of in-flight KRPC queries.
229    pub pending_queries: usize,
230    /// Total KRPC queries sent since startup.
231    pub total_queries_sent: u64,
232    /// Total KRPC responses received since startup.
233    pub total_responses_received: u64,
234    /// Number of BEP 44 items stored (immutable + mutable).
235    pub dht_item_count: usize,
236}
237
238/// Result of a `sample_infohashes` query (BEP 51).
239#[derive(Debug, Clone)]
240pub struct SampleInfohashesResult {
241    /// Minimum seconds before querying the same node again.
242    pub interval: i64,
243    /// Estimated total info hashes in the remote node's store.
244    pub num: i64,
245    /// Sampled info hashes.
246    pub samples: Vec<Id20>,
247    /// Closer nodes for traversal.
248    pub nodes: Vec<CompactNodeInfo>,
249}
250
251/// A cloneable handle to the DHT actor.
252#[derive(Clone, Debug)]
253pub struct DhtHandle {
254    tx: mpsc::Sender<DhtCommand>,
255}
256
257enum DhtCommand {
258    GetPeers {
259        info_hash: Id20,
260        reply: mpsc::UnboundedSender<Vec<SocketAddr>>,
261    },
262    Announce {
263        info_hash: Id20,
264        port: u16,
265        reply: oneshot::Sender<Result<()>>,
266    },
267    Stats {
268        reply: oneshot::Sender<DhtStats>,
269    },
270    UpdateExternalIp {
271        ip: std::net::IpAddr,
272        source: IpVoteSource,
273    },
274    GetImmutable {
275        target: Id20,
276        reply: oneshot::Sender<Result<Option<Vec<u8>>>>,
277    },
278    PutImmutable {
279        value: Vec<u8>,
280        reply: oneshot::Sender<Result<Id20>>,
281    },
282    GetMutable {
283        public_key: [u8; 32],
284        salt: Vec<u8>,
285        #[allow(clippy::type_complexity)]
286        reply: oneshot::Sender<Result<Option<(Vec<u8>, i64)>>>,
287    },
288    PutMutable {
289        keypair_bytes: [u8; 32],
290        value: Vec<u8>,
291        seq: i64,
292        salt: Vec<u8>,
293        reply: oneshot::Sender<Result<Id20>>,
294    },
295    SampleInfohashes {
296        target: Id20,
297        reply: oneshot::Sender<Result<SampleInfohashesResult>>,
298    },
299    GetRoutingNodes {
300        reply: oneshot::Sender<Vec<(Id20, SocketAddr)>>,
301    },
302    /// M173 Lane B (B7): synchronously persist the routing table to
303    /// `dht_state.json` and reply when the rename has completed.
304    /// Used by the `apply_settings` DHT-restart phase so the saved
305    /// state survives a runtime `enable_dht: true → false → true`
306    /// cycle.
307    SaveRoutingTable {
308        reply: oneshot::Sender<Result<()>>,
309    },
310    /// Optional reply lets the caller block until the actor has
311    /// drained — used by the `apply_settings` DHT-stop phase. The
312    /// actor saves the routing table BEFORE acking, so the on-disk
313    /// state is up-to-date when the new actor starts.
314    Shutdown {
315        reply: Option<oneshot::Sender<()>>,
316    },
317}
318
319impl DhtHandle {
320    /// Start the DHT actor and return a handle plus an IP consensus channel.
321    ///
322    /// The consensus channel fires when the BEP 42 `ExternalIpVoter` reaches
323    /// agreement on our external IP address.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the UDP socket cannot be bound.
328    pub async fn start(config: DhtConfig) -> Result<(Self, mpsc::Receiver<std::net::IpAddr>)> {
329        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await?);
330        let local_addr = socket.local_addr()?;
331        debug!(addr = %local_addr, "DHT socket bound");
332
333        let (tx, rx) = mpsc::channel(256);
334        let (ip_consensus_tx, ip_consensus_rx) = mpsc::channel(4);
335        let handle = Self { tx };
336
337        let actor = DhtActor::new(config, socket, rx, ip_consensus_tx);
338        tokio::spawn(actor.run());
339
340        Ok((handle, ip_consensus_rx))
341    }
342
343    /// Notify the DHT of our external IP (from NAT/tracker discovery).
344    ///
345    /// # Errors
346    ///
347    /// Returns [`Error::Shutdown`] if the actor has stopped.
348    pub async fn update_external_ip(
349        &self,
350        ip: std::net::IpAddr,
351        source: IpVoteSource,
352    ) -> Result<()> {
353        self.tx
354            .send(DhtCommand::UpdateExternalIp { ip, source })
355            .await
356            .map_err(|_| Error::Shutdown)
357    }
358
359    /// Discover peers for an `info_hash`.
360    ///
361    /// Returns a channel that receives batches of peers as they are found.
362    /// The channel closes when the search is exhausted.
363    ///
364    /// # Errors
365    ///
366    /// Returns [`Error::Shutdown`] if the actor has stopped.
367    pub async fn get_peers(
368        &self,
369        info_hash: Id20,
370    ) -> Result<mpsc::UnboundedReceiver<Vec<SocketAddr>>> {
371        let (reply_tx, reply_rx) = mpsc::unbounded_channel();
372        self.tx
373            .send(DhtCommand::GetPeers {
374                info_hash,
375                reply: reply_tx,
376            })
377            .await
378            .map_err(|_| Error::Shutdown)?;
379        Ok(reply_rx)
380    }
381
382    /// Announce that we have peers for an `info_hash` on the given port.
383    ///
384    /// # Errors
385    ///
386    /// Returns [`Error::Shutdown`] if the actor has stopped.
387    pub async fn announce(&self, info_hash: Id20, port: u16) -> Result<()> {
388        let (reply_tx, reply_rx) = oneshot::channel();
389        self.tx
390            .send(DhtCommand::Announce {
391                info_hash,
392                port,
393                reply: reply_tx,
394            })
395            .await
396            .map_err(|_| Error::Shutdown)?;
397        reply_rx.await.map_err(|_| Error::Shutdown)?
398    }
399
400    /// Get current DHT statistics.
401    ///
402    /// # Errors
403    ///
404    /// Returns [`Error::Shutdown`] if the actor has stopped.
405    pub async fn stats(&self) -> Result<DhtStats> {
406        let (reply_tx, reply_rx) = oneshot::channel();
407        self.tx
408            .send(DhtCommand::Stats { reply: reply_tx })
409            .await
410            .map_err(|_| Error::Shutdown)?;
411        reply_rx.await.map_err(|_| Error::Shutdown)
412    }
413
414    /// Get the number of nodes currently in the routing table (M171 D4).
415    ///
416    /// Thin accessor over [`DhtStats::routing_table_size`] used by the qBt
417    /// v2 `transferInfo.dht_nodes` field and the DHT pseudo-tracker's
418    /// `num_peers` column. Returns `Ok(0)` when the routing table is
419    /// empty — including immediately after startup, before bootstrap has
420    /// populated any buckets.
421    ///
422    /// # Errors
423    ///
424    /// Returns [`Error::Shutdown`] if the actor has stopped.
425    pub async fn node_count(&self) -> Result<usize> {
426        Ok(self.stats().await?.routing_table_size)
427    }
428
429    /// Shut down the DHT actor (fire-and-forget).
430    ///
431    /// Returns once the shutdown command has been queued. The actor
432    /// will persist the routing table (`dht_state.json`) before
433    /// terminating, but this method does NOT wait for that. For
434    /// runtime DHT-restart paths that need to be sure the state is
435    /// on disk before starting a new actor, use
436    /// [`Self::shutdown_and_wait`].
437    ///
438    /// # Errors
439    ///
440    /// Returns [`Error::Shutdown`] if the command channel has
441    /// already closed.
442    pub async fn shutdown(&self) -> Result<()> {
443        self.tx
444            .send(DhtCommand::Shutdown { reply: None })
445            .await
446            .map_err(|_| Error::Shutdown)
447    }
448
449    /// M173 Lane B (B7): shut down the DHT actor and wait for it to
450    /// fully drain — including persisting the routing table to
451    /// `dht_state.json`.
452    ///
453    /// Returns `Ok(())` once the actor has saved its state and
454    /// terminated. Used by the `apply_settings` DHT-restart phase so
455    /// the new DHT actor (started with the same `state_dir`) can
456    /// load the pre-restart state.
457    ///
458    /// # Errors
459    ///
460    /// Returns [`Error::Shutdown`] if the actor exits before
461    /// sending its reply (typically because it had already shut
462    /// down for another reason).
463    pub async fn shutdown_and_wait(&self) -> Result<()> {
464        let (reply_tx, reply_rx) = oneshot::channel();
465        self.tx
466            .send(DhtCommand::Shutdown {
467                reply: Some(reply_tx),
468            })
469            .await
470            .map_err(|_| Error::Shutdown)?;
471        reply_rx.await.map_err(|_| Error::Shutdown)
472    }
473
474    /// M173 Lane B (B7): synchronously persist the routing table.
475    ///
476    /// Returns `Ok(())` once `dht_state.json` has been written and
477    /// renamed atomically. Distinct from
478    /// [`Self::shutdown_and_wait`] in that the actor continues
479    /// running afterwards — used by callers that want to checkpoint
480    /// state without restarting DHT.
481    ///
482    /// # Errors
483    ///
484    /// Returns [`Error::Shutdown`] if the actor channel has closed.
485    /// May return an underlying I/O error wrapped in
486    /// `Error::Shutdown` if the persist itself failed (the actor
487    /// returns the error verbatim, but the channel-closed case is
488    /// indistinguishable from the I/O case at the API boundary).
489    pub async fn save_routing_table(&self) -> Result<()> {
490        let (reply_tx, reply_rx) = oneshot::channel();
491        self.tx
492            .send(DhtCommand::SaveRoutingTable { reply: reply_tx })
493            .await
494            .map_err(|_| Error::Shutdown)?;
495        reply_rx.await.map_err(|_| Error::Shutdown)?
496    }
497
498    /// Store an immutable item in the DHT (BEP 44).
499    ///
500    /// Returns the SHA-1 target hash that can be used to retrieve the item.
501    /// The value must be valid bencoded data, max 1000 bytes.
502    ///
503    /// # Errors
504    ///
505    /// Returns [`Error::Shutdown`] if the actor has stopped, or a BEP 44
506    /// validation error if the value exceeds size limits.
507    pub async fn put_immutable(&self, value: Vec<u8>) -> Result<Id20> {
508        let (reply_tx, reply_rx) = oneshot::channel();
509        self.tx
510            .send(DhtCommand::PutImmutable {
511                value,
512                reply: reply_tx,
513            })
514            .await
515            .map_err(|_| Error::Shutdown)?;
516        reply_rx.await.map_err(|_| Error::Shutdown)?
517    }
518
519    /// Retrieve an immutable item from the DHT (BEP 44).
520    ///
521    /// Returns the raw bencoded value if found, `None` if not.
522    ///
523    /// # Errors
524    ///
525    /// Returns [`Error::Shutdown`] if the actor has stopped.
526    pub async fn get_immutable(&self, target: Id20) -> Result<Option<Vec<u8>>> {
527        let (reply_tx, reply_rx) = oneshot::channel();
528        self.tx
529            .send(DhtCommand::GetImmutable {
530                target,
531                reply: reply_tx,
532            })
533            .await
534            .map_err(|_| Error::Shutdown)?;
535        reply_rx.await.map_err(|_| Error::Shutdown)?
536    }
537
538    /// Store a mutable item in the DHT (BEP 44).
539    ///
540    /// - `keypair_bytes`: 32-byte ed25519 seed (secret key)
541    /// - `value`: bencoded data, max 1000 bytes
542    /// - `seq`: sequence number (must be higher than any previously stored)
543    /// - `salt`: optional salt for sub-key isolation (max 64 bytes)
544    ///
545    /// Returns the target hash.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`Error::Shutdown`] if the actor has stopped, or a BEP 44
550    /// validation error if value or salt exceeds size limits.
551    pub async fn put_mutable(
552        &self,
553        keypair_bytes: [u8; 32],
554        value: Vec<u8>,
555        seq: i64,
556        salt: Vec<u8>,
557    ) -> Result<Id20> {
558        let (reply_tx, reply_rx) = oneshot::channel();
559        self.tx
560            .send(DhtCommand::PutMutable {
561                keypair_bytes,
562                value,
563                seq,
564                salt,
565                reply: reply_tx,
566            })
567            .await
568            .map_err(|_| Error::Shutdown)?;
569        reply_rx.await.map_err(|_| Error::Shutdown)?
570    }
571
572    /// Query a DHT node for a random sample of info hashes (BEP 51).
573    ///
574    /// Routes toward `target` to find the responding node. Returns sampled
575    /// hashes, the interval before re-querying, and closer nodes for traversal.
576    ///
577    /// # Errors
578    ///
579    /// Returns [`Error::Shutdown`] if the actor has stopped.
580    pub async fn sample_infohashes(&self, target: Id20) -> Result<SampleInfohashesResult> {
581        let (reply_tx, reply_rx) = oneshot::channel();
582        self.tx
583            .send(DhtCommand::SampleInfohashes {
584                target,
585                reply: reply_tx,
586            })
587            .await
588            .map_err(|_| Error::Shutdown)?;
589        reply_rx.await.map_err(|_| Error::Shutdown)?
590    }
591
592    /// Retrieve a mutable item from the DHT (BEP 44).
593    ///
594    /// Returns `(value, seq)` if found, `None` if not.
595    ///
596    /// # Errors
597    ///
598    /// Returns [`Error::Shutdown`] if the actor has stopped.
599    pub async fn get_mutable(
600        &self,
601        public_key: [u8; 32],
602        salt: Vec<u8>,
603    ) -> Result<Option<(Vec<u8>, i64)>> {
604        let (reply_tx, reply_rx) = oneshot::channel();
605        self.tx
606            .send(DhtCommand::GetMutable {
607                public_key,
608                salt,
609                reply: reply_tx,
610            })
611            .await
612            .map_err(|_| Error::Shutdown)?;
613        reply_rx.await.map_err(|_| Error::Shutdown)?
614    }
615
616    /// Return all nodes currently in the DHT routing table.
617    pub async fn get_routing_nodes(&self) -> Vec<(Id20, SocketAddr)> {
618        let (reply_tx, reply_rx) = oneshot::channel();
619        let _ = self
620            .tx
621            .send(DhtCommand::GetRoutingNodes { reply: reply_tx })
622            .await;
623        reply_rx.await.unwrap_or_default()
624    }
625}
626
627// ---- Actor internals ----
628
629struct DhtActor {
630    config: DhtConfig,
631    address_family: AddressFamily,
632    /// UDP socket shared with spawned `DhtLookup` tasks.
633    socket: Arc<UdpSocket>,
634    rx: mpsc::Receiver<DhtCommand>,
635    /// Routing table shared with spawned `DhtLookup` tasks.
636    routing_table: Arc<parking_lot::RwLock<RoutingTable>>,
637    peer_store: PeerStore,
638    /// BEP 44 item storage (immutable + mutable).
639    item_store: Box<dyn DhtStorage + Send>,
640    /// Pending KRPC queries shared with spawned `DhtLookup` tasks.
641    pending: Arc<DashMap<u16, PendingQuery>>,
642    /// Atomic transaction ID counter shared with spawned `DhtLookup` tasks.
643    next_txn_id: Arc<AtomicU16>,
644    stats: ActorStats,
645    /// Announce tokens collected from active lookups via the token channel.
646    announce_tokens: HashMap<Id20, HashMap<Id20, (SocketAddr, Vec<u8>)>>,
647    /// Sender for lookup token channel (cloned to each `DhtLookup`).
648    lookup_token_tx: mpsc::UnboundedSender<(Id20, Id20, SocketAddr, Vec<u8>)>,
649    /// Receiver for lookup token channel.
650    lookup_token_rx: mpsc::UnboundedReceiver<(Id20, Id20, SocketAddr, Vec<u8>)>,
651    /// Sender for lookup node channel (cloned to each `DhtLookup`).
652    lookup_node_tx: mpsc::UnboundedSender<(Id20, SocketAddr)>,
653    /// Receiver for lookup node channel.
654    lookup_node_rx: mpsc::UnboundedReceiver<(Id20, SocketAddr)>,
655    /// Active `DhtLookup` task handles, keyed by `info_hash`.
656    active_lookups: HashMap<Id20, tokio::task::JoinHandle<()>>,
657    /// Active BEP 44 get lookups.
658    item_lookups: HashMap<Id20, ItemLookupState>,
659    /// Active BEP 44 put operations (waiting for tokens before sending puts).
660    item_put_ops: HashMap<Id20, ItemPutState>,
661    /// BEP 42 external IP voter: aggregates IP reports from KRPC responses.
662    ip_voter: ExternalIpVoter,
663    /// Callback channel: fires when voter consensus changes.
664    ip_consensus_tx: mpsc::Sender<std::net::IpAddr>,
665    /// Pending one-shot replies for `sample_infohashes` queries.
666    sample_replies: HashMap<u16, oneshot::Sender<Result<SampleInfohashesResult>>>,
667    /// Token-bucket rate limiter shared with spawned `DhtLookup` tasks.
668    rate_limiter: Arc<SharedRateLimiter>,
669    /// Active iterative bootstrap lookup (`find_node` self-lookup after initial bootstrap).
670    bootstrap_lookup: Option<IterativeLookup<FindNodeCallbacks>>,
671    /// Whether initial bootstrap (`FindNodeLookup`) has completed (M97).
672    bootstrap_complete: bool,
673    /// M146: Queued `get_peers` waiting for at least 1 routing table node.
674    /// Lowered from M97's threshold=8 to threshold=1 (empty-table only).
675    pending_get_peers: Vec<(Id20, mpsc::UnboundedSender<Vec<SocketAddr>>)>,
676    /// Bootstrap timeout timer — forces `bootstrap_complete` after 10s (M97).
677    bootstrap_timeout: Option<std::pin::Pin<Box<tokio::time::Sleep>>>,
678    /// Timestamp of last `ping_questionable_nodes()` call for two-phase gating (M105).
679    last_ping: Instant,
680    /// Receiver for DNS-resolved bootstrap addresses from background tasks (M105).
681    /// Set during `bootstrap()`, drained in the main select! loop, cleared when
682    /// all spawned DNS tasks complete (channel closes).
683    dns_bootstrap_rx: Option<mpsc::Receiver<Vec<SocketAddr>>>,
684}
685
686struct ActorStats {
687    total_queries_sent: u64,
688    total_responses_received: u64,
689}
690
691/// A pending KRPC query awaiting a response.
692pub(crate) struct PendingQuery {
693    pub sent_at: Instant,
694    pub addr: SocketAddr,
695    pub kind: PendingQueryKind,
696    pub node_id: Option<Id20>,
697    /// If set, the response is routed through this oneshot instead of being
698    /// handled by the actor's `handle_response()` match arms.
699    pub response_tx: Option<oneshot::Sender<PendingQueryResponse>>,
700}
701
702/// Raw KRPC response forwarded to a `DhtLookup` via oneshot.
703pub(crate) struct PendingQueryResponse {
704    pub sender_id: Id20,
705    pub response: KrpcResponse,
706}
707
708#[derive(Debug)]
709pub(crate) enum PendingQueryKind {
710    Ping,
711    FindNode,
712    GetPeers {
713        info_hash: Id20,
714    },
715    AnnouncePeer,
716    /// BEP 44: outgoing get item query.
717    GetItem {
718        target: Id20,
719    },
720    /// BEP 44: outgoing put item query.
721    PutItem,
722    /// BEP 51: outgoing `sample_infohashes` query.
723    SampleInfohashes,
724}
725
726/// State for an active BEP 44 get lookup.
727enum ItemLookupState {
728    Immutable {
729        #[allow(clippy::type_complexity)]
730        reply: Option<oneshot::Sender<Result<Option<Vec<u8>>>>>,
731        queried: std::collections::HashSet<Id20>,
732    },
733    Mutable {
734        salt: Vec<u8>,
735        #[allow(clippy::type_complexity)]
736        reply: Option<oneshot::Sender<Result<Option<(Vec<u8>, i64)>>>>,
737        best_seq: i64,
738        best_value: Option<Vec<u8>>,
739        queried: std::collections::HashSet<Id20>,
740    },
741}
742
743/// State for an active BEP 44 put operation (waiting for tokens then sending puts).
744enum ItemPutState {
745    Immutable {
746        item: crate::bep44::ImmutableItem,
747        tokens: HashMap<Id20, (SocketAddr, Vec<u8>)>,
748        sent_puts: usize,
749        reply: Option<oneshot::Sender<Result<Id20>>>,
750    },
751    Mutable {
752        item: crate::bep44::MutableItem,
753        tokens: HashMap<Id20, (SocketAddr, Vec<u8>)>,
754        sent_puts: usize,
755        reply: Option<oneshot::Sender<Result<Id20>>>,
756    },
757}
758
759/// Parameters for a single BEP 44 put-item query.
760struct PutItemParams {
761    addr: SocketAddr,
762    token: Vec<u8>,
763    value: Vec<u8>,
764    key: Option<[u8; 32]>,
765    signature: Option<[u8; 64]>,
766    seq: Option<i64>,
767    salt: Option<Vec<u8>>,
768}
769
770/// JSON serialization format for persisted DHT routing table state.
771#[derive(serde::Serialize, serde::Deserialize)]
772struct DhtState {
773    /// Our node ID as a hex string.
774    node_id: String,
775    /// All nodes from the routing table.
776    nodes: Vec<DhtNodeEntry>,
777}
778
779/// A single node entry in the persisted JSON state.
780#[derive(serde::Serialize, serde::Deserialize)]
781struct DhtNodeEntry {
782    /// Node ID as a hex string.
783    id: String,
784    /// Socket address as "ip:port".
785    addr: String,
786}
787
788/// Interval for routing table maintenance.
789const MAINTENANCE_INTERVAL: Duration = Duration::from_mins(1);
790/// Interval for peer store cleanup.
791const CLEANUP_INTERVAL: Duration = Duration::from_mins(5);
792/// Interval for pinging questionable nodes.
793const PING_INTERVAL: Duration = Duration::from_secs(5);
794
795impl DhtActor {
796    fn new(
797        config: DhtConfig,
798        socket: Arc<UdpSocket>,
799        rx: mpsc::Receiver<DhtCommand>,
800        ip_consensus_tx: mpsc::Sender<std::net::IpAddr>,
801    ) -> Self {
802        let own_id = config.own_id.unwrap_or_else(generate_node_id);
803        let address_family = config.address_family;
804        let restrict_ips = config.restrict_routing_ips;
805        let max_routing_nodes = config.max_routing_nodes;
806        debug!(id = %own_id, family = ?address_family, "DHT node ID");
807
808        let max_items = config.dht_max_items;
809        let queries_per_second = config.queries_per_second;
810
811        let (lookup_token_tx, lookup_token_rx) = mpsc::unbounded_channel();
812        let (lookup_node_tx, lookup_node_rx) = mpsc::unbounded_channel();
813
814        let mut actor = Self {
815            config,
816            address_family,
817            socket,
818            rx,
819            routing_table: Arc::new(parking_lot::RwLock::new(RoutingTable::with_config(
820                own_id,
821                restrict_ips,
822                max_routing_nodes,
823            ))),
824            peer_store: PeerStore::new(),
825            item_store: Box::new(InMemoryDhtStorage::new(max_items)),
826            pending: Arc::new(DashMap::new()),
827            next_txn_id: Arc::new(AtomicU16::new(1)),
828            stats: ActorStats {
829                total_queries_sent: 0,
830                total_responses_received: 0,
831            },
832            announce_tokens: HashMap::new(),
833            lookup_token_tx,
834            lookup_token_rx,
835            lookup_node_tx,
836            lookup_node_rx,
837            active_lookups: HashMap::new(),
838            item_lookups: HashMap::new(),
839            item_put_ops: HashMap::new(),
840            ip_voter: ExternalIpVoter::new(10),
841            ip_consensus_tx,
842            sample_replies: HashMap::new(),
843            rate_limiter: Arc::new(SharedRateLimiter::new(queries_per_second)),
844            bootstrap_lookup: None,
845            bootstrap_complete: false,
846            pending_get_peers: Vec::new(),
847            bootstrap_timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(10)))),
848            last_ping: Instant::now(),
849            dns_bootstrap_rx: None,
850        };
851
852        // Load persisted routing table from JSON (if state_dir is configured).
853        // Loaded nodes are marked Questionable and will be verified via pings.
854        actor.load_routing_table();
855
856        actor
857    }
858
859    async fn run(mut self) {
860        // Bootstrap
861        self.bootstrap().await;
862
863        let mut recv_buf = vec![0u8; 65535];
864        let mut maintenance_tick = tokio::time::interval(MAINTENANCE_INTERVAL);
865        let mut cleanup_tick = tokio::time::interval(CLEANUP_INTERVAL);
866        let mut query_timeout_tick = tokio::time::interval(self.config.query_timeout);
867        let mut ping_tick = tokio::time::interval(PING_INTERVAL);
868
869        loop {
870            tokio::select! {
871                // Incoming UDP packets
872                result = self.socket.recv_from(&mut recv_buf) => {
873                    match result {
874                        Ok((n, addr)) => {
875                            self.handle_packet(&recv_buf[..n], addr).await;
876                        }
877                        Err(e) => {
878                            warn!(error = %e, "UDP recv error");
879                        }
880                    }
881                }
882
883                // Commands from handle
884                cmd = self.rx.recv() => {
885                    match cmd {
886                        Some(DhtCommand::GetPeers { info_hash, reply }) => {
887                            self.start_get_peers(info_hash, reply);
888                        }
889                        Some(DhtCommand::Announce { info_hash, port, reply }) => {
890                            self.handle_announce(info_hash, port, reply).await;
891                        }
892                        Some(DhtCommand::Stats { reply }) => {
893                            let _ = reply.send(self.make_stats());
894                        }
895                        Some(DhtCommand::UpdateExternalIp { ip, source }) => {
896                            let source_id = source.source_id();
897                            if let Some(consensus_ip) = self.ip_voter.add_vote(source_id, ip) {
898                                debug!(%consensus_ip, "BEP 42: external IP consensus (via NAT/tracker)");
899                                let _ = self.ip_consensus_tx.try_send(consensus_ip);
900                                self.regenerate_node_id(consensus_ip);
901                            }
902                        }
903                        Some(DhtCommand::GetImmutable { target, reply }) => {
904                            self.handle_get_immutable(target, reply).await;
905                        }
906                        Some(DhtCommand::PutImmutable { value, reply }) => {
907                            self.handle_put_immutable(value, reply).await;
908                        }
909                        Some(DhtCommand::GetMutable { public_key, salt, reply }) => {
910                            self.handle_get_mutable(public_key, salt, reply).await;
911                        }
912                        Some(DhtCommand::PutMutable { keypair_bytes, value, seq, salt, reply }) => {
913                            self.handle_put_mutable(keypair_bytes, value, seq, salt, reply).await;
914                        }
915                        Some(DhtCommand::SampleInfohashes { target, reply }) => {
916                            self.handle_sample_infohashes(target, reply).await;
917                        }
918                        Some(DhtCommand::GetRoutingNodes { reply }) => {
919                            let nodes = self.routing_table.read().all_nodes();
920                            let _ = reply.send(nodes);
921                        }
922                        Some(DhtCommand::SaveRoutingTable { reply }) => {
923                            // M173 Lane B (B7): synchronous persist
924                            // with reply. save_routing_table itself
925                            // is best-effort (logs and continues on
926                            // I/O error); we report Ok regardless so
927                            // the caller knows the actor processed
928                            // the request. If state_dir is None, the
929                            // function is a no-op and we still ack.
930                            self.save_routing_table();
931                            let _ = reply.send(Ok(()));
932                        }
933                        Some(DhtCommand::Shutdown { reply }) => {
934                            debug!("DHT shutting down — persisting routing table");
935                            // M173 Lane B (B7): persist on shutdown
936                            // BEFORE acking the reply. Without this,
937                            // a runtime `enable_dht: true → false →
938                            // true` cycle drops recent node state on
939                            // the floor — the new actor starts with
940                            // stale on-disk state.
941                            self.save_routing_table();
942                            if let Some(tx) = reply {
943                                let _ = tx.send(());
944                            }
945                            return;
946                        }
947                        None => {
948                            debug!("DHT shutting down (cmd channel closed) — persisting routing table");
949                            // Same persist-on-exit guarantee even
950                            // when shutdown is via channel-drop
951                            // (e.g. session teardown).
952                            self.save_routing_table();
953                            return;
954                        }
955                    }
956                }
957
958                // Expire timed-out queries and advance stalled lookups
959                // (like libtorrent's traversal_algorithm::failed → add_requests)
960                _ = query_timeout_tick.tick() => {
961                    self.expire_queries_and_advance_lookups().await;
962                }
963
964                // Periodic maintenance (routing table housekeeping)
965                _ = maintenance_tick.tick() => {
966                    self.maintenance().await;
967                }
968
969                // Peer store and item store cleanup
970                _ = cleanup_tick.tick() => {
971                    self.peer_store.cleanup();
972                    self.item_store.expire(
973                        Duration::from_secs(self.config.dht_item_lifetime_secs)
974                    );
975                }
976
977                // Ping questionable nodes to verify liveness
978                _ = ping_tick.tick() => {
979                    // Two-phase ping frequency (M105): 5s during bootstrap for
980                    // fast routing-table population, 60s steady-state to reduce
981                    // chatter after the table is established.
982                    let ping_interval = if self.bootstrap_complete {
983                        Duration::from_mins(1)
984                    } else {
985                        Duration::from_secs(5)
986                    };
987                    if self.last_ping.elapsed() >= ping_interval {
988                        self.ping_questionable_nodes().await;
989                        self.last_ping = Instant::now();
990                    }
991                    // M146: Drain pending get_peers as soon as the routing
992                    // table has at least 1 node (from bootstrap ping responses).
993                    self.drain_pending_if_table_ready();
994                }
995
996                // M97: Bootstrap timeout — force bootstrap_complete after 10s
997                () = async {
998                    match &mut self.bootstrap_timeout {
999                        Some(timer) => timer.as_mut().await,
1000                        None => std::future::pending().await,
1001                    }
1002                }, if self.bootstrap_timeout.is_some() && !self.bootstrap_complete => {
1003                    warn!(
1004                        table_size = self.routing_table.read().len(),
1005                        "bootstrap timeout (10s), proceeding with current routing table"
1006                    );
1007                    self.on_bootstrap_complete();
1008                }
1009
1010                // M105: Drain DNS-resolved bootstrap addresses from background tasks
1011                result = async {
1012                    match &mut self.dns_bootstrap_rx {
1013                        Some(rx) => rx.recv().await,
1014                        None => std::future::pending().await,
1015                    }
1016                } => {
1017                    let own_id = *self.routing_table.read().own_id();
1018                    if let Some(addrs) = result {
1019                        // If the bootstrap lookup already exhausted (cold start
1020                        // with no saved nodes), restart it so DNS-resolved nodes
1021                        // get properly iterated through the Kademlia lookup.
1022                        if self.bootstrap_lookup.is_none() && !self.bootstrap_complete {
1023                            debug!(
1024                                dns_addrs = addrs.len(),
1025                                "restarting bootstrap lookup from DNS results"
1026                            );
1027                            self.bootstrap_lookup = Some(IterativeLookup::new(
1028                                own_id,
1029                                FindNodeCallbacks {
1030                                    round: 0,
1031                                    max_rounds: 6,
1032                                },
1033                            ));
1034                        }
1035                        for addr in addrs {
1036                            self.send_find_node(addr, own_id, None).await;
1037                        }
1038                    } else {
1039                        // All DNS tasks completed
1040                        debug!("DNS bootstrap tasks completed");
1041                        self.dns_bootstrap_rx = None;
1042                    }
1043                }
1044
1045                // Drain tokens from active DhtLookup tasks
1046                Some((info_hash, node_id, addr, token)) = self.lookup_token_rx.recv() => {
1047                    self.announce_tokens
1048                        .entry(info_hash)
1049                        .or_default()
1050                        .insert(node_id, (addr, token));
1051                }
1052
1053                // Drain discovered nodes from active DhtLookup tasks
1054                Some((id, addr)) = self.lookup_node_rx.recv() => {
1055                    self.checked_insert(id, addr, false);
1056                }
1057            }
1058        }
1059    }
1060
1061    async fn bootstrap(&mut self) {
1062        let own_id = *self.routing_table.read().own_id();
1063
1064        // Partition: saved nodes (IP:port) vs DNS hostnames.
1065        // Saved nodes parse as SocketAddr; hardcoded bootstrap nodes have
1066        // hostname:port and will fail parse.
1067        let (saved_addrs, hostname_strs): (Vec<_>, Vec<_>) = self
1068            .config
1069            .bootstrap_nodes
1070            .clone()
1071            .into_iter()
1072            .partition(|s| s.parse::<SocketAddr>().is_ok());
1073
1074        debug!(
1075            saved_nodes = saved_addrs.len(),
1076            dns_nodes = hostname_strs.len(),
1077            family = ?self.address_family,
1078            "bootstrap: starting (pinging saved nodes, resolving DNS nodes)"
1079        );
1080
1081        // Phase 1: Ping saved nodes (validates liveness, inserts into routing
1082        // table via the normal ping response handler — no PingVerify needed)
1083        for addr_str in &saved_addrs {
1084            if let Ok(addr) = addr_str.parse::<SocketAddr>() {
1085                self.send_ping(addr, None).await;
1086            }
1087        }
1088
1089        // Phase 2: Spawn background DNS resolution tasks with retry+backoff.
1090        // Each hostname gets its own tokio::spawn that retries with exponential
1091        // backoff (1s → 30s cap, 120s total deadline). Resolved addresses are
1092        // sent to dns_bootstrap_rx and integrated via the main select! loop.
1093        // Phase 3 starts immediately without waiting for DNS.
1094        if !hostname_strs.is_empty() {
1095            let (dns_tx, dns_rx) = mpsc::channel(16);
1096            for hostname in hostname_strs {
1097                let tx = dns_tx.clone();
1098                let family = self.address_family;
1099                tokio::spawn(async move {
1100                    dns_bootstrap_resolve(hostname, family, tx).await;
1101                });
1102            }
1103            drop(dns_tx); // close sender so receiver ends when all tasks complete
1104            self.dns_bootstrap_rx = Some(dns_rx);
1105        }
1106
1107        // Phase 3: Initiate iterative bootstrap — follow returned nodes to discover more
1108        let initial_closest: Vec<CompactNodeInfo> = self
1109            .routing_table
1110            .read()
1111            .closest(&own_id, K)
1112            .into_iter()
1113            .map(|n| CompactNodeInfo {
1114                id: n.id,
1115                addr: n.addr,
1116            })
1117            .collect();
1118
1119        debug!(
1120            initial_nodes = initial_closest.len(),
1121            table_size = self.routing_table.read().len(),
1122            "bootstrap: starting iterative lookup"
1123        );
1124
1125        let mut lookup = IterativeLookup::new(
1126            own_id,
1127            FindNodeCallbacks {
1128                round: 0,
1129                max_rounds: 6,
1130            },
1131        );
1132        lookup.closest = initial_closest;
1133        self.bootstrap_lookup = Some(lookup);
1134    }
1135
1136    async fn handle_packet(&mut self, data: &[u8], addr: SocketAddr) {
1137        let msg = match KrpcMessage::from_bytes(data) {
1138            Ok(msg) => msg,
1139            Err(e) => {
1140                trace!(error = %e, from = %addr, "invalid KRPC message");
1141                return;
1142            }
1143        };
1144
1145        match &msg.body {
1146            KrpcBody::Query(query) => {
1147                self.handle_query(&msg, query, addr).await;
1148            }
1149            KrpcBody::Response(resp) => {
1150                self.handle_response(&msg, resp, addr).await;
1151            }
1152            KrpcBody::Error { code, message } => {
1153                trace!(code, message, from = %addr, "KRPC error received");
1154                // Still match pending query to clean up
1155                let txn = msg.transaction_id.as_u16();
1156                if let Some((_, pending)) = self.pending.remove(&txn)
1157                    && let Some(nid) = pending.node_id
1158                {
1159                    self.routing_table.write().mark_failed(&nid);
1160                }
1161            }
1162        }
1163    }
1164
1165    /// Check if a socket address matches this actor's address family.
1166    fn matches_family(&self, addr: &SocketAddr) -> bool {
1167        match self.address_family {
1168            AddressFamily::V4 => addr.is_ipv4(),
1169            AddressFamily::V6 => addr.is_ipv6(),
1170        }
1171    }
1172
1173    /// BEP 45: Build the `want` list for outgoing queries. When multi-address
1174    /// is enabled, request both address families so dual-stack remote nodes
1175    /// include cross-family nodes in their responses.
1176    fn outgoing_want(&self) -> Option<Vec<crate::krpc::WantFamily>> {
1177        if self.config.enable_multi_address {
1178            Some(vec![
1179                crate::krpc::WantFamily::N4,
1180                crate::krpc::WantFamily::N6,
1181            ])
1182        } else {
1183            None
1184        }
1185    }
1186
1187    async fn handle_query(&mut self, msg: &KrpcMessage, query: &KrpcQuery, addr: SocketAddr) {
1188        if !self.matches_family(&addr) {
1189            return; // Reject wrong address family
1190        }
1191        let sender_id = *query.sender_id();
1192        self.checked_insert(sender_id, addr, msg.read_only);
1193        self.routing_table.write().mark_query(&sender_id);
1194
1195        let own_id = *self.routing_table.read().own_id();
1196        let response = match query {
1197            KrpcQuery::Ping { id: _ } => KrpcResponse::NodeId { id: own_id },
1198            KrpcQuery::FindNode {
1199                id: _,
1200                target,
1201                want: _,
1202            } => {
1203                let closest = self.routing_table.read().closest(target, K);
1204                let nodes: Vec<CompactNodeInfo> = closest
1205                    .into_iter()
1206                    .map(|n| CompactNodeInfo {
1207                        id: n.id,
1208                        addr: n.addr,
1209                    })
1210                    .collect();
1211                KrpcResponse::FindNode {
1212                    id: own_id,
1213                    nodes,
1214                    nodes6: Vec::new(),
1215                }
1216            }
1217            KrpcQuery::GetPeers {
1218                id: _,
1219                info_hash,
1220                noseed: _,
1221                scrape,
1222                want: _,
1223            } => {
1224                let ip = addr.ip();
1225                let token = self.peer_store.generate_token(&ip);
1226                let peers = self.peer_store.get_peers(info_hash, 50);
1227
1228                // BEP 33: generate bloom filters when scrape=1.
1229                let (bfpe, bfsd) = if *scrape == Some(1) {
1230                    let all_peers = self.peer_store.all_peers(info_hash);
1231                    let mut filter = crate::bloom::ScrapeBloomFilter::new();
1232                    for peer_addr in &all_peers {
1233                        filter.insert(*peer_addr);
1234                    }
1235                    (Some(filter.as_bytes().to_vec()), None)
1236                } else {
1237                    (None, None)
1238                };
1239
1240                if peers.is_empty() {
1241                    let closest = self.routing_table.read().closest(info_hash, K);
1242                    let nodes: Vec<CompactNodeInfo> = closest
1243                        .into_iter()
1244                        .map(|n| CompactNodeInfo {
1245                            id: n.id,
1246                            addr: n.addr,
1247                        })
1248                        .collect();
1249                    KrpcResponse::GetPeers(GetPeersResponse {
1250                        id: own_id,
1251                        token: Some(token),
1252                        peers: Vec::new(),
1253                        nodes,
1254                        nodes6: Vec::new(),
1255                        bfpe,
1256                        bfsd,
1257                    })
1258                } else {
1259                    KrpcResponse::GetPeers(GetPeersResponse {
1260                        id: own_id,
1261                        token: Some(token),
1262                        peers,
1263                        nodes: Vec::new(),
1264                        nodes6: Vec::new(),
1265                        bfpe,
1266                        bfsd,
1267                    })
1268                }
1269            }
1270            KrpcQuery::AnnouncePeer {
1271                id: _,
1272                info_hash,
1273                port,
1274                implied_port,
1275                token,
1276            } => {
1277                let ip = addr.ip();
1278                if !self.peer_store.validate_token(token, &ip) {
1279                    // Send error response for invalid token
1280                    let err_msg = KrpcMessage {
1281                        transaction_id: msg.transaction_id,
1282                        body: KrpcBody::Error {
1283                            code: 203,
1284                            message: "invalid token".into(),
1285                        },
1286                        sender_ip: Some(addr),
1287                        read_only: false,
1288                    };
1289                    if let Ok(bytes) = err_msg.to_bytes() {
1290                        let _ = self.socket.send_to(&bytes, addr).await;
1291                    }
1292                    return;
1293                }
1294                let peer_port = if *implied_port { addr.port() } else { *port };
1295                let peer_addr = SocketAddr::new(addr.ip(), peer_port);
1296                self.peer_store.add_peer(*info_hash, peer_addr);
1297                KrpcResponse::NodeId {
1298                    id: *self.routing_table.read().own_id(),
1299                }
1300            }
1301            // BEP 44: get item from DHT storage
1302            KrpcQuery::Get {
1303                id: _,
1304                target,
1305                seq: requested_seq,
1306            } => {
1307                let ip = addr.ip();
1308                let token = self.peer_store.generate_token(&ip);
1309
1310                // Try immutable lookup first
1311                if let Some(item) = self.item_store.get_immutable(target) {
1312                    KrpcResponse::GetItem {
1313                        id: *self.routing_table.read().own_id(),
1314                        token: Some(token),
1315                        nodes: Vec::new(),
1316                        nodes6: Vec::new(),
1317                        value: Some(item.value),
1318                        key: None,
1319                        signature: None,
1320                        seq: None,
1321                    }
1322                } else if let Some(item) = self.item_store.get_mutable_by_target(target) {
1323                    // Check if requester wants only items with seq > requested_seq
1324                    if let Some(min_seq) = requested_seq {
1325                        if item.seq <= *min_seq {
1326                            // Return token + nodes but no value (requester already has this or newer)
1327                            let closest = self.routing_table.read().closest(target, K);
1328                            let nodes: Vec<CompactNodeInfo> = closest
1329                                .into_iter()
1330                                .map(|n| CompactNodeInfo {
1331                                    id: n.id,
1332                                    addr: n.addr,
1333                                })
1334                                .collect();
1335                            KrpcResponse::GetItem {
1336                                id: *self.routing_table.read().own_id(),
1337                                token: Some(token),
1338                                nodes,
1339                                nodes6: Vec::new(),
1340                                value: None,
1341                                key: Some(item.public_key),
1342                                signature: Some(item.signature),
1343                                seq: Some(item.seq),
1344                            }
1345                        } else {
1346                            KrpcResponse::GetItem {
1347                                id: *self.routing_table.read().own_id(),
1348                                token: Some(token),
1349                                nodes: Vec::new(),
1350                                nodes6: Vec::new(),
1351                                value: Some(item.value),
1352                                key: Some(item.public_key),
1353                                signature: Some(item.signature),
1354                                seq: Some(item.seq),
1355                            }
1356                        }
1357                    } else {
1358                        KrpcResponse::GetItem {
1359                            id: *self.routing_table.read().own_id(),
1360                            token: Some(token),
1361                            nodes: Vec::new(),
1362                            nodes6: Vec::new(),
1363                            value: Some(item.value),
1364                            key: Some(item.public_key),
1365                            signature: Some(item.signature),
1366                            seq: Some(item.seq),
1367                        }
1368                    }
1369                } else {
1370                    // Not found — return closer nodes
1371                    let closest = self.routing_table.read().closest(target, K);
1372                    let nodes: Vec<CompactNodeInfo> = closest
1373                        .into_iter()
1374                        .map(|n| CompactNodeInfo {
1375                            id: n.id,
1376                            addr: n.addr,
1377                        })
1378                        .collect();
1379                    KrpcResponse::GetItem {
1380                        id: *self.routing_table.read().own_id(),
1381                        token: Some(token),
1382                        nodes,
1383                        nodes6: Vec::new(),
1384                        value: None,
1385                        key: None,
1386                        signature: None,
1387                        seq: None,
1388                    }
1389                }
1390            }
1391            // BEP 44: put item into DHT storage
1392            KrpcQuery::Put {
1393                id: _,
1394                token,
1395                value,
1396                key,
1397                signature,
1398                seq,
1399                salt,
1400                cas,
1401            } => {
1402                let ip = addr.ip();
1403
1404                // Validate token
1405                if !self.peer_store.validate_token(token, &ip) {
1406                    let err_msg = KrpcMessage {
1407                        transaction_id: msg.transaction_id,
1408                        body: KrpcBody::Error {
1409                            code: 203,
1410                            message: "invalid token".into(),
1411                        },
1412                        sender_ip: Some(addr),
1413                        read_only: false,
1414                    };
1415                    if let Ok(bytes) = err_msg.to_bytes() {
1416                        let _ = self.socket.send_to(&bytes, addr).await;
1417                    }
1418                    return;
1419                }
1420
1421                // Validate value size
1422                if value.len() > MAX_VALUE_SIZE {
1423                    let err_msg = KrpcMessage {
1424                        transaction_id: msg.transaction_id,
1425                        body: KrpcBody::Error {
1426                            code: 205,
1427                            message: "message (v field) too big".into(),
1428                        },
1429                        sender_ip: Some(addr),
1430                        read_only: false,
1431                    };
1432                    if let Ok(bytes) = err_msg.to_bytes() {
1433                        let _ = self.socket.send_to(&bytes, addr).await;
1434                    }
1435                    return;
1436                }
1437
1438                if let (Some(k), Some(sig), Some(seq_val)) = (key, signature, seq) {
1439                    // Mutable item
1440                    let salt_bytes = salt.clone().unwrap_or_default();
1441
1442                    // Validate salt size
1443                    if salt_bytes.len() > MAX_SALT_SIZE {
1444                        let err_msg = KrpcMessage {
1445                            transaction_id: msg.transaction_id,
1446                            body: KrpcBody::Error {
1447                                code: 207,
1448                                message: "salt (salt field) too big".into(),
1449                            },
1450                            sender_ip: Some(addr),
1451                            read_only: false,
1452                        };
1453                        if let Ok(bytes) = err_msg.to_bytes() {
1454                            let _ = self.socket.send_to(&bytes, addr).await;
1455                        }
1456                        return;
1457                    }
1458
1459                    let item = MutableItem {
1460                        value: value.clone(),
1461                        public_key: *k,
1462                        signature: *sig,
1463                        seq: *seq_val,
1464                        salt: salt_bytes,
1465                        target: bep44::compute_mutable_target(k, salt.as_deref().unwrap_or(&[])),
1466                    };
1467
1468                    // Verify signature
1469                    if !item.verify() {
1470                        let err_msg = KrpcMessage {
1471                            transaction_id: msg.transaction_id,
1472                            body: KrpcBody::Error {
1473                                code: 206,
1474                                message: "invalid signature".into(),
1475                            },
1476                            sender_ip: Some(addr),
1477                            read_only: false,
1478                        };
1479                        if let Ok(bytes) = err_msg.to_bytes() {
1480                            let _ = self.socket.send_to(&bytes, addr).await;
1481                        }
1482                        return;
1483                    }
1484
1485                    // CAS check
1486                    if let Some(expected_seq) = cas
1487                        && let Some(existing) = self.item_store.get_mutable(k, &item.salt)
1488                        && existing.seq != *expected_seq
1489                    {
1490                        let err_msg = KrpcMessage {
1491                            transaction_id: msg.transaction_id,
1492                            body: KrpcBody::Error {
1493                                code: 301,
1494                                message: format!(
1495                                    "CAS mismatch: expected seq {}, got {}",
1496                                    expected_seq, existing.seq
1497                                ),
1498                            },
1499                            sender_ip: Some(addr),
1500                            read_only: false,
1501                        };
1502                        if let Ok(bytes) = err_msg.to_bytes() {
1503                            let _ = self.socket.send_to(&bytes, addr).await;
1504                        }
1505                        return;
1506                    }
1507
1508                    // Seq monotonicity check
1509                    if let Some(existing) = self.item_store.get_mutable(k, &item.salt)
1510                        && *seq_val <= existing.seq
1511                    {
1512                        let err_msg = KrpcMessage {
1513                            transaction_id: msg.transaction_id,
1514                            body: KrpcBody::Error {
1515                                code: 302,
1516                                message: format!(
1517                                    "sequence number not newer: {} <= {}",
1518                                    seq_val, existing.seq
1519                                ),
1520                            },
1521                            sender_ip: Some(addr),
1522                            read_only: false,
1523                        };
1524                        if let Ok(bytes) = err_msg.to_bytes() {
1525                            let _ = self.socket.send_to(&bytes, addr).await;
1526                        }
1527                        return;
1528                    }
1529
1530                    self.item_store.put_mutable(item);
1531                } else {
1532                    // Immutable item
1533                    if let Ok(item) = ImmutableItem::new(value.clone()) {
1534                        self.item_store.put_immutable(item);
1535                    } else {
1536                        let err_msg = KrpcMessage {
1537                            transaction_id: msg.transaction_id,
1538                            body: KrpcBody::Error {
1539                                code: 205,
1540                                message: "message (v field) too big".into(),
1541                            },
1542                            sender_ip: Some(addr),
1543                            read_only: false,
1544                        };
1545                        if let Ok(bytes) = err_msg.to_bytes() {
1546                            let _ = self.socket.send_to(&bytes, addr).await;
1547                        }
1548                        return;
1549                    }
1550                }
1551
1552                KrpcResponse::NodeId {
1553                    id: *self.routing_table.read().own_id(),
1554                }
1555            }
1556            // BEP 51: sample_infohashes
1557            KrpcQuery::SampleInfohashes { id: _, target } => {
1558                let closest = self.routing_table.read().closest(target, K);
1559                let nodes: Vec<CompactNodeInfo> = closest
1560                    .into_iter()
1561                    .map(|n| CompactNodeInfo {
1562                        id: n.id,
1563                        addr: n.addr,
1564                    })
1565                    .collect();
1566
1567                // Sample up to 20 info hashes (fits comfortably in one UDP packet)
1568                let samples = self.peer_store.random_info_hashes(20);
1569                let num = self.peer_store.info_hash_count() as i64;
1570
1571                KrpcResponse::SampleInfohashes(SampleInfohashesResponse {
1572                    id: *self.routing_table.read().own_id(),
1573                    interval: 60, // 1 minute default interval
1574                    num,
1575                    samples,
1576                    nodes,
1577                })
1578            }
1579        };
1580
1581        let reply = KrpcMessage {
1582            transaction_id: msg.transaction_id,
1583            body: KrpcBody::Response(response),
1584            sender_ip: Some(addr), // BEP 42: tell the querier their IP
1585            read_only: false,      // BEP 43: ro only on queries, never on responses
1586        };
1587        if let Ok(bytes) = reply.to_bytes() {
1588            let _ = self.socket.send_to(&bytes, addr).await;
1589        }
1590    }
1591
1592    async fn handle_response(&mut self, msg: &KrpcMessage, resp: &KrpcResponse, addr: SocketAddr) {
1593        if !self.matches_family(&addr) {
1594            return; // Reject wrong address family
1595        }
1596        self.stats.total_responses_received += 1;
1597
1598        // BEP 42: feed the ip field into the voter
1599        if let Some(reported_ip) = msg.sender_ip {
1600            let source_id = hash_source_addr(&addr);
1601            if let Some(consensus_ip) = self.ip_voter.add_vote(source_id, reported_ip.ip()) {
1602                debug!(%consensus_ip, "BEP 42: external IP consensus changed");
1603                let _ = self.ip_consensus_tx.try_send(consensus_ip);
1604                self.regenerate_node_id(consensus_ip);
1605            }
1606        }
1607
1608        let sender_id = *resp.sender_id();
1609        self.checked_insert(sender_id, addr, false);
1610        self.routing_table.write().mark_response(&sender_id);
1611
1612        // M146: Drain pending get_peers immediately when first node arrives.
1613        self.drain_pending_if_table_ready();
1614
1615        let txn = msg.transaction_id.as_u16();
1616        let Some((_, pending)) = self.pending.remove(&txn) else {
1617            trace!(txn, from = %addr, "response for unknown transaction");
1618            return;
1619        };
1620
1621        // If this pending entry has a oneshot response_tx, the response came
1622        // from a DhtLookup task. Forward via oneshot and let the lookup handle
1623        // its own state. We must still update the routing table with the
1624        // responding node — the lookup forwards discovered *contained* nodes
1625        // via node_tx, but the *responding* node itself must be handled here.
1626        if let Some(response_tx) = pending.response_tx {
1627            self.checked_insert(sender_id, pending.addr, false);
1628            let _ = response_tx.send(PendingQueryResponse {
1629                sender_id,
1630                response: resp.clone(),
1631            });
1632            return;
1633        }
1634
1635        match (&pending.kind, resp) {
1636            (PendingQueryKind::FindNode, KrpcResponse::FindNode { nodes, nodes6, .. }) => {
1637                for node in nodes {
1638                    if self.matches_family(&node.addr) {
1639                        self.checked_insert(node.id, node.addr, false);
1640                    }
1641                }
1642                for node in nodes6 {
1643                    if self.matches_family(&node.addr) {
1644                        self.checked_insert(node.id, node.addr, false);
1645                    }
1646                }
1647
1648                // Advance iterative bootstrap lookup if active
1649                if let Some(ref mut lookup) = self.bootstrap_lookup {
1650                    // Merge nodes4 + nodes6 into a single feed (CompactNodeInfo)
1651                    let mut all_nodes: Vec<CompactNodeInfo> = nodes.clone();
1652                    all_nodes.extend(nodes6.iter().map(|n| CompactNodeInfo {
1653                        id: n.id,
1654                        addr: n.addr,
1655                    }));
1656                    lookup.feed_nodes(all_nodes, self.address_family);
1657                }
1658
1659                if self.bootstrap_lookup.is_some() {
1660                    // Extract data needed before calling send_find_node (drops borrow)
1661                    let (to_query, target, terminate) =
1662                        if let Some(ref mut lookup) = self.bootstrap_lookup {
1663                            if lookup.callbacks.round >= lookup.callbacks.max_rounds {
1664                                (Vec::new(), lookup.target, true)
1665                            } else {
1666                                let to_query = lookup.next_to_query(3);
1667                                let target = lookup.target;
1668                                if to_query.is_empty() {
1669                                    (Vec::new(), target, true)
1670                                } else {
1671                                    lookup.callbacks.round += 1;
1672                                    (to_query, target, false)
1673                                }
1674                            }
1675                        } else {
1676                            (Vec::new(), Id20::ZERO, false)
1677                        };
1678
1679                    if terminate {
1680                        debug!(
1681                            routing_table_size = self.routing_table.read().len(),
1682                            "iterative bootstrap complete"
1683                        );
1684                        self.bootstrap_lookup = None;
1685                        self.on_bootstrap_complete();
1686                    } else {
1687                        let queries: Vec<(SocketAddr, Id20)> =
1688                            to_query.iter().map(|n| (n.addr, n.id)).collect();
1689                        for (node_addr, nid) in queries {
1690                            self.send_find_node(node_addr, target, Some(nid)).await;
1691                        }
1692                    }
1693                }
1694            }
1695            (PendingQueryKind::GetPeers { info_hash }, KrpcResponse::GetPeers(gp)) => {
1696                // GetPeers responses are normally routed to DhtLookup via
1697                // oneshot (handled above). This arm only fires for orphaned
1698                // responses after a lookup was aborted. Still update the
1699                // routing table from returned nodes.
1700                for node in &gp.nodes {
1701                    if self.matches_family(&node.addr) {
1702                        self.checked_insert(node.id, node.addr, false);
1703                    }
1704                }
1705                for node in &gp.nodes6 {
1706                    if self.matches_family(&node.addr) {
1707                        self.checked_insert(node.id, node.addr, false);
1708                    }
1709                }
1710                trace!(%info_hash, "get_peers response for orphaned lookup");
1711            }
1712            (PendingQueryKind::Ping, KrpcResponse::NodeId { .. }) => {
1713                // Ping response — node is alive, already updated routing table
1714                if !self.bootstrap_complete {
1715                    debug!(
1716                        from = %pending.addr,
1717                        table_size = self.routing_table.read().len(),
1718                        "bootstrap: ping response received"
1719                    );
1720                }
1721            }
1722            (
1723                PendingQueryKind::AnnouncePeer | PendingQueryKind::PutItem,
1724                KrpcResponse::NodeId { .. },
1725            ) => {
1726                // Announce / put acknowledged — success
1727            }
1728            (PendingQueryKind::SampleInfohashes, KrpcResponse::SampleInfohashes(si)) => {
1729                // Add discovered nodes to routing table (Gap 6: use checked_insert for BEP 42)
1730                for node in &si.nodes {
1731                    if self.matches_family(&node.addr) {
1732                        self.checked_insert(node.id, node.addr, false);
1733                    }
1734                }
1735
1736                // Send result back to caller
1737                if let Some(reply) = self.sample_replies.remove(&txn) {
1738                    let _ = reply.send(Ok(SampleInfohashesResult {
1739                        interval: si.interval,
1740                        num: si.num,
1741                        samples: si.samples.clone(),
1742                        nodes: si.nodes.clone(),
1743                    }));
1744                }
1745            }
1746            (
1747                PendingQueryKind::GetItem { target },
1748                KrpcResponse::GetItem {
1749                    token,
1750                    nodes,
1751                    nodes6,
1752                    value,
1753                    key,
1754                    signature,
1755                    seq,
1756                    ..
1757                },
1758            ) => {
1759                // Gap 13: Use checked_insert (BEP 42 compliant) instead of routing_table.insert
1760                for node in nodes {
1761                    if self.matches_family(&node.addr) {
1762                        self.checked_insert(node.id, node.addr, false);
1763                    }
1764                }
1765                for node in nodes6 {
1766                    if self.matches_family(&node.addr) {
1767                        self.checked_insert(node.id, node.addr, false);
1768                    }
1769                }
1770
1771                let target = *target;
1772
1773                // If we have a put operation waiting for tokens, collect this token
1774                if let (Some(token), Some(put_op)) = (token, self.item_put_ops.get_mut(&target)) {
1775                    match put_op {
1776                        ItemPutState::Immutable { tokens, .. }
1777                        | ItemPutState::Mutable { tokens, .. } => {
1778                            tokens.insert(sender_id, (addr, token.clone()));
1779                        }
1780                    }
1781
1782                    // If we have enough tokens, send the puts
1783                    let should_send = match &self.item_put_ops[&target] {
1784                        ItemPutState::Immutable {
1785                            tokens, sent_puts, ..
1786                        }
1787                        | ItemPutState::Mutable {
1788                            tokens, sent_puts, ..
1789                        } => tokens.len() >= K && *sent_puts == 0,
1790                    };
1791
1792                    if should_send {
1793                        self.send_pending_puts(target).await;
1794                    }
1795                }
1796
1797                // If we have a get lookup, process the value
1798                if self.item_lookups.contains_key(&target) {
1799                    // Determine if this is immutable or mutable lookup
1800                    let is_immutable = matches!(
1801                        self.item_lookups.get(&target),
1802                        Some(ItemLookupState::Immutable { .. })
1803                    );
1804
1805                    if is_immutable {
1806                        if let Some(v) = value {
1807                            // Validate: SHA-1(v) should equal target
1808                            if irontide_core::sha1(v) == target {
1809                                // Store locally
1810                                if let Ok(item) = crate::bep44::ImmutableItem::new(v.clone()) {
1811                                    self.item_store.put_immutable(item);
1812                                }
1813                                if let Some(ItemLookupState::Immutable { reply, .. }) =
1814                                    self.item_lookups.get_mut(&target)
1815                                    && let Some(r) = reply.take()
1816                                {
1817                                    let _ = r.send(Ok(Some(v.clone())));
1818                                }
1819                            }
1820                        } else {
1821                            // Gap 7: Collect nodes to query into local Vec first
1822                            // to avoid borrow checker violation
1823                            let family = self.address_family;
1824                            let to_query: Vec<SocketAddr> = {
1825                                if let Some(ItemLookupState::Immutable { queried, .. }) =
1826                                    self.item_lookups.get_mut(&target)
1827                                {
1828                                    nodes
1829                                        .iter()
1830                                        .filter(|n| match family {
1831                                            AddressFamily::V4 => n.addr.is_ipv4(),
1832                                            AddressFamily::V6 => n.addr.is_ipv6(),
1833                                        })
1834                                        .filter(|n| queried.insert(n.id))
1835                                        .take(3)
1836                                        .map(|n| n.addr)
1837                                        .collect()
1838                                } else {
1839                                    vec![]
1840                                }
1841                            };
1842                            for query_addr in to_query {
1843                                self.send_get_item(query_addr, target, None).await;
1844                            }
1845                        }
1846                    } else {
1847                        // Mutable lookup
1848                        if let (Some(v), Some(k), Some(sig), Some(s)) = (value, key, signature, seq)
1849                        {
1850                            // Get the salt from the lookup state
1851                            let salt = if let Some(ItemLookupState::Mutable { salt, .. }) =
1852                                self.item_lookups.get(&target)
1853                            {
1854                                salt.clone()
1855                            } else {
1856                                Vec::new()
1857                            };
1858
1859                            let item = crate::bep44::MutableItem {
1860                                value: v.clone(),
1861                                public_key: *k,
1862                                signature: *sig,
1863                                seq: *s,
1864                                salt,
1865                                target,
1866                            };
1867
1868                            if item.verify()
1869                                && let Some(ItemLookupState::Mutable {
1870                                    best_seq,
1871                                    best_value,
1872                                    ..
1873                                }) = self.item_lookups.get_mut(&target)
1874                                && *s > *best_seq
1875                            {
1876                                *best_seq = *s;
1877                                *best_value = Some(v.clone());
1878                                // Store locally
1879                                self.item_store.put_mutable(item);
1880                            }
1881                        }
1882
1883                        // Gap 7: Collect nodes to query into local Vec first
1884                        let family = self.address_family;
1885                        let to_query: Vec<SocketAddr> = {
1886                            if let Some(ItemLookupState::Mutable { queried, .. }) =
1887                                self.item_lookups.get_mut(&target)
1888                            {
1889                                nodes
1890                                    .iter()
1891                                    .filter(|n| match family {
1892                                        AddressFamily::V4 => n.addr.is_ipv4(),
1893                                        AddressFamily::V6 => n.addr.is_ipv6(),
1894                                    })
1895                                    .filter(|n| queried.insert(n.id))
1896                                    .take(3)
1897                                    .map(|n| n.addr)
1898                                    .collect()
1899                            } else {
1900                                vec![]
1901                            }
1902                        };
1903                        for query_addr in to_query {
1904                            self.send_get_item(query_addr, target, None).await;
1905                        }
1906                    }
1907                }
1908            }
1909            _ => {
1910                trace!(txn, "mismatched response type");
1911            }
1912        }
1913    }
1914
1915    fn start_get_peers(&mut self, info_hash: Id20, reply: mpsc::UnboundedSender<Vec<SocketAddr>>) {
1916        // M146: Lightweight gate — require at least 1 routing table node
1917        // before starting get_peers. Without any nodes, the DhtLookup would
1918        // start with zero roots and stall in adaptive backoff (1-15s) while
1919        // bootstrap pings populate the table.
1920        //
1921        // The old gate required 8 nodes (causing 1-5s dead zones). With
1922        // threshold=1, saved-node pings typically populate within 100-500ms.
1923        // The pending_get_peers queue is still removed — instead we use the
1924        // bootstrap_complete flag + bootstrap timeout as the fallback.
1925        if !self.bootstrap_complete && self.routing_table.read().is_empty() {
1926            debug!(
1927                %info_hash,
1928                "get_peers: routing table empty, queuing until first node arrives"
1929            );
1930            self.pending_get_peers.push((info_hash, reply));
1931            return;
1932        }
1933        self.start_get_peers_inner(info_hash, reply);
1934    }
1935
1936    fn start_get_peers_inner(
1937        &mut self,
1938        info_hash: Id20,
1939        reply: mpsc::UnboundedSender<Vec<SocketAddr>>,
1940    ) {
1941        debug!(
1942            %info_hash,
1943            table_size = self.routing_table.read().len(),
1944            "starting get_peers query"
1945        );
1946
1947        // M146: Allow get_peers with an empty routing table. The DhtLookup
1948        // starts with zero roots and uses its 1s requery timer to inject
1949        // roots as bootstrap pings populate the table. This avoids dropping
1950        // the reply channel (which would cause a 60s dead zone before the
1951        // TorrentActor re-queries).
1952
1953        // M147: Allow concurrent lookups for the same info_hash.
1954        // The new lookup overwrites the HashMap entry; the old lookup task
1955        // continues running and self-terminates when its peer_rx channel
1956        // closes. This prevents the background MetadataResolver's DHT
1957        // lookup from killing the TorrentActor's own DHT stream.
1958
1959        let own_id = *self.routing_table.read().own_id();
1960        debug!(
1961            family = ?self.address_family,
1962            %info_hash,
1963            table_size = self.routing_table.read().len(),
1964            "get_peers: spawning DhtLookup"
1965        );
1966
1967        let lookup = crate::dht_lookup::DhtLookup::new(
1968            info_hash,
1969            crate::dht_lookup::LookupConfig {
1970                max_depth: 4,
1971                max_nodes: 256,
1972            },
1973            self.address_family,
1974            self.socket.clone(),
1975            self.pending.clone(),
1976            self.rate_limiter.clone(),
1977            self.routing_table.clone(),
1978            self.next_txn_id.clone(),
1979            own_id,
1980            reply,
1981            self.lookup_token_tx.clone(),
1982            self.lookup_node_tx.clone(),
1983            self.config.read_only_mode,
1984            self.outgoing_want(),
1985        );
1986
1987        let handle = tokio::spawn(lookup.run());
1988        // If a prior lookup exists, let it run — don't abort it, as it may be
1989        // the TorrentActor's lookup. The old task runs to completion independently.
1990        if let Some(old_handle) = self.active_lookups.insert(info_hash, handle) {
1991            // Intentionally detach — old lookup finishes naturally when it
1992            // exhausts its routing table query.
1993            drop(old_handle);
1994        }
1995    }
1996
1997    /// M97/M146: Called when bootstrap completes or times out.
1998    /// Drains queued `get_peers` and sets the `bootstrap_complete` flag.
1999    fn on_bootstrap_complete(&mut self) {
2000        if self.bootstrap_complete {
2001            return;
2002        }
2003        self.bootstrap_complete = true;
2004        self.bootstrap_timeout = None;
2005
2006        let pending = std::mem::take(&mut self.pending_get_peers);
2007        debug!(
2008            count = pending.len(),
2009            table_size = self.routing_table.read().len(),
2010            "bootstrap complete, processing queued get_peers"
2011        );
2012        for (info_hash, reply) in pending {
2013            self.start_get_peers_inner(info_hash, reply);
2014        }
2015    }
2016
2017    /// M146: Drain pending `get_peers` as soon as at least 1 node is in the
2018    /// routing table. Called from the `ping_tick` arm when routing table
2019    /// transitions from empty to non-empty during bootstrap.
2020    fn drain_pending_if_table_ready(&mut self) {
2021        if self.pending_get_peers.is_empty() || self.routing_table.read().is_empty() {
2022            return;
2023        }
2024        let pending = std::mem::take(&mut self.pending_get_peers);
2025        debug!(
2026            count = pending.len(),
2027            table_size = self.routing_table.read().len(),
2028            "routing table populated, draining queued get_peers"
2029        );
2030        for (info_hash, reply) in pending {
2031            self.start_get_peers_inner(info_hash, reply);
2032        }
2033    }
2034
2035    async fn handle_announce(
2036        &mut self,
2037        info_hash: Id20,
2038        port: u16,
2039        reply: oneshot::Sender<Result<()>>,
2040    ) {
2041        // BEP 43: read-only nodes should not announce
2042        if self.config.read_only_mode {
2043            trace!("BEP 43: suppressing announce_peer in read-only mode");
2044            let _ = reply.send(Ok(()));
2045            return;
2046        }
2047
2048        // First, find nodes with tokens collected from DhtLookup tasks
2049        let tokens: Vec<(SocketAddr, Vec<u8>)> = self
2050            .announce_tokens
2051            .get(&info_hash)
2052            .map(|m| m.values().cloned().collect())
2053            .unwrap_or_default();
2054
2055        if tokens.is_empty() {
2056            let _ = reply.send(Err(Error::InvalidMessage(
2057                "no tokens available; call get_peers first".into(),
2058            )));
2059            return;
2060        }
2061
2062        let own_id = *self.routing_table.read().own_id();
2063        for (addr, token) in &tokens {
2064            if !self.rate_limiter.try_acquire() {
2065                break; // Use break, not return, since we still need to send the reply
2066            }
2067            let txn = self.next_transaction_id();
2068            let msg = KrpcMessage {
2069                transaction_id: TransactionId::from_u16(txn),
2070                body: KrpcBody::Query(KrpcQuery::AnnouncePeer {
2071                    id: own_id,
2072                    info_hash,
2073                    port,
2074                    implied_port: false,
2075                    token: token.clone(),
2076                }),
2077                sender_ip: None,
2078                read_only: false, // announce_peer is suppressed in read-only mode (early return above)
2079            };
2080            if let Ok(bytes) = msg.to_bytes() {
2081                let _ = self.socket.send_to(&bytes, addr).await;
2082                self.pending.insert(
2083                    txn,
2084                    PendingQuery {
2085                        sent_at: Instant::now(),
2086                        addr: *addr,
2087                        kind: PendingQueryKind::AnnouncePeer,
2088                        node_id: None,
2089                        response_tx: None,
2090                    },
2091                );
2092                self.stats.total_queries_sent += 1;
2093            }
2094        }
2095
2096        // Clean up tokens for this info_hash after announcing
2097        self.announce_tokens.remove(&info_hash);
2098
2099        let _ = reply.send(Ok(()));
2100    }
2101
2102    /// Expire timed-out queries and advance any stalled `get_peers` lookups.
2103    /// Runs every `query_timeout` interval — mirrors libtorrent's pattern where
2104    /// `traversal_algorithm::failed()` immediately calls `add_requests()` to
2105    /// query the next closest nodes.
2106    async fn expire_queries_and_advance_lookups(&mut self) {
2107        let timeout = self.config.query_timeout;
2108        let expired: Vec<u16> = self
2109            .pending
2110            .iter()
2111            .filter(|entry| entry.value().sent_at.elapsed() > timeout)
2112            .map(|entry| *entry.key())
2113            .collect();
2114
2115        if expired.is_empty() {
2116            return;
2117        }
2118
2119        debug!(
2120            family = ?self.address_family,
2121            expired_count = expired.len(),
2122            total_pending = self.pending.len(),
2123            active_lookups = self.active_lookups.len(),
2124            "expiring timed-out queries"
2125        );
2126
2127        let mut find_node_timed_out = false;
2128
2129        for txn in expired {
2130            if let Some((_, pending)) = self.pending.remove(&txn) {
2131                trace!(txn, addr = %pending.addr, "query timed out");
2132                if let Some(nid) = pending.node_id {
2133                    self.routing_table.write().mark_failed(&nid);
2134                }
2135                if matches!(pending.kind, PendingQueryKind::SampleInfohashes)
2136                    && let Some(reply) = self.sample_replies.remove(&txn)
2137                {
2138                    let _ = reply.send(Err(Error::Timeout));
2139                }
2140                // For GetPeers: the DhtLookup handles its own timeouts via
2141                // the oneshot channel — if response_tx was set, dropping the
2142                // PendingQuery will close the oneshot and the lookup's await
2143                // returns Err. No stalled lookup advancement needed here.
2144                if matches!(pending.kind, PendingQueryKind::FindNode) {
2145                    find_node_timed_out = true;
2146                }
2147            }
2148        }
2149
2150        // Advance bootstrap lookup if a FindNode query timed out
2151        if find_node_timed_out && self.bootstrap_lookup.is_some() {
2152            // Extract queries before calling send_find_node (borrow-checker)
2153            let (to_query, target, terminate) = if let Some(ref mut lookup) = self.bootstrap_lookup
2154            {
2155                let to_query = lookup.next_to_query(3);
2156                let target = lookup.target;
2157                if to_query.is_empty() {
2158                    (Vec::new(), target, true)
2159                } else {
2160                    (to_query, target, false)
2161                }
2162            } else {
2163                (Vec::new(), Id20::ZERO, false)
2164            };
2165
2166            if terminate {
2167                self.bootstrap_lookup = None;
2168                self.on_bootstrap_complete();
2169            } else {
2170                let queries: Vec<(SocketAddr, Id20)> =
2171                    to_query.iter().map(|n| (n.addr, n.id)).collect();
2172                for (node_addr, nid) in queries {
2173                    self.send_find_node(node_addr, target, Some(nid)).await;
2174                }
2175            }
2176        }
2177    }
2178
2179    // ---- JSON routing table persistence ----
2180
2181    /// Return the JSON state file path for this address family.
2182    fn state_file_path(state_dir: &std::path::Path, family: AddressFamily) -> PathBuf {
2183        match family {
2184            AddressFamily::V4 => state_dir.join("dht_state.json"),
2185            AddressFamily::V6 => state_dir.join("dht_state_v6.json"),
2186        }
2187    }
2188
2189    /// Persist the routing table to a JSON file via atomic temp-file + rename.
2190    ///
2191    /// Skips silently when `state_dir` is `None`. On any I/O or serialization
2192    /// error, logs a warning and continues (never crashes the actor).
2193    fn save_routing_table(&self) {
2194        let Some(state_dir) = &self.config.state_dir else {
2195            return;
2196        };
2197
2198        let nodes = self.routing_table.read().all_nodes();
2199        let own_id = *self.routing_table.read().own_id();
2200
2201        let state = DhtState {
2202            node_id: own_id.to_hex(),
2203            nodes: nodes
2204                .iter()
2205                .map(|(id, addr)| DhtNodeEntry {
2206                    id: id.to_hex(),
2207                    addr: addr.to_string(),
2208                })
2209                .collect(),
2210        };
2211
2212        let json = match serde_json::to_string_pretty(&state) {
2213            Ok(j) => j,
2214            Err(e) => {
2215                warn!(error = %e, "failed to serialize DHT state to JSON");
2216                return;
2217            }
2218        };
2219
2220        let final_path = Self::state_file_path(state_dir, self.address_family);
2221        let tmp_path = state_dir.join(format!(
2222            ".dht_state_{}.tmp",
2223            match self.address_family {
2224                AddressFamily::V4 => "v4",
2225                AddressFamily::V6 => "v6",
2226            }
2227        ));
2228
2229        if let Err(e) = std::fs::write(&tmp_path, json.as_bytes()) {
2230            warn!(error = %e, path = %tmp_path.display(), "failed to write DHT state temp file");
2231            return;
2232        }
2233
2234        if let Err(e) = std::fs::rename(&tmp_path, &final_path) {
2235            warn!(
2236                error = %e,
2237                tmp = %tmp_path.display(),
2238                dst = %final_path.display(),
2239                "failed to rename DHT state temp file"
2240            );
2241        }
2242    }
2243
2244    /// Load the routing table from a JSON state file.
2245    ///
2246    /// Skips silently when `state_dir` is `None`. On missing file (first run),
2247    /// logs at debug level and returns. On corrupt/parse errors, logs a warning
2248    /// and falls through to normal bootstrap. On success, inserts all nodes as
2249    /// Questionable and filters `bootstrap_nodes` to hostnames only (since the
2250    /// JSON file has fresher saved-node data).
2251    fn load_routing_table(&mut self) {
2252        let state_dir = match &self.config.state_dir {
2253            Some(dir) => dir.clone(),
2254            None => return,
2255        };
2256
2257        let path = Self::state_file_path(&state_dir, self.address_family);
2258
2259        let data = match std::fs::read_to_string(&path) {
2260            Ok(d) => d,
2261            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
2262                debug!(path = %path.display(), "no saved DHT state (first run)");
2263                return;
2264            }
2265            Err(e) => {
2266                warn!(error = %e, path = %path.display(), "failed to read DHT state file");
2267                return;
2268            }
2269        };
2270
2271        let state: DhtState = match serde_json::from_str(&data) {
2272            Ok(s) => s,
2273            Err(e) => {
2274                warn!(error = %e, path = %path.display(), "corrupt DHT state file, ignoring");
2275                return;
2276            }
2277        };
2278
2279        let mut loaded = 0u32;
2280        for entry in &state.nodes {
2281            let Ok(id) = Id20::from_hex(&entry.id) else {
2282                continue;
2283            };
2284            let addr: SocketAddr = match entry.addr.parse() {
2285                Ok(a) => a,
2286                Err(_) => continue,
2287            };
2288            if self.routing_table.write().insert(id, addr) {
2289                loaded = loaded.saturating_add(1);
2290            }
2291        }
2292
2293        if loaded > 0 {
2294            // Mark all as Questionable — they may be stale, must be re-verified
2295            self.routing_table.write().mark_all_questionable();
2296
2297            // Filter bootstrap_nodes to hostnames only (remove IP:port entries)
2298            // since the JSON file has fresher saved-node addresses.
2299            self.config
2300                .bootstrap_nodes
2301                .retain(|s| s.parse::<SocketAddr>().is_err());
2302
2303            debug!(
2304                loaded,
2305                table_size = self.routing_table.read().len(),
2306                family = ?self.address_family,
2307                "loaded DHT routing table from JSON"
2308            );
2309        }
2310    }
2311
2312    async fn maintenance(&mut self) {
2313        // Query timeouts are now handled by expire_queries_and_advance_lookups()
2314
2315        // Clean up completed DhtLookup tasks (where the JoinHandle has finished)
2316        self.active_lookups
2317            .retain(|_, handle| !handle.is_finished());
2318
2319        // Gap 12: Clean up item lookups — send best result before dropping stale lookups
2320        self.item_lookups.retain(|_, lookup| match lookup {
2321            ItemLookupState::Immutable { reply, .. } => {
2322                if reply
2323                    .as_ref()
2324                    .is_some_and(tokio::sync::oneshot::Sender::is_closed)
2325                {
2326                    // Receiver dropped — discard
2327                    false
2328                } else if reply.is_some() {
2329                    true
2330                } else {
2331                    // Reply already sent
2332                    false
2333                }
2334            }
2335            ItemLookupState::Mutable {
2336                reply,
2337                best_value,
2338                best_seq,
2339                ..
2340            } => {
2341                if reply
2342                    .as_ref()
2343                    .is_some_and(tokio::sync::oneshot::Sender::is_closed)
2344                {
2345                    false
2346                } else if reply.is_some() {
2347                    true
2348                } else {
2349                    // Check if we should finalize — reply already taken means we're done
2350                    let _ = best_value;
2351                    let _ = best_seq;
2352                    false
2353                }
2354            }
2355        });
2356
2357        // Clean up completed put operations
2358        self.item_put_ops.retain(|_, put_op| match put_op {
2359            ItemPutState::Immutable { reply, .. } | ItemPutState::Mutable { reply, .. } => {
2360                reply.is_some()
2361            }
2362        });
2363
2364        // Refresh stale buckets
2365        let stale = self
2366            .routing_table
2367            .read()
2368            .stale_buckets(Duration::from_mins(15));
2369        for bucket_idx in stale {
2370            let target = self.routing_table.read().random_id_in_bucket(bucket_idx);
2371            let closest = self.routing_table.read().closest(&target, 3);
2372            for node in closest {
2373                self.send_find_node(node.addr, target, Some(node.id)).await;
2374            }
2375        }
2376
2377        // Persist routing table to JSON (atomic write)
2378        self.save_routing_table();
2379    }
2380
2381    async fn send_find_node(&mut self, addr: SocketAddr, target: Id20, node_id: Option<Id20>) {
2382        if !self.rate_limiter.try_acquire() {
2383            return;
2384        }
2385        let txn = self.next_transaction_id();
2386        let own_id = *self.routing_table.read().own_id();
2387        let msg = KrpcMessage {
2388            transaction_id: TransactionId::from_u16(txn),
2389            body: KrpcBody::Query(KrpcQuery::FindNode {
2390                id: own_id,
2391                target,
2392                want: self.outgoing_want(),
2393            }),
2394            sender_ip: None,
2395            read_only: self.config.read_only_mode,
2396        };
2397        if let Ok(bytes) = msg.to_bytes() {
2398            let _ = self.socket.send_to(&bytes, addr).await;
2399            self.pending.insert(
2400                txn,
2401                PendingQuery {
2402                    sent_at: Instant::now(),
2403                    addr,
2404                    kind: PendingQueryKind::FindNode,
2405                    node_id,
2406                    response_tx: None,
2407                },
2408            );
2409            self.stats.total_queries_sent += 1;
2410        }
2411    }
2412
2413    async fn send_ping(&mut self, addr: SocketAddr, node_id: Option<Id20>) {
2414        if !self.rate_limiter.try_acquire() {
2415            return;
2416        }
2417        let txn = self.next_transaction_id();
2418        let own_id = *self.routing_table.read().own_id();
2419        let msg = KrpcMessage {
2420            transaction_id: TransactionId::from_u16(txn),
2421            body: KrpcBody::Query(KrpcQuery::Ping { id: own_id }),
2422            sender_ip: None,
2423            read_only: self.config.read_only_mode,
2424        };
2425        if let Ok(bytes) = msg.to_bytes() {
2426            let _ = self.socket.send_to(&bytes, addr).await;
2427            self.pending.insert(
2428                txn,
2429                PendingQuery {
2430                    sent_at: Instant::now(),
2431                    addr,
2432                    node_id,
2433                    kind: PendingQueryKind::Ping,
2434                    response_tx: None,
2435                },
2436            );
2437            self.stats.total_queries_sent += 1;
2438        }
2439    }
2440
2441    async fn ping_questionable_nodes(&mut self) {
2442        let nodes = self.routing_table.read().questionable_nodes();
2443        for (id, addr) in nodes {
2444            self.send_ping(addr, Some(id)).await;
2445        }
2446    }
2447
2448    // ---- BEP 44: item get/put handlers ----
2449
2450    async fn handle_get_immutable(
2451        &mut self,
2452        target: Id20,
2453        reply: oneshot::Sender<Result<Option<Vec<u8>>>>,
2454    ) {
2455        // Check local store first
2456        if let Some(item) = self.item_store.get_immutable(&target) {
2457            let _ = reply.send(Ok(Some(item.value)));
2458            return;
2459        }
2460
2461        // Initiate iterative get to the closest nodes
2462        let closest = self.routing_table.read().closest(&target, K);
2463        if closest.is_empty() {
2464            // No nodes to query — return None immediately
2465            let _ = reply.send(Ok(None));
2466            return;
2467        }
2468
2469        for node in closest.iter().take(3) {
2470            self.send_get_item(node.addr, target, None).await;
2471        }
2472
2473        self.item_lookups.insert(
2474            target,
2475            ItemLookupState::Immutable {
2476                reply: Some(reply),
2477                queried: closest.iter().map(|n| n.id).collect(),
2478            },
2479        );
2480    }
2481
2482    async fn handle_put_immutable(&mut self, value: Vec<u8>, reply: oneshot::Sender<Result<Id20>>) {
2483        let item = match crate::bep44::ImmutableItem::new(value) {
2484            Ok(item) => item,
2485            Err(e) => {
2486                let _ = reply.send(Err(e));
2487                return;
2488            }
2489        };
2490        let target = item.target;
2491
2492        // Store locally
2493        self.item_store.put_immutable(item.clone());
2494
2495        // Reply immediately — local store succeeded.
2496        let _ = reply.send(Ok(target));
2497
2498        // Best-effort propagation: find closest nodes, get tokens, then put.
2499        let closest = self.routing_table.read().closest(&target, K);
2500        if closest.is_empty() {
2501            return;
2502        }
2503
2504        for node in closest.iter().take(K) {
2505            self.send_get_item(node.addr, target, None).await;
2506        }
2507
2508        self.item_put_ops.insert(
2509            target,
2510            ItemPutState::Immutable {
2511                item,
2512                tokens: HashMap::new(),
2513                sent_puts: 0,
2514                reply: None,
2515            },
2516        );
2517    }
2518
2519    #[allow(clippy::type_complexity)]
2520    async fn handle_get_mutable(
2521        &mut self,
2522        public_key: [u8; 32],
2523        salt: Vec<u8>,
2524        reply: oneshot::Sender<Result<Option<(Vec<u8>, i64)>>>,
2525    ) {
2526        let target = crate::bep44::compute_mutable_target(&public_key, &salt);
2527
2528        // Check local store first
2529        if let Some(item) = self.item_store.get_mutable(&public_key, &salt) {
2530            let _ = reply.send(Ok(Some((item.value, item.seq))));
2531            return;
2532        }
2533
2534        // Initiate iterative get
2535        let closest = self.routing_table.read().closest(&target, K);
2536        if closest.is_empty() {
2537            let _ = reply.send(Ok(None));
2538            return;
2539        }
2540
2541        for node in closest.iter().take(3) {
2542            self.send_get_item(node.addr, target, None).await;
2543        }
2544
2545        self.item_lookups.insert(
2546            target,
2547            ItemLookupState::Mutable {
2548                salt,
2549                reply: Some(reply),
2550                best_seq: i64::MIN,
2551                best_value: None,
2552                queried: closest.iter().map(|n| n.id).collect(),
2553            },
2554        );
2555    }
2556
2557    async fn handle_put_mutable(
2558        &mut self,
2559        keypair_bytes: [u8; 32],
2560        value: Vec<u8>,
2561        seq: i64,
2562        salt: Vec<u8>,
2563        reply: oneshot::Sender<Result<Id20>>,
2564    ) {
2565        let keypair = ed25519_dalek::SigningKey::from_bytes(&keypair_bytes);
2566        let item = match crate::bep44::MutableItem::create(&keypair, value, seq, salt) {
2567            Ok(item) => item,
2568            Err(e) => {
2569                let _ = reply.send(Err(e));
2570                return;
2571            }
2572        };
2573        let target = item.target;
2574
2575        // Store locally
2576        self.item_store.put_mutable(item.clone());
2577
2578        // Reply immediately — local store succeeded.
2579        let _ = reply.send(Ok(target));
2580
2581        // Best-effort propagation: find closest nodes, get tokens, then put.
2582        let closest = self.routing_table.read().closest(&target, K);
2583        if closest.is_empty() {
2584            return;
2585        }
2586
2587        for node in closest.iter().take(K) {
2588            self.send_get_item(node.addr, target, None).await;
2589        }
2590
2591        self.item_put_ops.insert(
2592            target,
2593            ItemPutState::Mutable {
2594                item,
2595                tokens: HashMap::new(),
2596                sent_puts: 0,
2597                reply: None,
2598            },
2599        );
2600    }
2601
2602    // Gap 5: send_get_item uses sender_ip: None for outgoing queries
2603    async fn send_get_item(&mut self, addr: SocketAddr, target: Id20, seq: Option<i64>) {
2604        if !self.rate_limiter.try_acquire() {
2605            return;
2606        }
2607        let txn = self.next_transaction_id();
2608        let own_id = *self.routing_table.read().own_id();
2609        let msg = KrpcMessage {
2610            transaction_id: TransactionId::from_u16(txn),
2611            body: KrpcBody::Query(KrpcQuery::Get {
2612                id: own_id,
2613                target,
2614                seq,
2615            }),
2616            sender_ip: None, // Gap 5: outgoing queries use None
2617            read_only: self.config.read_only_mode,
2618        };
2619        if let Ok(bytes) = msg.to_bytes() {
2620            let _ = self.socket.send_to(&bytes, addr).await;
2621            self.pending.insert(
2622                txn,
2623                PendingQuery {
2624                    sent_at: Instant::now(),
2625                    addr,
2626                    kind: PendingQueryKind::GetItem { target },
2627                    node_id: None,
2628                    response_tx: None,
2629                },
2630            );
2631            self.stats.total_queries_sent += 1;
2632        }
2633    }
2634
2635    // Gap 5: send_put_item uses sender_ip: None for outgoing queries
2636    async fn send_put_item(&mut self, params: PutItemParams) {
2637        if !self.rate_limiter.try_acquire() {
2638            return;
2639        }
2640        let txn = self.next_transaction_id();
2641        let own_id = *self.routing_table.read().own_id();
2642        let msg = KrpcMessage {
2643            transaction_id: TransactionId::from_u16(txn),
2644            body: KrpcBody::Query(KrpcQuery::Put {
2645                id: own_id,
2646                token: params.token,
2647                value: params.value,
2648                key: params.key,
2649                signature: params.signature,
2650                seq: params.seq,
2651                salt: params.salt,
2652                cas: None,
2653            }),
2654            sender_ip: None, // Gap 5: outgoing queries use None
2655            read_only: self.config.read_only_mode,
2656        };
2657        if let Ok(bytes) = msg.to_bytes() {
2658            let _ = self.socket.send_to(&bytes, params.addr).await;
2659            self.pending.insert(
2660                txn,
2661                PendingQuery {
2662                    sent_at: Instant::now(),
2663                    addr: params.addr,
2664                    kind: PendingQueryKind::PutItem,
2665                    node_id: None,
2666                    response_tx: None,
2667                },
2668            );
2669            self.stats.total_queries_sent += 1;
2670        }
2671    }
2672
2673    // Gap 8: Extract data into local variables before calling self.send_put_item
2674    async fn send_pending_puts(&mut self, target: Id20) {
2675        let puts_to_send: Vec<PutItemParams> = if let Some(put_op) = self.item_put_ops.get(&target)
2676        {
2677            match put_op {
2678                ItemPutState::Immutable { item, tokens, .. } => tokens
2679                    .values()
2680                    .take(K)
2681                    .map(|(addr, token)| PutItemParams {
2682                        addr: *addr,
2683                        token: token.clone(),
2684                        value: item.value.clone(),
2685                        key: None,
2686                        signature: None,
2687                        seq: None,
2688                        salt: None,
2689                    })
2690                    .collect(),
2691                ItemPutState::Mutable { item, tokens, .. } => {
2692                    let salt = if item.salt.is_empty() {
2693                        None
2694                    } else {
2695                        Some(item.salt.clone())
2696                    };
2697                    tokens
2698                        .values()
2699                        .take(K)
2700                        .map(|(addr, token)| PutItemParams {
2701                            addr: *addr,
2702                            token: token.clone(),
2703                            value: item.value.clone(),
2704                            key: Some(item.public_key),
2705                            signature: Some(item.signature),
2706                            seq: Some(item.seq),
2707                            salt: salt.clone(),
2708                        })
2709                        .collect()
2710                }
2711            }
2712        } else {
2713            return;
2714        };
2715
2716        let num_puts = puts_to_send.len();
2717        for params in puts_to_send {
2718            self.send_put_item(params).await;
2719        }
2720
2721        // Update sent_puts count and send reply
2722        if let Some(put_op) = self.item_put_ops.get_mut(&target) {
2723            match put_op {
2724                ItemPutState::Immutable {
2725                    item,
2726                    sent_puts,
2727                    reply,
2728                    ..
2729                } => {
2730                    *sent_puts = num_puts;
2731                    if let Some(r) = reply.take() {
2732                        let _ = r.send(Ok(item.target));
2733                    }
2734                }
2735                ItemPutState::Mutable {
2736                    item,
2737                    sent_puts,
2738                    reply,
2739                    ..
2740                } => {
2741                    *sent_puts = num_puts;
2742                    if let Some(r) = reply.take() {
2743                        let _ = r.send(Ok(item.target));
2744                    }
2745                }
2746            }
2747        }
2748    }
2749
2750    // ---- BEP 51: sample_infohashes handler ----
2751
2752    async fn handle_sample_infohashes(
2753        &mut self,
2754        target: Id20,
2755        reply: oneshot::Sender<Result<SampleInfohashesResult>>,
2756    ) {
2757        // Find closest node to the target and send the query there
2758        let closest = self.routing_table.read().closest(&target, 1);
2759        let (addr, closest_node_id) = if let Some(node) = closest.first() {
2760            (node.addr, node.id)
2761        } else {
2762            let _ = reply.send(Err(Error::InvalidMessage(
2763                "no nodes in routing table".into(),
2764            )));
2765            return;
2766        };
2767
2768        if !self.rate_limiter.try_acquire() {
2769            let _ = reply.send(Err(Error::Timeout));
2770            return;
2771        }
2772        let txn = self.next_transaction_id();
2773        let own_id = *self.routing_table.read().own_id();
2774        let msg = KrpcMessage {
2775            transaction_id: TransactionId::from_u16(txn),
2776            body: KrpcBody::Query(KrpcQuery::SampleInfohashes { id: own_id, target }),
2777            sender_ip: None, // Gap 2: outgoing queries use None
2778            read_only: self.config.read_only_mode,
2779        };
2780        if let Ok(bytes) = msg.to_bytes() {
2781            let _ = self.socket.send_to(&bytes, addr).await;
2782            self.pending.insert(
2783                txn,
2784                PendingQuery {
2785                    sent_at: Instant::now(),
2786                    addr,
2787                    kind: PendingQueryKind::SampleInfohashes,
2788                    node_id: Some(closest_node_id),
2789                    response_tx: None,
2790                },
2791            );
2792            self.stats.total_queries_sent += 1;
2793        }
2794        // Store the reply sender for when the response comes back
2795        self.sample_replies.insert(txn, reply);
2796    }
2797
2798    fn next_transaction_id(&self) -> u16 {
2799        let txn = self.next_txn_id.fetch_add(1, Ordering::Relaxed);
2800        // Skip zero — reserved as "invalid".
2801        if txn == 0 {
2802            return self.next_txn_id.fetch_add(1, Ordering::Relaxed);
2803        }
2804        txn
2805    }
2806
2807    /// Insert a node into the routing table, enforcing BEP 42 and BEP 43 if enabled.
2808    fn checked_insert(&self, id: Id20, addr: SocketAddr, read_only: bool) -> bool {
2809        // BEP 43: never add read-only nodes to the routing table
2810        if read_only {
2811            trace!(
2812                node_id = %id,
2813                ip = %addr.ip(),
2814                "BEP 43: skipping read-only node"
2815            );
2816            return false;
2817        }
2818        if self.config.enforce_node_id && !node_id::is_valid_node_id(&id, addr.ip()) {
2819            trace!(
2820                node_id = %id,
2821                ip = %addr.ip(),
2822                "BEP 42: rejecting node with invalid ID for IP"
2823            );
2824            return false;
2825        }
2826        self.routing_table.write().insert(id, addr)
2827    }
2828
2829    /// Regenerate our node ID to be BEP 42-compliant for the given external IP.
2830    ///
2831    /// Preserves existing routing table nodes by re-inserting them into the
2832    /// new table. This avoids losing bootstrap-discovered nodes when the IP
2833    /// voter reaches consensus shortly after startup.
2834    fn regenerate_node_id(&mut self, external_ip: std::net::IpAddr) {
2835        let r = self.routing_table.read().own_id().0[19] & 0x07;
2836        let new_id = node_id::generate_node_id(external_ip, r);
2837        let restrict_ips = self.config.restrict_routing_ips;
2838        let max_routing_nodes = self.config.max_routing_nodes;
2839        let mut old_nodes = self.routing_table.read().all_nodes();
2840        debug!(
2841            old_id = %self.routing_table.read().own_id(),
2842            new_id = %new_id,
2843            preserved_nodes = old_nodes.len(),
2844            "BEP 42: regenerating node ID"
2845        );
2846        *self.routing_table.write() =
2847            RoutingTable::with_config(new_id, restrict_ips, max_routing_nodes);
2848
2849        // Sort nodes by XOR distance to the new ID (closest first).
2850        // This maximizes bucket splits: close nodes fill the home bucket,
2851        // triggering splits that create capacity for more distant nodes.
2852        // Without sorting, distant nodes fill non-splittable buckets first
2853        // and get rejected (we saw 72→20 node loss without this).
2854        old_nodes.sort_by_key(|(id, _)| id.xor_distance(&new_id));
2855
2856        let mut inserted = 0usize;
2857        for (id, addr) in &old_nodes {
2858            if self.routing_table.write().insert(*id, *addr) {
2859                inserted += 1;
2860            }
2861        }
2862        debug!(
2863            new_table_size = self.routing_table.read().len(),
2864            attempted = old_nodes.len(),
2865            inserted,
2866            "BEP 42: node ID regeneration complete"
2867        );
2868
2869        // Invalidate all active DhtLookup tasks. They hold Arc clones of the
2870        // routing table (which is now replaced), and their closest-node lists
2871        // may be wrong under the new ID. Aborting drops their peer_tx senders,
2872        // which makes the session detect `dht_peers_rx = None` and re-issue
2873        // `get_peers()` against the fresh routing table.
2874        if !self.active_lookups.is_empty() {
2875            // Also remove pending queries for the cleared lookups. Without this,
2876            // stale queries expire later and call mark_failed() on nodes that the
2877            // NEW lookup might want to query, degrading their routing table status.
2878            let cleared_hashes: std::collections::HashSet<Id20> =
2879                self.active_lookups.keys().copied().collect();
2880            let stale_txns: Vec<u16> = self
2881                .pending
2882                .iter()
2883                .filter(|entry| {
2884                    matches!(entry.value().kind, PendingQueryKind::GetPeers { info_hash }
2885                        if cleared_hashes.contains(&info_hash))
2886                })
2887                .map(|entry| *entry.key())
2888                .collect();
2889            debug!(
2890                active_lookups = self.active_lookups.len(),
2891                stale_pending = stale_txns.len(),
2892                "BEP 42: invalidating active get_peers lookups (will be re-issued by session)"
2893            );
2894            for txn in stale_txns {
2895                self.pending.remove(&txn);
2896            }
2897            for (_, handle) in self.active_lookups.drain() {
2898                handle.abort();
2899            }
2900        }
2901
2902        // Re-trigger iterative bootstrap with the new node ID.
2903        // The first bootstrap targeted the old ID, so the discovered nodes
2904        // are in the wrong neighbourhood. A fresh find_node cascade targeting
2905        // the new ID fills the home bucket properly.
2906        let initial_closest: Vec<CompactNodeInfo> = self
2907            .routing_table
2908            .read()
2909            .closest(&new_id, K)
2910            .into_iter()
2911            .map(|n| CompactNodeInfo {
2912                id: n.id,
2913                addr: n.addr,
2914            })
2915            .collect();
2916        if !initial_closest.is_empty() {
2917            debug!(
2918                seed_nodes = initial_closest.len(),
2919                "BEP 42: re-bootstrapping with new node ID"
2920            );
2921            let mut lookup = IterativeLookup::new(
2922                new_id,
2923                FindNodeCallbacks {
2924                    round: 0,
2925                    max_rounds: 6,
2926                },
2927            );
2928            lookup.closest = initial_closest;
2929            self.bootstrap_lookup = Some(lookup);
2930            // M97: Re-gate get_peers until the new bootstrap completes
2931            self.bootstrap_complete = false;
2932            self.bootstrap_timeout = Some(Box::pin(tokio::time::sleep(Duration::from_secs(10))));
2933        }
2934    }
2935
2936    fn make_stats(&self) -> DhtStats {
2937        let (immutable, mutable) = self.item_store.count();
2938        DhtStats {
2939            node_id: *self.routing_table.read().own_id(),
2940            routing_table_size: self.routing_table.read().len(),
2941            bucket_count: self.routing_table.read().bucket_count(),
2942            peer_store_info_hashes: self.peer_store.info_hash_count(),
2943            peer_store_peers: self.peer_store.peer_count(),
2944            pending_queries: self.pending.len(),
2945            total_queries_sent: self.stats.total_queries_sent,
2946            total_responses_received: self.stats.total_responses_received,
2947            dht_item_count: immutable + mutable,
2948        }
2949    }
2950}
2951
2952/// Hash a socket address to a u64 for use as a voter source ID.
2953fn hash_source_addr(addr: &SocketAddr) -> u64 {
2954    use std::hash::{Hash, Hasher};
2955    let mut hasher = std::collections::hash_map::DefaultHasher::new();
2956    addr.hash(&mut hasher);
2957    hasher.finish()
2958}
2959
2960/// Maximum duration for DNS bootstrap retry attempts per hostname.
2961const DNS_BOOTSTRAP_DEADLINE: Duration = Duration::from_mins(2);
2962
2963/// Initial retry delay for DNS bootstrap resolution.
2964const DNS_BOOTSTRAP_INITIAL_DELAY: Duration = Duration::from_secs(1);
2965
2966/// Maximum retry delay for DNS bootstrap resolution (exponential backoff cap).
2967const DNS_BOOTSTRAP_MAX_DELAY: Duration = Duration::from_secs(30);
2968
2969/// Resolve a single bootstrap hostname with exponential backoff.
2970///
2971/// Retries DNS resolution with delays of 1s, 2s, 4s, ..., capped at 30s,
2972/// until success or the 120-second deadline is reached. On success, sends
2973/// the matching addresses (filtered by address family) to `tx`.
2974async fn dns_bootstrap_resolve(
2975    hostname: String,
2976    family: AddressFamily,
2977    tx: mpsc::Sender<Vec<SocketAddr>>,
2978) {
2979    let deadline = Instant::now() + DNS_BOOTSTRAP_DEADLINE;
2980    let mut delay = DNS_BOOTSTRAP_INITIAL_DELAY;
2981
2982    loop {
2983        match tokio::net::lookup_host(hostname.as_str()).await {
2984            Ok(addrs) => {
2985                let matching: Vec<SocketAddr> = addrs
2986                    .filter(|a| match family {
2987                        AddressFamily::V4 => a.is_ipv4(),
2988                        AddressFamily::V6 => a.is_ipv6(),
2989                    })
2990                    .collect();
2991                debug!(
2992                    %hostname,
2993                    count = matching.len(),
2994                    ?family,
2995                    "DNS bootstrap resolved"
2996                );
2997                let _ = tx.send(matching).await;
2998                break;
2999            }
3000            Err(e) if Instant::now() + delay < deadline => {
3001                warn!(%hostname, %e, ?delay, "DNS bootstrap retry");
3002                tokio::time::sleep(delay).await;
3003                delay = delay.saturating_mul(2).min(DNS_BOOTSTRAP_MAX_DELAY);
3004            }
3005            Err(e) => {
3006                warn!(%hostname, %e, "DNS bootstrap failed after retries");
3007                break;
3008            }
3009        }
3010    }
3011}
3012
3013/// Generate a random node ID for this DHT node.
3014fn generate_node_id() -> Id20 {
3015    use std::cell::Cell;
3016    use std::time::SystemTime;
3017
3018    thread_local! {
3019        static STATE: Cell<u64> = Cell::new(
3020            SystemTime::now()
3021                .duration_since(SystemTime::UNIX_EPOCH)
3022                .unwrap_or_default()
3023                .as_nanos() as u64
3024        );
3025    }
3026
3027    let mut bytes = [0u8; 20];
3028    for byte in &mut bytes {
3029        STATE.with(|s| {
3030            let mut x = s.get();
3031            x ^= x << 13;
3032            x ^= x >> 7;
3033            x ^= x << 17;
3034            s.set(x);
3035            *byte = x as u8;
3036        });
3037    }
3038    Id20(bytes)
3039}
3040
3041#[cfg(test)]
3042mod tests {
3043    use super::*;
3044
3045    #[test]
3046    fn generate_node_id_is_unique() {
3047        let a = generate_node_id();
3048        let b = generate_node_id();
3049        assert_ne!(a, b);
3050    }
3051
3052    #[tokio::test]
3053    async fn dht_handle_start_and_shutdown() {
3054        let config = DhtConfig {
3055            bind_addr: "127.0.0.1:0".parse().unwrap(),
3056            bootstrap_nodes: Vec::new(), // No bootstrap for test
3057            ..DhtConfig::default()
3058        };
3059        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3060        let stats = handle.stats().await.unwrap();
3061        assert_eq!(stats.routing_table_size, 0);
3062        handle.shutdown().await.unwrap();
3063    }
3064
3065    #[tokio::test]
3066    async fn dht_handle_stats() {
3067        let config = DhtConfig {
3068            bind_addr: "127.0.0.1:0".parse().unwrap(),
3069            bootstrap_nodes: Vec::new(),
3070            ..DhtConfig::default()
3071        };
3072        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3073        let stats = handle.stats().await.unwrap();
3074        assert_eq!(stats.routing_table_size, 0);
3075        assert_eq!(stats.bucket_count, 1);
3076        assert_eq!(stats.pending_queries, 0);
3077        handle.shutdown().await.unwrap();
3078    }
3079
3080    /// M171 D4: [`DhtHandle::node_count`] must return the same value as
3081    /// `stats().routing_table_size`. Using a startup-empty bootstrap
3082    /// ensures the pre-populate count is 0.
3083    #[tokio::test]
3084    async fn dht_handle_node_count_matches_stats() {
3085        let config = DhtConfig {
3086            bind_addr: "127.0.0.1:0".parse().unwrap(),
3087            bootstrap_nodes: Vec::new(),
3088            ..DhtConfig::default()
3089        };
3090        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3091        let stats = handle.stats().await.unwrap();
3092        let count = handle.node_count().await.unwrap();
3093        assert_eq!(count, stats.routing_table_size);
3094        assert_eq!(count, 0, "empty bootstrap ⇒ empty routing table");
3095        handle.shutdown().await.unwrap();
3096    }
3097
3098    #[tokio::test]
3099    async fn two_dht_nodes_ping() {
3100        // Start two DHT nodes on localhost, have one send find_node to the other
3101        let config_a = DhtConfig {
3102            bind_addr: "127.0.0.1:0".parse().unwrap(),
3103            bootstrap_nodes: Vec::new(),
3104            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3105            ..DhtConfig::default()
3106        };
3107        let config_b = DhtConfig {
3108            bind_addr: "127.0.0.1:0".parse().unwrap(),
3109            bootstrap_nodes: Vec::new(),
3110            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000002").unwrap()),
3111            ..DhtConfig::default()
3112        };
3113
3114        let (handle_a, _ip_rx_a) = DhtHandle::start(config_a).await.unwrap();
3115        let (handle_b, _ip_rx_b) = DhtHandle::start(config_b).await.unwrap();
3116
3117        // Give them a moment to bind
3118        tokio::time::sleep(Duration::from_millis(50)).await;
3119
3120        // Both should have empty routing tables
3121        let stats_a = handle_a.stats().await.unwrap();
3122        let stats_b = handle_b.stats().await.unwrap();
3123        assert_eq!(stats_a.routing_table_size, 0);
3124        assert_eq!(stats_b.routing_table_size, 0);
3125
3126        handle_a.shutdown().await.unwrap();
3127        handle_b.shutdown().await.unwrap();
3128    }
3129
3130    #[tokio::test]
3131    async fn dht_handle_get_peers_empty_table() {
3132        let config = DhtConfig {
3133            bind_addr: "127.0.0.1:0".parse().unwrap(),
3134            bootstrap_nodes: Vec::new(),
3135            ..DhtConfig::default()
3136        };
3137        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3138
3139        let info_hash = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
3140        let _rx = handle.get_peers(info_hash).await.unwrap();
3141
3142        // With empty routing table, no peers will be found and channel closes
3143        tokio::time::sleep(Duration::from_millis(100)).await;
3144        // Channel should eventually be cleaned up
3145        let stats = handle.stats().await.unwrap();
3146        assert_eq!(stats.routing_table_size, 0);
3147
3148        handle.shutdown().await.unwrap();
3149    }
3150
3151    #[tokio::test]
3152    async fn dht_handles_malformed_packet() {
3153        let config = DhtConfig {
3154            bind_addr: "127.0.0.1:0".parse().unwrap(),
3155            bootstrap_nodes: Vec::new(),
3156            ..DhtConfig::default()
3157        };
3158        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3159
3160        // Get the DHT port from stats (indirect — we'd need to expose local_addr)
3161        // For now, just verify it doesn't crash on shutdown
3162        tokio::time::sleep(Duration::from_millis(50)).await;
3163        handle.shutdown().await.unwrap();
3164    }
3165
3166    #[test]
3167    fn dht_config_default_is_v4() {
3168        let config = DhtConfig::default();
3169        assert_eq!(config.address_family, AddressFamily::V4);
3170        assert!(config.bind_addr.is_ipv4());
3171    }
3172
3173    #[test]
3174    fn dht_config_default_v6() {
3175        let config = DhtConfig::default_v6();
3176        assert_eq!(config.address_family, AddressFamily::V6);
3177        assert!(config.bind_addr.is_ipv6());
3178        // Should have bootstrap nodes
3179        assert!(!config.bootstrap_nodes.is_empty());
3180    }
3181
3182    #[tokio::test]
3183    async fn dht_v6_start_and_shutdown() {
3184        let config = DhtConfig {
3185            bind_addr: "[::1]:0".parse().unwrap(),
3186            bootstrap_nodes: Vec::new(),
3187            ..DhtConfig::default_v6()
3188        };
3189        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3190        let stats = handle.stats().await.unwrap();
3191        assert_eq!(stats.routing_table_size, 0);
3192        handle.shutdown().await.unwrap();
3193    }
3194
3195    #[tokio::test]
3196    async fn dht_v6_stats_on_empty_table() {
3197        let config = DhtConfig {
3198            bind_addr: "[::1]:0".parse().unwrap(),
3199            bootstrap_nodes: Vec::new(),
3200            ..DhtConfig::default_v6()
3201        };
3202        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3203        let stats = handle.stats().await.unwrap();
3204        assert_eq!(stats.routing_table_size, 0);
3205        assert_eq!(stats.bucket_count, 1);
3206        assert_eq!(stats.pending_queries, 0);
3207        assert_eq!(stats.total_queries_sent, 0);
3208        handle.shutdown().await.unwrap();
3209    }
3210
3211    #[test]
3212    fn matches_family_helper() {
3213        let actor_v4 = AddressFamily::V4;
3214        let actor_v6 = AddressFamily::V6;
3215        let v4_addr: SocketAddr = "1.2.3.4:6881".parse().unwrap();
3216        let v6_addr: SocketAddr = "[::1]:6881".parse().unwrap();
3217
3218        assert!(matches!(actor_v4, AddressFamily::V4) && v4_addr.is_ipv4());
3219        assert!(!v6_addr.is_ipv4());
3220        assert!(matches!(actor_v6, AddressFamily::V6) && v6_addr.is_ipv6());
3221        assert!(!v4_addr.is_ipv6());
3222    }
3223
3224    #[test]
3225    fn dht_config_security_defaults() {
3226        let config = DhtConfig::default();
3227        // enforce_node_id off by default: too many real DHT nodes lack BEP 42 IDs
3228        assert!(!config.enforce_node_id);
3229        assert!(config.restrict_routing_ips);
3230
3231        let config_v6 = DhtConfig::default_v6();
3232        assert!(!config_v6.enforce_node_id);
3233        assert!(config_v6.restrict_routing_ips);
3234    }
3235
3236    #[tokio::test]
3237    async fn dht_handle_start_returns_ip_channel() {
3238        let config = DhtConfig {
3239            bind_addr: "127.0.0.1:0".parse().unwrap(),
3240            bootstrap_nodes: Vec::new(),
3241            ..DhtConfig::default()
3242        };
3243        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3244        handle.shutdown().await.unwrap();
3245    }
3246
3247    #[tokio::test]
3248    async fn dht_update_external_ip() {
3249        let config = DhtConfig {
3250            bind_addr: "127.0.0.1:0".parse().unwrap(),
3251            bootstrap_nodes: Vec::new(),
3252            ..DhtConfig::default()
3253        };
3254        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3255        handle
3256            .update_external_ip("203.0.113.5".parse().unwrap(), IpVoteSource::Nat)
3257            .await
3258            .unwrap();
3259        handle.shutdown().await.unwrap();
3260    }
3261
3262    // ---- BEP 44 put/get API tests ----
3263
3264    // Gap 2: All tests use `let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();`
3265
3266    #[tokio::test]
3267    async fn dht_put_get_immutable_local() {
3268        let config = DhtConfig {
3269            bind_addr: "127.0.0.1:0".parse().unwrap(),
3270            bootstrap_nodes: Vec::new(),
3271            ..DhtConfig::default()
3272        };
3273        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3274
3275        // Put an immutable item
3276        let value = b"12:Hello World!".to_vec();
3277        let target = handle.put_immutable(value.clone()).await.unwrap();
3278
3279        // Get it back (from local store)
3280        let result = handle.get_immutable(target).await.unwrap();
3281        assert_eq!(result, Some(value));
3282
3283        // Verify SHA-1 target
3284        assert_eq!(target, irontide_core::sha1(b"12:Hello World!"));
3285
3286        handle.shutdown().await.unwrap();
3287    }
3288
3289    #[tokio::test]
3290    async fn dht_put_get_mutable_local() {
3291        let config = DhtConfig {
3292            bind_addr: "127.0.0.1:0".parse().unwrap(),
3293            bootstrap_nodes: Vec::new(),
3294            ..DhtConfig::default()
3295        };
3296        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3297
3298        let seed = [42u8; 32];
3299        let keypair = ed25519_dalek::SigningKey::from_bytes(&seed);
3300        let pubkey = keypair.verifying_key().to_bytes();
3301
3302        let value = b"4:test".to_vec();
3303        let target = handle
3304            .put_mutable(seed, value.clone(), 1, Vec::new())
3305            .await
3306            .unwrap();
3307
3308        // Get it back (from local store)
3309        let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3310        assert_eq!(result, Some((value, 1)));
3311
3312        // Verify target
3313        let expected_target = crate::bep44::compute_mutable_target(&pubkey, &[]);
3314        assert_eq!(target, expected_target);
3315
3316        handle.shutdown().await.unwrap();
3317    }
3318
3319    #[tokio::test]
3320    async fn dht_get_immutable_not_found() {
3321        let config = DhtConfig {
3322            bind_addr: "127.0.0.1:0".parse().unwrap(),
3323            bootstrap_nodes: Vec::new(),
3324            ..DhtConfig::default()
3325        };
3326        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3327
3328        let target = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
3329        // With empty routing table, lookup has no peers to query; just returns local result
3330        let result = handle.get_immutable(target).await.unwrap();
3331        assert_eq!(result, None);
3332
3333        handle.shutdown().await.unwrap();
3334    }
3335
3336    #[tokio::test]
3337    async fn dht_put_immutable_rejects_oversized() {
3338        let config = DhtConfig {
3339            bind_addr: "127.0.0.1:0".parse().unwrap(),
3340            bootstrap_nodes: Vec::new(),
3341            ..DhtConfig::default()
3342        };
3343        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3344
3345        let value = vec![0u8; 1001];
3346        let result = handle.put_immutable(value).await;
3347        assert!(result.is_err());
3348
3349        handle.shutdown().await.unwrap();
3350    }
3351
3352    #[tokio::test]
3353    async fn dht_stats_includes_item_count() {
3354        let config = DhtConfig {
3355            bind_addr: "127.0.0.1:0".parse().unwrap(),
3356            bootstrap_nodes: Vec::new(),
3357            ..DhtConfig::default()
3358        };
3359        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3360
3361        let stats = handle.stats().await.unwrap();
3362        assert_eq!(stats.dht_item_count, 0);
3363
3364        handle.put_immutable(b"5:hello".to_vec()).await.unwrap();
3365        let stats = handle.stats().await.unwrap();
3366        assert_eq!(stats.dht_item_count, 1);
3367
3368        handle.shutdown().await.unwrap();
3369    }
3370
3371    #[tokio::test]
3372    async fn dht_get_mutable_not_found() {
3373        let config = DhtConfig {
3374            bind_addr: "127.0.0.1:0".parse().unwrap(),
3375            bootstrap_nodes: Vec::new(),
3376            ..DhtConfig::default()
3377        };
3378        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3379
3380        let pubkey = [99u8; 32];
3381        let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3382        assert_eq!(result, None);
3383
3384        handle.shutdown().await.unwrap();
3385    }
3386
3387    #[tokio::test]
3388    async fn two_nodes_put_get_immutable() {
3389        let config_a = DhtConfig {
3390            bind_addr: "127.0.0.1:0".parse().unwrap(),
3391            bootstrap_nodes: Vec::new(),
3392            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3393            ..DhtConfig::default()
3394        };
3395        let (handle_a, _ip_rx) = DhtHandle::start(config_a).await.unwrap();
3396
3397        // Node A stores an item locally
3398        let value = b"12:Hello World!".to_vec();
3399        let target = handle_a.put_immutable(value.clone()).await.unwrap();
3400
3401        // Verify local retrieval
3402        let result = handle_a.get_immutable(target).await.unwrap();
3403        assert_eq!(result, Some(value));
3404
3405        handle_a.shutdown().await.unwrap();
3406    }
3407
3408    #[tokio::test]
3409    async fn put_mutable_sequence_update() {
3410        let config = DhtConfig {
3411            bind_addr: "127.0.0.1:0".parse().unwrap(),
3412            bootstrap_nodes: Vec::new(),
3413            ..DhtConfig::default()
3414        };
3415        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3416
3417        let seed = [99u8; 32];
3418        let keypair = ed25519_dalek::SigningKey::from_bytes(&seed);
3419        let pubkey = keypair.verifying_key().to_bytes();
3420
3421        // Put seq=1
3422        handle
3423            .put_mutable(seed, b"5:first".to_vec(), 1, Vec::new())
3424            .await
3425            .unwrap();
3426        let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3427        assert_eq!(result, Some((b"5:first".to_vec(), 1)));
3428
3429        // Put seq=2 (should replace)
3430        handle
3431            .put_mutable(seed, b"6:second".to_vec(), 2, Vec::new())
3432            .await
3433            .unwrap();
3434        let result = handle.get_mutable(pubkey, Vec::new()).await.unwrap();
3435        assert_eq!(result, Some((b"6:second".to_vec(), 2)));
3436
3437        handle.shutdown().await.unwrap();
3438    }
3439
3440    #[tokio::test]
3441    async fn put_mutable_with_salt_isolation() {
3442        let config = DhtConfig {
3443            bind_addr: "127.0.0.1:0".parse().unwrap(),
3444            bootstrap_nodes: Vec::new(),
3445            ..DhtConfig::default()
3446        };
3447        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3448
3449        let seed = [77u8; 32];
3450        let keypair = ed25519_dalek::SigningKey::from_bytes(&seed);
3451        let pubkey = keypair.verifying_key().to_bytes();
3452
3453        // Put with salt "a"
3454        handle
3455            .put_mutable(seed, b"1:A".to_vec(), 1, b"a".to_vec())
3456            .await
3457            .unwrap();
3458        // Put with salt "b"
3459        handle
3460            .put_mutable(seed, b"1:B".to_vec(), 1, b"b".to_vec())
3461            .await
3462            .unwrap();
3463
3464        // Each salt returns its own value
3465        let a = handle.get_mutable(pubkey, b"a".to_vec()).await.unwrap();
3466        assert_eq!(a, Some((b"1:A".to_vec(), 1)));
3467        let b = handle.get_mutable(pubkey, b"b".to_vec()).await.unwrap();
3468        assert_eq!(b, Some((b"1:B".to_vec(), 1)));
3469
3470        handle.shutdown().await.unwrap();
3471    }
3472
3473    // ---- BEP 51 sample_infohashes tests ----
3474
3475    #[tokio::test]
3476    async fn dht_sample_infohashes_empty_table() {
3477        let config = DhtConfig {
3478            bind_addr: "127.0.0.1:0".parse().unwrap(),
3479            bootstrap_nodes: Vec::new(),
3480            ..DhtConfig::default()
3481        };
3482        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3483
3484        let target = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
3485        let result = handle.sample_infohashes(target).await;
3486        // With empty routing table, we expect an error (no nodes to query)
3487        assert!(result.is_err());
3488
3489        handle.shutdown().await.unwrap();
3490    }
3491
3492    #[tokio::test]
3493    async fn two_nodes_sample_infohashes() {
3494        // Node A will store some peers, then node B queries it
3495        let config_a = DhtConfig {
3496            bind_addr: "127.0.0.1:0".parse().unwrap(),
3497            bootstrap_nodes: Vec::new(),
3498            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3499            ..DhtConfig::default()
3500        };
3501        let (handle_a, _ip_rx_a) = DhtHandle::start(config_a).await.unwrap();
3502
3503        // We can't directly add peers to node A's store through the public API,
3504        // but we can verify the query/response path by having node B query node A.
3505        // Node A will respond with empty samples since its peer store is empty.
3506
3507        // For now, just verify the handle method exists and handles shutdown gracefully
3508        tokio::time::sleep(Duration::from_millis(50)).await;
3509        handle_a.shutdown().await.unwrap();
3510    }
3511
3512    // ---- QueryRateLimiter unit tests ----
3513
3514    #[test]
3515    fn rate_limiter_new_starts_full() {
3516        let limiter = QueryRateLimiter::new(10);
3517        assert_eq!(limiter.permits, 10);
3518        assert_eq!(limiter.max_permits, 10);
3519        assert_eq!(limiter.refill_rate, 10);
3520    }
3521
3522    #[test]
3523    fn rate_limiter_new_zero_rate() {
3524        // A zero-rate limiter should never grant permits.
3525        let mut limiter = QueryRateLimiter::new(0);
3526        assert!(!limiter.try_acquire());
3527    }
3528
3529    #[test]
3530    fn rate_limiter_exhaustion() {
3531        // Drain all N permits, then the (N+1)th call must fail.
3532        let mut limiter = QueryRateLimiter::new(5);
3533        for _ in 0..5 {
3534            assert!(limiter.try_acquire(), "permit should be available");
3535        }
3536        assert!(
3537            !limiter.try_acquire(),
3538            "bucket must be empty after N acquires"
3539        );
3540    }
3541
3542    #[test]
3543    fn rate_limiter_initial_permits_work() {
3544        // Full bucket on creation: first try_acquire always succeeds.
3545        let mut limiter = QueryRateLimiter::new(1);
3546        assert!(limiter.try_acquire());
3547        // Bucket is now empty.
3548        assert!(!limiter.try_acquire());
3549    }
3550
3551    #[test]
3552    fn rate_limiter_refill_caps_at_max() {
3553        // Manually set permits below max, then trigger a refill by faking a
3554        // large elapsed time through repeated calls; instead, just validate the
3555        // cap logic by setting state directly and calling refill via try_acquire.
3556        // We can't easily fake Instant, but we can verify that permits never
3557        // exceed max_permits after a refill.
3558        let mut limiter = QueryRateLimiter::new(10);
3559        // Drain to 0.
3560        for _ in 0..10 {
3561            limiter.try_acquire();
3562        }
3563        assert_eq!(limiter.permits, 0);
3564
3565        // Sleep slightly longer than 1 second so the refill would add >10 permits
3566        // if uncapped. Since we cannot sleep in a unit test cheaply, we instead
3567        // directly manipulate last_refill to simulate elapsed time.
3568        limiter.last_refill = Instant::now() - Duration::from_secs(5);
3569        limiter.refill();
3570        // After 5 seconds at rate 10, raw new_permits = 50, but cap is 10.
3571        assert_eq!(limiter.permits, 10, "permits must not exceed max_permits");
3572    }
3573
3574    #[test]
3575    fn rate_limiter_refill_adds_correct_permits() {
3576        let mut limiter = QueryRateLimiter::new(100);
3577        // Drain all.
3578        for _ in 0..100 {
3579            limiter.try_acquire();
3580        }
3581        // Simulate 0.5 seconds elapsed → should add ~50 permits.
3582        limiter.last_refill = Instant::now() - Duration::from_millis(500);
3583        limiter.refill();
3584        // Allow for timing imprecision: must be in [45, 55].
3585        assert!(
3586            limiter.permits >= 45 && limiter.permits <= 55,
3587            "expected ~50 permits after 0.5s refill at rate 100, got {}",
3588            limiter.permits
3589        );
3590    }
3591
3592    /// Exercises the bootstrap path with saved-node addresses (no DNS).
3593    /// Verifies the bootstrap code path (including new diagnostic logging)
3594    /// runs without panicking.
3595    #[tokio::test]
3596    async fn dht_bootstrap_logging() {
3597        // Use a fake saved-node address (loopback) that won't resolve to a
3598        // real DHT node — the important thing is that the bootstrap code path
3599        // executes all three phases without panicking.
3600        let config = DhtConfig {
3601            bind_addr: "127.0.0.1:0".parse().unwrap(),
3602            bootstrap_nodes: vec!["127.0.0.1:16881".to_owned(), "127.0.0.1:16882".to_owned()],
3603            ..DhtConfig::default()
3604        };
3605        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
3606
3607        // Allow time for bootstrap() to run (pings sent, iterative lookup started).
3608        tokio::time::sleep(Duration::from_millis(200)).await;
3609
3610        let stats = handle.stats().await.unwrap();
3611        // The pings will have been sent (queries_sent >= 2) but won't get
3612        // responses from the fake addresses.
3613        assert!(
3614            stats.total_queries_sent >= 2,
3615            "expected at least 2 ping queries, got {}",
3616            stats.total_queries_sent
3617        );
3618
3619        handle.shutdown().await.unwrap();
3620    }
3621
3622    /// T10: During bootstrap (`bootstrap_complete = false`), the ping gate
3623    /// uses a 5-second interval — pings should fire every tick.
3624    ///
3625    /// Uses millisecond-scale durations so the test completes instantly while
3626    /// exercising the exact same gating logic as the real actor loop.
3627    #[test]
3628    fn ping_interval_5s_during_bootstrap() {
3629        let bootstrap_complete = false;
3630
3631        // Simulate the timing decision with a tick interval equal to the
3632        // bootstrap ping interval (both 5s in production, both 10ms here).
3633        let tick = Duration::from_millis(10);
3634        let bootstrap_interval = tick;
3635        let steady_interval = Duration::from_millis(120);
3636
3637        let mut last_ping = Instant::now();
3638        let mut ping_count: u32 = 0;
3639
3640        // Simulate 6 ticks, sleeping the tick interval between each.
3641        for _ in 0..6 {
3642            std::thread::sleep(tick);
3643
3644            let ping_interval = if bootstrap_complete {
3645                steady_interval
3646            } else {
3647                bootstrap_interval
3648            };
3649            if last_ping.elapsed() >= ping_interval {
3650                ping_count = ping_count.saturating_add(1);
3651                last_ping = Instant::now();
3652            }
3653        }
3654
3655        // All 6 ticks should trigger a ping (tick == bootstrap interval).
3656        assert_eq!(
3657            ping_count, 6,
3658            "expected 6 pings during bootstrap (every tick), got {ping_count}"
3659        );
3660    }
3661
3662    /// T11: After bootstrap (`bootstrap_complete = true`), the ping gate
3663    /// uses a 60-second interval — most ticks are no-ops for pinging.
3664    ///
3665    /// Uses millisecond-scale durations so the test completes instantly while
3666    /// exercising the exact same gating logic as the real actor loop.
3667    #[test]
3668    fn ping_interval_60s_after_bootstrap() {
3669        let bootstrap_complete = true;
3670
3671        // Production ratio: tick = 5s, steady interval = 60s → 12:1.
3672        // Test ratio:       tick = 10ms, steady interval = 120ms → 12:1.
3673        let tick = Duration::from_millis(10);
3674        let bootstrap_interval = tick;
3675        let steady_interval = Duration::from_millis(120);
3676
3677        let mut last_ping = Instant::now();
3678        let mut ping_count: u32 = 0;
3679
3680        // 24 ticks × 10ms = 240ms total. With a 120ms gate, exactly 2
3681        // pings should fire (at tick ~12 = 120ms and tick ~24 = 240ms).
3682        for _ in 0..24 {
3683            std::thread::sleep(tick);
3684
3685            let ping_interval = if bootstrap_complete {
3686                steady_interval
3687            } else {
3688                bootstrap_interval
3689            };
3690            if last_ping.elapsed() >= ping_interval {
3691                ping_count = ping_count.saturating_add(1);
3692                last_ping = Instant::now();
3693            }
3694        }
3695
3696        // Only 2 pings should have fired (12:1 ratio, same as production).
3697        assert_eq!(
3698            ping_count, 2,
3699            "expected 2 pings post-bootstrap (12:1 tick-to-interval ratio), got {ping_count}"
3700        );
3701    }
3702
3703    // ---- DNS bootstrap backoff tests (M105 Task 3) ----
3704
3705    /// T1: Verify DNS resolution is retried with increasing delay on failure.
3706    ///
3707    /// Uses a hostname that will definitely fail DNS resolution. Validates that
3708    /// the backoff logic computes the correct delay sequence (1s, 2s, 4s, ...)
3709    /// capped at 30s.
3710    #[test]
3711    fn dns_backoff_retries_on_failure() {
3712        // Validate the exponential backoff sequence directly.
3713        let mut delay = DNS_BOOTSTRAP_INITIAL_DELAY;
3714        let expected_delays = [
3715            Duration::from_secs(1),
3716            Duration::from_secs(2),
3717            Duration::from_secs(4),
3718            Duration::from_secs(8),
3719            Duration::from_secs(16),
3720            Duration::from_secs(30), // capped
3721            Duration::from_secs(30), // stays capped
3722        ];
3723
3724        for expected in &expected_delays {
3725            assert_eq!(
3726                delay, *expected,
3727                "backoff delay mismatch: got {delay:?}, expected {expected:?}"
3728            );
3729            delay = delay.saturating_mul(2).min(DNS_BOOTSTRAP_MAX_DELAY);
3730        }
3731    }
3732
3733    /// T2: Verify successful retry after initial failure proceeds normally.
3734    ///
3735    /// Spawns `dns_bootstrap_resolve` with localhost (which resolves
3736    /// immediately) and confirms addresses arrive on the channel.
3737    #[tokio::test]
3738    async fn dns_backoff_succeeds_on_retry() {
3739        let (tx, mut rx) = mpsc::channel(16);
3740
3741        // "localhost:1234" should resolve immediately on any system.
3742        let hostname = "localhost:1234".to_owned();
3743        tokio::spawn(dns_bootstrap_resolve(hostname, AddressFamily::V4, tx));
3744
3745        // We should receive at least one batch of addresses.
3746        let result = tokio::time::timeout(Duration::from_secs(5), rx.recv()).await;
3747        assert!(
3748            result.is_ok(),
3749            "expected DNS resolution to complete within 5 seconds"
3750        );
3751        let addrs = result.expect("timeout should not occur");
3752        // localhost resolves, so we should get Some with at least one address.
3753        assert!(
3754            addrs.is_some(),
3755            "expected Some(addresses) from dns_bootstrap_resolve"
3756        );
3757        let addrs = addrs.expect("already checked is_some");
3758        assert!(
3759            !addrs.is_empty(),
3760            "expected at least one resolved address for localhost"
3761        );
3762        // All addresses should be IPv4 since we requested V4.
3763        for addr in &addrs {
3764            assert!(addr.is_ipv4(), "expected IPv4 address, got {addr}");
3765        }
3766    }
3767
3768    /// T3: Verify after 120s of failures, we stop retrying.
3769    ///
3770    /// Tests the deadline logic directly: once `Instant::now() + delay >= deadline`,
3771    /// the function should break out of its loop.
3772    #[test]
3773    fn dns_backoff_total_timeout_120s() {
3774        // Simulate the deadline check from dns_bootstrap_resolve.
3775        // With 120s deadline and delays 1,2,4,8,16,30,30,...
3776        // Sum: 1+2+4+8+16+30 = 61s after 6 retries, then 30s more = 91s after 7,
3777        // 121s after 8 retries → exceeds deadline.
3778        let deadline_duration = DNS_BOOTSTRAP_DEADLINE;
3779        let mut delay = DNS_BOOTSTRAP_INITIAL_DELAY;
3780        let mut total_sleep = Duration::ZERO;
3781        let mut retries = 0u32;
3782
3783        loop {
3784            // Check if the next sleep would exceed the deadline
3785            // (mirrors: `Instant::now() + delay < deadline` in the real code,
3786            // but using cumulative durations since we can't fake Instant).
3787            let next_total = total_sleep.saturating_add(delay);
3788            if next_total >= deadline_duration {
3789                break;
3790            }
3791            total_sleep = next_total;
3792            retries = retries.saturating_add(1);
3793            delay = delay.saturating_mul(2).min(DNS_BOOTSTRAP_MAX_DELAY);
3794        }
3795
3796        // Should have retried several times before hitting the deadline.
3797        assert!(
3798            retries >= 5,
3799            "expected at least 5 retries before 120s deadline, got {retries}"
3800        );
3801        // Total sleep should be < 120s (we broke before the last sleep).
3802        assert!(
3803            total_sleep < deadline_duration,
3804            "total sleep {total_sleep:?} should be less than deadline {deadline_duration:?}"
3805        );
3806    }
3807
3808    /// T16: Verify Phase 3 (`FindNodeLookup`) starts immediately without
3809    /// waiting for DNS resolution.
3810    ///
3811    /// Starts a DHT actor with both saved-node addresses and a DNS hostname.
3812    /// After `bootstrap()`, the `bootstrap_lookup` (Phase 3) must be set, and
3813    /// `dns_bootstrap_rx` must be Some (DNS still in flight).
3814    #[tokio::test]
3815    async fn bootstrap_phase3_starts_before_dns() {
3816        // Use a DNS hostname that takes time to resolve (unresolvable is fine —
3817        // we just need to confirm Phase 3 didn't wait for it).
3818        let config = DhtConfig {
3819            bind_addr: "127.0.0.1:0".parse().unwrap(),
3820            bootstrap_nodes: vec![
3821                // Saved node (parsed as SocketAddr → Phase 1 ping)
3822                "127.0.0.1:16881".to_owned(),
3823                // DNS hostname (goes to background task)
3824                "router.bittorrent.com:6881".to_owned(),
3825            ],
3826            ..DhtConfig::default()
3827        };
3828
3829        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3830        let (tx, rx) = mpsc::channel(256);
3831        let (ip_tx, _ip_rx) = mpsc::channel(4);
3832        let mut actor = DhtActor::new(config, socket, rx, ip_tx);
3833
3834        // Run bootstrap — should return quickly (DNS spawned in background).
3835        actor.bootstrap().await;
3836
3837        // Phase 3 must have started: bootstrap_lookup is set.
3838        assert!(
3839            actor.bootstrap_lookup.is_some(),
3840            "Phase 3 (FindNodeLookup) must start without waiting for DNS"
3841        );
3842
3843        // DNS is still in flight: dns_bootstrap_rx must be Some.
3844        assert!(
3845            actor.dns_bootstrap_rx.is_some(),
3846            "dns_bootstrap_rx should be Some (background DNS tasks still running)"
3847        );
3848
3849        // Cleanup: drop sender so actor doesn't hang.
3850        drop(tx);
3851    }
3852
3853    // ---- JSON routing table persistence tests (M105 Task 5) ----
3854
3855    /// T12: Save routing table to JSON, read it back, verify nodes restored as
3856    /// Questionable. Also test that corrupt JSON is handled gracefully.
3857    #[tokio::test]
3858    async fn json_persistence_round_trip_and_corrupt() {
3859        use crate::routing_table::NodeStatus;
3860        let dir = tempfile::tempdir().expect("failed to create temp dir");
3861
3862        let config = DhtConfig {
3863            bind_addr: "127.0.0.1:0".parse().unwrap(),
3864            bootstrap_nodes: Vec::new(),
3865            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3866            state_dir: Some(dir.path().to_path_buf()),
3867            ..DhtConfig::default()
3868        };
3869
3870        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3871        let (_tx, rx) = mpsc::channel(256);
3872        let (ip_tx, _ip_rx) = mpsc::channel(4);
3873        let actor = DhtActor::new(config.clone(), socket, rx, ip_tx);
3874
3875        // Insert some nodes
3876        let node1_id = Id20::from_hex("1111111111111111111111111111111111111111").unwrap();
3877        let node2_id = Id20::from_hex("2222222222222222222222222222222222222222").unwrap();
3878        let addr1: SocketAddr = "10.0.0.1:6881".parse().unwrap();
3879        let addr2: SocketAddr = "10.0.0.2:6882".parse().unwrap();
3880        actor.routing_table.write().insert(node1_id, addr1);
3881        actor.routing_table.write().insert(node2_id, addr2);
3882        // Mark one as Good to confirm mark_all_questionable works on load
3883        actor.routing_table.write().mark_response(&node1_id);
3884
3885        // Save
3886        actor.save_routing_table();
3887
3888        // Verify file exists
3889        let path = DhtActor::state_file_path(dir.path(), AddressFamily::V4);
3890        assert!(path.exists(), "JSON state file should exist after save");
3891
3892        // Load into a new actor
3893        let config2 = DhtConfig {
3894            bind_addr: "127.0.0.1:0".parse().unwrap(),
3895            bootstrap_nodes: Vec::new(),
3896            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3897            state_dir: Some(dir.path().to_path_buf()),
3898            ..DhtConfig::default()
3899        };
3900        let socket2 = Arc::new(UdpSocket::bind(config2.bind_addr).await.unwrap());
3901        let (_tx2, rx2) = mpsc::channel(256);
3902        let (ip_tx2, _ip_rx2) = mpsc::channel(4);
3903        let actor2 = DhtActor::new(config2, socket2, rx2, ip_tx2);
3904
3905        // Verify nodes were loaded
3906        assert_eq!(actor2.routing_table.read().len(), 2);
3907        assert!(actor2.routing_table.read().get(&node1_id).is_some());
3908        assert!(actor2.routing_table.read().get(&node2_id).is_some());
3909
3910        // All nodes should be Questionable (mark_all_questionable was called)
3911        assert_eq!(
3912            actor2.routing_table.read().get(&node1_id).unwrap().status(),
3913            NodeStatus::Questionable
3914        );
3915        assert_eq!(
3916            actor2.routing_table.read().get(&node2_id).unwrap().status(),
3917            NodeStatus::Questionable
3918        );
3919
3920        // --- Corrupt JSON test ---
3921        std::fs::write(&path, b"{{not valid json at all!!}}")
3922            .expect("failed to write corrupt data");
3923
3924        let config3 = DhtConfig {
3925            bind_addr: "127.0.0.1:0".parse().unwrap(),
3926            bootstrap_nodes: Vec::new(),
3927            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3928            state_dir: Some(dir.path().to_path_buf()),
3929            ..DhtConfig::default()
3930        };
3931        let socket3 = Arc::new(UdpSocket::bind(config3.bind_addr).await.unwrap());
3932        let (_tx3, rx3) = mpsc::channel(256);
3933        let (ip_tx3, _ip_rx3) = mpsc::channel(4);
3934        let actor3 = DhtActor::new(config3, socket3, rx3, ip_tx3);
3935
3936        // Corrupt JSON should result in an empty routing table (graceful fallback)
3937        assert_eq!(actor3.routing_table.read().len(), 0);
3938    }
3939
3940    /// T13: Verify atomic write — temp file is written first, then renamed.
3941    /// A partial (interrupted) write should not corrupt the final state file.
3942    #[tokio::test]
3943    async fn json_persistence_atomic_write() {
3944        let dir = tempfile::tempdir().expect("failed to create temp dir");
3945
3946        let config = DhtConfig {
3947            bind_addr: "127.0.0.1:0".parse().unwrap(),
3948            bootstrap_nodes: Vec::new(),
3949            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
3950            state_dir: Some(dir.path().to_path_buf()),
3951            ..DhtConfig::default()
3952        };
3953
3954        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3955        let (_tx, rx) = mpsc::channel(256);
3956        let (ip_tx, _ip_rx) = mpsc::channel(4);
3957        let actor = DhtActor::new(config, socket, rx, ip_tx);
3958
3959        let node_id = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
3960        let addr: SocketAddr = "10.0.0.1:6881".parse().unwrap();
3961        actor.routing_table.write().insert(node_id, addr);
3962
3963        // Write a known value to the final path first (simulate existing state)
3964        let final_path = DhtActor::state_file_path(dir.path(), AddressFamily::V4);
3965        std::fs::write(&final_path, b"old data").unwrap();
3966
3967        // Save — should atomically replace via rename
3968        actor.save_routing_table();
3969
3970        // Final file should contain valid JSON with our node
3971        let content = std::fs::read_to_string(&final_path).unwrap();
3972        let state: DhtState =
3973            serde_json::from_str(&content).expect("final file should contain valid JSON");
3974        assert_eq!(state.nodes.len(), 1);
3975        assert_eq!(state.nodes[0].id, node_id.to_hex());
3976
3977        // Temp file should NOT exist (it was renamed away)
3978        let tmp_path = dir.path().join(".dht_state_v4.tmp");
3979        assert!(
3980            !tmp_path.exists(),
3981            "temp file should be cleaned up by rename"
3982        );
3983    }
3984
3985    /// T14: Verify persistence is silently skipped when `state_dir` is `None`.
3986    #[tokio::test]
3987    async fn json_persistence_no_state_dir() {
3988        let config = DhtConfig {
3989            bind_addr: "127.0.0.1:0".parse().unwrap(),
3990            bootstrap_nodes: Vec::new(),
3991            state_dir: None, // No state dir
3992            ..DhtConfig::default()
3993        };
3994
3995        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
3996        let (_tx, rx) = mpsc::channel(256);
3997        let (ip_tx, _ip_rx) = mpsc::channel(4);
3998        let actor = DhtActor::new(config, socket, rx, ip_tx);
3999
4000        // Insert a node — save should be a no-op
4001        let node_id = Id20::from_hex("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap();
4002        actor
4003            .routing_table
4004            .write()
4005            .insert(node_id, "10.0.0.1:6881".parse().unwrap());
4006
4007        // This should not panic or do anything
4008        actor.save_routing_table();
4009
4010        // load_routing_table in new() already ran silently (no state_dir)
4011        assert_eq!(actor.routing_table.read().len(), 1); // only the node we just inserted
4012    }
4013
4014    /// T17: When JSON loads successfully, IP:port entries in `bootstrap_nodes`
4015    /// should be filtered out (hostnames remain).
4016    #[tokio::test]
4017    async fn json_persistence_priority_over_config() {
4018        let dir = tempfile::tempdir().expect("failed to create temp dir");
4019
4020        // First: create a state file with saved nodes.
4021        let config_save = DhtConfig {
4022            bind_addr: "127.0.0.1:0".parse().unwrap(),
4023            bootstrap_nodes: Vec::new(),
4024            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
4025            state_dir: Some(dir.path().to_path_buf()),
4026            ..DhtConfig::default()
4027        };
4028
4029        let socket = Arc::new(UdpSocket::bind(config_save.bind_addr).await.unwrap());
4030        let (_tx, rx) = mpsc::channel(256);
4031        let (ip_tx, _ip_rx) = mpsc::channel(4);
4032        let actor = DhtActor::new(config_save, socket, rx, ip_tx);
4033
4034        let node_id = Id20::from_hex("cccccccccccccccccccccccccccccccccccccccc").unwrap();
4035        actor
4036            .routing_table
4037            .write()
4038            .insert(node_id, "10.0.0.1:6881".parse().unwrap());
4039        actor.save_routing_table();
4040        drop(actor);
4041
4042        // Now load with bootstrap_nodes containing both IP:port and hostnames.
4043        let config_load = DhtConfig {
4044            bind_addr: "127.0.0.1:0".parse().unwrap(),
4045            bootstrap_nodes: vec![
4046                "192.168.1.100:6881".to_owned(), // IP:port — should be filtered out
4047                "10.0.0.50:6881".to_owned(),     // IP:port — should be filtered out
4048                "router.bittorrent.com:6881".to_owned(), // hostname — should remain
4049                "dht.transmissionbt.com:6881".to_owned(), // hostname — should remain
4050            ],
4051            own_id: Some(Id20::from_hex("0000000000000000000000000000000000000001").unwrap()),
4052            state_dir: Some(dir.path().to_path_buf()),
4053            ..DhtConfig::default()
4054        };
4055
4056        let socket2 = Arc::new(UdpSocket::bind(config_load.bind_addr).await.unwrap());
4057        let (_tx2, rx2) = mpsc::channel(256);
4058        let (ip_tx2, _ip_rx2) = mpsc::channel(4);
4059        let actor2 = DhtActor::new(config_load, socket2, rx2, ip_tx2);
4060
4061        // Routing table should have the loaded node
4062        assert_eq!(actor2.routing_table.read().len(), 1);
4063
4064        // bootstrap_nodes should only contain hostnames (IP:port entries filtered)
4065        assert_eq!(actor2.config.bootstrap_nodes.len(), 2);
4066        assert!(
4067            actor2
4068                .config
4069                .bootstrap_nodes
4070                .contains(&"router.bittorrent.com:6881".to_owned())
4071        );
4072        assert!(
4073            actor2
4074                .config
4075                .bootstrap_nodes
4076                .contains(&"dht.transmissionbt.com:6881".to_owned())
4077        );
4078        // IP:port entries should be gone
4079        assert!(
4080            !actor2
4081                .config
4082                .bootstrap_nodes
4083                .contains(&"192.168.1.100:6881".to_owned())
4084        );
4085        assert!(
4086            !actor2
4087                .config
4088                .bootstrap_nodes
4089                .contains(&"10.0.0.50:6881".to_owned())
4090        );
4091    }
4092
4093    // --- BEP 43 read-only node tests ---
4094
4095    #[tokio::test]
4096    async fn checked_insert_rejects_read_only() {
4097        let config = DhtConfig {
4098            bind_addr: "127.0.0.1:0".parse().unwrap(),
4099            bootstrap_nodes: Vec::new(),
4100            ..DhtConfig::default()
4101        };
4102        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
4103        let (_tx, rx) = mpsc::channel(256);
4104        let (ip_tx, _ip_rx) = mpsc::channel(4);
4105        let actor = DhtActor::new(config, socket, rx, ip_tx);
4106
4107        let id = Id20::from_hex("0000000000000000000000000000000000000042").unwrap();
4108        let addr: SocketAddr = "10.0.0.1:6881".parse().unwrap();
4109
4110        // read_only: true => should NOT be inserted
4111        assert!(!actor.checked_insert(id, addr, true));
4112        assert_eq!(actor.routing_table.read().len(), 0);
4113    }
4114
4115    #[tokio::test]
4116    async fn checked_insert_accepts_normal() {
4117        let config = DhtConfig {
4118            bind_addr: "127.0.0.1:0".parse().unwrap(),
4119            bootstrap_nodes: Vec::new(),
4120            ..DhtConfig::default()
4121        };
4122        let socket = Arc::new(UdpSocket::bind(config.bind_addr).await.unwrap());
4123        let (_tx, rx) = mpsc::channel(256);
4124        let (ip_tx, _ip_rx) = mpsc::channel(4);
4125        let actor = DhtActor::new(config, socket, rx, ip_tx);
4126
4127        let id = Id20::from_hex("0000000000000000000000000000000000000042").unwrap();
4128        let addr: SocketAddr = "10.0.0.1:6881".parse().unwrap();
4129
4130        // read_only: false => should be inserted normally
4131        assert!(actor.checked_insert(id, addr, false));
4132        assert_eq!(actor.routing_table.read().len(), 1);
4133    }
4134
4135    #[tokio::test]
4136    async fn outgoing_query_includes_ro() {
4137        // When read_only_mode is true, the actor constructs outgoing queries
4138        // with `read_only: self.config.read_only_mode`. Verify the KrpcMessage
4139        // round-trip: when read_only is true, the encoded bytes contain `ro: 1`
4140        // and decoding recovers the flag.
4141        let info_hash = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
4142        let own_id = Id20::ZERO;
4143
4144        let msg = crate::krpc::KrpcMessage {
4145            transaction_id: crate::krpc::TransactionId::from_u16(1),
4146            body: crate::krpc::KrpcBody::Query(crate::krpc::KrpcQuery::FindNode {
4147                id: own_id,
4148                target: info_hash,
4149                want: None,
4150            }),
4151            sender_ip: None,
4152            read_only: true, // matches what send_find_node sets when read_only_mode: true
4153        };
4154        let bytes = msg.to_bytes().unwrap();
4155
4156        // Verify the raw bencode contains "ro" key
4157        let raw: irontide_bencode::BencodeValue = irontide_bencode::from_bytes(&bytes).unwrap();
4158        let dict = raw.as_dict().unwrap();
4159        assert!(
4160            dict.contains_key(&b"ro"[..]),
4161            "query with read_only: true should contain ro key in wire format"
4162        );
4163
4164        let decoded = crate::krpc::KrpcMessage::from_bytes(&bytes).unwrap();
4165        assert!(decoded.read_only, "outgoing query should include ro flag");
4166    }
4167
4168    #[tokio::test]
4169    async fn response_never_includes_ro() {
4170        // Responses should always have read_only: false, even from a read-only node.
4171        let own_id = Id20::ZERO;
4172        let msg = crate::krpc::KrpcMessage {
4173            transaction_id: crate::krpc::TransactionId::from_u16(1),
4174            body: crate::krpc::KrpcBody::Response(crate::krpc::KrpcResponse::NodeId { id: own_id }),
4175            sender_ip: None,
4176            read_only: false, // responses never include ro
4177        };
4178        let bytes = msg.to_bytes().unwrap();
4179        let decoded = crate::krpc::KrpcMessage::from_bytes(&bytes).unwrap();
4180        assert!(!decoded.read_only, "responses should never include ro flag");
4181
4182        // Verify a response constructed with read_only: false does NOT produce an ro field.
4183        // (The encoder only includes ro when true.)
4184        let raw: irontide_bencode::BencodeValue = irontide_bencode::from_bytes(&bytes).unwrap();
4185        let dict = raw.as_dict().unwrap();
4186        assert!(
4187            !dict.contains_key(&b"ro"[..]),
4188            "response bytes should not contain ro key"
4189        );
4190    }
4191
4192    #[tokio::test]
4193    async fn announce_suppressed_in_read_only_mode() {
4194        let config = DhtConfig {
4195            bind_addr: "127.0.0.1:0".parse().unwrap(),
4196            bootstrap_nodes: Vec::new(),
4197            read_only_mode: true,
4198            ..DhtConfig::default()
4199        };
4200        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4201
4202        let info_hash = Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap();
4203        // announce should succeed silently (no-op) in read-only mode
4204        let result = handle.announce(info_hash, 6881).await;
4205        assert!(
4206            result.is_ok(),
4207            "announce should return Ok in read-only mode (suppressed)"
4208        );
4209
4210        handle.shutdown().await.unwrap();
4211    }
4212
4213    // -----------------------------------------------------------------------
4214    // M173 Lane B (B7): SaveRoutingTable + persist-on-shutdown.
4215    // -----------------------------------------------------------------------
4216
4217    /// `save_routing_table` returns `Ok(())` even when no `state_dir`
4218    /// is configured (no-op). The actor still acks so the caller
4219    /// doesn't need to special-case the disabled path.
4220    #[tokio::test]
4221    async fn save_routing_table_acks_even_without_state_dir() {
4222        let config = DhtConfig {
4223            bind_addr: "127.0.0.1:0".parse().unwrap(),
4224            bootstrap_nodes: Vec::new(),
4225            state_dir: None,
4226            ..DhtConfig::default()
4227        };
4228        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4229        let result = handle.save_routing_table().await;
4230        assert!(result.is_ok(), "expected Ok, got {result:?}");
4231        handle.shutdown().await.unwrap();
4232    }
4233
4234    /// `save_routing_table` writes `dht_state.json` to disk under the
4235    /// configured `state_dir`. Confirms the `apply_settings` DHT-stop
4236    /// phase can checkpoint state without restarting the actor.
4237    #[tokio::test]
4238    async fn save_routing_table_writes_state_file() {
4239        let tmp = tempfile::tempdir().unwrap();
4240        let state_dir = tmp.path().to_path_buf();
4241
4242        let config = DhtConfig {
4243            bind_addr: "127.0.0.1:0".parse().unwrap(),
4244            bootstrap_nodes: Vec::new(),
4245            state_dir: Some(state_dir.clone()),
4246            ..DhtConfig::default()
4247        };
4248        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4249
4250        // Wait briefly for the actor to settle (load_routing_table
4251        // is part of bootstrap).
4252        tokio::time::sleep(Duration::from_millis(50)).await;
4253
4254        handle.save_routing_table().await.unwrap();
4255
4256        let state_path = state_dir.join("dht_state.json");
4257        assert!(
4258            state_path.exists(),
4259            "save_routing_table must write dht_state.json to {}",
4260            state_path.display()
4261        );
4262
4263        // The file should be valid JSON containing the node_id.
4264        let contents = std::fs::read_to_string(&state_path).unwrap();
4265        let parsed: serde_json::Value = serde_json::from_str(&contents).unwrap();
4266        assert!(parsed.get("node_id").is_some());
4267
4268        handle.shutdown().await.unwrap();
4269    }
4270
4271    /// `shutdown_and_wait` returns AFTER the actor has persisted the
4272    /// routing table — the on-disk state is up-to-date when the
4273    /// caller proceeds with starting a new actor. This is the
4274    /// contract the B11 `apply_settings` DHT-restart phase relies on.
4275    #[tokio::test]
4276    async fn shutdown_and_wait_persists_state_before_returning() {
4277        let tmp = tempfile::tempdir().unwrap();
4278        let state_dir = tmp.path().to_path_buf();
4279
4280        let config = DhtConfig {
4281            bind_addr: "127.0.0.1:0".parse().unwrap(),
4282            bootstrap_nodes: Vec::new(),
4283            state_dir: Some(state_dir.clone()),
4284            ..DhtConfig::default()
4285        };
4286        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4287
4288        // Wait for bootstrap to settle.
4289        tokio::time::sleep(Duration::from_millis(50)).await;
4290
4291        // Pre-shutdown: state file may or may not exist (depends on
4292        // whether the periodic save fired). Delete it so we can
4293        // check that shutdown_and_wait WROTE it.
4294        let state_path = state_dir.join("dht_state.json");
4295        let _ = std::fs::remove_file(&state_path);
4296        assert!(!state_path.exists());
4297
4298        handle.shutdown_and_wait().await.unwrap();
4299
4300        // After shutdown_and_wait returns, the file MUST exist on
4301        // disk — if the actor exited before saving, the rebuild path
4302        // would lose recent node state.
4303        assert!(
4304            state_path.exists(),
4305            "shutdown_and_wait must persist state BEFORE returning"
4306        );
4307    }
4308
4309    /// `shutdown_and_wait` after a prior fire-and-forget shutdown
4310    /// returns Err(Shutdown). Pin the failure mode for B11 callers.
4311    #[tokio::test]
4312    async fn shutdown_and_wait_after_actor_exit_returns_shutdown_error() {
4313        let config = DhtConfig {
4314            bind_addr: "127.0.0.1:0".parse().unwrap(),
4315            bootstrap_nodes: Vec::new(),
4316            ..DhtConfig::default()
4317        };
4318        let (handle, _ip_rx) = DhtHandle::start(config).await.unwrap();
4319        handle.shutdown().await.unwrap();
4320        // Give the actor a tick to exit.
4321        tokio::time::sleep(Duration::from_millis(50)).await;
4322        let result = handle.shutdown_and_wait().await;
4323        assert!(
4324            matches!(result, Err(Error::Shutdown)),
4325            "expected Error::Shutdown, got {result:?}"
4326        );
4327    }
4328
4329    #[test]
4330    fn dht_config_enable_multi_address_default_true() {
4331        let cfg = DhtConfig::default();
4332        assert!(cfg.enable_multi_address);
4333    }
4334
4335    #[test]
4336    fn dht_config_v6_enable_multi_address_default_true() {
4337        let cfg = DhtConfig::default_v6();
4338        assert!(cfg.enable_multi_address);
4339    }
4340}