use futures_lite::{FutureExt, Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::client::ConcurrencyError;
use crate::SignedPacket;
use super::PublishError;
pub async fn publish_both_networks(
dht_future: impl Future<Output = Result<(), PublishError>> + Send + 'static,
relays_future: impl Future<Output = Result<(), PublishError>> + Send + 'static,
) -> Result<(), PublishError> {
SelectFuture {
first_result: None,
dht_future: dht_future.boxed(),
relays_future: relays_future.boxed(),
}
.await
}
pub struct SelectFuture {
first_result: Option<(Network, Result<(), PublishError>)>,
dht_future: Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send>>,
relays_future: Pin<Box<dyn Future<Output = Result<(), PublishError>> + Send>>,
}
impl SelectFuture {
fn process_result(
&mut self,
network: Network,
result: Result<(), PublishError>,
) -> Poll<Result<(), PublishError>> {
match self.first_result {
Some((_, Ok(()))) => Poll::Ready(match result {
Err(PublishError::Concurrency(ConcurrencyError::CasFailed)) => Ok(()),
_ => result,
}),
Some(_) => Poll::Ready(result),
None => {
self.first_result = Some((network, result));
Poll::Pending
}
}
}
}
impl Future for SelectFuture {
type Output = Result<(), PublishError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let done_network = this.first_result.as_ref().map(|r| r.0);
if !matches!(done_network, Some(Network::Dht)) {
if let Poll::Ready(result) = this.dht_future.as_mut().poll(cx) {
return this.process_result(Network::Dht, result);
}
}
if !matches!(done_network, Some(Network::Relays)) {
if let Poll::Ready(result) = this.relays_future.as_mut().poll(cx) {
return this.process_result(Network::Relays, result);
}
}
Poll::Pending
}
}
pub fn select_stream(
dht_stream: Pin<Box<dyn Stream<Item = SignedPacket> + Send>>,
relays_stream: Pin<Box<dyn Stream<Item = SignedPacket> + Send>>,
) -> SelectStream {
SelectStream {
mode: Mode::RoundRobin(Network::Dht),
dht_stream,
relays_stream,
}
}
pub struct SelectStream {
mode: Mode,
dht_stream: Pin<Box<dyn Stream<Item = SignedPacket> + Send>>,
relays_stream: Pin<Box<dyn Stream<Item = SignedPacket> + Send>>,
}
#[derive(Clone, Debug)]
enum Mode {
RoundRobin(Network),
Exhausted(Network),
}
#[derive(Clone, Copy, Debug)]
enum Network {
Dht,
Relays,
}
impl Stream for SelectStream {
type Item = SignedPacket;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match this.mode {
Mode::RoundRobin(current) => {
let (primary, secondary) = match current {
Network::Dht => (this.dht_stream.as_mut(), this.relays_stream.as_mut()),
Network::Relays => (this.relays_stream.as_mut(), this.dht_stream.as_mut()),
};
this.mode = Mode::RoundRobin(match current {
Network::Dht => Network::Relays,
Network::Relays => Network::Dht,
});
match primary.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => {
this.mode = Mode::Exhausted(current);
secondary.poll_next(cx)
}
Poll::Pending => Poll::Pending,
}
}
Mode::Exhausted(exhausted) => match exhausted {
Network::Dht => this.relays_stream.poll_next(cx),
Network::Relays => this.dht_stream.poll_next(cx),
},
}
}
}