use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Result;
use bytes::Bytes;
use futures_util::future::BoxFuture;
use futures_util::stream::FuturesUnordered;
use futures_util::{FutureExt, StreamExt};
use tl_proto::TlRead;
use super::streams::DhtValuesStream;
use super::Node;
use crate::proto;
#[must_use = "futures do nothing unless polled"]
pub struct StoreValue {
dht: Arc<Node>,
key: proto::dht::KeyOwned,
query: Bytes,
futures: FuturesUnordered<StoreFuture>,
started: bool,
}
impl StoreValue {
pub(super) fn new(dht: Arc<Node>, value: proto::dht::Value<'_>) -> Result<Self> {
dht.storage().insert(value)?;
let key = value.key.key.as_equivalent_owned();
let query = tl_proto::serialize(proto::rpc::DhtStore { value }).into();
Ok(Self {
dht,
key,
query,
futures: Default::default(),
started: false,
})
}
pub fn then_check<T, FV>(self, check_value: FV) -> DhtStoreValueWithCheck<T, FV> {
DhtStoreValueWithCheck {
store_value: self,
find_value: None,
check_value,
check_all: false,
_marker: Default::default(),
}
}
pub fn only_locally(self) {}
}
impl Future for StoreValue {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.started {
for &peer_id in self.dht.iter_known_peers() {
let dht = self.dht.clone();
let query = self.query.clone();
self.futures.push(Box::pin(async move {
dht.query_raw(&peer_id, query).await.ok();
}));
}
self.started = true;
}
loop {
match self.futures.poll_next_unpin(cx) {
Poll::Ready(Some(_)) => continue,
Poll::Ready(None) => break Poll::Ready(()),
Poll::Pending => break Poll::Pending,
}
}
}
}
#[must_use = "futures do nothing unless polled"]
pub struct DhtStoreValueWithCheck<T, FV> {
store_value: StoreValue,
find_value: Option<DhtValuesStream<T>>,
check_value: FV,
check_all: bool,
_marker: std::marker::PhantomData<T>,
}
impl<T, FV> DhtStoreValueWithCheck<T, FV> {
pub fn check_all(mut self) -> Self {
self.check_all = true;
self
}
}
impl<T, FV> Unpin for DhtStoreValueWithCheck<T, FV> {}
impl<T, FV> Future for DhtStoreValueWithCheck<T, FV>
where
FV: FnMut(proto::dht::KeyDescriptionOwned, T) -> Result<bool>,
for<'a> T: TlRead<'a, Repr = tl_proto::Boxed> + Send + 'static,
{
type Output = Result<bool>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match &mut self.find_value {
None => {
futures_util::ready!(self.store_value.poll_unpin(cx));
self.find_value = Some(
DhtValuesStream::new(
self.store_value.dht.clone(),
self.store_value.key.as_equivalent_ref(),
)
.use_new_peers(true),
);
}
Some(find_value) => match find_value.poll_next_unpin(cx) {
Poll::Ready(Some((key, value))) => match (self.check_value)(key, value) {
Ok(true) => break Poll::Ready(Ok(true)),
Ok(false) => continue,
Err(e) => break Poll::Ready(Err(e)),
},
Poll::Ready(None) => break Poll::Ready(Ok(false)),
Poll::Pending => break Poll::Pending,
},
}
}
}
}
type StoreFuture = BoxFuture<'static, ()>;