use std::{any::type_name, cmp::min, sync::Arc};
use chrono::Utc;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep_until, timeout, Instant};
use tower::{Service, ServiceExt};
use tracing::Span;
use zebra_chain::{diagnostic::task::WaitForPanics, serialization::DateTime32};
use crate::{
constants, meta_addr::MetaAddrChange, peer_set::set::MorePeers, types::MetaAddr, AddressBook,
BoxError, Request, Response,
};
#[cfg(test)]
mod tests;
pub(crate) struct CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
address_book: Arc<std::sync::Mutex<AddressBook>>,
peer_service: S,
min_next_handshake: Instant,
min_next_crawl: Instant,
}
impl<S> std::fmt::Debug for CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CandidateSet")
.field("address_book", &self.address_book)
.field("peer_service", &type_name::<S>())
.field("min_next_handshake", &self.min_next_handshake)
.field("min_next_crawl", &self.min_next_crawl)
.finish()
}
}
impl<S> CandidateSet<S>
where
S: Service<Request, Response = Response, Error = BoxError> + Send,
S::Future: Send + 'static,
{
pub fn new(
address_book: Arc<std::sync::Mutex<AddressBook>>,
peer_service: S,
) -> CandidateSet<S> {
CandidateSet {
address_book,
peer_service,
min_next_handshake: Instant::now(),
min_next_crawl: Instant::now(),
}
}
pub async fn update(&mut self) -> Result<Option<MorePeers>, BoxError> {
self.update_timeout(None).await
}
pub async fn update_initial(
&mut self,
fanout_limit: usize,
) -> Result<Option<MorePeers>, BoxError> {
self.update_timeout(Some(fanout_limit)).await
}
async fn update_timeout(
&mut self,
fanout_limit: Option<usize>,
) -> Result<Option<MorePeers>, BoxError> {
let mut more_peers = None;
if self.min_next_crawl <= Instant::now() {
if let Ok(fanout_result) = timeout(
constants::PEER_GET_ADDR_TIMEOUT,
self.update_fanout(fanout_limit),
)
.await
{
more_peers = fanout_result?;
} else {
info!("timeout waiting for peer service readiness or peer responses");
}
self.min_next_crawl = Instant::now() + constants::MIN_PEER_GET_ADDR_INTERVAL;
}
Ok(more_peers)
}
async fn update_fanout(
&mut self,
fanout_limit: Option<usize>,
) -> Result<Option<MorePeers>, BoxError> {
let fanout_limit = fanout_limit
.map(|fanout_limit| min(fanout_limit, constants::GET_ADDR_FANOUT))
.unwrap_or(constants::GET_ADDR_FANOUT);
debug!(?fanout_limit, "sending GetPeers requests");
let mut responses = FuturesUnordered::new();
let mut more_peers = None;
for attempt in 0..fanout_limit {
if attempt > 0 {
tokio::task::yield_now().await;
}
let peer_service = self.peer_service.ready().await?;
responses.push(peer_service.call(Request::Peers));
}
let mut address_book_updates = FuturesUnordered::new();
while let Some(rsp) = responses.next().await {
match rsp {
Ok(Response::Peers(addrs)) => {
trace!(
addr_count = ?addrs.len(),
?addrs,
"got response to GetPeers"
);
let addrs = validate_addrs(addrs, DateTime32::now());
address_book_updates.push(self.send_addrs(addrs));
more_peers = Some(MorePeers);
}
Err(e) => {
trace!(?e, "got error in GetPeers request");
}
Ok(_) => unreachable!("Peers requests always return Peers responses"),
}
}
while let Some(()) = address_book_updates.next().await {}
Ok(more_peers)
}
async fn send_addrs(&self, addrs: impl IntoIterator<Item = MetaAddr>) {
let addrs: Vec<MetaAddrChange> = addrs
.into_iter()
.map(MetaAddr::new_gossiped_change)
.map(|maybe_addr| maybe_addr.expect("Received gossiped peers always have services set"))
.collect();
debug!(count = ?addrs.len(), "sending gossiped addresses to the address book");
if addrs.is_empty() {
return;
}
let address_book = self.address_book.clone();
let span = Span::current();
tokio::task::spawn_blocking(move || {
span.in_scope(|| address_book.lock().unwrap().extend(addrs))
})
.wait_for_panics()
.await
}
pub async fn next(&mut self) -> Option<MetaAddr> {
let address_book = self.address_book.clone();
let next_peer = move || -> Option<MetaAddr> {
let mut guard = address_book.lock().unwrap();
let instant_now = std::time::Instant::now();
let chrono_now = Utc::now();
let next_peer = guard.reconnection_peers(instant_now, chrono_now).next()?;
let next_peer = MetaAddr::new_reconnect(next_peer.addr);
guard.update(next_peer)
};
let span = Span::current();
let next_peer = tokio::task::spawn_blocking(move || span.in_scope(next_peer))
.wait_for_panics()
.await?;
sleep_until(self.min_next_handshake).await;
self.min_next_handshake = Instant::now() + constants::MIN_OUTBOUND_PEER_CONNECTION_INTERVAL;
Some(next_peer)
}
#[cfg(any(test, feature = "proptest-impl"))]
#[allow(dead_code)]
pub async fn address_book(&self) -> Arc<std::sync::Mutex<AddressBook>> {
self.address_book.clone()
}
}
fn validate_addrs(
addrs: impl IntoIterator<Item = MetaAddr>,
last_seen_limit: DateTime32,
) -> impl Iterator<Item = MetaAddr> {
let mut addrs: Vec<_> = addrs.into_iter().collect();
limit_last_seen_times(&mut addrs, last_seen_limit);
addrs.into_iter()
}
fn limit_last_seen_times(addrs: &mut Vec<MetaAddr>, last_seen_limit: DateTime32) {
let last_seen_times = addrs.iter().map(|meta_addr| {
meta_addr
.untrusted_last_seen()
.expect("unexpected missing last seen: should be provided by deserialization")
});
let oldest_seen = last_seen_times.clone().min().unwrap_or(DateTime32::MIN);
let newest_seen = last_seen_times.max().unwrap_or(DateTime32::MAX);
if newest_seen > last_seen_limit {
let offset = newest_seen
.checked_duration_since(last_seen_limit)
.expect("unexpected underflow: just checked newest_seen is greater");
if oldest_seen.checked_sub(offset).is_some() {
for addr in addrs {
let last_seen = addr
.untrusted_last_seen()
.expect("unexpected missing last seen: should be provided by deserialization");
let last_seen = last_seen
.checked_sub(offset)
.expect("unexpected underflow: just checked oldest_seen");
addr.set_untrusted_last_seen(last_seen);
}
} else {
addrs.clear();
}
}
}