p2p 0.6.0

NAT Traversal for P2P communication
use common::event_loop::{Core, CoreState};
use common::types::{PeerId, PeerMsg, PlainTextMsg};
use maidsafe_utilities::serialisation::{deserialise, serialise};
use mio::{Poll, PollOpt, Ready, Token};
use p2p::{msg_to_read, msg_to_send, Interface, RendezvousInfo};
use socket_collection::TcpSock;
use sodium::crypto::box_;
use std::any::Any;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::{Rc, Weak};
use std::{fmt, mem};

pub struct Peer {
    token: Token,
    sock: TcpSock,
    state: CurrentState,
    peers: Weak<RefCell<BTreeMap<PeerId, Token>>>,
}

enum CurrentState {
    AwaitingPeerPk,
    AwaitingPeerName {
        pk: box_::PublicKey,
        key: box_::PrecomputedKey,
    },
    PeerActivated {
        id: PeerId,
        key: box_::PrecomputedKey,
    },
}

impl Default for CurrentState {
    fn default() -> Self {
        CurrentState::AwaitingPeerPk
    }
}

impl fmt::Debug for CurrentState {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            CurrentState::AwaitingPeerPk => write!(f, "CurrentState::AwaitingPeerPk"),
            CurrentState::AwaitingPeerName { .. } => write!(f, "CurrentState::AwaitingPeerName"),
            CurrentState::PeerActivated { .. } => write!(f, "CurrentState::PeerActivated"),
        }
    }
}

impl Peer {
    pub fn start(
        core: &mut Core,
        poll: &Poll,
        sock: TcpSock,
        peers: Weak<RefCell<BTreeMap<PeerId, Token>>>,
    ) {
        let token = core.new_token();

        unwrap!(poll.register(
            &sock,
            token,
            Ready::readable() | Ready::writable(),
            PollOpt::edge(),
        ));

        let state = Self {
            token,
            sock,
            state: Default::default(),
            peers,
        };

        if core
            .insert_peer_state(token, Rc::new(RefCell::new(state)))
            .is_err()
        {
            panic!("Could not start Overlay !");
        }
    }

    fn read(&mut self, core: &mut Core, poll: &Poll) {
        loop {
            match self.sock.read() {
                Ok(Some(PeerMsg::PubKey(pk))) => {
                    if !self.handle_peer_pk(core, poll, pk) {
                        return self.terminate(core, poll);
                    }
                }
                Ok(Some(PeerMsg::CipherText(ct))) => {
                    if !self.handle_ciphertext(core, poll, &ct) {
                        return self.terminate(core, poll);
                    }
                }
                Ok(None) => return,
                Err(e) => {
                    // TODO Make this debug better as such:
                    // debug!("{:?} - Failed to read from sock: {:?}", self.our_id, e);
                    debug!("Failed to read from sock: {:?}", e);
                    return self.terminate(core, poll);
                }
            }
        }
    }

    fn write(&mut self, core: &mut Core, poll: &Poll, m: Option<PeerMsg>) {
        if let Err(e) = self.sock.write(m.map(|m| (m, 0))) {
            debug!("Failed to write to sock: {:?}", e);
            self.terminate(core, poll);
        }
    }

    fn handle_peer_pk(&mut self, core: &mut Core, poll: &Poll, pk: box_::PublicKey) -> bool {
        match self.state {
            CurrentState::AwaitingPeerPk => (),
            ref x => {
                info!("Message cannot be handled in the current state: {:?}", x);
                return false;
            }
        }

        self.state = CurrentState::AwaitingPeerName {
            pk,
            key: box_::precompute(&pk, core.enc_sk()),
        };
        let overlay_pk = *core.enc_pk();

        self.write(core, poll, Some(PeerMsg::PubKey(overlay_pk)));

        true
    }

    fn handle_ciphertext(&mut self, core: &mut Core, poll: &Poll, ciphertext: &[u8]) -> bool {
        let plaintext_ser = match self.state {
            CurrentState::AwaitingPeerName { ref key, .. }
            | CurrentState::PeerActivated { ref key, .. } => match msg_to_read(ciphertext, key) {
                Ok(pt) => pt,
                Err(e) => {
                    info!("Error decrypting: {:?}", e);
                    return false;
                }
            },
            ref x => {
                info!("Message cannot be handled in the current state: {:?}", x);
                return false;
            }
        };

        let plaintext = match deserialise(&plaintext_ser) {
            Ok(pt) => pt,
            Err(e) => {
                info!("Error deserialising: {:?}", e);
                return false;
            }
        };

        match plaintext {
            PlainTextMsg::ReqUpdateName(name) => self.handle_update_name(core, poll, name),
            PlainTextMsg::ReqOnlinePeers => self.handle_req_online_peers(core, poll),
            PlainTextMsg::ExchgRendezvousInfo { src_info, dst_peer } => {
                self.forward_rendezvous_impl(core, poll, src_info, dst_peer)
            }
            x => {
                info!("Invalid PlainTextMsg: {:?}", x);
                return false;
            }
        }
    }

