1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
extern crate bincode;
extern crate serde;
extern crate net2;
extern crate crossbeam;
extern crate term_size;

mod server;
pub mod discovery;

use crossbeam::thread;

use server::Server;
use discovery::DiscoveryServer;

use std::io;
use std::io::Write;
use std::io::Read;
use std::time::Duration;
use std::net::{SocketAddr, SocketAddrV4, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};

const READ_BUFFER_SIZE: usize = 1024;

pub fn talk<R>(
        discovery_addr: &SocketAddrV4,
        users: Option<&Vec<String>>,
        user_name: &str,
        mut input: R) -> bool
where
    R: Read + 'static
{
    let remotes = discovery::discover(&discovery_addr);
    let remotes = match users {
        Some(users) => remotes.into_iter().filter(|r| users.iter().any(|u| *u == r.name)).collect(),
        None => remotes,
    };

    let mut connections = vec![];
    for remote in remotes.iter() {
        let mut connection = TcpStream::connect(remote.addr).unwrap();
        let serialized_user_name = bincode::serialize(&user_name).unwrap();
        connection.write(&serialized_user_name).unwrap();
        connections.push(connection);
    }

    loop {
        let mut input_buffer = [0; READ_BUFFER_SIZE];
        let size = input.read(&mut input_buffer).unwrap();
        if size == 0 {
            break;
        }

        connections.retain(|mut connection|{
            match connection.write(&input_buffer[0..size]) {
                Ok(_) => true,
                Err(e) => match e.kind() {
                    io::ErrorKind::BrokenPipe => false,
                    io::ErrorKind::ConnectionReset => false,
                    _ => Err(e).unwrap(),
                },
            }
        });

        if connections.len() == 0 {
            return false;
        }
    }
    true
}

pub fn listen<W, C>(
        discovery_addr: &SocketAddrV4,
        users: Option<&Vec<String>>,
        user_name: &str,
        service_addr: &SocketAddr,
        once: bool,
        mut callback: C,
        mut output: W)
where
    C: FnMut(&str, &SocketAddr) + Send + Sync,
    W: Write + Send
{
    let mut last_user = String::new();
    let mut last_remote = "0.0.0.0:0".parse().unwrap();
    let on_data = |user: &str, remote: &SocketAddr, data: &[u8]| -> bool {
        if let Some(users) = users {
            if !users.iter().any(|u| u == user) {
                return false;
            }
        }
        if last_user != user || last_remote != *remote {
            callback(user, remote);
            last_user = String::from(user);
            last_remote = remote.clone();
        }
        output.write(data).unwrap();
        true
    };
    let mut server = Server::new(&service_addr, once, on_data);
    let discovery_server = DiscoveryServer::new(&discovery_addr, &user_name, server.get_listener_port());

    let running = AtomicBool::new(true);
    thread::scope(|s| {
        s.spawn(|_| {
            while running.load(Ordering::Relaxed) {
                let alive = server.listen(Some(Duration::from_millis(100)));
                running.store(alive, Ordering::Relaxed);
            }
        });

        s.spawn(|_| {
            while running.load(Ordering::Relaxed) {
                discovery_server.listen(Some(Duration::from_millis(100)));
            }
        });
    }).unwrap();
}