everscale_network/dht/
streams.rs1use std::pin::Pin;
2use std::sync::Arc;
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use futures_util::future::BoxFuture;
7use futures_util::stream::FuturesUnordered;
8use futures_util::{Stream, StreamExt};
9use tl_proto::TlRead;
10
11use super::node::Node;
12use super::peers_iter::PeersIter;
13use crate::proto;
14
15#[must_use = "streams do nothing unless polled"]
17pub struct DhtValuesStream<T> {
18 dht: Arc<Node>,
19 query: Bytes,
20 batch_len: Option<usize>,
21 known_peers_version: u64,
22 use_new_peers: bool,
23 peers_iter: PeersIter,
24 futures: FuturesUnordered<ValueFuture<T>>,
25 future_count: usize,
26 _marker: std::marker::PhantomData<T>,
27}
28
29impl<T> Unpin for DhtValuesStream<T> {}
30
31impl<T> DhtValuesStream<T>
32where
33 for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
34{
35 pub(super) fn new(dht: Arc<Node>, key: proto::dht::Key<'_>) -> Self {
36 let key_id = tl_proto::hash_as_boxed(key);
37 let peers_iter = PeersIter::with_key_id(key_id);
38
39 let batch_len = Some(dht.options().default_value_batch_len);
40 let known_peers_version = dht.known_peers().version();
41
42 let query = tl_proto::serialize(proto::rpc::DhtFindValue { key: &key_id, k: 6 }).into();
43
44 Self {
45 dht,
46 query,
47 batch_len,
48 known_peers_version,
49 use_new_peers: false,
50 peers_iter,
51 futures: Default::default(),
52 future_count: usize::MAX,
53 _marker: Default::default(),
54 }
55 }
56
57 pub fn use_full_batch(mut self) -> Self {
59 self.batch_len = None;
60 self
61 }
62
63 pub fn use_new_peers(mut self, enable: bool) -> Self {
65 self.use_new_peers = enable;
66 self
67 }
68
69 fn refill_futures(&mut self) {
70 while let Some(peer_id) = self.peers_iter.next() {
72 let dht = self.dht.clone();
73 let query = self.query.clone();
74
75 self.futures.push(Box::pin(async move {
76 match dht.query_raw(&peer_id, query).await {
77 Ok(Some(result)) => match dht.parse_value_result::<T>(&result) {
78 Ok(Some(value)) => Some(value),
79 Ok(None) => None,
80 Err(e) => {
81 tracing::warn!("failed to parse queried value: {e}");
82 None
83 }
84 },
85 Ok(None) => None,
86 Err(e) => {
87 tracing::warn!("failed to query value: {e}");
88 None
89 }
90 }
91 }));
92
93 self.future_count += 1;
94 if self.future_count > MAX_PARALLEL_FUTURES {
95 break;
96 }
97 }
98 }
99}
100
101impl<T> Stream for DhtValuesStream<T>
102where
103 for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
104{
105 type Item = ReceivedValue<T>;
106
107 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 let this = self.get_mut();
109
110 if this.future_count == usize::MAX {
112 this.peers_iter.fill(&this.dht, this.batch_len);
113 this.future_count = 0;
114 }
115
116 loop {
117 if this.future_count < MAX_PARALLEL_FUTURES {
119 this.refill_futures();
120 }
121
122 match this.futures.poll_next_unpin(cx) {
123 Poll::Ready(Some(value)) => {
124 match this.dht.known_peers().version() {
126 version if this.use_new_peers && version != this.known_peers_version => {
127 this.peers_iter.fill(&this.dht, this.batch_len);
128 this.known_peers_version = version;
129 }
130 _ => {}
131 }
132
133 this.future_count -= 1;
135
136 if let Some(value) = value {
137 break Poll::Ready(Some(value));
138 }
139 }
140 Poll::Ready(None) => break Poll::Ready(None),
141 Poll::Pending => break Poll::Pending,
142 }
143 }
144 }
145}
146
147type ValueFuture<T> = BoxFuture<'static, Option<ReceivedValue<T>>>;
148type ReceivedValue<T> = (proto::dht::KeyDescriptionOwned, T);
149
150const MAX_PARALLEL_FUTURES: usize = 5;