everscale_network/dht/
futures.rs1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use anyhow::Result;
7use bytes::Bytes;
8use futures_util::future::BoxFuture;
9use futures_util::stream::FuturesUnordered;
10use futures_util::{FutureExt, StreamExt};
11use tl_proto::TlRead;
12
13use super::streams::DhtValuesStream;
14use super::Node;
15use crate::proto;
16
17#[must_use = "futures do nothing unless polled"]
19pub struct StoreValue {
20 dht: Arc<Node>,
21 key: proto::dht::KeyOwned,
22 query: Bytes,
23 futures: FuturesUnordered<StoreFuture>,
24 started: bool,
25}
26
27impl StoreValue {
28 pub(super) fn new(dht: Arc<Node>, value: proto::dht::Value<'_>) -> Result<Self> {
29 dht.storage().insert(value)?;
30
31 let key = value.key.key.as_equivalent_owned();
32 let query = tl_proto::serialize(proto::rpc::DhtStore { value }).into();
33
34 Ok(Self {
35 dht,
36 key,
37 query,
38 futures: Default::default(),
39 started: false,
40 })
41 }
42
43 pub fn then_check<T, FV>(self, check_value: FV) -> DhtStoreValueWithCheck<T, FV> {
46 DhtStoreValueWithCheck {
47 store_value: self,
48 find_value: None,
49 check_value,
50 check_all: false,
51 _marker: Default::default(),
52 }
53 }
54
55 pub fn only_locally(self) {}
57}
58
59impl Future for StoreValue {
60 type Output = ();
61
62 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63 if !self.started {
64 for &peer_id in self.dht.iter_known_peers() {
65 let dht = self.dht.clone();
66 let query = self.query.clone();
67 self.futures.push(Box::pin(async move {
68 dht.query_raw(&peer_id, query).await.ok();
69 }));
70 }
71 self.started = true;
72 }
73
74 loop {
75 match self.futures.poll_next_unpin(cx) {
76 Poll::Ready(Some(_)) => continue,
77 Poll::Ready(None) => break Poll::Ready(()),
78 Poll::Pending => break Poll::Pending,
79 }
80 }
81 }
82}
83
84#[must_use = "futures do nothing unless polled"]
86pub struct DhtStoreValueWithCheck<T, FV> {
87 store_value: StoreValue,
88 find_value: Option<DhtValuesStream<T>>,
89 check_value: FV,
90 check_all: bool,
91 _marker: std::marker::PhantomData<T>,
92}
93
94impl<T, FV> DhtStoreValueWithCheck<T, FV> {
95 pub fn check_all(mut self) -> Self {
97 self.check_all = true;
98 self
99 }
100}
101
102impl<T, FV> Unpin for DhtStoreValueWithCheck<T, FV> {}
103
104impl<T, FV> Future for DhtStoreValueWithCheck<T, FV>
105where
106 FV: FnMut(proto::dht::KeyDescriptionOwned, T) -> Result<bool>,
107 for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
108{
109 type Output = Result<bool>;
110
111 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
112 loop {
113 match &mut self.find_value {
114 None => {
115 futures_util::ready!(self.store_value.poll_unpin(cx));
116 self.find_value = Some(
117 DhtValuesStream::new(
118 self.store_value.dht.clone(),
119 self.store_value.key.as_equivalent_ref(),
120 )
121 .use_new_peers(true),
122 );
123 }
124 Some(find_value) => match find_value.poll_next_unpin(cx) {
125 Poll::Ready(Some((key, value))) => match (self.check_value)(key, value) {
126 Ok(true) => break Poll::Ready(Ok(true)),
127 Ok(false) => continue,
128 Err(e) => break Poll::Ready(Err(e)),
129 },
130 Poll::Ready(None) => break Poll::Ready(Ok(false)),
131 Poll::Pending => break Poll::Pending,
132 },
133 }
134 }
135 }
136}
137
138type StoreFuture = BoxFuture<'static, ()>;