everscale_network/dht/
futures.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use anyhow::Result;
7use bytes::Bytes;
8use futures_util::future::BoxFuture;
9use futures_util::stream::FuturesUnordered;
10use futures_util::{FutureExt, StreamExt};
11use tl_proto::TlRead;
12
13use super::streams::DhtValuesStream;
14use super::Node;
15use crate::proto;
16
17/// Future for the `DhtNode::store_value` method.
18#[must_use = "futures do nothing unless polled"]
19pub struct StoreValue {
20    dht: Arc<Node>,
21    key: proto::dht::KeyOwned,
22    query: Bytes,
23    futures: FuturesUnordered<StoreFuture>,
24    started: bool,
25}
26
27impl StoreValue {
28    pub(super) fn new(dht: Arc<Node>, value: proto::dht::Value<'_>) -> Result<Self> {
29        dht.storage().insert(value)?;
30
31        let key = value.key.key.as_equivalent_owned();
32        let query = tl_proto::serialize(proto::rpc::DhtStore { value }).into();
33
34        Ok(Self {
35            dht,
36            key,
37            query,
38            futures: Default::default(),
39            started: false,
40        })
41    }
42
43    /// Wraps `DhtStoreValue` into future which verifies that value is stored in the DHT
44    /// and passes the predicate test
45    pub fn then_check<T, FV>(self, check_value: FV) -> DhtStoreValueWithCheck<T, FV> {
46        DhtStoreValueWithCheck {
47            store_value: self,
48            find_value: None,
49            check_value,
50            check_all: false,
51            _marker: Default::default(),
52        }
53    }
54
55    /// Drops the future, causing the value to be stored only locally
56    pub fn only_locally(self) {}
57}
58
59impl Future for StoreValue {
60    type Output = ();
61
62    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63        if !self.started {
64            for &peer_id in self.dht.iter_known_peers() {
65                let dht = self.dht.clone();
66                let query = self.query.clone();
67                self.futures.push(Box::pin(async move {
68                    dht.query_raw(&peer_id, query).await.ok();
69                }));
70            }
71            self.started = true;
72        }
73
74        loop {
75            match self.futures.poll_next_unpin(cx) {
76                Poll::Ready(Some(_)) => continue,
77                Poll::Ready(None) => break Poll::Ready(()),
78                Poll::Pending => break Poll::Pending,
79            }
80        }
81    }
82}
83
84/// Future for the `DhtStoreValue::ensure_stored` method.
85#[must_use = "futures do nothing unless polled"]
86pub struct DhtStoreValueWithCheck<T, FV> {
87    store_value: StoreValue,
88    find_value: Option<DhtValuesStream<T>>,
89    check_value: FV,
90    check_all: bool,
91    _marker: std::marker::PhantomData<T>,
92}
93
94impl<T, FV> DhtStoreValueWithCheck<T, FV> {
95    /// Forces the future to check all stored values
96    pub fn check_all(mut self) -> Self {
97        self.check_all = true;
98        self
99    }
100}
101
102impl<T, FV> Unpin for DhtStoreValueWithCheck<T, FV> {}
103
104impl<T, FV> Future for DhtStoreValueWithCheck<T, FV>
105where
106    FV: FnMut(proto::dht::KeyDescriptionOwned, T) -> Result<bool>,
107    for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
108{
109    type Output = Result<bool>;
110
111    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112        loop {
113            match &mut self.find_value {
114                None => {
115                    futures_util::ready!(self.store_value.poll_unpin(cx));
116                    self.find_value = Some(
117                        DhtValuesStream::new(
118                            self.store_value.dht.clone(),
119                            self.store_value.key.as_equivalent_ref(),
120                        )
121                        .use_new_peers(true),
122                    );
123                }
124                Some(find_value) => match find_value.poll_next_unpin(cx) {
125                    Poll::Ready(Some((key, value))) => match (self.check_value)(key, value) {
126                        Ok(true) => break Poll::Ready(Ok(true)),
127                        Ok(false) => continue,
128                        Err(e) => break Poll::Ready(Err(e)),
129                    },
130                    Poll::Ready(None) => break Poll::Ready(Ok(false)),
131                    Poll::Pending => break Poll::Pending,
132                },
133            }
134        }
135    }
136}
137
138type StoreFuture = BoxFuture<'static, ()>;