use crate::agent::RANDOM_NODES_CHOICES;
use klukai_types::{agent::SplitPool, config::DEFAULT_GOSSIP_PORT};
use hickory_resolver::{
TokioResolver,
config::{NameServerConfigGroup, ResolverConfig},
name_server::TokioConnectionProvider,
proto::rr::{RData, RecordType},
};
use rand::{SeedableRng, rngs::StdRng, seq::IteratorRandom};
use std::{collections::HashSet, net::SocketAddr};
use tokio::task::block_in_place;
use tracing::{debug, error, warn};
pub async fn generate_bootstrap(
bootstrap: &[String],
our_addr: SocketAddr,
pool: &SplitPool,
) -> eyre::Result<Vec<SocketAddr>> {
let mut addrs = match resolve_bootstrap(bootstrap, our_addr).await {
Ok(addrs) => addrs,
Err(e) => {
warn!("could not resolve bootstraps, falling back to in-db nodes: {e}");
HashSet::new()
}
};
if addrs.is_empty() {
let conn = pool.read().await?;
addrs = block_in_place(|| {
let mut prepped =
conn.prepare("SELECT address FROM __corro_members ORDER BY RANDOM() LIMIT 5")?;
let node_addrs = prepped.query_map([], |row| row.get::<_, String>(0))?;
Ok::<_, rusqlite::Error>(
node_addrs
.flatten()
.flat_map(|addr| addr.parse())
.filter(|addr| match (our_addr, addr) {
(SocketAddr::V6(our_ip), SocketAddr::V6(ip)) if our_ip != *ip => true,
(SocketAddr::V4(our_ip), SocketAddr::V4(ip)) if our_ip != *ip => true,
_ => {
debug!("ignore node with addr: {addr}");
false
}
})
.collect(),
)
})?;
}
let mut rng = StdRng::from_os_rng();
Ok(addrs
.into_iter()
.choose_multiple(&mut rng, RANDOM_NODES_CHOICES))
}
async fn resolve_bootstrap(
bootstrap: &[String],
our_addr: SocketAddr,
) -> eyre::Result<HashSet<SocketAddr>> {
let mut addrs = HashSet::new();
if bootstrap.is_empty() {
return Ok(addrs);
}
let system_resolver = TokioResolver::builder_tokio()?.build();
for s in bootstrap {
if let Ok(addr) = s.parse() {
addrs.insert(addr);
} else {
debug!("attempting to resolve {s}");
let mut host_port_dns_server = s.split('@');
let mut host_port = host_port_dns_server.next().unwrap().split(':');
let mut resolver = None;
if let Some(dns_server) = host_port_dns_server.next() {
debug!("attempting to use resolver: {dns_server}");
let (ip, port) = if let Ok(addr) = dns_server.parse::<SocketAddr>() {
(addr.ip(), addr.port())
} else {
(dns_server.parse()?, 53)
};
let custom_config = ResolverConfig::from_parts(
None,
vec![],
NameServerConfigGroup::from_ips_clear(&[ip], port, true),
);
resolver = Some(
TokioResolver::builder_with_config(
custom_config,
TokioConnectionProvider::default(),
)
.build(),
);
debug!("using resolver: {dns_server}");
}
if let Some(hostname) = host_port.next() {
debug!("Resolving '{hostname}' to an IP");
match resolver
.as_ref()
.unwrap_or(&system_resolver)
.lookup(
hostname,
if our_addr.is_ipv6() {
RecordType::AAAA
} else {
RecordType::A
},
)
.await
{
Ok(response) => {
debug!("Successfully resolved things: {response:?}");
let port: u16 = host_port
.next()
.and_then(|p| p.parse().ok())
.unwrap_or(DEFAULT_GOSSIP_PORT);
for addr in response.iter().filter_map(|rdata| match rdata {
RData::A(ip) => Some(SocketAddr::from((ip.0, port))),
RData::AAAA(ip) => Some(SocketAddr::from((ip.0, port))),
_ => None,
}) {
match (our_addr, addr) {
(SocketAddr::V4(our_ip), SocketAddr::V4(ip)) if our_ip != ip => {}
(SocketAddr::V6(our_ip), SocketAddr::V6(ip)) if our_ip != ip => {}
_ => {
debug!("ignore node with addr: {addr}");
continue;
}
}
addrs.insert(addr);
}
}
Err(e) => {
error!("could not resolve '{hostname}': {e}");
}
}
}
}
}
Ok(addrs)
}