use std::collections::{HashMap, VecDeque};
use std::io;
use std::net::{Ipv6Addr, SocketAddrV6, UdpSocket};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use rns_core::transport::types::InterfaceId;
use crate::event::{Event, EventSender};
use crate::interface::Writer;
pub const DEFAULT_DISCOVERY_PORT: u16 = 29716;
pub const DEFAULT_DATA_PORT: u16 = 42671;
pub const DEFAULT_GROUP_ID: &[u8] = b"reticulum";
pub const DEFAULT_IFAC_SIZE: usize = 16;
pub const HW_MTU: usize = 1196;
pub const SCOPE_LINK: &str = "2";
pub const SCOPE_ADMIN: &str = "4";
pub const SCOPE_SITE: &str = "5";
pub const SCOPE_ORGANISATION: &str = "8";
pub const SCOPE_GLOBAL: &str = "e";
pub const MULTICAST_PERMANENT_ADDRESS_TYPE: &str = "0";
pub const MULTICAST_TEMPORARY_ADDRESS_TYPE: &str = "1";
pub const PEERING_TIMEOUT: f64 = 22.0;
pub const ANNOUNCE_INTERVAL: f64 = 1.6;
pub const PEER_JOB_INTERVAL: f64 = 4.0;
pub const MCAST_ECHO_TIMEOUT: f64 = 6.5;
pub const BITRATE_GUESS: u64 = 10_000_000;
pub const MULTI_IF_DEQUE_LEN: usize = 48;
pub const MULTI_IF_DEQUE_TTL: f64 = 0.75;
pub const REVERSE_PEERING_MULTIPLIER: f64 = 3.25;
pub const ALL_IGNORE_IFS: &[&str] = &["lo0"];
#[derive(Debug, Clone)]
pub struct AutoConfig {
pub name: String,
pub group_id: Vec<u8>,
pub discovery_scope: String,
pub discovery_port: u16,
pub data_port: u16,
pub multicast_address_type: String,
pub allowed_interfaces: Vec<String>,
pub ignored_interfaces: Vec<String>,
pub configured_bitrate: u64,
pub interface_id: InterfaceId,
pub ingress_control: rns_core::transport::types::IngressControlConfig,
pub runtime: Arc<Mutex<AutoRuntime>>,
}
#[derive(Debug, Clone)]
pub struct AutoRuntime {
pub announce_interval_secs: f64,
pub peer_timeout_secs: f64,
pub peer_job_interval_secs: f64,
}
impl AutoRuntime {
pub fn from_config(_config: &AutoConfig) -> Self {
Self {
announce_interval_secs: ANNOUNCE_INTERVAL,
peer_timeout_secs: PEERING_TIMEOUT,
peer_job_interval_secs: PEER_JOB_INTERVAL,
}
}
}
#[derive(Debug, Clone)]
pub struct AutoRuntimeConfigHandle {
pub interface_name: String,
pub runtime: Arc<Mutex<AutoRuntime>>,
pub startup: AutoRuntime,
}
impl Default for AutoConfig {
fn default() -> Self {
let mut config = AutoConfig {
name: String::new(),
group_id: DEFAULT_GROUP_ID.to_vec(),
discovery_scope: SCOPE_LINK.to_string(),
discovery_port: DEFAULT_DISCOVERY_PORT,
data_port: DEFAULT_DATA_PORT,
multicast_address_type: MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
allowed_interfaces: Vec::new(),
ignored_interfaces: Vec::new(),
configured_bitrate: BITRATE_GUESS,
interface_id: InterfaceId(0),
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(AutoRuntime {
announce_interval_secs: ANNOUNCE_INTERVAL,
peer_timeout_secs: PEERING_TIMEOUT,
peer_job_interval_secs: PEER_JOB_INTERVAL,
})),
};
let startup = AutoRuntime::from_config(&config);
config.runtime = Arc::new(Mutex::new(startup));
config
}
}
pub fn derive_multicast_address(group_id: &[u8], address_type: &str, scope: &str) -> String {
let group_hash = rns_crypto::sha256::sha256(group_id);
let g = &group_hash;
let w1 = (g[2] as u16) << 8 | g[3] as u16;
let w2 = (g[4] as u16) << 8 | g[5] as u16;
let w3 = (g[6] as u16) << 8 | g[7] as u16;
let w4 = (g[8] as u16) << 8 | g[9] as u16;
let w5 = (g[10] as u16) << 8 | g[11] as u16;
let w6 = (g[12] as u16) << 8 | g[13] as u16;
format!(
"ff{}{}:0:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
address_type, scope, w1, w2, w3, w4, w5, w6
)
}
pub fn parse_multicast_addr(addr: &str) -> Option<Ipv6Addr> {
addr.parse::<Ipv6Addr>().ok()
}
pub fn compute_discovery_token(group_id: &[u8], link_local_addr: &str) -> [u8; 32] {
let mut input = group_id.to_vec();
input.extend_from_slice(link_local_addr.as_bytes());
rns_crypto::sha256::sha256(&input)
}
#[derive(Debug, Clone)]
pub struct LocalInterface {
pub name: String,
pub link_local_addr: String,
pub index: u32,
}
pub fn enumerate_interfaces(allowed: &[String], ignored: &[String]) -> Vec<LocalInterface> {
let mut result = Vec::new();
unsafe {
let mut ifaddrs: *mut libc::ifaddrs = std::ptr::null_mut();
if libc::getifaddrs(&mut ifaddrs) != 0 {
return result;
}
let mut current = ifaddrs;
while !current.is_null() {
let ifa = &*current;
current = ifa.ifa_next;
if ifa.ifa_addr.is_null() {
continue;
}
if (*ifa.ifa_addr).sa_family as i32 != libc::AF_INET6 {
continue;
}
let name = match std::ffi::CStr::from_ptr(ifa.ifa_name).to_str() {
Ok(s) => s.to_string(),
Err(_) => continue,
};
if ALL_IGNORE_IFS.iter().any(|&ig| ig == name) {
if !allowed.iter().any(|a| a == &name) {
continue;
}
}
if ignored.iter().any(|ig| ig == &name) {
continue;
}
if !allowed.is_empty() && !allowed.iter().any(|a| a == &name) {
continue;
}
let sa6 = ifa.ifa_addr as *const libc::sockaddr_in6;
let addr_bytes = (*sa6).sin6_addr.s6_addr;
let ipv6 = Ipv6Addr::from(addr_bytes);
let octets = ipv6.octets();
if octets[0] != 0xfe || (octets[1] & 0xc0) != 0x80 {
continue;
}
let addr_str = format!("{}", ipv6);
let index = libc::if_nametoindex(ifa.ifa_name);
if index == 0 {
continue;
}
if result.iter().any(|li: &LocalInterface| li.name == name) {
continue;
}
result.push(LocalInterface {
name,
link_local_addr: addr_str,
index,
});
}
libc::freeifaddrs(ifaddrs);
}
result
}
struct AutoPeer {
interface_id: InterfaceId,
#[allow(dead_code)]
link_local_addr: String,
#[allow(dead_code)]
ifname: String,
last_heard: f64,
}
struct UdpWriter {
socket: UdpSocket,
target: SocketAddrV6,
}
impl Writer for UdpWriter {
fn send_frame(&mut self, data: &[u8]) -> io::Result<()> {
self.socket.send_to(data, self.target)?;
Ok(())
}
}
struct SharedState {
peers: HashMap<String, AutoPeer>,
link_local_addresses: Vec<String>,
dedup_deque: VecDeque<([u8; 32], f64)>,
online: bool,
next_id: Arc<AtomicU64>,
}
impl SharedState {
fn new(next_id: Arc<AtomicU64>) -> Self {
SharedState {
peers: HashMap::new(),
link_local_addresses: Vec::new(),
dedup_deque: VecDeque::new(),
online: false,
next_id,
}
}
fn is_duplicate(&self, hash: &[u8; 32], now: f64) -> bool {
for (h, ts) in &self.dedup_deque {
if h == hash && now < *ts + MULTI_IF_DEQUE_TTL {
return true;
}
}
false
}
fn add_dedup(&mut self, hash: [u8; 32], now: f64) {
self.dedup_deque.push_back((hash, now));
while self.dedup_deque.len() > MULTI_IF_DEQUE_LEN {
self.dedup_deque.pop_front();
}
}
fn refresh_peer(&mut self, addr: &str, now: f64) {
if let Some(peer) = self.peers.get_mut(addr) {
peer.last_heard = now;
}
}
}
pub fn start(
config: AutoConfig,
tx: EventSender,
next_dynamic_id: Arc<AtomicU64>,
) -> io::Result<()> {
let interfaces = enumerate_interfaces(&config.allowed_interfaces, &config.ignored_interfaces);
if interfaces.is_empty() {
log::warn!(
"[{}] No suitable IPv6 link-local interfaces found",
config.name,
);
return Ok(());
}
let group_id = config.group_id.clone();
let mcast_addr_str = derive_multicast_address(
&group_id,
&config.multicast_address_type,
&config.discovery_scope,
);
let mcast_ip = match parse_multicast_addr(&mcast_addr_str) {
Some(ip) => ip,
None => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid multicast address: {}", mcast_addr_str),
));
}
};
let discovery_port = config.discovery_port;
let unicast_discovery_port = config.discovery_port + 1;
let data_port = config.data_port;
let name = config.name.clone();
let configured_bitrate = config.configured_bitrate;
let ingress_control = config.ingress_control;
{
let startup = AutoRuntime::from_config(&config);
*config.runtime.lock().unwrap() = startup;
}
let runtime = Arc::clone(&config.runtime);
let shared = Arc::new(Mutex::new(SharedState::new(next_dynamic_id)));
let running = Arc::new(AtomicBool::new(true));
{
let mut state = shared.lock().unwrap();
for iface in &interfaces {
state
.link_local_addresses
.push(iface.link_local_addr.clone());
}
}
log::info!(
"[{}] AutoInterface starting with {} local interfaces, multicast {}",
name,
interfaces.len(),
mcast_addr_str,
);
for local_iface in &interfaces {
let ifname = local_iface.name.clone();
let link_local = local_iface.link_local_addr.clone();
let if_index = local_iface.index;
let mcast_socket = create_multicast_recv_socket(&mcast_ip, discovery_port, if_index)?;
let unicast_socket =
create_unicast_recv_socket(&link_local, unicast_discovery_port, if_index)?;
{
let group_id = group_id.clone();
let link_local = link_local.clone();
let running = running.clone();
let name = name.clone();
let runtime = runtime.clone();
thread::Builder::new()
.name(format!("auto-disc-tx-{}", ifname))
.spawn(move || {
discovery_sender_loop(
&group_id,
&link_local,
&mcast_ip,
discovery_port,
if_index,
runtime,
&running,
&name,
);
})?;
}
{
let group_id = group_id.clone();
let shared = shared.clone();
let tx = tx.clone();
let running = running.clone();
let name = name.clone();
let runtime = runtime.clone();
thread::Builder::new()
.name(format!("auto-disc-rx-{}", ifname))
.spawn(move || {
discovery_receiver_loop(
mcast_socket,
&group_id,
shared,
tx,
&running,
&name,
data_port,
configured_bitrate,
ingress_control,
runtime,
);
})?;
}
{
let group_id = group_id.clone();
let shared = shared.clone();
let tx = tx.clone();
let running = running.clone();
let name = name.clone();
let runtime = runtime.clone();
let ingress_control = ingress_control;
thread::Builder::new()
.name(format!("auto-udisc-rx-{}", ifname))
.spawn(move || {
discovery_receiver_loop(
unicast_socket,
&group_id,
shared,
tx,
&running,
&name,
data_port,
configured_bitrate,
ingress_control,
runtime,
);
})?;
}
{
let link_local = local_iface.link_local_addr.clone();
let shared = shared.clone();
let tx = tx.clone();
let running = running.clone();
let name = name.clone();
let data_socket = create_data_recv_socket(&link_local, data_port, if_index)?;
thread::Builder::new()
.name(format!("auto-data-rx-{}", local_iface.name))
.spawn(move || {
data_receiver_loop(data_socket, shared, tx, &running, &name);
})?;
}
}
{
let shared = shared.clone();
let tx = tx.clone();
let running = running.clone();
let name = name.clone();
let runtime = runtime.clone();
thread::Builder::new()
.name(format!("auto-peer-jobs-{}", name))
.spawn(move || {
peer_jobs_loop(shared, tx, runtime, &running, &name);
})?;
}
let announce_interval = runtime.lock().unwrap().announce_interval_secs;
let peering_wait = Duration::from_secs_f64(announce_interval * 1.2);
thread::sleep(peering_wait);
{
let mut state = shared.lock().unwrap();
state.online = true;
}
log::info!("[{}] AutoInterface online", config.name);
Ok(())
}
fn create_multicast_recv_socket(
mcast_ip: &Ipv6Addr,
port: u16,
if_index: u32,
) -> io::Result<UdpSocket> {
let socket = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
socket.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
socket.set_reuse_port(true)?;
let bind_addr = SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, port, 0, if_index);
socket.bind(&bind_addr.into())?;
socket.join_multicast_v6(mcast_ip, if_index)?;
socket.set_nonblocking(false)?;
let std_socket: UdpSocket = socket.into();
std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
Ok(std_socket)
}
fn create_unicast_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
let ip: Ipv6Addr = link_local
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
let socket = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
socket.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
socket.set_reuse_port(true)?;
let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
socket.bind(&bind_addr.into())?;
socket.set_nonblocking(false)?;
let std_socket: UdpSocket = socket.into();
std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
Ok(std_socket)
}
fn create_data_recv_socket(link_local: &str, port: u16, if_index: u32) -> io::Result<UdpSocket> {
let ip: Ipv6Addr = link_local
.parse()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, format!("bad IPv6: {}", e)))?;
let socket = socket2::Socket::new(
socket2::Domain::IPV6,
socket2::Type::DGRAM,
Some(socket2::Protocol::UDP),
)?;
socket.set_reuse_address(true)?;
#[cfg(not(target_os = "windows"))]
socket.set_reuse_port(true)?;
let bind_addr = SocketAddrV6::new(ip, port, 0, if_index);
socket.bind(&bind_addr.into())?;
socket.set_nonblocking(false)?;
let std_socket: UdpSocket = socket.into();
std_socket.set_read_timeout(Some(Duration::from_secs(2)))?;
Ok(std_socket)
}
fn discovery_sender_loop(
group_id: &[u8],
link_local_addr: &str,
mcast_ip: &Ipv6Addr,
discovery_port: u16,
if_index: u32,
runtime: Arc<Mutex<AutoRuntime>>,
running: &AtomicBool,
name: &str,
) {
let token = compute_discovery_token(group_id, link_local_addr);
while running.load(Ordering::Relaxed) {
if let Ok(socket) = UdpSocket::bind("[::]:0") {
let if_bytes = if_index.to_ne_bytes();
unsafe {
libc::setsockopt(
socket_fd(&socket),
libc::IPPROTO_IPV6,
libc::IPV6_MULTICAST_IF,
if_bytes.as_ptr() as *const libc::c_void,
4,
);
}
let target = SocketAddrV6::new(*mcast_ip, discovery_port, 0, 0);
if let Err(e) = socket.send_to(&token, target) {
log::debug!("[{}] multicast send error: {}", name, e);
}
}
let sleep_dur =
Duration::from_secs_f64(runtime.lock().unwrap().announce_interval_secs.max(0.1));
thread::sleep(sleep_dur);
}
}
fn discovery_receiver_loop(
socket: UdpSocket,
group_id: &[u8],
shared: Arc<Mutex<SharedState>>,
tx: EventSender,
running: &AtomicBool,
name: &str,
data_port: u16,
configured_bitrate: u64,
ingress_control: rns_core::transport::types::IngressControlConfig,
runtime: Arc<Mutex<AutoRuntime>>,
) {
let mut buf = [0u8; 1024];
while running.load(Ordering::Relaxed) {
match socket.recv_from(&mut buf) {
Ok((n, src)) => {
if n < 32 {
continue;
}
let src_addr = match src {
std::net::SocketAddr::V6(v6) => v6,
_ => continue,
};
let src_ip = format!("{}", src_addr.ip());
let peering_hash = &buf[..32];
let expected = compute_discovery_token(group_id, &src_ip);
if peering_hash != expected {
log::debug!("[{}] invalid peering hash from {}", name, src_ip);
continue;
}
let state = shared.lock().unwrap();
if !state.online {
}
if state.link_local_addresses.contains(&src_ip) {
drop(state);
continue;
}
if state.peers.contains_key(&src_ip) {
let now = crate::time::now();
drop(state);
let mut state = shared.lock().unwrap();
state.refresh_peer(&src_ip, now);
continue;
}
drop(state);
add_peer(
&shared,
&tx,
&src_ip,
data_port,
name,
configured_bitrate,
ingress_control,
&runtime,
);
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
{
continue;
}
Err(e) => {
log::warn!("[{}] discovery recv error: {}", name, e);
if !running.load(Ordering::Relaxed) {
return;
}
thread::sleep(Duration::from_millis(100));
}
}
}
}
fn add_peer(
shared: &Arc<Mutex<SharedState>>,
tx: &EventSender,
peer_addr: &str,
data_port: u16,
name: &str,
configured_bitrate: u64,
ingress_control: rns_core::transport::types::IngressControlConfig,
_runtime: &Arc<Mutex<AutoRuntime>>,
) {
let peer_ip: Ipv6Addr = match peer_addr.parse() {
Ok(ip) => ip,
Err(_) => return,
};
let send_socket = match UdpSocket::bind("[::]:0") {
Ok(s) => s,
Err(e) => {
log::warn!(
"[{}] failed to create writer for peer {}: {}",
name,
peer_addr,
e
);
return;
}
};
let target = SocketAddrV6::new(peer_ip, data_port, 0, 0);
let mut state = shared.lock().unwrap();
if state.peers.contains_key(peer_addr) {
state.refresh_peer(peer_addr, crate::time::now());
return;
}
let peer_id = InterfaceId(state.next_id.fetch_add(1, Ordering::Relaxed));
let driver_writer: Box<dyn Writer> = Box::new(UdpWriter {
socket: send_socket,
target,
});
let peer_info = rns_core::transport::types::InterfaceInfo {
id: peer_id,
name: format!("{}:{}", name, peer_addr),
mode: rns_core::constants::MODE_FULL,
out_capable: true,
in_capable: true,
bitrate: Some(configured_bitrate),
announce_rate_target: None,
announce_rate_grace: 0,
announce_rate_penalty: 0.0,
announce_cap: rns_core::constants::ANNOUNCE_CAP,
is_local_client: false,
wants_tunnel: false,
tunnel_id: None,
mtu: 1400,
ia_freq: 0.0,
started: 0.0,
ingress_control,
};
let now = crate::time::now();
state.peers.insert(
peer_addr.to_string(),
AutoPeer {
interface_id: peer_id,
link_local_addr: peer_addr.to_string(),
ifname: String::new(),
last_heard: now,
},
);
log::info!(
"[{}] Peer discovered: {} (id={})",
name,
peer_addr,
peer_id.0
);
let _ = tx.send(Event::InterfaceUp(
peer_id,
Some(driver_writer),
Some(peer_info),
));
}
fn data_receiver_loop(
socket: UdpSocket,
shared: Arc<Mutex<SharedState>>,
tx: EventSender,
running: &AtomicBool,
name: &str,
) {
let mut buf = [0u8; HW_MTU + 64];
while running.load(Ordering::Relaxed) {
match socket.recv_from(&mut buf) {
Ok((n, src)) => {
if n == 0 {
continue;
}
let src_addr = match src {
std::net::SocketAddr::V6(v6) => v6,
_ => continue,
};
let src_ip = format!("{}", src_addr.ip());
let data = &buf[..n];
let now = crate::time::now();
let data_hash = rns_crypto::sha256::sha256(data);
let mut state = shared.lock().unwrap();
if !state.online {
continue;
}
if state.is_duplicate(&data_hash, now) {
continue;
}
state.add_dedup(data_hash, now);
state.refresh_peer(&src_ip, now);
let iface_id = match state.peers.get(&src_ip) {
Some(peer) => peer.interface_id,
None => {
continue;
}
};
drop(state);
if tx
.send(Event::Frame {
interface_id: iface_id,
data: data.to_vec(),
})
.is_err()
{
return;
}
}
Err(ref e)
if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut =>
{
continue;
}
Err(e) => {
log::warn!("[{}] data recv error: {}", name, e);
if !running.load(Ordering::Relaxed) {
return;
}
thread::sleep(Duration::from_millis(100));
}
}
}
}
fn peer_jobs_loop(
shared: Arc<Mutex<SharedState>>,
tx: EventSender,
runtime: Arc<Mutex<AutoRuntime>>,
running: &AtomicBool,
name: &str,
) {
while running.load(Ordering::Relaxed) {
let interval =
Duration::from_secs_f64(runtime.lock().unwrap().peer_job_interval_secs.max(0.1));
thread::sleep(interval);
let now = crate::time::now();
let mut timed_out = Vec::new();
let peer_timeout_secs = runtime.lock().unwrap().peer_timeout_secs;
{
let state = shared.lock().unwrap();
for (addr, peer) in &state.peers {
if now > peer.last_heard + peer_timeout_secs {
timed_out.push((addr.clone(), peer.interface_id));
}
}
}
for (addr, iface_id) in &timed_out {
log::info!("[{}] Peer timed out: {}", name, addr);
let mut state = shared.lock().unwrap();
state.peers.remove(addr.as_str());
let _ = tx.send(Event::InterfaceDown(*iface_id));
}
}
}
#[cfg(unix)]
fn socket_fd(socket: &UdpSocket) -> i32 {
use std::os::unix::io::AsRawFd;
socket.as_raw_fd()
}
#[cfg(not(unix))]
fn socket_fd(_socket: &UdpSocket) -> i32 {
0
}
use super::{InterfaceConfigData, InterfaceFactory, StartContext, StartResult};
pub struct AutoFactory;
impl InterfaceFactory for AutoFactory {
fn type_name(&self) -> &str {
"AutoInterface"
}
fn parse_config(
&self,
name: &str,
id: InterfaceId,
params: &HashMap<String, String>,
) -> Result<Box<dyn InterfaceConfigData>, String> {
let group_id = params
.get("group_id")
.map(|v| v.as_bytes().to_vec())
.unwrap_or_else(|| DEFAULT_GROUP_ID.to_vec());
let discovery_scope = params
.get("discovery_scope")
.map(|v| match v.to_lowercase().as_str() {
"link" => SCOPE_LINK.to_string(),
"admin" => SCOPE_ADMIN.to_string(),
"site" => SCOPE_SITE.to_string(),
"organisation" | "organization" => SCOPE_ORGANISATION.to_string(),
"global" => SCOPE_GLOBAL.to_string(),
_ => v.clone(),
})
.unwrap_or_else(|| SCOPE_LINK.to_string());
let discovery_port = params
.get("discovery_port")
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_DISCOVERY_PORT);
let data_port = params
.get("data_port")
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_DATA_PORT);
let multicast_address_type = params
.get("multicast_address_type")
.map(|v| match v.to_lowercase().as_str() {
"permanent" => MULTICAST_PERMANENT_ADDRESS_TYPE.to_string(),
"temporary" => MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string(),
_ => v.clone(),
})
.unwrap_or_else(|| MULTICAST_TEMPORARY_ADDRESS_TYPE.to_string());
let configured_bitrate = params
.get("configured_bitrate")
.or_else(|| params.get("bitrate"))
.and_then(|v| v.parse().ok())
.unwrap_or(BITRATE_GUESS);
let allowed_interfaces = params
.get("devices")
.or_else(|| params.get("allowed_interfaces"))
.map(|v| {
v.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default();
let ignored_interfaces = params
.get("ignored_devices")
.or_else(|| params.get("ignored_interfaces"))
.map(|v| {
v.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default();
Ok(Box::new(AutoConfig {
name: name.to_string(),
group_id,
discovery_scope,
discovery_port,
data_port,
multicast_address_type,
allowed_interfaces,
ignored_interfaces,
configured_bitrate,
interface_id: id,
ingress_control: rns_core::transport::types::IngressControlConfig::enabled(),
runtime: Arc::new(Mutex::new(AutoRuntime {
announce_interval_secs: ANNOUNCE_INTERVAL,
peer_timeout_secs: PEERING_TIMEOUT,
peer_job_interval_secs: PEER_JOB_INTERVAL,
})),
}))
}
fn start(
&self,
config: Box<dyn InterfaceConfigData>,
ctx: StartContext,
) -> std::io::Result<StartResult> {
let mut auto_config = *config.into_any().downcast::<AutoConfig>().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidData, "wrong config type")
})?;
auto_config.ingress_control = ctx.ingress_control;
start(auto_config, ctx.tx, ctx.next_dynamic_id)?;
Ok(StartResult::Listener { control: None })
}
}
pub(crate) fn auto_runtime_handle_from_config(config: &AutoConfig) -> AutoRuntimeConfigHandle {
AutoRuntimeConfigHandle {
interface_name: config.name.clone(),
runtime: Arc::clone(&config.runtime),
startup: AutoRuntime::from_config(config),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn multicast_address_default_group() {
let addr = derive_multicast_address(
DEFAULT_GROUP_ID,
MULTICAST_TEMPORARY_ADDRESS_TYPE,
SCOPE_LINK,
);
assert_eq!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
}
#[test]
fn multicast_address_custom_group() {
let addr =
derive_multicast_address(b"testgroup", MULTICAST_TEMPORARY_ADDRESS_TYPE, SCOPE_LINK);
assert!(addr.starts_with("ff12:0:"));
assert_ne!(addr, "ff12:0:d70b:fb1c:16e4:5e39:485e:31e1");
}
#[test]
fn multicast_address_scope_admin() {
let addr = derive_multicast_address(
DEFAULT_GROUP_ID,
MULTICAST_TEMPORARY_ADDRESS_TYPE,
SCOPE_ADMIN,
);
assert!(addr.starts_with("ff14:0:"));
}
#[test]
fn multicast_address_permanent_type() {
let addr = derive_multicast_address(
DEFAULT_GROUP_ID,
MULTICAST_PERMANENT_ADDRESS_TYPE,
SCOPE_LINK,
);
assert!(addr.starts_with("ff02:0:"));
}
#[test]
fn multicast_address_parseable() {
let addr = derive_multicast_address(
DEFAULT_GROUP_ID,
MULTICAST_TEMPORARY_ADDRESS_TYPE,
SCOPE_LINK,
);
let ip = parse_multicast_addr(&addr);
assert!(ip.is_some());
assert!(ip.unwrap().is_multicast());
}
#[test]
fn discovery_token_interop() {
let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
let expected = "97b25576749ea936b0d8a8536ffaf442d157cf47d460dcf13c48b7bd18b6c163";
let got = token
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>();
assert_eq!(got, expected);
}
#[test]
fn discovery_token_interop_2() {
let token = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::dead:beef:1234:5678");
let expected = "46b6ec7595504b6a35f06bd4bfff71567fb82fcf2706cd361bab20409c42d072";
let got = token
.iter()
.map(|b| format!("{:02x}", b))
.collect::<String>();
assert_eq!(got, expected);
}
#[test]
fn discovery_token_different_groups() {
let t1 = compute_discovery_token(b"reticulum", "fe80::1");
let t2 = compute_discovery_token(b"othergroup", "fe80::1");
assert_ne!(t1, t2);
}
#[test]
fn discovery_token_different_addrs() {
let t1 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::1");
let t2 = compute_discovery_token(DEFAULT_GROUP_ID, "fe80::2");
assert_ne!(t1, t2);
}
#[test]
fn dedup_basic() {
let next_id = Arc::new(AtomicU64::new(1));
let mut state = SharedState::new(next_id);
let hash = [0xAA; 32];
let now = 1000.0;
assert!(!state.is_duplicate(&hash, now));
state.add_dedup(hash, now);
assert!(state.is_duplicate(&hash, now));
}
#[test]
fn dedup_expired() {
let next_id = Arc::new(AtomicU64::new(1));
let mut state = SharedState::new(next_id);
let hash = [0xBB; 32];
state.add_dedup(hash, 1000.0);
assert!(state.is_duplicate(&hash, 1000.5));
assert!(!state.is_duplicate(&hash, 1001.0));
}
#[test]
fn dedup_max_length() {
let next_id = Arc::new(AtomicU64::new(1));
let mut state = SharedState::new(next_id);
for i in 0..MULTI_IF_DEQUE_LEN + 10 {
let mut hash = [0u8; 32];
hash[0] = (i & 0xFF) as u8;
hash[1] = ((i >> 8) & 0xFF) as u8;
state.add_dedup(hash, 1000.0);
}
assert_eq!(state.dedup_deque.len(), MULTI_IF_DEQUE_LEN);
}
#[test]
fn peer_refresh() {
let next_id = Arc::new(AtomicU64::new(100));
let mut state = SharedState::new(next_id);
state.peers.insert(
"fe80::1".to_string(),
AutoPeer {
interface_id: InterfaceId(100),
link_local_addr: "fe80::1".to_string(),
ifname: "eth0".to_string(),
last_heard: 1000.0,
},
);
state.refresh_peer("fe80::1", 2000.0);
assert_eq!(state.peers["fe80::1"].last_heard, 2000.0);
}
#[test]
fn peer_not_found_refresh() {
let next_id = Arc::new(AtomicU64::new(100));
let mut state = SharedState::new(next_id);
state.refresh_peer("fe80::999", 1000.0);
}
#[test]
fn enumerate_returns_vec() {
let interfaces = enumerate_interfaces(&[], &[]);
for iface in &interfaces {
assert!(!iface.name.is_empty());
assert!(iface.link_local_addr.starts_with("fe80"));
assert!(iface.index > 0);
}
}
#[test]
fn enumerate_with_ignored() {
let interfaces = enumerate_interfaces(
&[],
&[
"lo".to_string(),
"eth0".to_string(),
"wlan0".to_string(),
"enp0s3".to_string(),
"docker0".to_string(),
],
);
for iface in &interfaces {
assert_ne!(iface.name, "lo");
assert_ne!(iface.name, "eth0");
assert_ne!(iface.name, "wlan0");
}
}
#[test]
fn enumerate_with_allowed_nonexistent() {
let interfaces = enumerate_interfaces(&["nonexistent_if_12345".to_string()], &[]);
assert!(interfaces.is_empty());
}
#[test]
fn config_defaults() {
let config = AutoConfig::default();
assert_eq!(config.group_id, DEFAULT_GROUP_ID);
assert_eq!(config.discovery_scope, SCOPE_LINK);
assert_eq!(config.discovery_port, DEFAULT_DISCOVERY_PORT);
assert_eq!(config.data_port, DEFAULT_DATA_PORT);
assert_eq!(
config.multicast_address_type,
MULTICAST_TEMPORARY_ADDRESS_TYPE
);
assert_eq!(config.configured_bitrate, BITRATE_GUESS);
assert!(config.allowed_interfaces.is_empty());
assert!(config.ignored_interfaces.is_empty());
}
#[test]
fn constants_match_python() {
assert_eq!(DEFAULT_DISCOVERY_PORT, 29716);
assert_eq!(DEFAULT_DATA_PORT, 42671);
assert_eq!(HW_MTU, 1196);
assert_eq!(MULTI_IF_DEQUE_LEN, 48);
assert!((MULTI_IF_DEQUE_TTL - 0.75).abs() < f64::EPSILON);
assert!((PEERING_TIMEOUT - 22.0).abs() < f64::EPSILON);
assert!((ANNOUNCE_INTERVAL - 1.6).abs() < f64::EPSILON);
assert!((PEER_JOB_INTERVAL - 4.0).abs() < f64::EPSILON);
assert!((MCAST_ECHO_TIMEOUT - 6.5).abs() < f64::EPSILON);
assert_eq!(BITRATE_GUESS, 10_000_000);
}
#[test]
fn unicast_discovery_port() {
let unicast_port = DEFAULT_DISCOVERY_PORT + 1;
assert_eq!(unicast_port, 29717);
}
#[test]
fn reverse_peering_interval() {
let interval = ANNOUNCE_INTERVAL * REVERSE_PEERING_MULTIPLIER;
assert!((interval - 5.2).abs() < 0.01);
}
}