1use std::borrow::{Borrow, Cow};
2use std::convert::TryFrom;
3use std::net::SocketAddrV4;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::Result;
8use bytes::Bytes;
9use futures_util::stream::FuturesUnordered;
10use futures_util::StreamExt;
11use smallvec::smallvec;
12use tl_proto::{BoxedConstructor, BoxedWrapper, TlRead, TlWrite};
13
14use super::buckets::Buckets;
15use super::entry::Entry;
16use super::futures::StoreValue;
17use super::storage::{Storage, StorageOptions};
18use super::{KEY_ADDRESS, KEY_NODES, MAX_DHT_PEERS};
19use crate::adnl;
20use crate::overlay;
21use crate::proto;
22use crate::subscriber::*;
23use crate::util::*;
24
25#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize)]
27#[serde(default)]
28pub struct NodeOptions {
29 pub value_ttl_sec: u32,
34
35 pub query_timeout_ms: u64,
39
40 pub default_value_batch_len: usize,
44
45 pub bad_peer_threshold: usize,
50
51 pub max_allowed_k: u32,
55
56 pub max_key_name_len: usize,
62
63 pub max_key_index: u32,
69
70 pub storage_gc_interval_ms: u64,
74}
75
76impl Default for NodeOptions {
77 fn default() -> Self {
78 Self {
79 value_ttl_sec: 3600,
80 query_timeout_ms: 1000,
81 default_value_batch_len: 5,
82 bad_peer_threshold: 5,
83 max_allowed_k: 20,
84 max_key_name_len: 127,
85 max_key_index: 15,
86 storage_gc_interval_ms: 10000,
87 }
88 }
89}
90
91pub struct Node {
93 adnl: Arc<adnl::Node>,
95
96 local_id: adnl::NodeIdShort,
98
99 query_prefix: Vec<u8>,
101
102 options: NodeOptions,
104
105 state: Arc<NodeState>,
107}
108
109impl Node {
110 pub fn new(adnl: Arc<adnl::Node>, key_tag: usize, options: NodeOptions) -> Result<Arc<Self>> {
112 let key = adnl.key_by_tag(key_tag)?.clone();
113
114 let buckets = Buckets::new(key.id());
115 let storage = Storage::new(StorageOptions {
116 max_key_name_len: options.max_key_name_len,
117 max_key_index: options.max_key_index,
118 });
119
120 let state = Arc::new(NodeState {
121 key: key.clone(),
122 known_peers: adnl::PeersSet::with_capacity(MAX_DHT_PEERS),
123 penalties: Default::default(),
124 buckets,
125 storage,
126 max_allowed_k: options.max_allowed_k,
127 });
128
129 adnl.add_query_subscriber(state.clone())?;
130
131 let query_prefix = tl_proto::serialize(proto::rpc::DhtQuery {
132 node: state
133 .sign_local_node(adnl.build_address_list())
134 .as_equivalent_ref(),
135 });
136
137 let dht_node = Arc::new(Self {
138 adnl,
139 local_id: *key.id(),
140 query_prefix,
141 options,
142 state,
143 });
144
145 let state = Arc::downgrade(&dht_node.state);
146 let interval = Duration::from_millis(dht_node.options.storage_gc_interval_ms);
147 tokio::spawn(async move {
148 loop {
149 tokio::time::sleep(interval).await;
150 if let Some(state) = state.upgrade() {
151 state.storage.gc();
152 }
153 }
154 });
155
156 Ok(dht_node)
157 }
158
159 #[inline(always)]
161 pub fn options(&self) -> &NodeOptions {
162 &self.options
163 }
164
165 #[inline(always)]
167 pub fn metrics(&self) -> NodeMetrics {
168 self.state.metrics()
169 }
170
171 #[inline(always)]
173 pub fn adnl(&self) -> &Arc<adnl::Node> {
174 &self.adnl
175 }
176
177 #[inline(always)]
178 pub fn key(&self) -> &Arc<adnl::Key> {
179 &self.state.key
180 }
181
182 pub fn iter_known_peers(&self) -> impl Iterator<Item = &adnl::NodeIdShort> {
183 self.state.known_peers.iter()
184 }
185
186 pub fn add_dht_peer(&self, peer: proto::dht::NodeOwned) -> Result<Option<adnl::NodeIdShort>> {
188 self.state.add_dht_peer(&self.adnl, peer)
189 }
190
191 pub fn is_bad_peer(&self, peer: &adnl::NodeIdShort) -> bool {
193 matches!(
194 self.state.penalties.get(peer),
195 Some(penalty) if *penalty > self.options.bad_peer_threshold
196 )
197 }
198
199 pub async fn ping(&self, peer_id: &adnl::NodeIdShort) -> Result<bool> {
201 use rand::RngCore;
202 let random_id = fast_thread_rng().next_u64();
203 match self
204 .query(peer_id, proto::rpc::DhtPing { random_id })
205 .await?
206 {
207 Some(proto::dht::Pong { random_id: answer }) => Ok(answer == random_id),
208 None => Ok(false),
209 }
210 }
211
212 pub fn entry<'a, T>(self: &'a Arc<Self>, id: &'a T, name: &'a str) -> Entry<'a>
214 where
215 T: Borrow<[u8; 32]>,
216 {
217 Entry::new(self, id, name)
218 }
219
220 pub async fn query_dht_nodes(
223 &self,
224 peer_id: &adnl::NodeIdShort,
225 k: u32,
226 store_self: bool,
227 ) -> Result<Vec<proto::dht::NodeOwned>> {
228 let query = proto::rpc::DhtFindNode {
229 key: self.local_id.as_slice(),
230 k,
231 };
232
233 let answer = match store_self {
234 true => self.query_with_prefix(peer_id, query).await,
235 false => self.query(peer_id, query).await,
236 }?;
237
238 Ok(match answer {
239 Some(BoxedWrapper(proto::dht::NodesOwned { nodes })) => nodes,
240 None => Vec::new(),
241 })
242 }
243
244 pub async fn find_more_dht_nodes(&self) -> Result<usize> {
246 let known_nodes = self.known_peers().clone_inner();
247
248 let mut tasks = futures_util::stream::FuturesUnordered::new();
249 for peer_id in known_nodes {
250 tasks.push(async move {
251 let res = self.query_dht_nodes(&peer_id, 10, false).await;
252 (peer_id, res)
253 });
254 }
255
256 let mut node_count = 0;
257 while let Some((peer_id, res)) = tasks.next().await {
258 match res {
259 Ok(nodes) => {
260 for node in nodes {
261 node_count += self.add_dht_peer(node)?.is_some() as usize;
262 }
263 }
264 Err(e) => {
265 tracing::warn!(%peer_id, "failed to get DHT nodes: {e:?}")
266 }
267 }
268 }
269
270 Ok(node_count)
271 }
272
273 pub async fn find_overlay_nodes(
278 self: &Arc<Self>,
279 overlay_id: &overlay::IdShort,
280 ) -> Result<Vec<(SocketAddrV4, proto::overlay::NodeOwned)>> {
281 let mut result = Vec::new();
282 let mut nodes = Vec::new();
283 let mut cache = FastHashSet::default();
284 loop {
285 let received = self
287 .entry(overlay_id, KEY_NODES)
288 .values()
289 .use_new_peers(true)
290 .map(|(_, BoxedWrapper(proto::overlay::NodesOwned { nodes }))| nodes)
291 .collect::<Vec<_>>()
292 .await;
293 if received.is_empty() {
294 break;
295 }
296
297 let mut futures = FuturesUnordered::new();
298
299 for node in received
302 .into_iter()
303 .flatten()
304 .chain(std::mem::take(&mut nodes).into_iter())
305 {
306 let peer_id = match adnl::NodeIdFull::try_from(node.id.as_equivalent_ref())
307 .map(|full_id| full_id.compute_short_id())
308 {
309 Ok(peer_id) if cache.insert(peer_id) => peer_id,
311 _ => continue,
312 };
313
314 let dht = self.clone();
315 futures.push(async move {
316 match dht.find_address(&peer_id).await {
317 Ok((ip, _)) => (Some(ip), node),
318 Err(_) => (None, node),
319 }
320 });
321 }
322
323 while let Some((ip, node)) = futures.next().await {
325 match ip {
326 Some(ip) => result.push((ip, node)),
328 None if result.is_empty() => nodes.push(node),
330 _ => {}
331 }
332 }
333
334 if !result.is_empty() {
335 break;
336 }
337 }
338
339 Ok(result)
340 }
341
342 pub async fn find_address(
344 self: &Arc<Self>,
345 peer_id: &adnl::NodeIdShort,
346 ) -> Result<(SocketAddrV4, adnl::NodeIdFull)> {
347 let mut values = self.entry(peer_id, KEY_ADDRESS).values();
348 while let Some((key, BoxedWrapper(value))) = values.next().await {
349 match (
350 parse_address_list(&value, self.adnl.options().clock_tolerance_sec),
351 adnl::NodeIdFull::try_from(key.id.as_equivalent_ref()),
352 ) {
353 (Ok(addr), Ok(full_id)) => return Ok((addr, full_id)),
354 _ => continue,
355 }
356 }
357
358 Err(DhtNodeError::NoAddressFound.into())
359 }
360
361 pub fn store_value(self: &Arc<Self>, value: proto::dht::Value<'_>) -> Result<StoreValue> {
365 StoreValue::new(self.clone(), value)
366 }
367
368 pub async fn store_overlay_node(
372 self: &Arc<Self>,
373 overlay_id_full: &overlay::IdFull,
374 node: proto::overlay::Node<'_>,
375 ) -> Result<bool> {
376 let overlay_id = overlay_id_full.compute_short_id();
377 overlay_id.verify_overlay_node(&node)?;
378
379 let value = tl_proto::serialize_as_boxed(proto::overlay::Nodes {
380 nodes: smallvec![node],
381 });
382
383 let value = proto::dht::Value {
384 key: proto::dht::KeyDescription {
385 key: proto::dht::Key {
386 id: overlay_id.as_slice(),
387 name: KEY_NODES.as_bytes(),
388 idx: 0,
389 },
390 id: everscale_crypto::tl::PublicKey::Overlay {
391 name: overlay_id_full.as_slice(),
392 },
393 update_rule: proto::dht::UpdateRule::OverlayNodes,
394 signature: Default::default(),
395 },
396 value: &value,
397 ttl: now() + self.options.value_ttl_sec,
398 signature: Default::default(),
399 };
400
401 self.store_value(value)?
402 .then_check(
403 move |_, BoxedWrapper(proto::overlay::NodesOwned { nodes })| {
404 for stored_node in &nodes {
405 if stored_node.as_equivalent_ref() == node {
406 return Ok(true);
407 }
408 }
409 Ok(false)
410 },
411 )
412 .check_all()
413 .await
414 }
415
416 pub async fn store_address(
418 self: &Arc<Self>,
419 key: &adnl::Key,
420 addr: SocketAddrV4,
421 ) -> Result<bool> {
422 let clock_tolerance_sec = self.adnl.options().clock_tolerance_sec;
423
424 self.entry(key.id(), KEY_ADDRESS)
425 .with_data(
426 proto::adnl::AddressList {
427 address: Some(proto::adnl::Address::from(&addr)),
428 version: now(),
429 reinit_date: self.adnl.start_time(),
430 expire_at: 0,
431 }
432 .into_boxed(),
433 )
434 .sign_and_store(key)?
435 .then_check(move |_, BoxedWrapper(address_list)| {
436 match parse_address_list(&address_list, clock_tolerance_sec)? {
437 stored_addr if stored_addr == addr => Ok(true),
438 stored_addr => {
439 tracing::warn!(
440 stored = %stored_addr,
441 expected = %addr,
442 "stored address mismatch",
443 );
444 Ok(false)
445 }
446 }
447 })
448 .await
449 }
450
451 async fn query<Q, A>(&self, peer_id: &adnl::NodeIdShort, query: Q) -> Result<Option<A>>
452 where
453 Q: TlWrite,
454 for<'a> A: TlRead<'a, Repr = tl_proto::Boxed> + 'static,
455 {
456 let result = self.adnl.query(&self.local_id, peer_id, query, None).await;
457 self.state.update_peer_status(peer_id, result.is_ok());
458 result
459 }
460
461 pub(super) async fn query_raw(
462 &self,
463 peer_id: &adnl::NodeIdShort,
464 query: Bytes,
465 ) -> Result<Option<Vec<u8>>> {
466 let result = self
467 .adnl
468 .query_raw(
469 &self.local_id,
470 peer_id,
471 query,
472 Some(self.options.query_timeout_ms),
473 )
474 .await;
475 self.state.update_peer_status(peer_id, result.is_ok());
476 result
477 }
478
479 async fn query_with_prefix<Q, A>(
480 &self,
481 peer_id: &adnl::NodeIdShort,
482 query: Q,
483 ) -> Result<Option<A>>
484 where
485 Q: TlWrite,
486 for<'a> A: TlRead<'a, Repr = tl_proto::Boxed> + 'static,
487 {
488 let result = self
489 .adnl
490 .query_with_prefix::<Q, A>(&self.local_id, peer_id, &self.query_prefix, query, None)
491 .await;
492 self.state.update_peer_status(peer_id, result.is_ok());
493 result
494 }
495
496 pub(super) fn parse_value_result<T>(
497 &self,
498 result: &[u8],
499 ) -> Result<Option<(proto::dht::KeyDescriptionOwned, T)>>
500 where
501 for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + 'static,
502 {
503 match tl_proto::deserialize::<proto::dht::ValueResult>(result)? {
504 proto::dht::ValueResult::ValueFound(BoxedWrapper(mut value)) => {
505 if value.key.update_rule == proto::dht::UpdateRule::Signature {
506 verify_signed_dht_value(&mut value)?;
507 }
508
509 let parsed = tl_proto::deserialize(value.value)?;
510 Ok(Some((value.key.as_equivalent_owned(), parsed)))
511 }
512 proto::dht::ValueResult::ValueNotFound(proto::dht::NodesOwned { nodes }) => {
513 for node in nodes {
514 if let Err(e) = self.add_dht_peer(node) {
515 tracing::warn!("failed to add DHT peer: {e:?}");
516 }
517 }
518 Ok(None)
519 }
520 }
521 }
522
523 #[inline(always)]
524 pub(super) fn known_peers(&self) -> &adnl::PeersSet {
525 &self.state.known_peers
526 }
527
528 #[inline(always)]
529 pub(super) fn storage(&self) -> &Storage {
530 &self.state.storage
531 }
532}
533
534struct NodeState {
535 key: Arc<adnl::Key>,
537
538 known_peers: adnl::PeersSet,
540 penalties: Penalties,
542
543 buckets: Buckets,
545 storage: Storage,
547
548 max_allowed_k: u32,
550}
551
552impl NodeState {
553 fn metrics(&self) -> NodeMetrics {
554 NodeMetrics {
555 known_peers_len: self.known_peers.len(),
556 bucket_peer_count: self.buckets.iter().map(|bucket| bucket.len()).sum(),
557 storage_len: self.storage.len(),
558 storage_total_size: self.storage.total_size(),
559 }
560 }
561
562 fn sign_local_node(&self, addr_list: proto::adnl::AddressList) -> proto::dht::NodeOwned {
563 let mut node = proto::dht::NodeOwned {
564 id: self.key.full_id().as_tl().as_equivalent_owned(),
565 addr_list,
566 version: addr_list.version,
567 signature: Default::default(),
568 };
569 node.signature = self.key.sign(node.as_boxed()).to_vec().into();
570 node
571 }
572
573 fn add_dht_peer(
574 &self,
575 adnl: &adnl::Node,
576 mut peer: proto::dht::NodeOwned,
577 ) -> Result<Option<adnl::NodeIdShort>> {
578 let peer_id_full = adnl::NodeIdFull::try_from(peer.id.as_equivalent_ref())?;
579
580 let signature = std::mem::take(&mut peer.signature);
582 if peer_id_full.verify(peer.as_boxed(), &signature).is_err() {
583 tracing::warn!("invalid DHT peer signature");
584 return Ok(None);
585 }
586 peer.signature = signature;
587
588 let peer_id = peer_id_full.compute_short_id();
590 let peer_addr = parse_address_list(&peer.addr_list, adnl.options().clock_tolerance_sec)?;
591
592 let is_new_peer = adnl.add_peer(
594 adnl::NewPeerContext::Dht,
595 self.key.id(),
596 &peer_id,
597 peer_addr,
598 peer_id_full,
599 )?;
600 if !is_new_peer {
601 return Ok(None);
602 }
603
604 if self.known_peers.insert(peer_id) {
606 self.buckets.insert(&peer_id, peer);
607 } else {
608 self.set_good_peer(&peer_id);
609 }
610
611 Ok(Some(peer_id))
612 }
613
614 fn update_peer_status(&self, peer: &adnl::NodeIdShort, is_good: bool) {
615 use dashmap::mapref::entry::Entry;
616
617 if is_good {
618 self.set_good_peer(peer);
619 } else {
620 match self.penalties.entry(*peer) {
621 Entry::Occupied(mut entry) => {
622 *entry.get_mut() += 2;
623 }
624 Entry::Vacant(entry) => {
625 entry.insert(0);
626 }
627 }
628 }
629 }
630
631 fn set_good_peer(&self, peer: &adnl::NodeIdShort) {
632 if let Some(mut count) = self.penalties.get_mut(peer) {
633 *count.value_mut() = count.saturating_sub(1);
634 }
635 }
636
637 fn process_find_node(&self, query: proto::rpc::DhtFindNode<'_>) -> proto::dht::NodesOwned {
638 self.buckets.find(query.key, query.k)
639 }
640
641 fn process_find_value(
642 &self,
643 query: proto::rpc::DhtFindValue<'_>,
644 ) -> Result<proto::dht::ValueResultOwned> {
645 if query.k == 0 || query.k > self.max_allowed_k {
646 return Err(DhtNodeError::InvalidNodeCountLimit.into());
647 }
648
649 Ok(if let Some(value) = self.storage.get_ref(query.key) {
650 proto::dht::ValueResultOwned::ValueFound(value.clone().into_boxed())
651 } else {
652 let mut nodes = Vec::with_capacity(query.k as usize);
653
654 'outer: for bucket in &self.buckets {
655 for peer in bucket {
656 nodes.push(peer.clone());
657
658 if nodes.len() >= query.k as usize {
659 break 'outer;
660 }
661 }
662 }
663
664 proto::dht::ValueResultOwned::ValueNotFound(proto::dht::NodesOwned { nodes })
665 })
666 }
667
668 fn process_store(&self, query: proto::rpc::DhtStore<'_>) -> Result<proto::dht::Stored> {
669 self.storage.insert(query.value)?;
670 Ok(proto::dht::Stored)
671 }
672}
673
674#[async_trait::async_trait]
675impl QuerySubscriber for NodeState {
676 async fn try_consume_query<'a>(
677 &self,
678 ctx: SubscriberContext<'a>,
679 constructor: u32,
680 query: Cow<'a, [u8]>,
681 ) -> Result<QueryConsumingResult<'a>> {
682 match constructor {
683 proto::rpc::DhtPing::TL_ID => {
684 let proto::rpc::DhtPing { random_id } = tl_proto::deserialize(&query)?;
685 QueryConsumingResult::consume(proto::dht::Pong { random_id })
686 }
687 proto::rpc::DhtFindNode::TL_ID => {
688 let query = tl_proto::deserialize(&query)?;
689 QueryConsumingResult::consume(self.process_find_node(query).into_boxed())
690 }
691 proto::rpc::DhtFindValue::TL_ID => {
692 let query = tl_proto::deserialize(&query)?;
693 QueryConsumingResult::consume(self.process_find_value(query)?)
694 }
695 proto::rpc::DhtGetSignedAddressList::TL_ID => QueryConsumingResult::consume(
696 self.sign_local_node(ctx.adnl.build_address_list())
697 .into_boxed(),
698 ),
699 proto::rpc::DhtStore::TL_ID => {
700 let query = tl_proto::deserialize(&query)?;
701 QueryConsumingResult::consume(self.process_store(query)?)
702 }
703 proto::rpc::DhtQuery::TL_ID => {
704 let mut offset = 0;
705 let proto::rpc::DhtQuery { node } = <_>::read_from(&query, &mut offset)?;
706 let constructor = u32::read_from(&query, &mut std::convert::identity(offset))?;
707
708 if offset >= query.len() {
709 return Err(DhtNodeError::UnexpectedQuery.into());
710 }
711
712 self.add_dht_peer(ctx.adnl, node.as_equivalent_owned())?;
713
714 match self
715 .try_consume_query(ctx, constructor, Cow::Borrowed(&query[offset..]))
716 .await?
717 {
718 QueryConsumingResult::Consumed(answer) => {
719 Ok(QueryConsumingResult::Consumed(answer))
720 }
721 QueryConsumingResult::Rejected(_) => Err(DhtNodeError::UnexpectedQuery.into()),
722 }
723 }
724 _ => Ok(QueryConsumingResult::Rejected(query)),
725 }
726 }
727}
728
729fn verify_signed_dht_value(value: &mut proto::dht::Value<'_>) -> Result<()> {
730 if value.key.key.id != &tl_proto::hash(value.key.id) {
731 return Err(DhtNodeError::InvalidValueKey.into());
732 }
733
734 let full_id = adnl::NodeIdFull::try_from(value.key.id)?;
735
736 let key_signature = std::mem::take(&mut value.key.signature);
737 full_id.verify(value.key.as_boxed(), key_signature)?;
738 value.key.signature = key_signature;
739
740 let value_signature = std::mem::take(&mut value.signature);
741 full_id.verify(value.as_boxed(), value_signature)?;
742 value.signature = value_signature;
743
744 Ok(())
745}
746
747#[derive(Debug, Copy, Clone)]
749pub struct NodeMetrics {
750 pub known_peers_len: usize,
751 pub bucket_peer_count: usize,
752 pub storage_len: usize,
753 pub storage_total_size: usize,
754}
755
756type Penalties = FastDashMap<adnl::NodeIdShort, usize>;
757
758#[derive(thiserror::Error, Debug)]
759enum DhtNodeError {
760 #[error("No address found")]
761 NoAddressFound,
762 #[error("Unexpected DHT query")]
763 UnexpectedQuery,
764 #[error("Invalid node count limit")]
765 InvalidNodeCountLimit,
766 #[error("Invalid value key")]
767 InvalidValueKey,
768}