Skip to main content

scp2p_core/api/
node_dht.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7//! DHT operations on `NodeHandle`: local and iterative find/store, republish loops.
8
9use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
10
11use ed25519_dalek::SigningKey;
12use tokio::net::TcpListener;
13use tokio::task::JoinHandle;
14use tracing::{debug, info};
15
16use crate::transport_net::{
17    QuicServerHandle, TlsServerHandle, quic_accept_bi_session, tls_accept_session,
18};
19use crate::{
20    capabilities::Capabilities,
21    dht::{ALPHA, DEFAULT_TTL_SECS, DhtInsertResult, DhtNodeRecord, DhtValue, K, MAX_VALUE_SIZE},
22    dht_keys::{community_info_key, share_head_key},
23    ids::{NodeId, ShareId},
24    manifest::ShareHead,
25    net_fetch::RequestTransport,
26    peer::PeerAddr,
27    wire::{CommunityMembers, FindNode, Store as WireStore},
28};
29
30use super::{
31    NodeHandle,
32    helpers::{
33        merge_peer_list, now_unix_secs, peer_key, persist_state, query_find_node, query_find_value,
34        replicate_store_to_closest, replicate_store_to_peers, sort_peers_for_target,
35        validate_dht_value_for_known_keyspaces,
36    },
37};
38
39impl NodeHandle {
40    pub async fn dht_upsert_peer(
41        &self,
42        local_target: NodeId,
43        node_id: NodeId,
44        addr: PeerAddr,
45    ) -> anyhow::Result<()> {
46        let mut state = self.state.write().await;
47        let result = state.dht.upsert_node(
48            DhtNodeRecord {
49                node_id,
50                addr,
51                last_seen_unix: now_unix_secs()?,
52            },
53            local_target,
54        );
55
56        match result {
57            DhtInsertResult::Inserted => {}
58            DhtInsertResult::PendingEviction {
59                stale_node,
60                new_node,
61                bucket_idx,
62            } => {
63                // Ping-before-evict: spawn a background task to probe
64                // the stale node.  A successful TCP connect (within a
65                // short timeout) is treated as "alive" — the stale node
66                // is refreshed and the new node dropped.  On failure the
67                // stale node is evicted and replaced by the new one.
68                let handle = self.clone();
69                tokio::spawn(async move {
70                    let remote =
71                        std::net::SocketAddr::new(stale_node.addr.ip, stale_node.addr.port);
72                    let alive = tokio::time::timeout(
73                        Duration::from_millis(1500),
74                        tokio::net::TcpStream::connect(remote),
75                    )
76                    .await
77                    .is_ok_and(|r| r.is_ok());
78
79                    let mut state = handle.state.write().await;
80                    if alive {
81                        let ts = now_unix_secs().unwrap_or(0);
82                        state.dht.refresh_node(&stale_node.node_id, ts);
83                    } else {
84                        state
85                            .dht
86                            .complete_eviction(bucket_idx, stale_node.node_id, *new_node);
87                    }
88                });
89            }
90            DhtInsertResult::RejectedSubnetLimit => {}
91        }
92
93        Ok(())
94    }
95
96    pub async fn dht_find_node(&self, req: FindNode) -> anyhow::Result<Vec<PeerAddr>> {
97        let state = self.state.read().await;
98        let nodes = state.dht.find_node(NodeId(req.target_node_id), K);
99        Ok(nodes.into_iter().map(|n| n.addr).collect())
100    }
101
102    pub async fn dht_store(&self, req: WireStore) -> anyhow::Result<()> {
103        validate_dht_value_for_known_keyspaces(req.key, &req.value)?;
104        let now = now_unix_secs()?;
105        let value = merge_community_members_if_applicable(req.key, req.value, self, now).await;
106        let mut state = self.state.write().await;
107        state
108            .dht
109            .store(req.key, value, req.ttl_secs.max(DEFAULT_TTL_SECS), now)
110    }
111
112    pub async fn dht_find_value(&self, key: [u8; 32]) -> anyhow::Result<Option<DhtValue>> {
113        let mut state = self.state.write().await;
114        Ok(state.dht.find_value(key, now_unix_secs()?))
115    }
116
117    pub async fn dht_store_replicated<T: RequestTransport + ?Sized>(
118        &self,
119        transport: &T,
120        req: WireStore,
121        seed_peers: &[PeerAddr],
122    ) -> anyhow::Result<usize> {
123        validate_dht_value_for_known_keyspaces(req.key, &req.value)?;
124        let now = now_unix_secs()?;
125        {
126            let mut state = self.state.write().await;
127            state.dht.store(
128                req.key,
129                req.value.clone(),
130                req.ttl_secs.max(DEFAULT_TTL_SECS),
131                now,
132            )?;
133        }
134        persist_state(self).await?;
135        replicate_store_to_closest(
136            transport,
137            self,
138            req.key,
139            req.value,
140            req.ttl_secs.max(DEFAULT_TTL_SECS),
141            seed_peers,
142            K,
143        )
144        .await
145    }
146
147    pub async fn dht_find_node_iterative<T: RequestTransport + ?Sized>(
148        &self,
149        transport: &T,
150        target_node_id: [u8; 20],
151        seed_peers: &[PeerAddr],
152    ) -> anyhow::Result<Vec<PeerAddr>> {
153        let target_hex = hex::encode(&target_node_id[..8]);
154        let mut peers = self
155            .collect_seed_and_known_node_peers(target_node_id, seed_peers)
156            .await;
157        debug!(
158            target = %target_hex,
159            initial_peers = peers.len(),
160            "dht_find_node_iterative: starting"
161        );
162        let mut queried = HashSet::new();
163
164        loop {
165            sort_peers_for_target(&mut peers, target_node_id);
166            let to_query = peers
167                .iter()
168                .filter(|peer| !queried.contains(&peer_key(peer)))
169                .take(ALPHA)
170                .cloned()
171                .collect::<Vec<_>>();
172            if to_query.is_empty() {
173                break;
174            }
175
176            let mut discovered = false;
177            for peer in to_query {
178                queried.insert(peer_key(&peer));
179                match query_find_node(transport, &peer, target_node_id).await {
180                    Ok(result) => {
181                        debug!(
182                            target = %target_hex,
183                            peer = ?peer,
184                            returned_peers = result.peers.len(),
185                            "dht_find_node_iterative: queried peer"
186                        );
187                        discovered |= merge_peer_list(&mut peers, result.peers);
188                    }
189                    Err(e) => {
190                        debug!(
191                            target = %target_hex,
192                            peer = ?peer,
193                            error = %e,
194                            "dht_find_node_iterative: peer query failed"
195                        );
196                    }
197                }
198            }
199            if !discovered {
200                break;
201            }
202        }
203
204        sort_peers_for_target(&mut peers, target_node_id);
205        peers.truncate(K);
206        debug!(
207            target = %target_hex,
208            result_peers = peers.len(),
209            queried = queried.len(),
210            "dht_find_node_iterative: complete"
211        );
212        Ok(peers)
213    }
214
215    pub async fn dht_find_value_iterative<T: RequestTransport + ?Sized>(
216        &self,
217        transport: &T,
218        key: [u8; 32],
219        seed_peers: &[PeerAddr],
220    ) -> anyhow::Result<Option<DhtValue>> {
221        let key_hex = hex::encode(&key[..8]);
222        debug!(key = %key_hex, "dht_find_value_iterative: starting");
223        if let Some(value) = self.dht_find_value(key).await? {
224            debug!(key = %key_hex, "dht_find_value_iterative: found locally");
225            return Ok(Some(value));
226        }
227
228        self.dht_find_value_from_network(transport, key, seed_peers)
229            .await
230    }
231
232    /// Like [`Self::dht_find_value_iterative`], but always queries the
233    /// network even when a local copy exists.  The remote result is
234    /// stored locally before being returned.
235    ///
236    /// Use this for values where the relay's copy is the authoritative
237    /// merged superset (e.g. `CommunityMembers`).
238    pub async fn dht_find_value_from_network<T: RequestTransport + ?Sized>(
239        &self,
240        transport: &T,
241        key: [u8; 32],
242        seed_peers: &[PeerAddr],
243    ) -> anyhow::Result<Option<DhtValue>> {
244        let key_hex = hex::encode(&key[..8]);
245        debug!(key = %key_hex, "dht_find_value_from_network: starting");
246
247        let mut target = [0u8; 20];
248        target.copy_from_slice(&key[..20]);
249        let mut peers = self
250            .collect_seed_and_known_node_peers(target, seed_peers)
251            .await;
252        let mut queried = HashSet::new();
253
254        loop {
255            sort_peers_for_target(&mut peers, target);
256            let to_query = peers
257                .iter()
258                .filter(|peer| !queried.contains(&peer_key(peer)))
259                .take(ALPHA)
260                .cloned()
261                .collect::<Vec<_>>();
262            if to_query.is_empty() {
263                break;
264            }
265
266            let mut discovered = false;
267            for peer in to_query {
268                queried.insert(peer_key(&peer));
269                let Ok(result) = query_find_value(transport, &peer, key).await else {
270                    continue;
271                };
272
273                if let Some(remote) = result.value
274                    && remote.key == key
275                    && remote.value.len() <= MAX_VALUE_SIZE
276                    && validate_dht_value_for_known_keyspaces(remote.key, &remote.value).is_ok()
277                {
278                    let now = now_unix_secs()?;
279                    let mut state = self.state.write().await;
280                    state.dht.store(
281                        key,
282                        remote.value,
283                        remote.ttl_secs.max(DEFAULT_TTL_SECS),
284                        now,
285                    )?;
286                    debug!(key = %key_hex, "dht_find_value_from_network: found remotely");
287                    return Ok(state.dht.find_value(key, now));
288                }
289                discovered |= merge_peer_list(&mut peers, result.closer_peers);
290            }
291            if !discovered {
292                break;
293            }
294        }
295
296        debug!(key = %key_hex, queried = queried.len(), "dht_find_value_from_network: not found");
297        Ok(None)
298    }
299
300    pub async fn dht_find_share_head_iterative<T: RequestTransport + ?Sized>(
301        &self,
302        transport: &T,
303        share_id: ShareId,
304        share_pubkey: Option<[u8; 32]>,
305        seed_peers: &[PeerAddr],
306    ) -> anyhow::Result<Option<ShareHead>> {
307        debug!(?share_id, "dht_find_share_head_iterative: starting");
308        let key = share_head_key(&share_id);
309        let local_best = self
310            .dht_find_value(key)
311            .await?
312            .and_then(|value| crate::cbor::from_slice::<ShareHead>(&value.value).ok())
313            .filter(|head| head.share_id == share_id.0);
314
315        let mut target = [0u8; 20];
316        target.copy_from_slice(&key[..20]);
317        let mut peers = self
318            .collect_seed_and_known_node_peers(target, seed_peers)
319            .await;
320        let mut queried = HashSet::new();
321        let mut best = local_best;
322
323        loop {
324            sort_peers_for_target(&mut peers, target);
325            let to_query = peers
326                .iter()
327                .filter(|peer| !queried.contains(&peer_key(peer)))
328                .take(ALPHA)
329                .cloned()
330                .collect::<Vec<_>>();
331            if to_query.is_empty() {
332                break;
333            }
334
335            let mut discovered = false;
336            for peer in to_query {
337                queried.insert(peer_key(&peer));
338                let Ok(result) = query_find_value(transport, &peer, key).await else {
339                    continue;
340                };
341
342                if let Some(remote) = result.value
343                    && remote.key == key
344                    && remote.value.len() <= MAX_VALUE_SIZE
345                    && validate_dht_value_for_known_keyspaces(remote.key, &remote.value).is_ok()
346                {
347                    let head: ShareHead = crate::cbor::from_slice(&remote.value)?;
348                    if head.share_id != share_id.0 {
349                        continue;
350                    }
351                    if let Some(pubkey) = share_pubkey {
352                        head.verify_with_pubkey(pubkey)?;
353                    }
354                    if best
355                        .as_ref()
356                        .map(|current| {
357                            head.latest_seq > current.latest_seq
358                                || (head.latest_seq == current.latest_seq
359                                    && head.updated_at > current.updated_at)
360                        })
361                        .unwrap_or(true)
362                    {
363                        best = Some(head.clone());
364                        let now = now_unix_secs()?;
365                        let mut state = self.state.write().await;
366                        state.dht.store(
367                            key,
368                            crate::cbor::to_vec(&head)?,
369                            remote.ttl_secs.max(DEFAULT_TTL_SECS),
370                            now,
371                        )?;
372                    }
373                }
374                discovered |= merge_peer_list(&mut peers, result.closer_peers);
375            }
376            if !discovered {
377                break;
378            }
379        }
380        if best.is_some() {
381            info!(
382                ?share_id,
383                queried = queried.len(),
384                "dht_find_share_head_iterative: found head"
385            );
386        } else {
387            debug!(
388                ?share_id,
389                queried = queried.len(),
390                "dht_find_share_head_iterative: no head found"
391            );
392        }
393        Ok(best)
394    }
395
396    pub async fn dht_republish_once<T: RequestTransport + ?Sized>(
397        &self,
398        transport: &T,
399        seed_peers: &[PeerAddr],
400    ) -> anyhow::Result<usize> {
401        // Re-populate ephemeral DHT with share heads + manifests for
402        // shares we have *published*, so they survive app restarts.
403        let _ = self.reannounce_published_share_data().await;
404
405        // Refresh share heads for public subscriptions so they survive
406        // after the original publisher goes offline.
407        let _ = self.reannounce_subscribed_share_heads().await;
408
409        let now = now_unix_secs()?;
410        let values = {
411            let mut state = self.state.write().await;
412            let values = state.dht.active_values(now);
413            for value in &values {
414                state
415                    .dht
416                    .store(value.key, value.value.clone(), DEFAULT_TTL_SECS, now)?;
417            }
418            values
419        };
420        persist_state(self).await?;
421
422        let total = values.len();
423        info!(
424            active_values = total,
425            seed_peers = seed_peers.len(),
426            "DHT republish: starting"
427        );
428
429        // --- Resolve the peer list ONCE for the entire batch -----------
430        // With few peers (common: only the relay on QUIC + TCP) every
431        // iterative lookup returns the same set.  Doing it once avoids
432        // O(N) FindNode round-trips that burn through rate limits.
433        let representative_target = {
434            let mut t = [0u8; 20];
435            if let Some(v) = values.first() {
436                t.copy_from_slice(&v.key[..20]);
437            }
438            t
439        };
440        let cached_peers = self
441            .dht_find_node_iterative(transport, representative_target, seed_peers)
442            .await
443            .unwrap_or_default();
444
445        debug!(
446            cached_peers = cached_peers.len(),
447            "DHT republish: using cached peer list"
448        );
449
450        // Throttle: space out store bursts to stay well under the
451        // remote peer's per-window rate limit.
452        const INTER_VALUE_DELAY: Duration = Duration::from_millis(500);
453        // Back-off after a rate-limit error.
454        const RATE_LIMIT_BACKOFF: Duration = Duration::from_secs(5);
455
456        let mut republished = 0usize;
457        for (idx, value) in values.into_iter().enumerate() {
458            if validate_dht_value_for_known_keyspaces(value.key, &value.value).is_err() {
459                debug!(
460                    key = %hex::encode(&value.key[..8]),
461                    value_len = value.value.len(),
462                    "DHT republish: skipping value that fails validation"
463                );
464                continue;
465            }
466            let replication_factor = if value.is_popular() { K * 2 } else { K };
467            let (stored, rate_limited) = replicate_store_to_peers(
468                transport,
469                value.key,
470                value.value,
471                DEFAULT_TTL_SECS,
472                &cached_peers,
473                replication_factor,
474            )
475            .await;
476            if stored > 0 {
477                republished += 1;
478            }
479            if rate_limited {
480                debug!("DHT republish: rate-limited, backing off");
481                tokio::time::sleep(RATE_LIMIT_BACKOFF).await;
482            } else if idx + 1 < total {
483                tokio::time::sleep(INTER_VALUE_DELAY).await;
484            }
485        }
486        info!(republished, total, "DHT republish: complete");
487        Ok(republished)
488    }
489
490    pub fn start_dht_republish_loop(
491        self,
492        transport: Arc<dyn RequestTransport>,
493        seed_peers: Vec<PeerAddr>,
494        interval: Duration,
495    ) -> JoinHandle<()> {
496        info!(
497            interval_secs = interval.as_secs(),
498            seed_peers = seed_peers.len(),
499            "starting DHT republish loop"
500        );
501        tokio::spawn(async move {
502            loop {
503                let _ = self
504                    .dht_republish_once(transport.as_ref(), &seed_peers)
505                    .await;
506                tokio::time::sleep(interval).await;
507            }
508        })
509    }
510
511    pub fn start_subscription_sync_loop(
512        self,
513        transport: Arc<dyn RequestTransport>,
514        seed_peers: Vec<PeerAddr>,
515        interval: Duration,
516    ) -> JoinHandle<()> {
517        info!(
518            interval_secs = interval.as_secs(),
519            seed_peers = seed_peers.len(),
520            "starting subscription sync loop"
521        );
522        tokio::spawn(async move {
523            loop {
524                let _ = self
525                    .sync_subscriptions_over_dht(transport.as_ref(), &seed_peers)
526                    .await;
527                tokio::time::sleep(interval).await;
528            }
529        })
530    }
531
532    /// Start a **TLS-over-TCP** listener that accepts incoming sessions.
533    ///
534    /// The returned task listens on `bind_addr`, wraps every accepted
535    /// TCP stream in a TLS session using the provided server handle,
536    /// then runs the SCP2P handshake and dispatches messages.
537    pub fn start_tls_dht_service(
538        self,
539        bind_addr: SocketAddr,
540        local_signing_key: SigningKey,
541        capabilities: Capabilities,
542        tls_server: Arc<TlsServerHandle>,
543    ) -> JoinHandle<anyhow::Result<()>> {
544        tokio::spawn(async move {
545            let listener = TcpListener::bind(bind_addr).await?;
546            let mut nonce_tracker = crate::transport::NonceTracker::new();
547            loop {
548                let accepted = tls_accept_session(
549                    &listener,
550                    &tls_server,
551                    &local_signing_key,
552                    capabilities.clone(),
553                    None,
554                    Some(&mut nonce_tracker),
555                )
556                .await;
557                let Ok((stream, session, remote_addr)) = accepted else {
558                    continue;
559                };
560                let remote_peer = PeerAddr {
561                    ip: remote_addr.ip(),
562                    port: remote_addr.port(),
563                    transport: crate::peer::TransportProtocol::Tcp,
564                    pubkey_hint: Some(session.remote_node_pubkey),
565                    relay_via: None,
566                };
567                let node = self.clone();
568                tokio::spawn(async move {
569                    let _ = node.serve_wire_stream(stream, Some(remote_peer)).await;
570                });
571            }
572        })
573    }
574
575    /// Start a **QUIC/UDP** listener that accepts incoming sessions.
576    ///
577    /// The returned task accepts bidirectional QUIC streams on the
578    /// given server endpoint, runs the SCP2P handshake, and dispatches
579    /// messages.
580    pub fn start_quic_dht_service(
581        self,
582        quic_server: QuicServerHandle,
583        local_signing_key: SigningKey,
584        capabilities: Capabilities,
585    ) -> JoinHandle<anyhow::Result<()>> {
586        tokio::spawn(async move {
587            let mut nonce_tracker = crate::transport::NonceTracker::new();
588            loop {
589                let accepted = quic_accept_bi_session(
590                    &quic_server,
591                    &local_signing_key,
592                    capabilities.clone(),
593                    None,
594                    Some(&mut nonce_tracker),
595                )
596                .await;
597                let Ok((stream, session, remote_addr)) = accepted else {
598                    continue;
599                };
600                let remote_peer = PeerAddr {
601                    ip: remote_addr.ip(),
602                    port: remote_addr.port(),
603                    transport: crate::peer::TransportProtocol::Quic,
604                    pubkey_hint: Some(session.remote_node_pubkey),
605                    relay_via: None,
606                };
607                let node = self.clone();
608                tokio::spawn(async move {
609                    let _ = node.serve_wire_stream(stream, Some(remote_peer)).await;
610                });
611            }
612        })
613    }
614
615    pub(super) async fn collect_seed_and_known_node_peers(
616        &self,
617        target_node_id: [u8; 20],
618        seed_peers: &[PeerAddr],
619    ) -> Vec<PeerAddr> {
620        let state = self.state.read().await;
621        let mut peers = seed_peers.to_vec();
622        let known = state
623            .dht
624            .find_node(NodeId(target_node_id), K)
625            .into_iter()
626            .map(|node| node.addr)
627            .collect::<Vec<_>>();
628        merge_peer_list(&mut peers, known);
629        peers
630    }
631
632    /// Insert or update the local DHT entry for a community the node has
633    /// joined so that other peers can discover this node as a community
634    /// member via `community_info_key(share_id)`.
635    pub async fn upsert_community_member(
636        &self,
637        community_share_id: ShareId,
638        self_addr: PeerAddr,
639    ) -> anyhow::Result<()> {
640        let key = community_info_key(&community_share_id);
641        let now = now_unix_secs()?;
642        let mut state = self.state.write().await;
643        let mut cm: CommunityMembers = state
644            .dht
645            .find_value(key, now)
646            .and_then(|v| crate::cbor::from_slice(&v.value).ok())
647            .unwrap_or(CommunityMembers {
648                community_share_id: community_share_id.0,
649                members: vec![],
650                updated_at: now,
651            });
652        if !cm.members.contains(&self_addr) {
653            cm.members.push(self_addr);
654        }
655        cm.updated_at = now;
656        state
657            .dht
658            .store(key, crate::cbor::to_vec(&cm)?, DEFAULT_TTL_SECS, now)?;
659        Ok(())
660    }
661
662    /// Re-announce DHT community member entries for all joined communities.
663    ///
664    /// Called during `dht_republish_once` to keep community member
665    /// announcements fresh and ensure they survive app restarts.
666    pub async fn reannounce_community_memberships(
667        &self,
668        self_addr: PeerAddr,
669    ) -> anyhow::Result<usize> {
670        let community_ids: Vec<[u8; 32]> = {
671            let state = self.state.read().await;
672            state.communities.keys().copied().collect()
673        };
674        let mut count = 0usize;
675        for cid in community_ids {
676            if let Err(e) = self
677                .upsert_community_member(ShareId(cid), self_addr.clone())
678                .await
679            {
680                debug!(
681                    community = %hex::encode(&cid[..8]),
682                    error = %e,
683                    "reannounce_community_memberships: failed"
684                );
685            } else {
686                count += 1;
687            }
688        }
689        if count > 0 {
690            debug!(count, "reannounce_community_memberships: updated");
691        }
692        Ok(count)
693    }
694}
695
696/// If `value` deserializes as [`CommunityMembers`], merge its member list
697/// with any existing entry already stored in the local DHT at `key`.
698/// Otherwise return `value` unchanged.
699async fn merge_community_members_if_applicable(
700    key: [u8; 32],
701    value: Vec<u8>,
702    node: &NodeHandle,
703    now: u64,
704) -> Vec<u8> {
705    let Ok(incoming) = crate::cbor::from_slice::<CommunityMembers>(&value) else {
706        return value;
707    };
708    let expected = community_info_key(&ShareId(incoming.community_share_id));
709    if expected != key {
710        return value;
711    }
712    let existing = {
713        let mut state = node.state.write().await;
714        state
715            .dht
716            .find_value(key, now)
717            .and_then(|v| crate::cbor::from_slice::<CommunityMembers>(&v.value).ok())
718    };
719    let Some(mut merged) = existing else {
720        return value;
721    };
722    for member in incoming.members {
723        if !merged.members.contains(&member) {
724            merged.members.push(member);
725        }
726    }
727    merged.updated_at = merged.updated_at.max(incoming.updated_at);
728    crate::cbor::to_vec(&merged).unwrap_or(value)
729}