everscale_network/dht/
node.rs

1use std::borrow::{Borrow, Cow};
2use std::convert::TryFrom;
3use std::net::SocketAddrV4;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures_util::stream::FuturesUnordered;
10use futures_util::StreamExt;
11use smallvec::smallvec;
12use tl_proto::{BoxedConstructor, BoxedWrapper, TlRead, TlWrite};
13
14use super::buckets::Buckets;
15use super::entry::Entry;
16use super::futures::StoreValue;
17use super::storage::{Storage, StorageOptions};
18use super::{KEY_ADDRESS, KEY_NODES, MAX_DHT_PEERS};
19use crate::adnl;
20use crate::overlay;
21use crate::proto;
22use crate::subscriber::*;
23use crate::util::*;
24
25/// DHT node configuration
26#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
27#[serde(default)]
28pub struct NodeOptions {
29    /// Default stored value timeout used for [`Node::store_overlay_node`] and
30    /// [`Node::store_address`]
31    ///
32    /// Default: `3600` seconds
33    pub value_ttl_sec: u32,
34
35    /// ADNL query timeout
36    ///
37    /// Default: `1000` ms
38    pub query_timeout_ms: u64,
39
40    /// Amount of DHT peers, used for values search
41    ///
42    /// Default: `5`
43    pub default_value_batch_len: usize,
44
45    /// Max peer penalty points. On each unsuccessful query every peer gains 2 points,
46    /// and then they are reduced by one on each good action.
47    ///
48    /// Default: `5`
49    pub bad_peer_threshold: usize,
50
51    /// Max allowed `k` value for DHT `FindValue` query.
52    ///
53    /// Default: `5`
54    pub max_allowed_k: u32,
55
56    /// Max allowed key name length (in bytes).
57    ///
58    /// See [`proto::dht::Key`]
59    ///
60    /// Default: `127` bytes
61    pub max_key_name_len: usize,
62
63    /// Max allowed key index
64    ///
65    /// See [`proto::dht::Key`]
66    ///
67    /// Default: `15`
68    pub max_key_index: u32,
69
70    /// Storage GC interval. Will remove all outdated entries
71    ///
72    /// Default: `10000` ms
73    pub storage_gc_interval_ms: u64,
74}
75
76impl Default for NodeOptions {
77    fn default() -> Self {
78        Self {
79            value_ttl_sec: 3600,
80            query_timeout_ms: 1000,
81            default_value_batch_len: 5,
82            bad_peer_threshold: 5,
83            max_allowed_k: 20,
84            max_key_name_len: 127,
85            max_key_index: 15,
86            storage_gc_interval_ms: 10000,
87        }
88    }
89}
90
91/// Kademlia-like DHT node
92pub struct Node {
93    /// Underlying ADNL node
94    adnl: Arc<adnl::Node>,
95
96    /// Local ADNL peer id
97    local_id: adnl::NodeIdShort,
98
99    /// Serialized [`proto::rpc::DhtQuery`] with own DHT node info
100    query_prefix: Vec<u8>,
101
102    /// Configuration
103    options: NodeOptions,
104
105    /// State
106    state: Arc<NodeState>,
107}
108
109impl Node {
110    /// Create new DHT node on top of ADNL node
111    pub fn new(adnl: Arc<adnl::Node>, key_tag: usize, options: NodeOptions) -> Result<Arc<Self>> {
112        let key = adnl.key_by_tag(key_tag)?.clone();
113
114        let buckets = Buckets::new(key.id());
115        let storage = Storage::new(StorageOptions {
116            max_key_name_len: options.max_key_name_len,
117            max_key_index: options.max_key_index,
118        });
119
120        let state = Arc::new(NodeState {
121            key: key.clone(),
122            known_peers: adnl::PeersSet::with_capacity(MAX_DHT_PEERS),
123            penalties: Default::default(),
124            buckets,
125            storage,
126            max_allowed_k: options.max_allowed_k,
127        });
128
129        adnl.add_query_subscriber(state.clone())?;
130
131        let query_prefix = tl_proto::serialize(proto::rpc::DhtQuery {
132            node: state
133                .sign_local_node(adnl.build_address_list())
134                .as_equivalent_ref(),
135        });
136
137        let dht_node = Arc::new(Self {
138            adnl,
139            local_id: *key.id(),
140            query_prefix,
141            options,
142            state,
143        });
144
145        let state = Arc::downgrade(&dht_node.state);
146        let interval = Duration::from_millis(dht_node.options.storage_gc_interval_ms);
147        tokio::spawn(async move {
148            loop {
149                tokio::time::sleep(interval).await;
150                if let Some(state) = state.upgrade() {
151                    state.storage.gc();
152                }
153            }
154        });
155
156        Ok(dht_node)
157    }
158
159    /// Configuration
160    #[inline(always)]
161    pub fn options(&self) -> &NodeOptions {
162        &self.options
163    }
164
165    /// Instant metrics
166    #[inline(always)]
167    pub fn metrics(&self) -> NodeMetrics {
168        self.state.metrics()
169    }
170
171    /// Underlying ADNL node
172    #[inline(always)]
173    pub fn adnl(&self) -> &Arc<adnl::Node> {
174        &self.adnl
175    }
176
177    #[inline(always)]
178    pub fn key(&self) -> &Arc<adnl::Key> {
179        &self.state.key
180    }
181
182    pub fn iter_known_peers(&self) -> impl Iterator<Item = &adnl::NodeIdShort> {
183        self.state.known_peers.iter()
184    }
185
186    /// Adds new peer to DHT or explicitly marks existing as good. Returns new peer short id
187    pub fn add_dht_peer(&self, peer: proto::dht::NodeOwned) -> Result<Option<adnl::NodeIdShort>> {
188        self.state.add_dht_peer(&self.adnl, peer)
189    }
190
191    /// Checks whether the specified peer was marked as bad
192    pub fn is_bad_peer(&self, peer: &adnl::NodeIdShort) -> bool {
193        matches!(
194            self.state.penalties.get(peer),
195            Some(penalty) if *penalty > self.options.bad_peer_threshold
196        )
197    }
198
199    /// Sends ping query to the given peer
200    pub async fn ping(&self, peer_id: &adnl::NodeIdShort) -> Result<bool> {
201        use rand::RngCore;
202        let random_id = fast_thread_rng().next_u64();
203        match self
204            .query(peer_id, proto::rpc::DhtPing { random_id })
205            .await?
206        {
207            Some(proto::dht::Pong { random_id: answer }) => Ok(answer == random_id),
208            None => Ok(false),
209        }
210    }
211
212    /// Returns an entry interface for manipulating DHT values
213    pub fn entry<'a, T>(self: &'a Arc<Self>, id: &'a T, name: &'a str) -> Entry<'a>
214    where
215        T: Borrow<[u8; 32]>,
216    {
217        Entry::new(self, id, name)
218    }
219
220    /// Queries given peer for at most `k` DHT nodes with
221    /// the same affinity as `local_id <-> peer_id`
222    pub async fn query_dht_nodes(
223        &self,
224        peer_id: &adnl::NodeIdShort,
225        k: u32,
226        store_self: bool,
227    ) -> Result<Vec<proto::dht::NodeOwned>> {
228        let query = proto::rpc::DhtFindNode {
229            key: self.local_id.as_slice(),
230            k,
231        };
232
233        let answer = match store_self {
234            true => self.query_with_prefix(peer_id, query).await,
235            false => self.query(peer_id, query).await,
236        }?;
237
238        Ok(match answer {
239            Some(BoxedWrapper(proto::dht::NodesOwned { nodes })) => nodes,
240            None => Vec::new(),
241        })
242    }
243
244    /// Asks each known DHT node for other nodes, extending current nodes set
245    pub async fn find_more_dht_nodes(&self) -> Result<usize> {
246        let known_nodes = self.known_peers().clone_inner();
247
248        let mut tasks = futures_util::stream::FuturesUnordered::new();
249        for peer_id in known_nodes {
250            tasks.push(async move {
251                let res = self.query_dht_nodes(&peer_id, 10, false).await;
252                (peer_id, res)
253            });
254        }
255
256        let mut node_count = 0;
257        while let Some((peer_id, res)) = tasks.next().await {
258            match res {
259                Ok(nodes) => {
260                    for node in nodes {
261                        node_count += self.add_dht_peer(node)?.is_some() as usize;
262                    }
263                }
264                Err(e) => {
265                    tracing::warn!(%peer_id, "failed to get DHT nodes: {e:?}")
266                }
267            }
268        }
269
270        Ok(node_count)
271    }
272
273    /// Searches overlay nodes and their ip addresses.
274    ///
275    /// NOTE: For the sake of speed it uses only a subset of nodes, so
276    /// results may vary between calls.
277    pub async fn find_overlay_nodes(
278        self: &Arc<Self>,
279        overlay_id: &overlay::IdShort,
280    ) -> Result<Vec<(SocketAddrV4, proto::overlay::NodeOwned)>> {
281        let mut result = Vec::new();
282        let mut nodes = Vec::new();
283        let mut cache = FastHashSet::default();
284        loop {
285            // Receive several nodes records
286            let received = self
287                .entry(overlay_id, KEY_NODES)
288                .values()
289                .use_new_peers(true)
290                .map(|(_, BoxedWrapper(proto::overlay::NodesOwned { nodes }))| nodes)
291                .collect::<Vec<_>>()
292                .await;
293            if received.is_empty() {
294                break;
295            }
296
297            let mut futures = FuturesUnordered::new();
298
299            // Spawn IP resolution tasks.
300            // It combines received nodes with nodes from the previous iteration
301            for node in received
302                .into_iter()
303                .flatten()
304                .chain(std::mem::take(&mut nodes).into_iter())
305            {
306                let peer_id = match adnl::NodeIdFull::try_from(node.id.as_equivalent_ref())
307                    .map(|full_id| full_id.compute_short_id())
308                {
309                    // Only resolve address for new peers with valid id
310                    Ok(peer_id) if cache.insert(peer_id) => peer_id,
311                    _ => continue,
312                };
313
314                let dht = self.clone();
315                futures.push(async move {
316                    match dht.find_address(&peer_id).await {
317                        Ok((ip, _)) => (Some(ip), node),
318                        Err(_) => (None, node),
319                    }
320                });
321            }
322
323            // Wait all results
324            while let Some((ip, node)) = futures.next().await {
325                match ip {
326                    // Add nodes with ips to result
327                    Some(ip) => result.push((ip, node)),
328                    // Prepare nodes for the next iteration in case we haven't found any yet
329                    None if result.is_empty() => nodes.push(node),
330                    _ => {}
331                }
332            }
333
334            if !result.is_empty() {
335                break;
336            }
337        }
338
339        Ok(result)
340    }
341
342    /// Searches for the first stored IP address for the given peer id
343    pub async fn find_address(
344        self: &Arc<Self>,
345        peer_id: &adnl::NodeIdShort,
346    ) -> Result<(SocketAddrV4, adnl::NodeIdFull)> {
347        let mut values = self.entry(peer_id, KEY_ADDRESS).values();
348        while let Some((key, BoxedWrapper(value))) = values.next().await {
349            match (
350                parse_address_list(&value, self.adnl.options().clock_tolerance_sec),
351                adnl::NodeIdFull::try_from(key.id.as_equivalent_ref()),
352            ) {
353                (Ok(addr), Ok(full_id)) => return Ok((addr, full_id)),
354                _ => continue,
355            }
356        }
357
358        Err(DhtNodeError::NoAddressFound.into())
359    }
360
361    /// Returns a future which stores value into multiple DHT nodes.
362    ///
363    /// See [`Node::entry`] for more convenient API
364    pub fn store_value(self: &Arc<Self>, value: proto::dht::Value<'_>) -> Result<StoreValue> {
365        StoreValue::new(self.clone(), value)
366    }
367
368    /// Stores given overlay node into multiple DHT nodes
369    ///
370    /// Returns and error if stored value is incorrect
371    pub async fn store_overlay_node(
372        self: &Arc<Self>,
373        overlay_id_full: &overlay::IdFull,
374        node: proto::overlay::Node<'_>,
375    ) -> Result<bool> {
376        let overlay_id = overlay_id_full.compute_short_id();
377        overlay_id.verify_overlay_node(&node)?;
378
379        let value = tl_proto::serialize_as_boxed(proto::overlay::Nodes {
380            nodes: smallvec![node],
381        });
382
383        let value = proto::dht::Value {
384            key: proto::dht::KeyDescription {
385                key: proto::dht::Key {
386                    id: overlay_id.as_slice(),
387                    name: KEY_NODES.as_bytes(),
388                    idx: 0,
389                },
390                id: everscale_crypto::tl::PublicKey::Overlay {
391                    name: overlay_id_full.as_slice(),
392                },
393                update_rule: proto::dht::UpdateRule::OverlayNodes,
394                signature: Default::default(),
395            },
396            value: &value,
397            ttl: now() + self.options.value_ttl_sec,
398            signature: Default::default(),
399        };
400
401        self.store_value(value)?
402            .then_check(
403                move |_, BoxedWrapper(proto::overlay::NodesOwned { nodes })| {
404                    for stored_node in &nodes {
405                        if stored_node.as_equivalent_ref() == node {
406                            return Ok(true);
407                        }
408                    }
409                    Ok(false)
410                },
411            )
412            .check_all()
413            .await
414    }
415
416    /// Stores given socket address into multiple DHT nodes
417    pub async fn store_address(
418        self: &Arc<Self>,
419        key: &adnl::Key,
420        addr: SocketAddrV4,
421    ) -> Result<bool> {
422        let clock_tolerance_sec = self.adnl.options().clock_tolerance_sec;
423
424        self.entry(key.id(), KEY_ADDRESS)
425            .with_data(
426                proto::adnl::AddressList {
427                    address: Some(proto::adnl::Address::from(&addr)),
428                    version: now(),
429                    reinit_date: self.adnl.start_time(),
430                    expire_at: 0,
431                }
432                .into_boxed(),
433            )
434            .sign_and_store(key)?
435            .then_check(move |_, BoxedWrapper(address_list)| {
436                match parse_address_list(&address_list, clock_tolerance_sec)? {
437                    stored_addr if stored_addr == addr => Ok(true),
438                    stored_addr => {
439                        tracing::warn!(
440                            stored = %stored_addr,
441                            expected = %addr,
442                            "stored address mismatch",
443                        );
444                        Ok(false)
445                    }
446                }
447            })
448            .await
449    }
450
451    async fn query<Q, A>(&self, peer_id: &adnl::NodeIdShort, query: Q) -> Result<Option<A>>
452    where
453        Q: TlWrite,
454        for<'a> A: TlRead<'a, Repr = tl_proto::Boxed> + 'static,
455    {
456        let result = self.adnl.query(&self.local_id, peer_id, query, None).await;
457        self.state.update_peer_status(peer_id, result.is_ok());
458        result
459    }
460
461    pub(super) async fn query_raw(
462        &self,
463        peer_id: &adnl::NodeIdShort,
464        query: Bytes,
465    ) -> Result<Option<Vec<u8>>> {
466        let result = self
467            .adnl
468            .query_raw(
469                &self.local_id,
470                peer_id,
471                query,
472                Some(self.options.query_timeout_ms),
473            )
474            .await;
475        self.state.update_peer_status(peer_id, result.is_ok());
476        result
477    }
478
479    async fn query_with_prefix<Q, A>(
480        &self,
481        peer_id: &adnl::NodeIdShort,
482        query: Q,
483    ) -> Result<Option<A>>
484    where
485        Q: TlWrite,
486        for<'a> A: TlRead<'a, Repr = tl_proto::Boxed> + 'static,
487    {
488        let result = self
489            .adnl
490            .query_with_prefix::<Q, A>(&self.local_id, peer_id, &self.query_prefix, query, None)
491            .await;
492        self.state.update_peer_status(peer_id, result.is_ok());
493        result
494    }
495
496    pub(super) fn parse_value_result<T>(
497        &self,
498        result: &[u8],
499    ) -> Result<Option<(proto::dht::KeyDescriptionOwned, T)>>
500    where
501        for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + 'static,
502    {
503        match tl_proto::deserialize::<proto::dht::ValueResult>(result)? {
504            proto::dht::ValueResult::ValueFound(BoxedWrapper(mut value)) => {
505                if value.key.update_rule == proto::dht::UpdateRule::Signature {
506                    verify_signed_dht_value(&mut value)?;
507                }
508
509                let parsed = tl_proto::deserialize(value.value)?;
510                Ok(Some((value.key.as_equivalent_owned(), parsed)))
511            }
512            proto::dht::ValueResult::ValueNotFound(proto::dht::NodesOwned { nodes }) => {
513                for node in nodes {
514                    if let Err(e) = self.add_dht_peer(node) {
515                        tracing::warn!("failed to add DHT peer: {e:?}");
516                    }
517                }
518                Ok(None)
519            }
520        }
521    }
522
523    #[inline(always)]
524    pub(super) fn known_peers(&self) -> &adnl::PeersSet {
525        &self.state.known_peers
526    }
527
528    #[inline(always)]
529    pub(super) fn storage(&self) -> &Storage {
530        &self.state.storage
531    }
532}
533
534struct NodeState {
535    /// Local ADNL key
536    key: Arc<adnl::Key>,
537
538    /// Known DHT nodes
539    known_peers: adnl::PeersSet,
540    /// DHT nodes penalty scores table
541    penalties: Penalties,
542
543    /// DHT nodes organized by buckets
544    buckets: Buckets,
545    /// Local DHT values storage
546    storage: Storage,
547
548    /// Max allowed `k` value for DHT `FindValue` query.
549    max_allowed_k: u32,
550}
551
552impl NodeState {
553    fn metrics(&self) -> NodeMetrics {
554        NodeMetrics {
555            known_peers_len: self.known_peers.len(),
556            bucket_peer_count: self.buckets.iter().map(|bucket| bucket.len()).sum(),
557            storage_len: self.storage.len(),
558            storage_total_size: self.storage.total_size(),
559        }
560    }
561
562    fn sign_local_node(&self, addr_list: proto::adnl::AddressList) -> proto::dht::NodeOwned {
563        let mut node = proto::dht::NodeOwned {
564            id: self.key.full_id().as_tl().as_equivalent_owned(),
565            addr_list,
566            version: addr_list.version,
567            signature: Default::default(),
568        };
569        node.signature = self.key.sign(node.as_boxed()).to_vec().into();
570        node
571    }
572
573    fn add_dht_peer(
574        &self,
575        adnl: &adnl::Node,
576        mut peer: proto::dht::NodeOwned,
577    ) -> Result<Option<adnl::NodeIdShort>> {
578        let peer_id_full = adnl::NodeIdFull::try_from(peer.id.as_equivalent_ref())?;
579
580        // Verify signature
581        let signature = std::mem::take(&mut peer.signature);
582        if peer_id_full.verify(peer.as_boxed(), &signature).is_err() {
583            tracing::warn!("invalid DHT peer signature");
584            return Ok(None);
585        }
586        peer.signature = signature;
587
588        // Parse remaining peer data
589        let peer_id = peer_id_full.compute_short_id();
590        let peer_addr = parse_address_list(&peer.addr_list, adnl.options().clock_tolerance_sec)?;
591
592        // Add new ADNL peer
593        let is_new_peer = adnl.add_peer(
594            adnl::NewPeerContext::Dht,
595            self.key.id(),
596            &peer_id,
597            peer_addr,
598            peer_id_full,
599        )?;
600        if !is_new_peer {
601            return Ok(None);
602        }
603
604        // Add new peer to the bucket
605        if self.known_peers.insert(peer_id) {
606            self.buckets.insert(&peer_id, peer);
607        } else {
608            self.set_good_peer(&peer_id);
609        }
610
611        Ok(Some(peer_id))
612    }
613
614    fn update_peer_status(&self, peer: &adnl::NodeIdShort, is_good: bool) {
615        use dashmap::mapref::entry::Entry;
616
617        if is_good {
618            self.set_good_peer(peer);
619        } else {
620            match self.penalties.entry(*peer) {
621                Entry::Occupied(mut entry) => {
622                    *entry.get_mut() += 2;
623                }
624                Entry::Vacant(entry) => {
625                    entry.insert(0);
626                }
627            }
628        }
629    }
630
631    fn set_good_peer(&self, peer: &adnl::NodeIdShort) {
632        if let Some(mut count) = self.penalties.get_mut(peer) {
633            *count.value_mut() = count.saturating_sub(1);
634        }
635    }
636
637    fn process_find_node(&self, query: proto::rpc::DhtFindNode<'_>) -> proto::dht::NodesOwned {
638        self.buckets.find(query.key, query.k)
639    }
640
641    fn process_find_value(
642        &self,
643        query: proto::rpc::DhtFindValue<'_>,
644    ) -> Result<proto::dht::ValueResultOwned> {
645        if query.k == 0 || query.k > self.max_allowed_k {
646            return Err(DhtNodeError::InvalidNodeCountLimit.into());
647        }
648
649        Ok(if let Some(value) = self.storage.get_ref(query.key) {
650            proto::dht::ValueResultOwned::ValueFound(value.clone().into_boxed())
651        } else {
652            let mut nodes = Vec::with_capacity(query.k as usize);
653
654            'outer: for bucket in &self.buckets {
655                for peer in bucket {
656                    nodes.push(peer.clone());
657
658                    if nodes.len() >= query.k as usize {
659                        break 'outer;
660                    }
661                }
662            }
663
664            proto::dht::ValueResultOwned::ValueNotFound(proto::dht::NodesOwned { nodes })
665        })
666    }
667
668    fn process_store(&self, query: proto::rpc::DhtStore<'_>) -> Result<proto::dht::Stored> {
669        self.storage.insert(query.value)?;
670        Ok(proto::dht::Stored)
671    }
672}
673
674#[async_trait::async_trait]
675impl QuerySubscriber for NodeState {
676    async fn try_consume_query<'a>(
677        &self,
678        ctx: SubscriberContext<'a>,
679        constructor: u32,
680        query: Cow<'a, [u8]>,
681    ) -> Result<QueryConsumingResult<'a>> {
682        match constructor {
683            proto::rpc::DhtPing::TL_ID => {
684                let proto::rpc::DhtPing { random_id } = tl_proto::deserialize(&query)?;
685                QueryConsumingResult::consume(proto::dht::Pong { random_id })
686            }
687            proto::rpc::DhtFindNode::TL_ID => {
688                let query = tl_proto::deserialize(&query)?;
689                QueryConsumingResult::consume(self.process_find_node(query).into_boxed())
690            }
691            proto::rpc::DhtFindValue::TL_ID => {
692                let query = tl_proto::deserialize(&query)?;
693                QueryConsumingResult::consume(self.process_find_value(query)?)
694            }
695            proto::rpc::DhtGetSignedAddressList::TL_ID => QueryConsumingResult::consume(
696                self.sign_local_node(ctx.adnl.build_address_list())
697                    .into_boxed(),
698            ),
699            proto::rpc::DhtStore::TL_ID => {
700                let query = tl_proto::deserialize(&query)?;
701                QueryConsumingResult::consume(self.process_store(query)?)
702            }
703            proto::rpc::DhtQuery::TL_ID => {
704                let mut offset = 0;
705                let proto::rpc::DhtQuery { node } = <_>::read_from(&query, &mut offset)?;
706                let constructor = u32::read_from(&query, &mut std::convert::identity(offset))?;
707
708                if offset >= query.len() {
709                    return Err(DhtNodeError::UnexpectedQuery.into());
710                }
711
712                self.add_dht_peer(ctx.adnl, node.as_equivalent_owned())?;
713
714                match self
715                    .try_consume_query(ctx, constructor, Cow::Borrowed(&query[offset..]))
716                    .await?
717                {
718                    QueryConsumingResult::Consumed(answer) => {
719                        Ok(QueryConsumingResult::Consumed(answer))
720                    }
721                    QueryConsumingResult::Rejected(_) => Err(DhtNodeError::UnexpectedQuery.into()),
722                }
723            }
724            _ => Ok(QueryConsumingResult::Rejected(query)),
725        }
726    }
727}
728
729fn verify_signed_dht_value(value: &mut proto::dht::Value<'_>) -> Result<()> {
730    if value.key.key.id != &tl_proto::hash(value.key.id) {
731        return Err(DhtNodeError::InvalidValueKey.into());
732    }
733
734    let full_id = adnl::NodeIdFull::try_from(value.key.id)?;
735
736    let key_signature = std::mem::take(&mut value.key.signature);
737    full_id.verify(value.key.as_boxed(), key_signature)?;
738    value.key.signature = key_signature;
739
740    let value_signature = std::mem::take(&mut value.signature);
741    full_id.verify(value.as_boxed(), value_signature)?;
742    value.signature = value_signature;
743
744    Ok(())
745}
746
747/// Instant DHT node metrics
748#[derive(Debug, Copy, Clone)]
749pub struct NodeMetrics {
750    pub known_peers_len: usize,
751    pub bucket_peer_count: usize,
752    pub storage_len: usize,
753    pub storage_total_size: usize,
754}
755
756type Penalties = FastDashMap<adnl::NodeIdShort, usize>;
757
758#[derive(thiserror::Error, Debug)]
759enum DhtNodeError {
760    #[error("No address found")]
761    NoAddressFound,
762    #[error("Unexpected DHT query")]
763    UnexpectedQuery,
764    #[error("Invalid node count limit")]
765    InvalidNodeCountLimit,
766    #[error("Invalid value key")]
767    InvalidValueKey,
768}