use std::collections::HashMap;
use std::net::SocketAddrV4;
use std::num::NonZeroUsize;
use std::time::{Duration, Instant};
use lru::LruCache;
use tracing::error;
use tracing::{debug, trace};
use iterative_query::IterativeQuery;
use put_query::PutQuery;
use crate::common::{
Id, MAX_BUCKET_SIZE_K, Node, PutMutableRequestArguments, RequestTypeSpecific, RoutingTable,
SignedAnnounce,
};
use crate::core::iterative_query::GetRequestSpecific;
use crate::{MutableItem, PutRequestSpecific};
pub mod handle_request;
pub mod handle_response;
pub mod iterative_query;
pub mod put_query;
pub mod server;
use server::Server;
use server::ServerSettings;
pub use put_query::{ConcurrencyError, PutError, PutQueryError};
pub const REFRESH_TABLE_INTERVAL: Duration = Duration::from_secs(15 * 60);
pub const PING_TABLE_INTERVAL: Duration = Duration::from_secs(5 * 60);
pub const MAX_CACHED_ITERATIVE_QUERIES: usize = 1000;
pub const VERSION: [u8; 4] = [82, 83, 0, 6]; pub const VERSIONS_SUPPORTING_SIGNED_PEERS: &[[u8; 4]] = &[
VERSION,
];
#[derive(Debug)]
pub struct Core {
pub bootstrap: Box<[SocketAddrV4]>,
pub routing_table: RoutingTable,
pub signed_peers_routing_table: RoutingTable,
pub cached_iterative_queries: LruCache<Id, CachedIterativeQuery>,
pub iterative_queries: HashMap<Id, IterativeQuery>,
pub put_queries: HashMap<Id, PutQuery>,
pub last_table_refresh: Instant,
pub last_table_ping: Instant,
pub server: Server,
pub public_address: Option<SocketAddrV4>,
pub firewalled: bool,
pub server_mode: bool,
}
impl Core {
pub fn new(
id: Id,
bootstrap: Vec<SocketAddrV4>,
server_mode: bool,
server_settings: ServerSettings,
) -> Self {
Self {
bootstrap: bootstrap.into(),
routing_table: RoutingTable::new(id),
signed_peers_routing_table: RoutingTable::new(id),
iterative_queries: HashMap::new(),
put_queries: HashMap::new(),
cached_iterative_queries: LruCache::new(
NonZeroUsize::new(MAX_CACHED_ITERATIVE_QUERIES)
.expect("MAX_CACHED_BUCKETS is NonZeroUsize"),
),
last_table_refresh: Instant::now(),
last_table_ping: Instant::now(),
server: Server::new(server_settings),
public_address: None,
firewalled: true,
server_mode,
}
}
fn id(&self) -> &Id {
self.routing_table.id()
}
pub fn should_ping_table(&self) -> bool {
self.last_table_ping.elapsed() > PING_TABLE_INTERVAL
}
pub fn update_last_table_ping(&mut self) {
self.last_table_ping = Instant::now();
}
pub fn should_refresh_table(&self) -> bool {
self.last_table_refresh.elapsed() > REFRESH_TABLE_INTERVAL
}
pub fn update_last_table_refresh(&mut self) {
self.last_table_refresh = Instant::now();
}
pub fn get_cached_closest_nodes(&mut self, target: &Id) -> Option<Box<[Node]>> {
self.cached_iterative_queries
.get(target)
.map(|cached| cached.closest_responding_nodes.clone())
.filter(|closest_nodes| {
!closest_nodes.is_empty() && closest_nodes.iter().any(|n| n.valid_token())
})
}
pub fn closest_nodes_from_done_iterative_query(&self, query: &IterativeQuery) -> Box<[Node]> {
let self_id = self.id();
let table_size = self.routing_table.size();
if let RequestTypeSpecific::FindNode(_) = query.request.request_type {
if query.target() == *self_id {
if !self.bootstrap.is_empty() && table_size == 0 {
error!("Could not bootstrap the routing table");
} else {
debug!(
?self_id,
table_size,
signed_peers_table_size = self.signed_peers_routing_table.size(),
"Populated the routing table"
);
}
};
query
.closest()
.nodes()
.iter()
.take(MAX_BUCKET_SIZE_K)
.cloned()
.collect::<Box<[_]>>()
} else {
let relevant_routing_table = choose_relevant_routing_table(
query.request.request_type.clone(),
&self.routing_table,
&self.signed_peers_routing_table,
);
query
.responders()
.take_until_secure(
relevant_routing_table.responders_based_dht_size_estimate(),
relevant_routing_table.average_subnets(),
)
.to_vec()
.into_boxed_slice()
}
}
pub fn cleanup_done_queries(
&mut self,
done_get_queries: &[(Id, Box<[Node]>)],
done_put_queries: &[(Id, Option<PutError>)],
) -> Option<SocketAddrV4> {
let mut should_ping_alleged_new_address = None;
for (id, closest_nodes) in done_get_queries {
if let Some(query) = self.iterative_queries.remove(id) {
self.cache_iterative_query(&query, closest_nodes);
should_ping_alleged_new_address =
self.update_address_votes_from_iterative_query(&query);
};
}
for (id, _) in done_put_queries {
self.put_queries.remove(id);
}
should_ping_alleged_new_address
}
pub fn check_nodes_to_ping_and_remove_stale_nodes(&mut self) -> Vec<SocketAddrV4> {
let mut to_ping = vec![];
for routing_table in [
&mut self.routing_table,
&mut self.signed_peers_routing_table,
] {
let mut to_remove = Vec::with_capacity(routing_table.size());
for node in routing_table.nodes() {
if node.is_stale() {
to_remove.push(*node.id())
} else if node.should_ping() {
to_ping.push(node.address())
}
}
for id in to_remove {
routing_table.remove(&id);
}
}
to_ping
}
pub fn check_concurrency_errors(
&mut self,
request: &PutRequestSpecific,
) -> Result<(), ConcurrencyError> {
if let PutRequestSpecific::PutMutable(PutMutableRequestArguments {
sig,
cas,
seq,
target,
..
}) = request
&& let Some(PutRequestSpecific::PutMutable(inflight_request)) = self
.put_queries
.get(target)
.map(|existing| &existing.request)
{
debug!(?inflight_request, ?request, "Possible conflict risk");
if *sig == inflight_request.sig {
return Ok(());
} else if *seq <= inflight_request.seq {
return Err(ConcurrencyError::NotMostRecent)?;
} else if cas.is_none() {
self.put_queries.remove(target);
} else if let Some(cas) = cas {
if *cas == inflight_request.seq {
self.put_queries.remove(target);
} else {
return Err(ConcurrencyError::CasFailed)?;
}
};
};
Ok(())
}
pub fn check_outgoing_put_request(&self, target: &Id) -> Option<Response> {
self.put_queries
.get(target)
.and_then(|existing| match existing.request.clone() {
PutRequestSpecific::PutMutable(request) => Some(Response::Mutable(request.into())),
PutRequestSpecific::AnnounceSignedPeer(request) => {
Some(Response::SignedPeers(vec![SignedAnnounce {
key: request.k,
timestamp: request.t,
signature: request.sig,
}]))
}
_ => None,
})
}
pub fn check_responses_from_active_query(&self, target: &Id) -> Option<&[Response]> {
self.iterative_queries
.get(target)
.map(|query| query.responses())
}
pub fn create_iterative_query(
&mut self,
request: GetRequestSpecific,
extra_nodes: Option<&[SocketAddrV4]>,
) -> Option<(IterativeQuery, Vec<SocketAddrV4>)> {
let target = request.target();
if self.iterative_queries.contains_key(&target) {
return None;
}
let routing_table_closest = self.get_closest_nodes_from_routing_tables(&request);
let mut query = IterativeQuery::new(*self.id(), target, request);
for node in routing_table_closest {
query.add_candidate(node)
}
if let Some(closest_responding_nodes) = self.get_cached_closest_nodes(&target) {
for node in closest_responding_nodes {
query.add_candidate(node.clone())
}
}
let mut to_visit = query.closest_candidates();
let candidates = query.closest();
if candidates.is_empty() || candidates.len() < self.bootstrap.len() {
for bootstrapping_node in self.bootstrap.clone() {
to_visit.push(bootstrapping_node)
}
}
if let Some(extra_nodes) = extra_nodes {
to_visit.extend_from_slice(extra_nodes);
}
Some((query, to_visit))
}
fn get_closest_nodes_from_routing_tables(&self, request: &GetRequestSpecific) -> Vec<Node> {
let target = request.target();
match &request {
GetRequestSpecific::FindNode(_) => {
let mut routing_table_closest = self.routing_table.closest_secure(target);
routing_table_closest
.extend_from_slice(&self.signed_peers_routing_table.closest_secure(target));
routing_table_closest
}
GetRequestSpecific::GetSignedPeers(_) => {
self.signed_peers_routing_table.closest_secure(target)
}
_ => self.routing_table.closest_secure(target),
}
}
fn update_address_votes_from_iterative_query(
&mut self,
query: &IterativeQuery,
) -> Option<SocketAddrV4> {
if let Some(new_address) = query.best_address() {
self.public_address = Some(new_address);
if self.public_address.is_none()
|| new_address
!= self
.public_address
.expect("self.public_address is not None")
{
trace!(
?new_address,
"Query responses suggest a different public_address, trying to confirm.."
);
self.firewalled = true;
return Some(new_address);
}
}
None
}
fn cache_iterative_query(&mut self, query: &IterativeQuery, closest_responding_nodes: &[Node]) {
if self.cached_iterative_queries.len() >= MAX_CACHED_ITERATIVE_QUERIES {
let q = self.cached_iterative_queries.pop_lru();
self.decrement_cached_iterative_query_stats(q.map(|q| q.1));
}
let closest = query.closest();
let responders = query.responders();
if closest.nodes().is_empty() {
return;
}
let dht_size_estimate = closest.dht_size_estimate();
let responders_dht_size_estimate = responders.dht_size_estimate();
let subnets_count = responders.subnets_count();
let previous = self.cached_iterative_queries.put(
query.target(),
CachedIterativeQuery {
closest_responding_nodes: closest_responding_nodes.into(),
dht_size_estimate,
responders_dht_size_estimate,
subnets: subnets_count,
request_type: query.request.request_type.clone(),
},
);
self.decrement_cached_iterative_query_stats(previous);
let relevant_routing_table = choose_relevant_routing_table_mut(
&query.request.request_type,
&mut self.routing_table,
&mut self.signed_peers_routing_table,
);
relevant_routing_table.increment_responders_stats(
dht_size_estimate,
responders_dht_size_estimate,
subnets_count,
);
if !matches!(query.request.request_type, RequestTypeSpecific::FindNode(_)) {
debug!(
target = ?query.target(),
responders_size_estimate = ?relevant_routing_table.responders_based_dht_size_estimate(),
responders_subnets_count = ?relevant_routing_table.average_subnets(),
"Storing nodes stats..",
);
}
}
fn decrement_cached_iterative_query_stats(&mut self, query: Option<CachedIterativeQuery>) {
if let Some(CachedIterativeQuery {
dht_size_estimate,
responders_dht_size_estimate,
subnets,
request_type,
..
}) = query
{
match request_type {
RequestTypeSpecific::FindNode(..) => {
self.routing_table
.decrement_dht_size_estimate(dht_size_estimate);
}
_ => {
let relevant_routing_table = choose_relevant_routing_table_mut(
&request_type,
&mut self.routing_table,
&mut self.signed_peers_routing_table,
);
relevant_routing_table.decrement_responders_stats(
dht_size_estimate,
responders_dht_size_estimate,
subnets,
);
}
}
};
}
}
fn choose_relevant_routing_table_mut<'a>(
request_type: &'a RequestTypeSpecific,
basic_routing_table: &'a mut RoutingTable,
signed_peers_routing_table: &'a mut RoutingTable,
) -> &'a mut RoutingTable {
match request_type {
RequestTypeSpecific::GetSignedPeers(_) => signed_peers_routing_table,
_ => basic_routing_table,
}
}
fn choose_relevant_routing_table<'a>(
request_type: RequestTypeSpecific,
basic_routing_table: &'a RoutingTable,
signed_peers_routing_table: &'a RoutingTable,
) -> &'a RoutingTable {
match request_type {
RequestTypeSpecific::GetSignedPeers(_) => signed_peers_routing_table,
_ => basic_routing_table,
}
}
fn supports_signed_peers(version: Option<[u8; 4]>) -> bool {
version
.map(|version| {
VERSIONS_SUPPORTING_SIGNED_PEERS
.iter()
.any(|v| version[0..2] == v[0..2] && version[2..] >= v[2..])
})
.unwrap_or_default()
}
pub(crate) struct CachedIterativeQuery {
closest_responding_nodes: Box<[Node]>,
dht_size_estimate: f64,
responders_dht_size_estimate: f64,
subnets: u8,
request_type: RequestTypeSpecific,
}
#[derive(Debug, Clone)]
pub enum Response {
Peers(Vec<SocketAddrV4>),
SignedPeers(Vec<SignedAnnounce>),
Immutable(Box<[u8]>),
Mutable(MutableItem),
}