use crate::cla::ConvergenceLayerAgent;
use crate::core::{DtnPeer, PeerType};
use crate::ipnd::{beacon::Beacon, services::*};
use crate::routing::RoutingNotifcation;
use crate::{peers_add, routing_notify, CLAS, CONFIG};
use crate::{peers_touch, DTNCORE};
use anyhow::Result;
use log::{debug, error, info, trace};
use socket2::{Domain, Socket, Type};
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use tokio::net::UdpSocket;
use tokio::time::interval;
async fn receiver(socket: UdpSocket) -> Result<(), io::Error> {
let mut buf: Vec<u8> = vec![0; 1024 * 64];
let nodeid = CONFIG.lock().host_eid.clone();
loop {
if let Ok((size, peer)) = socket.recv_from(&mut buf).await {
trace!("received {} bytes", size);
let deserialized: Beacon = match serde_cbor::from_slice(&buf[..size]) {
Ok(pkt) => pkt,
Err(e) => {
error!("Deserialization of beacon failed: {}", e);
continue;
}
};
let dtnpeer = DtnPeer::new(
deserialized.eid().clone(),
peer.ip().into(),
PeerType::Dynamic,
deserialized.beacon_period(),
deserialized.service_block().clas().clone(),
deserialized.service_block().convert_services(),
);
if dtnpeer.eid == nodeid {
debug!("Received beacon from myself, ignoring");
continue;
}
if peers_add(dtnpeer) {
info!(
"New peer discovered: {} @ {} (len={})",
deserialized.eid(),
peer,
size
);
} else {
debug!(
"Beacon from known peer: {} @ {} (len={})",
deserialized.eid(),
peer,
size
);
if let Err(e) = peers_touch(deserialized.eid().node().unwrap().as_ref()) {
error!("Failed to touch peer: {}", e);
}
}
trace!("{}", deserialized);
if let Err(err) = routing_notify(RoutingNotifcation::EncounteredPeer(
deserialized.eid().clone(),
))
.await
{
info!("Error while encountered peer notification: {}", err);
}
}
}
}
async fn announcer(socket: UdpSocket, _v6: bool) {
let mut task = interval(crate::CONFIG.lock().announcement_interval);
loop {
task.tick().await;
let eid = CONFIG.lock().host_eid.clone();
let beacon_period = if !crate::CONFIG.lock().enable_period {
None
} else {
Some(crate::CONFIG.lock().announcement_interval)
};
let mut pkt = Beacon::with_config(eid, ServiceBlock::new(), beacon_period);
(*CLAS.lock())
.iter()
.for_each(|cla| pkt.add_cla(cla.name(), &Some(cla.port())));
DTNCORE
.lock()
.service_list
.iter()
.for_each(|(tag, payload)| pkt.add_custom_service(*tag, payload.clone()));
let mut destinations: HashMap<SocketAddr, u32> = HashMap::new();
CONFIG
.lock()
.discovery_destinations
.iter()
.for_each(|(key, value)| {
destinations.insert(key.clone().parse().unwrap(), *value);
});
for (destination, bsn) in destinations {
(*CONFIG.lock()).update_beacon_sequence_number(&destination.to_string());
pkt.set_beacon_sequence_number(bsn);
if destination.ip().is_multicast() {
trace!(
"Sending beacon\n{}\nto multicast address {}",
pkt,
destination
);
} else {
trace!(
"Sending beacon\n{}\nto unicast address {}",
pkt,
destination
);
}
match socket
.send_to(&serde_cbor::to_vec(&pkt).unwrap(), destination)
.await
{
Ok(amt) => {
debug!("sent announcement beacon (len={}) to {}", amt, destination);
}
Err(err) => error!("Sending announcement failed: {}", err),
}
}
}
}
pub async fn spawn_neighbour_discovery() -> Result<()> {
let v4 = CONFIG.lock().v4;
let v6 = CONFIG.lock().v6;
let port = 3003;
if v4 {
let addr: SocketAddr = format!("0.0.0.0:{}", port).parse()?;
let addr = addr.into();
let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;
socket.set_reuse_address(true)?;
socket.bind(&addr)?;
socket.set_nonblocking(true)?;
socket
.set_multicast_loop_v4(false)
.expect("error activating multicast loop v4");
socket.set_broadcast(true)?;
for address in CONFIG.lock().discovery_destinations.keys() {
let addr: SocketAddr = address.parse().expect("Error parsing discovery address");
if addr.is_ipv4() {
if addr.ip().is_multicast() {
socket
.join_multicast_v4(
&addr.ip().to_string().parse()?,
&std::net::Ipv4Addr::new(0, 0, 0, 0),
)
.expect("error joining multicast v4 group");
}
info!("Configured discovery destination: {}", addr.ip());
}
}
let socket1 = UdpSocket::from_std(socket.try_clone()?.into())?;
let socket2 = UdpSocket::from_std(socket.try_clone()?.into())?;
info!("Listening on {}", socket1.local_addr()?);
tokio::spawn(receiver(socket1));
tokio::spawn(announcer(socket2, false));
}
if v6 {
let addr: SocketAddr = format!("[::1]:{}", port).parse()?;
let addr = addr.into();
let socket = Socket::new(Domain::IPV6, Type::DGRAM, None)?;
socket.set_reuse_address(true)?;
socket.set_only_v6(true)?;
socket.bind(&addr)?;
socket.set_nonblocking(true)?;
socket
.set_multicast_loop_v6(false)
.expect("error activating multicast loop v6");
socket.set_broadcast(true)?;
for address in CONFIG.lock().discovery_destinations.keys() {
let addr: SocketAddr = address.parse().expect("Error while parsing IPv6 address");
if addr.is_ipv6() {
if addr.ip().is_multicast() {
socket
.join_multicast_v6(&addr.ip().to_string().parse()?, 0)
.expect("Error joining multicast v6 group");
}
info!("Configured discovery destination: {}", addr.ip());
}
}
let socket1 = UdpSocket::from_std(socket.try_clone()?.into())?;
let socket2 = UdpSocket::from_std(socket.try_clone()?.into())?;
info!("Listening on {}", socket1.local_addr()?);
tokio::spawn(receiver(socket1));
tokio::spawn(announcer(socket2, true));
}
Ok(())
}