use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::Arc;
use futures::sync::mpsc::{unbounded, UnboundedSender};
use tokio::codec::BytesCodec;
use tokio::net::UdpFramed;
use tokio::prelude::*;
use tokio::reactor::Handle;
use crate::prelude::*;
use mycelium_experimental::crossbeam_skiplist::set::SkipSet;
use get_if_addrs::{
self,
IfAddr::{V4, V6},
Ifv4Addr,
};
use std::time::Duration;
pub const DEFAULT_MC: Ipv4Addr = Ipv4Addr::new(225, 225, 225, 1);
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct UDPMsg {
ip: Ipv4Addr,
tcp_port: usize,
udp_port: usize,
opt: Opt,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum Opt {
WakeUp(Vec<String>),
}
fn create_wakeup_package(ip: Ipv4Addr, db: Arc<Mycelium>) -> std::io::Result<bytes::Bytes> {
let msg = bincode::serialize(&UDPMsg {
ip,
tcp_port: db.config.tcp_port,
udp_port: db.config.udp_port,
opt: Opt::WakeUp(db.data.list_tags())
})
.unwrap();
let mut bytes = bytes::Bytes::with_capacity(msg.len());
bytes.extend_from_slice(&msg);
Ok(bytes)
}
fn get_local_ip() -> std::io::Result<Option<Ifv4Addr>> {
let mut ip: Option<Ifv4Addr> = None;
let if_addrs = get_if_addrs::get_if_addrs().expect("Failed to get ifaddrs");
for addr in if_addrs {
match &addr.addr {
V4(v_4) => {
if !v_4.is_loopback() {
ip = Some(v_4.clone());
break;
}
}
V6(_) => (),
}
}
Ok(ip)
}
fn bind_socket(addr: SocketAddrV4, multi: SocketAddrV4) -> std::io::Result<socket2::Socket> {
use socket2::{Domain, Protocol, Type};
let mc_socket = socket2::Socket::new(Domain::ipv4(), Type::dgram(), Some(Protocol::udp()))?;
mc_socket.bind(&socket2::SockAddr::from(addr))?;
mc_socket.set_reuse_address(true)?;
mc_socket.join_multicast_v4(multi.ip(), addr.ip())?;
Ok(mc_socket)
}
fn send(tx: UnboundedSender<bytes::Bytes>, msg: bytes::Bytes) {
match tx.unbounded_send(msg.clone()) {
Ok(_) => (),
Err(e) => { eprintln!("Udp send error: {:?}", e) },
}
loop {
std::thread::sleep(Duration::from_secs(30));
match tx.unbounded_send(msg.clone()) {
Ok(_) => (),
Err(e) => { eprintln!("Udp send error: {:?}", e) },
}
}
}
pub fn start_distributed(db: Arc<Mycelium>) -> std::io::Result<()> {
let udp_port = db.config.udp_port;
let ip = get_local_ip()?.unwrap().ip;
let multi = SocketAddrV4::new(DEFAULT_MC, udp_port as u16);
let t_socket = tokio::net::UdpSocket::from_std(
std::net::UdpSocket::from(bind_socket(SocketAddrV4::new(ip, udp_port as u16), multi)?),
&Handle::default(),
)
.expect("Failed to convert std to tokio::udpsocket");
let local_addr = t_socket.local_addr().unwrap();
let framed = UdpFramed::new(t_socket, BytesCodec::new());
let (udp_tx, udp_rx) = Stream::split(framed);
let (chn_tx, chn_rx) = unbounded::<bytes::Bytes>();
let db_clone = db.clone();
std::thread::spawn(move || send(chn_tx, create_wakeup_package(ip, db_clone).unwrap()));
let send = chn_rx
.map(move |s| (s, local_addr))
.forward(udp_tx.sink_map_err(|e| println!("Error receiving UDP packet: {:?}", e)))
.map(|_| ());
let recv = udp_rx
.for_each(move |(s, _)| {
let codec: UDPMsg = bincode::deserialize(&s).expect("Failed to deserialize UDPMsg");
eprintln!("Message from: {:?}", codec.ip);
let key = (codec.ip, codec.tcp_port, codec.udp_port);
match codec.opt {
Opt::WakeUp(tags) => {
for tag in tags {
let network: SkipSet<(Ipv4Addr, usize, usize)> = SkipSet::new();
network.insert(key);
let item = db.network.get_or_insert(tag, network);
if !item.value().contains(&key) {
item.value().insert(key);
}
}
}
}
Ok(())
})
.map_err(|e| println!("Error sending UDP packet: {:?}", e));
let serve = send.select(recv).map(|_| ()).map_err(|_| ());
tokio::run(serve);
Ok(())
}