use super::*;
impl_veilid_log_facility!("rtab");
const DEFAULT_WIDE_FIND_NODE_SEGMENTS: u8 = 16;
impl RoutingTable {
#[cfg_attr(feature = "instrument", instrument(level = "trace", skip(self), err, fields(__VEILID_LOG_KEY = self.log_key())))]
pub async fn find_nodes_close_to_hash_coordinate(
&self,
node_ref: FilteredNodeRef,
hash_coordinate: HashCoordinate,
capabilities: Vec<VeilidCapability>,
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
let rpc_processor = self.rpc_processor();
let res = network_result_try!(
Box::pin(rpc_processor.rpc_call_find_node(
Destination::direct(node_ref, None),
hash_coordinate,
capabilities
))
.await?
);
Ok(NetworkResult::value(
self.register_nodes_with_peer_info_list(res.answer),
))
}
#[cfg_attr(feature = "instrument", instrument(level = "trace", skip(self), err, fields(__VEILID_LOG_KEY = self.log_key())))]
pub async fn find_new_nodes_close_to_self(
&self,
crypto_kind: CryptoKind,
existing_node_ids: &HashSet<NodeId>,
node_ref: FilteredNodeRef,
capabilities: Vec<VeilidCapability>,
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
let self_node_id = self.node_id(crypto_kind);
let found_node_refs = network_result_try!(
Box::pin(self.find_nodes_close_to_hash_coordinate(
node_ref,
self_node_id.to_hash_coordinate(),
capabilities,
))
.await?
);
let cur_ts = Timestamp::now();
let new_node_refs = found_node_refs
.into_iter()
.filter(|nr| {
!nr.node_ids()
.contains_any_from_iter(existing_node_ids.iter())
&& nr.state(cur_ts) != BucketEntryState::Punished
})
.collect();
Ok(NetworkResult::value(new_node_refs))
}
#[cfg_attr(feature = "instrument", instrument(level = "trace", skip(self), err, fields(__VEILID_LOG_KEY = self.log_key())))]
pub async fn find_nodes_close_to_node_ref(
&self,
crypto_kind: CryptoKind,
node_ref: FilteredNodeRef,
capabilities: Vec<VeilidCapability>,
) -> EyreResult<NetworkResult<Vec<NodeRef>>> {
let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else {
bail!("no target node ids for this crypto kind");
};
Box::pin(self.find_nodes_close_to_hash_coordinate(
node_ref,
target_node_id.to_hash_coordinate(),
capabilities,
))
.await
}
#[cfg_attr(feature = "instrument", instrument(level = "trace", skip(self), fields(__VEILID_LOG_KEY = self.log_key())))]
pub async fn find_new_nodes_wide(
&self,
crypto_kind: CryptoKind,
existing_node_ids: &HashSet<NodeId>,
routing_domain: RoutingDomain,
segments: Option<u8>,
capabilities: Vec<VeilidCapability>,
) -> EyreResult<Vec<NodeRef>> {
let segments = segments.unwrap_or(DEFAULT_WIDE_FIND_NODE_SEGMENTS);
let self_node_id = self.node_id(crypto_kind);
if !(2..=64).contains(&segments) {
bail!("segments must be between 2 and 64");
}
let spacing = 256usize / (segments as usize);
let shift = spacing / 2;
let mut shift_segment_bytes = BytesMut::zeroed(HASH_COORDINATE_LENGTH);
shift_segment_bytes[0] = shift as u8;
let mut spacing_segment_bytes = BytesMut::zeroed(HASH_COORDINATE_LENGTH);
spacing_segment_bytes[0] = spacing as u8;
let shift_hash_distance = HashDistance::new_from_bytes(shift_segment_bytes.freeze());
let spacing_hash_distance = HashDistance::new_from_bytes(spacing_segment_bytes.freeze());
let mut current_hash_distance = shift_hash_distance;
let mut target_hash_coordinates = Vec::new();
for _ in 0..segments {
let target_hash_coordinate = self_node_id
.to_hash_coordinate()
.offset(¤t_hash_distance);
target_hash_coordinates.push(target_hash_coordinate);
current_hash_distance += spacing_hash_distance.clone();
}
let target_node_refs = {
let snapshot = self.snapshot_entries(Timestamp::now(), BucketEntryState::Unreliable);
let closest_results = self.get_nodes_closest_to_multiple_hash_coordinates(
routing_domain,
&target_hash_coordinates,
&capabilities,
&snapshot,
);
let mut target_node_refs = HashSet::new();
for (i, result) in closest_results.into_iter().enumerate() {
if let Some(target_node_ref) = result {
if !target_node_refs.insert(target_node_ref) {
veilid_log!(self trace
"find_nodes_wide: duplicate node found: {:?}",
target_hash_coordinates[i]
);
}
} else {
veilid_log!(self trace
"find_nodes_wide: no node found closest to hash coordinate: {:?}",
target_hash_coordinates[i]
);
}
}
target_node_refs
};
let mut unord = FuturesUnordered::new();
let mut found_node_refs = HashSet::new();
for target_node_ref in target_node_refs {
let capabilities = capabilities.clone();
unord.push(async move {
network_result_value_or_log!(self match pin_future!(self.find_nodes_close_to_node_ref(crypto_kind, target_node_ref.clone(), capabilities.clone())).await {
Err(e) => {
veilid_log!(self error
"find_nodes_close_to_node_ref failed for {:?}: {:?}",
target_node_ref, e
);
return vec![];
}
Ok(v) => v,
} => [ format!(": crypto_kind={} target_node_ref={} capabilities={:?}", crypto_kind, target_node_ref, capabilities) ] {
vec![]
})
});
}
while let Some(nodes) = unord.next().await {
for node in nodes {
if !found_node_refs.insert(node.clone()) {
veilid_log!(self trace
"find_nodes_wide: duplicate node found near target node: {:?}",
node
);
}
}
}
let cur_ts = Timestamp::now();
let new_node_refs = found_node_refs
.into_iter()
.filter(|nr| {
!nr.node_ids()
.contains_any_from_iter(existing_node_ids.iter())
&& nr.state(cur_ts) != BucketEntryState::Punished
})
.collect();
Ok(new_node_refs)
}
}