use crate::{AddrTable, Envelope, FrameExt, Peer};
use async_std::{
future::{self, Future},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
pin::Pin,
sync::{Arc, RwLock},
task::{self, Poll},
};
use netmod::{Frame, Target};
use std::collections::VecDeque;
use task_notify::Notify;
const MULTI: Ipv4Addr = Ipv4Addr::new(224, 0, 0, 123);
const SELF: Ipv4Addr = Ipv4Addr::new(0, 0, 0, 0);
pub(crate) struct Socket {
port: u16,
sock: Arc<UdpSocket>,
inbox: Arc<RwLock<Notify<VecDeque<FrameExt>>>>,
}
impl Socket {
#[instrument(skip(table), level = "trace")]
pub(crate) async fn with_port(port: u16, table: Arc<AddrTable>) -> Arc<Self> {
let sock = UdpSocket::bind((SELF, port)).await.unwrap();
sock.join_multicast_v4(MULTI, SELF)
.expect("Failed to join multicast. Error");
let arc = Arc::new(Self {
port,
sock: Arc::new(sock),
inbox: Default::default(),
});
Self::incoming_handle(Arc::clone(&arc), table);
arc.multicast(Envelope::Announce).await;
info!("Sent multicast announcement");
arc
}
pub(crate) async fn send(&self, frame: &Frame, peer: Peer) {
let data = Envelope::frame(frame);
self.sock
.send_to(&data, SocketAddr::new(peer.ip, peer.port))
.await
.unwrap();
}
pub(crate) async fn send_many(&self, frame: &Frame, ips: Vec<Peer>) {
let data = Envelope::frame(frame);
for peer in ips.iter() {
self.send(frame, *peer).await
}
}
#[instrument(skip(self, env), level = "trace")]
pub(crate) async fn multicast(&self, env: Envelope) {
info!("Sending multicast message: {:#?}", env);
self.sock
.send_to(
&env.as_bytes(),
SocketAddr::new(IpAddr::V4(MULTI.clone()), self.port),
)
.await;
}
pub(crate) async fn next(&self) -> FrameExt {
future::poll_fn(|ctx| {
let lock = &mut self.inbox.write();
match unsafe { Pin::new_unchecked(lock).poll(ctx) } {
Poll::Ready(ref mut inc) => match inc.pop_front() {
Some(f) => Poll::Ready(f),
None => {
Notify::clear_waker(inc);
Notify::register_waker(inc, ctx.waker());
Poll::Pending
}
},
Poll::Pending => Poll::Pending,
}
})
.await
}
#[instrument(skip(arc, table), level = "trace")]
fn incoming_handle(arc: Arc<Self>, table: Arc<AddrTable>) {
task::spawn(async move {
loop {
let mut buf = vec![0; 8192];
match arc.sock.recv_from(&mut buf).await {
Ok((_, peer)) => {
let env = Envelope::from_bytes(&buf);
match env {
Envelope::Announce => {
debug!("Recieving announce");
table.set(peer).await;
arc.multicast(Envelope::Reply).await;
}
Envelope::Reply => {
debug!("Recieving announce reply");
table.set(peer).await;
}
Envelope::Data(_) => {
debug!("Recieved frame");
let frame = env.get_frame();
info!(frame = format!("{:#?}", frame).as_str());
info!(peer = format!("{:#?}", peer).as_str());
let id = table.id(peer.into()).await.unwrap();
let mut inbox = arc.inbox.write().await;
inbox.push_back(FrameExt(frame, Target::Single(id)));
Notify::wake(&mut inbox);
}
}
}
val => {
error!("Crashed UDP thread: {:#?}", val);
val.expect("Crashed UDP thread!");
}
}
}
});
}
}
#[test]
fn test_init() {
task::block_on(async move {
let table = Arc::new(AddrTable::new());
let sock = Socket::with_port(12322, table).await;
println!("Multicasting");
sock.multicast(Envelope::Announce);
});
}
#[ignore]
#[test]
fn test_single_unicast() {
task::block_on(async {
let p1 = Peer {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 10000,
};
let p2 = Peer {
ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
port: 10001,
};
let t1 = Arc::new(AddrTable::new());
let t2 = Arc::new(AddrTable::new());
t1.set(p2).await;
t2.set(p1).await;
let s1 = Socket::with_port(p1.port, t1).await;
let s2 = Socket::with_port(p2.port, t2).await;
let f = Frame::dummy();
s1.send(&f, p2).await;
assert_eq!(s2.next().await.0, f);
});
}