everscale_network/dht/
streams.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use futures_util::future::BoxFuture;
7use futures_util::stream::FuturesUnordered;
8use futures_util::{Stream, StreamExt};
9use tl_proto::TlRead;
10
11use super::node::Node;
12use super::peers_iter::PeersIter;
13use crate::proto;
14
15/// Stream for the `DhtNode::values` method.
16#[must_use = "streams do nothing unless polled"]
17pub struct DhtValuesStream<T> {
18    dht: Arc<Node>,
19    query: Bytes,
20    batch_len: Option<usize>,
21    known_peers_version: u64,
22    use_new_peers: bool,
23    peers_iter: PeersIter,
24    futures: FuturesUnordered<ValueFuture<T>>,
25    future_count: usize,
26    _marker: std::marker::PhantomData<T>,
27}
28
29impl<T> Unpin for DhtValuesStream<T> {}
30
31impl<T> DhtValuesStream<T>
32where
33    for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
34{
35    pub(super) fn new(dht: Arc<Node>, key: proto::dht::Key<'_>) -> Self {
36        let key_id = tl_proto::hash_as_boxed(key);
37        let peers_iter = PeersIter::with_key_id(key_id);
38
39        let batch_len = Some(dht.options().default_value_batch_len);
40        let known_peers_version = dht.known_peers().version();
41
42        let query = tl_proto::serialize(proto::rpc::DhtFindValue { key: &key_id, k: 6 }).into();
43
44        Self {
45            dht,
46            query,
47            batch_len,
48            known_peers_version,
49            use_new_peers: false,
50            peers_iter,
51            futures: Default::default(),
52            future_count: usize::MAX,
53            _marker: Default::default(),
54        }
55    }
56
57    /// Use all DHT nodes in peers iterator
58    pub fn use_full_batch(mut self) -> Self {
59        self.batch_len = None;
60        self
61    }
62
63    /// Whether stream should fill peers iterator when new nodes are found
64    pub fn use_new_peers(mut self, enable: bool) -> Self {
65        self.use_new_peers = enable;
66        self
67    }
68
69    fn refill_futures(&mut self) {
70        // Spawn at most `max_tasks` queries
71        while let Some(peer_id) = self.peers_iter.next() {
72            let dht = self.dht.clone();
73            let query = self.query.clone();
74
75            self.futures.push(Box::pin(async move {
76                match dht.query_raw(&peer_id, query).await {
77                    Ok(Some(result)) => match dht.parse_value_result::<T>(&result) {
78                        Ok(Some(value)) => Some(value),
79                        Ok(None) => None,
80                        Err(e) => {
81                            tracing::warn!("failed to parse queried value: {e}");
82                            None
83                        }
84                    },
85                    Ok(None) => None,
86                    Err(e) => {
87                        tracing::warn!("failed to query value: {e}");
88                        None
89                    }
90                }
91            }));
92
93            self.future_count += 1;
94            if self.future_count > MAX_PARALLEL_FUTURES {
95                break;
96            }
97        }
98    }
99}
100
101impl<T> Stream for DhtValuesStream<T>
102where
103    for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
104{
105    type Item = ReceivedValue<T>;
106
107    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108        let this = self.get_mut();
109
110        // Fill iterator during the first poll
111        if this.future_count == usize::MAX {
112            this.peers_iter.fill(&this.dht, this.batch_len);
113            this.future_count = 0;
114        }
115
116        loop {
117            // Keep starting new futures when we can
118            if this.future_count < MAX_PARALLEL_FUTURES {
119                this.refill_futures();
120            }
121
122            match this.futures.poll_next_unpin(cx) {
123                Poll::Ready(Some(value)) => {
124                    // Refill peers iterator when version has changed and `use_new_peers` is set
125                    match this.dht.known_peers().version() {
126                        version if this.use_new_peers && version != this.known_peers_version => {
127                            this.peers_iter.fill(&this.dht, this.batch_len);
128                            this.known_peers_version = version;
129                        }
130                        _ => {}
131                    }
132
133                    // Decrease the number of parallel futures on each new item from `futures`
134                    this.future_count -= 1;
135
136                    if let Some(value) = value {
137                        break Poll::Ready(Some(value));
138                    }
139                }
140                Poll::Ready(None) => break Poll::Ready(None),
141                Poll::Pending => break Poll::Pending,
142            }
143        }
144    }
145}
146
147type ValueFuture<T> = BoxFuture<'static, Option<ReceivedValue<T>>>;
148type ReceivedValue<T> = (proto::dht::KeyDescriptionOwned, T);
149
150const MAX_PARALLEL_FUTURES: usize = 5;