p2p 0.6.0

NAT Traversal for P2P communication
pub use self::active_peer::ActivePeer;
pub use self::event::Event;
pub use self::overlay_connect::OverlayConnect;

use common::event_loop::{spawn_event_loop, CoreMsg, El};
use common::read_config;
use common::types::PlainTextMsg;
use maidsafe_utilities::serialisation::serialise;
use maidsafe_utilities::thread::{self, Joiner};
use mio::Token;
use p2p::{Config, Handle, RendezvousInfo};
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration;
use std::time::Instant;

mod active_peer;
mod event;
mod overlay_connect;

#[derive(Serialize, Deserialize)]
pub struct FullConfig {
    pub peer_cfg: PeerConfig,
    pub p2p_cfg: Config,
}

#[derive(Serialize, Deserialize)]
pub struct PeerConfig {
    overlay_addr: SocketAddr,
}

#[derive(Debug)]
pub enum PeerState {
    Discovered,
    CreatingRendezvousInfo {
        mediator_token: Token,
        peer_info: Option<RendezvousInfo>,
    },
    AwaitingPeerRendezvous {
        since: Instant,
        p2p_handle: Handle,
    },
    AwaitingHolePunchResult,
    Connected(Token),
}

impl Default for PeerState {
    fn default() -> Self {
        PeerState::Discovered
    }
}

const MENU: &str = "
 -------------------------------------------
|  ======
| | Menu |
|  ======
| 1) Show Online & Connected (*) Peers
| 2) Refresh Online Peers List
| 3) Connect to
| 4) Chat with
| 5) Information about inputs to (2) and (3)
| 0) Quit/Exit
 -------------------------------------------
";

