use super::PeerDiscoverySettings;
use crate::api::peer_discovery_internals::get_peers_all;
use crate::error::PeerDiscoveryError;
use crate::{api::node::get_info, NodeConf, NodeError, PeerInfo};
use bounded_integer::BoundedU16;
use bounded_vec::NonEmptyVec;
use ergo_chain_types::PeerAddr;
use std::fmt::Debug;
use std::{
collections::{BinaryHeap, HashSet},
time::Duration,
};
use url::Url;
pub(crate) async fn peer_discovery_inner_chrome(
scan: ChromePeerDiscoveryScan,
max_parallel_tasks: BoundedU16<1, { u16::MAX }>,
timeout: Duration,
) -> Result<ChromePeerDiscoveryScan, PeerDiscoveryError> {
if timeout.as_secs() < 180 {
return Err(PeerDiscoveryError::TimeoutTooShort);
}
let global_timeout = timeout.checked_sub(Duration::from_secs(180)).unwrap();
let settings = PeerDiscoverySettings {
max_parallel_tasks,
task_2_buffer_length: 50,
global_timeout,
timeout_of_individual_node_request: Duration::from_secs(6),
};
let (tx_msg, rx_msg) = futures::channel::mpsc::channel::<Msg>(settings.task_2_buffer_length);
let (tx_node_request, rx_node_request) =
futures::channel::mpsc::channel::<NodeRequest>(settings.task_2_buffer_length);
let node_request_stream = rx_node_request;
let msg_stream = rx_msg;
peer_discovery_impl_chrome(
scan,
tx_msg,
msg_stream,
tx_node_request,
node_request_stream,
settings,
)
.await
}
async fn peer_discovery_impl_chrome(
scan: ChromePeerDiscoveryScan,
tx_msg: futures::channel::mpsc::Sender<Msg>,
msg_stream: futures::channel::mpsc::Receiver<Msg>,
mut tx_node_request: futures::channel::mpsc::Sender<NodeRequest>,
node_request_stream: futures::channel::mpsc::Receiver<NodeRequest>,
settings: PeerDiscoverySettings,
) -> Result<ChromePeerDiscoveryScan, PeerDiscoveryError> {
use futures::future::FutureExt;
use futures::{SinkExt, StreamExt};
let max_parallel_requests = settings.max_parallel_tasks.get() as usize;
let ChromePeerDiscoveryScan {
active_peers,
mut visited_peers,
seeds_set,
mut pending_requests,
} = scan;
let mut active_peers: HashSet<Url> = active_peers.into_iter().collect();
spawn_http_request_task_chrome(
tx_msg,
node_request_stream,
settings.max_parallel_tasks,
settings.timeout_of_individual_node_request,
);
if let Some(node_request) = pending_requests.pop() {
tx_node_request
.send(node_request)
.await
.map_err(|_| PeerDiscoveryError::MpscSender)?;
} else {
return Err(PeerDiscoveryError::NoPendingNodeRequests);
}
let mut count = 1;
let mut chrome_request_count = 2;
let mut pending_requests_after_timeout = BinaryHeap::new();
let rx_timeout_signal = {
let (tx, rx) = futures::channel::oneshot::channel::<()>();
wasm_bindgen_futures::spawn_local(async move {
crate::wasm_timer::Delay::new(settings.global_timeout)
.await
.expect("wasm_timer::Delay: can't spawn global timeout");
tx.send(()).unwrap();
});
rx.into_stream()
};
enum C {
RxMsg(Msg),
RxTimeoutSignal,
}
type CombinedStream = std::pin::Pin<Box<dyn futures::stream::Stream<Item = C> + Send>>;
let streams: Vec<CombinedStream> = vec![
msg_stream.map(C::RxMsg).boxed(),
rx_timeout_signal.map(|_| C::RxTimeoutSignal).boxed(),
];
let mut combined_stream = futures::stream::select_all(streams);
let mut add_peers = true;
'loop_: while let Some(n) = combined_stream.next().await {
match n {
C::RxMsg(p) => {
while let Some(peer) = pending_requests.pop() {
let mut url = peer.get_url().clone();
#[allow(clippy::unwrap_used)]
url.set_port(None).unwrap();
let room_for_requests = (max_parallel_requests > chrome_request_count)
&& max_parallel_requests - chrome_request_count >= 2;
let peers_all = if let NodeRequest::PeersAll(_) = peer {
true
} else {
false
};
if peers_all || !visited_peers.contains(&url) {
if room_for_requests {
match tx_node_request.try_send(peer.clone()) {
Ok(_) => {
chrome_request_count += 2;
if !peers_all {
count += 1;
}
visited_peers.insert(url);
}
Err(e) => {
if e.is_full() {
pending_requests.push(peer);
break;
} else if e.is_disconnected() {
return Err(PeerDiscoveryError::MpscSender);
}
unreachable!()
}
}
} else {
pending_requests.push(peer);
break;
}
}
}
match p {
Msg::AddActiveNode(mut url) => {
#[allow(clippy::unwrap_used)]
url.set_port(None).unwrap();
active_peers.insert(url.clone());
visited_peers.insert(url);
count -= 1;
chrome_request_count -= 2;
}
Msg::InfoRequestSucceeded(url) => {
chrome_request_count -= 2;
pending_requests.push(NodeRequest::PeersAll(url));
}
Msg::InfoRequestFailedWithoutTimeout(mut url) => {
#[allow(clippy::unwrap_used)]
url.set_port(None).unwrap();
visited_peers.insert(url);
count -= 1;
chrome_request_count -= 2;
}
Msg::InfoRequestFailedWithTimeout(mut url) => {
#[allow(clippy::unwrap_used)]
url.set_port(None).unwrap();
visited_peers.insert(url);
count -= 1;
chrome_request_count -= 1;
}
Msg::PeersAllRequestFailedWithoutTimeout(mut url) => {
#[allow(clippy::unwrap_used)]
url.set_port(None).unwrap();
visited_peers.insert(url);
count -= 1;
chrome_request_count -= 2;
}
Msg::PeersAllRequestFailedWithTimeout(mut url) => {
#[allow(clippy::unwrap_used)]
url.set_port(None).unwrap();
visited_peers.insert(url);
count -= 1;
chrome_request_count -= 1;
}
Msg::PreflightRequestFailed => {
chrome_request_count -= 1;
}
Msg::CheckPeers(mut peers) => {
use rand::seq::SliceRandom;
use rand::thread_rng;
peers.shuffle(&mut thread_rng());
if add_peers {
pending_requests.extend(
peers
.into_iter()
.map(|p| NodeRequest::Info(p.addr.as_http_url())),
);
} else {
pending_requests_after_timeout.extend(
peers
.into_iter()
.map(|p| NodeRequest::Info(p.addr.as_http_url())),
);
}
}
}
if count == 0 && pending_requests.is_empty() {
break 'loop_;
}
}
C::RxTimeoutSignal => {
add_peers = false;
while let Some(req) = pending_requests.pop() {
pending_requests_after_timeout.push(req);
}
break;
}
}
}
drop(tx_node_request);
let active_peers: Vec<_> = active_peers
.difference(&seeds_set)
.into_iter()
.cloned()
.collect();
crate::wasm_timer::Delay::new(Duration::from_secs(180)).await?;
Ok(ChromePeerDiscoveryScan {
active_peers,
visited_peers,
pending_requests: pending_requests_after_timeout,
seeds_set,
})
}
fn spawn_http_request_task_chrome(
tx_msg: futures::channel::mpsc::Sender<Msg>,
node_request_stream: impl futures::Stream<Item = NodeRequest> + Send + 'static,
max_parallel_tasks: BoundedU16<1, { u16::MAX }>,
request_timeout_duration: Duration,
) {
use futures::{SinkExt, StreamExt};
use wasm_bindgen_futures::spawn_local;
let mapped_stream = node_request_stream
.map(move |node_request| {
let mut tx_msg = tx_msg.clone();
async move {
spawn_local(async move {
let mut url = node_request.get_url().clone();
#[allow(clippy::unwrap_used)]
url.set_port(Some(9053)).unwrap();
#[allow(clippy::unwrap_used)]
let node_conf = NodeConf {
addr: PeerAddr(url.socket_addrs(|| Some(9053)).unwrap()[0]),
api_key: None,
timeout: Some(request_timeout_duration),
};
let chrome_timeout_str = "error sending request: JsValue(AbortError: \
The user aborted a request.";
match node_request {
NodeRequest::Info(url) => match get_info(node_conf).await {
Ok(_) => {
let _ = tx_msg.send(Msg::InfoRequestSucceeded(url)).await;
}
Err(e) => {
if let NodeError::ReqwestError(r) = e {
if r.to_string().starts_with(chrome_timeout_str) {
let _ = tx_msg
.send(Msg::InfoRequestFailedWithTimeout(url))
.await;
spawn_local(async move {
let _ = crate::wasm_timer::Delay::new(
Duration::from_secs(80),
)
.await;
let _ = tx_msg.send(Msg::PreflightRequestFailed).await;
});
} else {
#[allow(clippy::unwrap_used)]
let _ = tx_msg
.send(Msg::InfoRequestFailedWithoutTimeout(url))
.await;
}
} else {
#[allow(clippy::unwrap_used)]
let _ = tx_msg
.send(Msg::InfoRequestFailedWithoutTimeout(url))
.await;
}
}
},
NodeRequest::PeersAll(url) => {
match get_peers_all(node_conf).await {
Ok(peers) => {
tx_msg.send(Msg::CheckPeers(peers)).await.unwrap();
tx_msg.send(Msg::AddActiveNode(url.clone())).await.unwrap();
}
Err(e) => {
if let NodeError::ReqwestError(r) = e {
if r.to_string().starts_with(chrome_timeout_str) {
let _ = tx_msg
.send(Msg::PeersAllRequestFailedWithTimeout(url))
.await;
spawn_local(async move {
crate::wasm_timer::Delay::new(Duration::from_secs(
80,
))
.await
.unwrap();
let _ =
tx_msg.send(Msg::PreflightRequestFailed).await;
});
}
} else {
let _ = tx_msg
.send(Msg::PeersAllRequestFailedWithoutTimeout(url))
.await;
}
}
}
}
}
});
}
})
.buffer_unordered(max_parallel_tasks.get() as usize);
let spawn_fn_new = wasm_bindgen_futures::spawn_local;
spawn_fn_new(mapped_stream.for_each(|_| async move {}));
}
#[derive(Debug)]
pub(crate) enum Msg {
AddActiveNode(Url),
InfoRequestSucceeded(Url),
InfoRequestFailedWithoutTimeout(Url),
InfoRequestFailedWithTimeout(Url),
PeersAllRequestFailedWithTimeout(Url),
PeersAllRequestFailedWithoutTimeout(Url),
PreflightRequestFailed,
CheckPeers(Vec<PeerInfo>),
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum NodeRequest {
Info(Url),
PeersAll(Url),
}
impl NodeRequest {
fn get_url(&self) -> &Url {
match self {
NodeRequest::Info(url) => url,
NodeRequest::PeersAll(url) => url,
}
}
}
#[derive(Debug, Clone)]
pub struct ChromePeerDiscoveryScan {
active_peers: Vec<Url>,
visited_peers: HashSet<Url>,
seeds_set: HashSet<Url>,
pending_requests: BinaryHeap<NodeRequest>,
}
impl ChromePeerDiscoveryScan {
pub fn new(seeds: NonEmptyVec<Url>) -> Self {
let mut seeds_set: HashSet<Url> = HashSet::new();
for mut seed_url in seeds {
#[allow(clippy::unwrap_used)]
seed_url.set_port(None).unwrap();
seeds_set.insert(seed_url);
}
let mut pending_requests = BinaryHeap::new();
for url in &seeds_set {
pending_requests.push(NodeRequest::Info(url.clone()));
}
ChromePeerDiscoveryScan {
active_peers: vec![],
visited_peers: HashSet::new(),
seeds_set,
pending_requests,
}
}
pub fn active_peers(&self) -> Vec<Url> {
self.active_peers.clone()
}
}