p2p 0.6.0

NAT Traversal for P2P communication
use common::event_loop::{Core, CoreState, CoreTimer};
use common::types::{PeerId, PeerMsg, PlainTextMsg};
use maidsafe_utilities::serialisation::{deserialise, serialise};
use mio::timer::Timeout;
use mio::{Poll, Ready, Token};
use p2p::{msg_to_read, msg_to_send, Interface};
use socket_collection::UdpSock;
use sodium::crypto::box_;
use std::any::Any;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use {Event, PeerState};

const INACTIVITY_TIMEOUT_ID: u8 = 0;
const TOLERATE_READ_ERRS_ID: u8 = 1;

const INACTIVITY_TIMEOUT_SECS: u64 = 180;
const TOLERATE_READ_ERRS_SECS: u64 = 60;

pub struct ActivePeer {
    token: Token,
    sock: UdpSock,
    peer: PeerId,
    key: box_::PrecomputedKey,
    peers: Arc<Mutex<BTreeMap<PeerId, PeerState>>>,
    should_buffer: bool,
    chat_buf: Vec<String>,
    tolerate_read_errs: bool,
    timeout_inactivity: Timeout,
    timeout_tolerate_read_errs: Timeout,
    tx: Sender<Event>,
}

impl ActivePeer {
    pub fn start(
        core: &mut Core,
        poll: &Poll,
        token: Token,
        sock: UdpSock,
        peer: PeerId,
        peers: Arc<Mutex<BTreeMap<PeerId, PeerState>>>,
        tx: Sender<Event>,
    ) {
        let state = Rc::new(RefCell::new(ActivePeer {
            token,
            sock,
            peer: peer.clone(),
            key: box_::precompute(&peer.pk, core.enc_sk()),
            peers: peers.clone(),
            should_buffer: true,
            chat_buf: Default::default(),
            tolerate_read_errs: true,
            timeout_inactivity: unwrap!(core.set_core_timeout(
                Duration::from_secs(INACTIVITY_TIMEOUT_SECS),
                CoreTimer::new(token, INACTIVITY_TIMEOUT_ID)
            )),
            timeout_tolerate_read_errs: unwrap!(core.set_core_timeout(
                Duration::from_secs(TOLERATE_READ_ERRS_SECS),
                CoreTimer::new(token, TOLERATE_READ_ERRS_ID)
            )),
            tx: tx.clone(),
        }));

        if let Err((state, e)) = core.insert_peer_state(token, state) {
            info!("Could not insert peer-state: {:?}", e);
            state.borrow_mut().terminate(core, poll);
            unwrap!(tx.send(Event::PeerConnectFailed(peer)));
            return;
        }

        let mut peers_guard = unwrap!(peers.lock());
        let stored_state = peers_guard
            .entry(peer.clone())
            .or_insert(Default::default());
        *stored_state = PeerState::Connected(token);

        unwrap!(tx.send(Event::PeerConnected(peer, token)));
    }

    pub fn start_buffering(&mut self) {
        self.should_buffer = true;
    }

    pub fn flush_and_stop_buffering(&mut self) {
        self.should_buffer = false;
        for m in self.chat_buf.drain(..) {
            println!("{} --> {}", self.peer, m);
        }
    }

    fn read(&mut self, core: &mut Core, poll: &Poll) {
        loop {
            match self.sock.read() {
                Ok(Some(PeerMsg::CipherText(ct))) => {
                    if !self.handle_ciphertext(core, poll, &ct) {
                        return self.terminate(core, poll);
                    }
                }
                Ok(Some(_)) => {
                    debug!("Invalid peer-chat message");
                    return self.terminate(core, poll);
                }
                Ok(None) => return,
                Err(e) => {
                    // TODO Make this debug better as such:
                    // debug!("{:?} - Failed to read from sock: {:?}", self.our_id, e);
                    if self.tolerate_read_errs {
                        trace!("Tolerating read error: {:?}", e);
                    } else {
                        debug!("Failed to read from sock: {:?}", e);
                        return self.terminate(core, poll);
                    }
                }
            }
        }
    }

    fn write(&mut self, core: &mut Core, poll: &Poll, m: Option<PeerMsg>) {
        if let Err(e) = self.sock.write(m.map(|m| (m, 0))) {
            debug!("Failed to write to sock: {:?}", e);
            self.terminate(core, poll);
        }
    }

    fn handle_ciphertext(&mut self, core: &mut Core, poll: &Poll, ciphertext: &[u8]) -> bool {
        let plaintext_ser = match msg_to_read(ciphertext, &self.key) {
            Ok(pt) => pt,
            Err(e) => {
                return if self.tolerate_read_errs {
                    trace!("Tolerating error decrypting: {:?}", e);
                    true
                } else {
                    debug!("Error decrypting: {:?}", e);
                    false
                };
            }
        };

        let plaintext = match deserialise(&plaintext_ser) {
            Ok(pt) => pt,
            Err(e) => {
                return if self.tolerate_read_errs {
                    trace!("Tolerating error deserialising: {:?}", e);
                    true
                } else {
                    info!("Error deserialising: {:?}", e);
                    false
                };
            }
        };

        let chat = match plaintext {
            PlainTextMsg::Chat(m) => m,
            x => {
                return if self.tolerate_read_errs {
                    trace!("Tolerating invalid PlainTextMsg: {:?}", x);
                    true
                } else {
                    info!("Invalid PlainTextMsg: {:?}", x);
                    false
                };
            }
        };

        if self.should_buffer {
            self.chat_buf.push(chat);
        } else {
            println!("{} --> {}", self.peer, chat);
        }

        let _ = core.cancel_core_timeout(&self.timeout_inactivity);
        self.timeout_inactivity = unwrap!(core.set_core_timeout(
            Duration::from_secs(INACTIVITY_TIMEOUT_SECS),
            CoreTimer::new(self.token, INACTIVITY_TIMEOUT_ID)
        ));

        true
    }
}

impl CoreState for ActivePeer {
    fn ready(&mut self, core: &mut Core, poll: &Poll, kind: Ready) {
        if kind.is_readable() {
            self.read(core, poll);
        } else if kind.is_writable() {
            self.write(core, poll, None)
        } else {
            warn!("Unknown kind: {:?}", kind);
        }
    }

    fn write(&mut self, core: &mut Core, poll: &Poll, data: Vec<u8>) {
        let ciphertext = unwrap!(msg_to_send(&data, &self.key));
        self.write(core, poll, Some(PeerMsg::CipherText(ciphertext)));
    }

    fn timeout(&mut self, core: &mut Core, poll: &Poll, timer_id: u8) {
        if timer_id == INACTIVITY_TIMEOUT_ID {
            trace!(
                "Peer inactive for {} secs. Terminating..",
                INACTIVITY_TIMEOUT_SECS
            );
            return self.terminate(core, poll);
        }

        assert_eq!(timer_id, TOLERATE_READ_ERRS_ID);
        self.tolerate_read_errs = false;
    }

    fn terminate(&mut self, core: &mut Core, poll: &Poll) {
        let mut peers_guard = unwrap!(self.peers.lock());
        if let Some(stored_state) = peers_guard.get_mut(&self.peer) {
            *stored_state = Default::default();
        }

        let _ = core.remove_peer_state(self.token);
        let _ = poll.deregister(&self.sock);

        let _ = self.tx.send(Event::PeerDisconnected(self.peer.clone()));
    }

    fn as_any(&mut self) -> &mut Any {
        self
    }
}