pub fn entry_point() {
    let cfg: FullConfig = read_config("./peer-config");

    let el = spawn_event_loop(cfg.p2p_cfg);
    let peer_cfg = cfg.peer_cfg;

    println!("Enter Name [Name must be unique (preferably) and cannot contain spaces]:");

    let mut name = String::new();
    loop {
        unwrap!(io::stdin().read_line(&mut name));
        name = name.trim().to_string();
        if name.is_empty() || name.contains(" ") {
            println!("Invalid Name. Choose a valid Name:");
            name.clear();
        } else {
            break;
        }
    }

    let (event_tx, event_rx) = mpsc::channel();
    let peers = Arc::new(Mutex::new(Default::default()));

    {
        let tx = event_tx.clone();
        let name = name.clone();
        let peers = peers.clone();
        let overlay = peer_cfg.overlay_addr;
        unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| {
            OverlayConnect::start(core, poll, &overlay, name, peers, tx);
        })));
    }

    let event = unwrap!(event_rx.recv_timeout(Duration::from_secs(5)));
    println!("{}", event);
    let overlay_token = match event {
        Event::OverlayConnected(t) => t,
        x => panic!("Unexpected event: {:?}", x),
    };

    let _j = print_events(event_rx);

    let mut choice = String::new();
    loop {
        println!("\n{}\nChoose an option:", MENU);
        unwrap!(io::stdin().read_line(&mut choice));
        choice = choice.trim().to_string();

        if choice == "1" {
            let mut list = String::new();
            unwrap!(peers.lock())
                .iter()
                .for_each(|(ref id, ref peer_state)| {
                    list.push_str(&format!(
                        "{} {}\n",
                        id,
                        if let PeerState::Connected(_) = peer_state {
                            "*"
                        } else {
                            ""
                        }
                    ))
                });
            if list.is_empty() {
                list = "List is empty. Try refreshing.".to_string();
            }
            println!("List:\n{}", list);
        } else if choice == "2" {
            print!("Refreshing... ");
            unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| {
                if let Some(overlay) = core.peer_state(overlay_token) {
                    let m = unwrap!(serialise(&PlainTextMsg::ReqOnlinePeers));
                    overlay.borrow_mut().write(core, poll, m);
                }
            })));
        } else if choice == "3" {
            println!("Enter peer id. Partial id/name can be given:");

            let mut peer_choice = String::new();
            unwrap!(io::stdin().read_line(&mut peer_choice));
            peer_choice = peer_choice.trim().to_string();

            let mut found_peer = None;
            {
                let peers_guard = unwrap!(peers.lock());
                for (id, peer_state) in &*peers_guard {
                    let peer_fmt = format!("{}", id);
                    if peer_fmt.contains(&peer_choice) {
                        if let PeerState::Discovered = peer_state {
                            if found_peer.is_some() {
                                println!(
                                    "Ambiguous, multiple matches found. More qualification needed."
                                );
                                found_peer = None;
                                break;
                            } else {
                                found_peer = Some(id.clone());
                            }
                        } else {
                            println!(
                                "Peer is either in the process of being connected or is already \
                                 connected. Check status after sometime and retry if necessary."
                            );
                            found_peer = None;
                            break;
                        }
                    }
                }
            }

            if let Some(peer_id) = found_peer {
                unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| {
                    let overlay = unwrap!(core.peer_state(overlay_token));
                    let mut overlay = overlay.borrow_mut();
                    let overlay_connect =
                        unwrap!(overlay.as_any().downcast_mut::<OverlayConnect>());
                    overlay_connect.start_connect_with_peer(core, poll, peer_id.clone());
                })));
            } else {
                println!("Aborting due to previous errors (if printed) or due to peer not found.");
            }
        } else if choice == "4" {
            println!("Enter peer id. Partial id/name can be given:");

            let mut peer_choice = String::new();
            unwrap!(io::stdin().read_line(&mut peer_choice));
            peer_choice = peer_choice.trim().to_string();

            let mut found_peer = None;
            {
                let peers_guard = unwrap!(peers.lock());
                for (id, peer_state) in &*peers_guard {
                    let peer_fmt = format!("{}", id);
                    if peer_fmt.contains(&peer_choice) {
                        if let PeerState::Connected(token) = peer_state {
                            if found_peer.is_some() {
                                println!(
                                    "Ambiguous, multiple matches found. More qualification needed."
                                );
                                found_peer = None;
                                break;
                            } else {
                                found_peer = Some(*token);
                            }
                        } else {
                            println!(
                                "Peer is not connected. Check status after sometime and retry if \
                                 necessary."
                            );
                            found_peer = None;
                            break;
                        }
                    }
                }
            }

            if let Some(token) = found_peer {
                let (tx, rx) = mpsc::channel();
                let tx_clone = tx.clone();
                unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| {
                    let peer = unwrap!(core.peer_state(token));
                    let mut peer = peer.borrow_mut();
                    let active_peer = unwrap!(peer.as_any().downcast_mut::<ActivePeer>());
                    active_peer.flush_and_stop_buffering();

                    unwrap!(tx_clone.send(()));
                })));

                unwrap!(rx.recv());
                let disconnected = start_chat(&el, token);

                if !disconnected {
                    unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| {
                        let peer = unwrap!(core.peer_state(token));
                        let mut peer = peer.borrow_mut();
                        let active_peer = unwrap!(peer.as_any().downcast_mut::<ActivePeer>());
                        active_peer.start_buffering();

                        unwrap!(tx.send(()));
                    })));
                    unwrap!(rx.recv());
                }
            } else {
                println!("Aborting due to previous errors (if printed) or due to peer not found.");
            }
        } else if choice == "5" {
            println!(
                "E.g. if the list has a peer id as \"Blah (1baf3e..)\" you may enter the \
                 whole thing as is, or just \"Blah\" or just \"lah\" or just \"af3e\" etc. all \
                 without quotes. However if the list also contains someone else like \"Blah-blah\" \
                 then \"lah\" will match that too and return error for non unique match. In such \
                 cases qualify more until you have a unique match, giving the whole ID in the worst \
                 case."
            );
        } else if choice == "0" {
            break;
        } else {
            println!("Invalid option !");
        }

        choice.clear();
    }

    unwrap!(event_tx.send(Event::Quit));
}

fn start_chat(el: &El, peer: Token) -> bool {
    println!("Enter (without quotes) \"<quit>\" to exit this chat.");

    let mut disconnected = false;
    let (tx, rx) = mpsc::channel();
    loop {
        let mut input = String::new();
        let _ = unwrap!(io::stdin().read_line(&mut input));
        input = input.trim().to_owned();

        if input == "<quit>" {
            break;
        }

        let m = unwrap!(serialise(&PlainTextMsg::Chat(input)));

        let tx = tx.clone();
        unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| {
            if let Some(active_peer) = core.peer_state(peer) {
                active_peer.borrow_mut().write(core, poll, m);
                unwrap!(tx.send(true));
            } else {
                unwrap!(tx.send(false));
            }
        })));

        if !unwrap!(rx.recv()) {
            println!("Peer is now disconnected. Try reconnecting to chat again.");
            disconnected = true;
            break;
        }
    }

    disconnected
}

fn print_events(rx: mpsc::Receiver<Event>) -> Joiner {
    thread::named("Event-Rx", move || {
        for event in rx.iter() {
            match event {
                Event::Quit => break,
                e => println!("{}", e),
            }
        }
    })
}