mycelium_lib 0.1.2

Library for Mycelium DDM
Documentation
use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::Arc;

use futures::sync::mpsc::{unbounded, UnboundedSender};
use tokio::codec::BytesCodec;
use tokio::net::UdpFramed;
use tokio::prelude::*;
use tokio::reactor::Handle;

use crate::prelude::*;
use mycelium_experimental::crossbeam_skiplist::set::SkipSet;

use get_if_addrs::{
    self,
    IfAddr::{V4, V6},
    Ifv4Addr,
};
use std::time::Duration;

pub const DEFAULT_MC: Ipv4Addr = Ipv4Addr::new(225, 225, 225, 1);

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct UDPMsg {
    ip: Ipv4Addr,
    tcp_port: usize,
    udp_port: usize,
    opt: Opt,
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum Opt {
    WakeUp(Vec<String>),
}

fn create_wakeup_package(ip: Ipv4Addr, db: Arc<Mycelium>) -> std::io::Result<bytes::Bytes> {
    let msg = bincode::serialize(&UDPMsg {
        ip,
        tcp_port: db.config.tcp_port,
        udp_port: db.config.udp_port,
        opt: Opt::WakeUp(db.data.list_tags())
    })
    .unwrap();
    let mut bytes = bytes::Bytes::with_capacity(msg.len());
    bytes.extend_from_slice(&msg);
    Ok(bytes)
}

fn get_local_ip() -> std::io::Result<Option<Ifv4Addr>> {
    let mut ip: Option<Ifv4Addr> = None;
    let if_addrs = get_if_addrs::get_if_addrs().expect("Failed to get ifaddrs");
    for addr in if_addrs {
        match &addr.addr {
            V4(v_4) => {
                if !v_4.is_loopback() {
                    ip = Some(v_4.clone());
                    break;
                }
            }
            V6(_) => (),
        }
    }

    Ok(ip)
}

// Local package reference so hopefully someday when Tokio includes the features
// for multicast refactoring to use that will be limited to this function block.
fn bind_socket(addr: SocketAddrV4, multi: SocketAddrV4) -> std::io::Result<socket2::Socket> {
    use socket2::{Domain, Protocol, Type};
    let mc_socket = socket2::Socket::new(Domain::ipv4(), Type::dgram(), Some(Protocol::udp()))?;
    mc_socket.bind(&socket2::SockAddr::from(addr))?;
    mc_socket.set_reuse_address(true)?;
    mc_socket.join_multicast_v4(multi.ip(), addr.ip())?;

    Ok(mc_socket)
}

fn send(tx: UnboundedSender<bytes::Bytes>, msg: bytes::Bytes) {
    match tx.unbounded_send(msg.clone()) {
        Ok(_) => (),
        Err(e) => { eprintln!("Udp send error: {:?}", e) },
    }
    loop {
        std::thread::sleep(Duration::from_secs(30));
        match tx.unbounded_send(msg.clone()) {
            Ok(_) => (),
            Err(e) => { eprintln!("Udp send error: {:?}", e) },
        }
    }
}

///
/// # Distributed database
///
pub fn start_distributed(db: Arc<Mycelium>) -> std::io::Result<()> {
    let udp_port = db.config.udp_port;
    let ip = get_local_ip()?.unwrap().ip;
    let multi = SocketAddrV4::new(DEFAULT_MC, udp_port as u16);

    let t_socket = tokio::net::UdpSocket::from_std(
        std::net::UdpSocket::from(bind_socket(SocketAddrV4::new(ip, udp_port as u16), multi)?),
        &Handle::default(),
    )
    .expect("Failed to convert std to tokio::udpsocket");

    let local_addr = t_socket.local_addr().unwrap();
    let framed = UdpFramed::new(t_socket, BytesCodec::new());
    let (udp_tx, udp_rx) = Stream::split(framed);
    let (chn_tx, chn_rx) = unbounded::<bytes::Bytes>();

    let db_clone = db.clone();
    std::thread::spawn(move || send(chn_tx, create_wakeup_package(ip, db_clone).unwrap()));
    let send = chn_rx
        .map(move |s| (s, local_addr))
        .forward(udp_tx.sink_map_err(|e| println!("Error receiving UDP packet: {:?}", e)))
        .map(|_| ());

    let recv = udp_rx
        .for_each(move |(s, _)| {
            let codec: UDPMsg = bincode::deserialize(&s).expect("Failed to deserialize UDPMsg");
            eprintln!("Message from: {:?}", codec.ip);
            let key = (codec.ip, codec.tcp_port, codec.udp_port);
            match codec.opt {
                Opt::WakeUp(tags) => {
                    for tag in tags {
                        let network: SkipSet<(Ipv4Addr, usize, usize)> = SkipSet::new();
                        network.insert(key);
                        let item = db.network.get_or_insert(tag, network);
                        if !item.value().contains(&key) {
                            item.value().insert(key);
                        }
                    }
                }
            }
            Ok(())
        })
        .map_err(|e| println!("Error sending UDP packet: {:?}", e));
    let serve = send.select(recv).map(|_| ()).map_err(|_| ());
    tokio::run(serve);

    Ok(())
}