    fn handle_update_name(&mut self, core: &mut Core, poll: &Poll, name: String) -> bool {
        let peers = match self.peers.upgrade() {
            Some(peers) => peers,
            None => {
                warn!("Peer list unexpectedly unavailable !");
                return false;
            }
        };

        match mem::replace(&mut self.state, Default::default()) {
            CurrentState::AwaitingPeerName { pk, key } => {
                let id = PeerId::new(name, pk);
                let ciphertext = if id.name.is_empty()
                    || id.name.contains(" ")
                    || peers.borrow().contains_key(&id)
                {
                    trace!("Invalid name or identity already taken - choose a different one");
                    let resp_ser = unwrap!(serialise(&PlainTextMsg::UpdateNameResp(false)));
                    let ciphertext = unwrap!(msg_to_send(&resp_ser, &key));

                    self.state = CurrentState::AwaitingPeerName { pk, key };

                    ciphertext
                } else {
                    let resp_ser = unwrap!(serialise(&PlainTextMsg::UpdateNameResp(true)));
                    let ciphertext = unwrap!(msg_to_send(&resp_ser, &key));

                    self.state = CurrentState::PeerActivated {
                        id: id.clone(),
                        key,
                    };
                    if peers.borrow_mut().insert(id, self.token).is_some() {
                        panic!("Logic Error in updating name: id existed and is now displaced !");
                    }

                    ciphertext
                };

                self.write(core, poll, Some(PeerMsg::CipherText(ciphertext)));
                true
            }
            x => {
                info!("Message cannot be handled in the current state: {:?}", x);
                false
            }
        }
    }

    fn handle_req_online_peers(&mut self, core: &mut Core, poll: &Poll) -> bool {
        let ciphertext = match self.state {
            CurrentState::PeerActivated { ref key, .. } => {
                let peers = match self.peers.upgrade() {
                    Some(peers) => peers,
                    None => {
                        warn!("Peer list unexpectedly unavailable !");
                        return false;
                    }
                };

                let peers: Vec<PeerId> = peers.borrow().keys().cloned().collect();

                let peers_ser = unwrap!(serialise(&PlainTextMsg::OnlinePeersResp(peers)));
                unwrap!(msg_to_send(&peers_ser, key))
            }
            ref x => {
                info!("Message cannot be handled in the current state: {:?}", x);
                return false;
            }
        };

        self.write(core, poll, Some(PeerMsg::CipherText(ciphertext)));

        true
    }

    fn forward_rendezvous_impl(
        &mut self,
        core: &mut Core,
        poll: &Poll,
        src_info: RendezvousInfo,
        dst_peer: PeerId,
    ) -> bool {
        let src_peer = match self.state {
            CurrentState::PeerActivated { ref id, .. } => id.clone(),
            ref x => {
                info!("Message cannot be handled in the current state: {:?}", x);
                return false;
            }
        };

        let peers = match self.peers.upgrade() {
            Some(peers) => peers,
            None => {
                warn!("Peer list unexpectedly unavailable !");
                return false;
            }
        };

        let dst_token = match peers.borrow().get(&dst_peer) {
            Some(&t) => t,
            None => {
                trace!("Destination Peer is no longer online.");
                return true;
            }
        };

        let dst_peer_state = match core.peer_state(dst_token) {
            Some(ps) => ps,
            None => {
                warn!("Destination Peer is online but does not have a peer-state.");
                return true;
            }
        };

        let fwd_info = PlainTextMsg::FwdRendezvousInfo { src_info, src_peer };

        let fwd_info_ser = unwrap!(serialise(&fwd_info));
        dst_peer_state.borrow_mut().write(core, poll, fwd_info_ser);

        true
    }
}

impl CoreState for Peer {
    fn ready(&mut self, core: &mut Core, poll: &Poll, kind: Ready) {
        if kind.is_readable() {
            self.read(core, poll)
        } else if kind.is_writable() {
            self.write(core, poll, None)
        } else {
            warn!("Unknown kind: {:?}", kind);
        }
    }

    fn write(&mut self, core: &mut Core, poll: &Poll, data: Vec<u8>) {
        let ciphertext = match self.state {
            CurrentState::PeerActivated { ref key, .. } => unwrap!(msg_to_send(&data, key)),
            ref x => {
                info!("Message cannot be handled in the current state: {:?}", x);
                return;
            }
        };
        self.write(core, poll, Some(PeerMsg::CipherText(ciphertext)));
    }

    fn terminate(&mut self, core: &mut Core, poll: &Poll) {
        if let Some(peers) = self.peers.upgrade() {
            if let CurrentState::PeerActivated { ref id, .. } = self.state {
                let _ = peers.borrow_mut().remove(&id);
            }
        }
        let _ = poll.deregister(&self.sock);
        let _ = core.remove_peer_state(self.token);
    }

    fn as_any(&mut self) -> &mut Any {
        self
    }
}