1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
use crate::network_adapter::{self, Controller, Listener, Remote};
use crate::encoding::{self, DecodingPool};

use serde::{Serialize, Deserialize};

use std::sync::{Arc, Mutex, atomic::{AtomicBool, Ordering}};
use std::net::{SocketAddr, ToSocketAddrs};
use std::thread::{self, JoinHandle};
use std::time::{Duration};
use std::io::{self};

pub use crate::network_adapter::{Endpoint};

const NETWORK_SAMPLING_TIMEOUT: u64 = 50; //ms
const INPUT_BUFFER_SIZE: usize = 65536;

/// Input network events.
#[derive(Debug)]
pub enum NetEvent<InMessage>
where InMessage: for<'b> Deserialize<'b> + Send + 'static {
    /// Input message received by the network.
    Message(Endpoint, InMessage),

    /// New endpoint added to a listener.
    /// It will be sent when a new connection was accepted by the listener.
    /// This event will be sent only in TCP.
    AddedEndpoint(Endpoint),

    /// A connection lost event.
    /// This event is only dispatched when a connection is lost. Call to `remove_resource()` will not generate the event.
    /// After this event, the resource is considered removed.
    /// A Message event will never be generated after this event.
    /// This event will be sent only in TCP. Because UDP is not connection oriented, the event can no be detected.
    RemovedEndpoint(Endpoint),
}

/// NetworkManager allows to manage the network easier.
/// It is in mainly in charge to transform raw data from the network into message events and vice versa.
pub struct NetworkManager {
    network_event_thread: Option<JoinHandle<()>>,
    network_thread_running: Arc<AtomicBool>,
    network_controller: Arc<Mutex<Controller>>,
    output_buffer: Vec<u8>,
}

impl<'a> NetworkManager {
    /// Creates a new [NetworkManager].
    /// The user must register an event_callback that can be called each time the network generate and [NetEvent]
    pub fn new<InMessage, C>(event_callback: C) -> NetworkManager
    where InMessage: for<'b> Deserialize<'b> + Send + 'static,
          C: Fn(NetEvent<InMessage>) + Send + 'static {
        let (network_controller, mut network_receiver) = network_adapter::adapter();

        let network_thread_running = Arc::new(AtomicBool::new(true));
        let running = network_thread_running.clone();

        let network_event_thread = thread::Builder::new().name("message-io: network".into()).spawn(move || {
            let mut input_buffer = [0; INPUT_BUFFER_SIZE];
            let mut decoding_pool = DecodingPool::new();
            let timeout = Duration::from_millis(NETWORK_SAMPLING_TIMEOUT);
            while running.load(Ordering::Relaxed) {
                network_receiver.receive(&mut input_buffer[..], Some(timeout), |endpoint, event| {
                    Self::process_network_event(&event_callback, endpoint, event, &mut decoding_pool);
                });
            }
        }).unwrap();

        NetworkManager {
            network_event_thread: Some(network_event_thread),
            network_thread_running,
            network_controller,
            output_buffer: Vec::with_capacity(encoding::PADDING),
        }
    }

    fn process_network_event<'c, InMessage, C>(
        event_callback: &C,
        endpoint: Endpoint,
        event: network_adapter::Event<'c>,
        decoding_pool: &mut DecodingPool<Endpoint>
        )
    where
        InMessage: for<'b> Deserialize<'b> + Send + 'static,
        C: Fn(NetEvent<InMessage>) + Send + 'static {

        match event {
            network_adapter::Event::Connection => {
                log::trace!("Connected endpoint {}", endpoint);
                event_callback(NetEvent::AddedEndpoint(endpoint));
            },
            network_adapter::Event::Data(data) => {
                log::trace!("Data received from {}, {} bytes", endpoint, data.len());
                decoding_pool.decode_from(data, endpoint, |decoded_data|{
                    log::trace!("Message received from {}, {} bytes", endpoint, decoded_data.len());
                    let message: InMessage = bincode::deserialize(decoded_data).unwrap();
                    event_callback(NetEvent::Message(endpoint, message));
                });
            },
            network_adapter::Event::Disconnection => {
                log::trace!("Disconnected endpoint {}", endpoint);
                event_callback(NetEvent::RemovedEndpoint(endpoint));
            },
        };
    }

    /// Creates a connection to the specific address by TCP.
    /// The endpoint, an identified of the new connection, will be returned.
    /// If the connection can not be performed (e.g. the address is not reached) an error is returned.
    pub fn connect_tcp<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<Endpoint> {
        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
        Remote::new_tcp(addr).map(|remote| {
            self.network_controller.lock().unwrap().add_remote(remote)
        })
    }

    /// Creates a connection to the specific address by UDP.
    /// The endpoint, an identified of the new connection, will be returned.
    /// If there is an error during the socket creation, an error will be returned.
    pub fn connect_udp<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<Endpoint> {
        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
        Remote::new_udp(addr).map(|remote| {
            self.network_controller.lock().unwrap().add_remote(remote)
        })
    }

    /// Open a port to listen messages from TCP.
    /// If the port can be opened, an resource id identifying the listener is returned along with the local address, or an error if not.
    pub fn listen_tcp<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<(usize, SocketAddr)> {
        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
        Listener::new_tcp(addr).map(|listener| {
            self.network_controller.lock().unwrap().add_listener(listener)
        })
    }

    /// Open a port to listen messages from UDP.
    /// If the port can be opened, an resource id identifying the listener is returned along with the local address, or an error if not.
    pub fn listen_udp<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<(usize, SocketAddr)> {
        let addr = addr.to_socket_addrs().unwrap().next().unwrap();
        Listener::new_udp(addr).map(|listener| {
            self.network_controller.lock().unwrap().add_listener(listener)
        })
    }

    /// Open a port to listen messages from UDP in multicast.
    /// If the port can be opened, an resource id identifying the listener is returned along with the local address, or an error if not.
    /// Only ipv4 addresses are allowed.
    pub fn listen_udp_multicast<A: ToSocketAddrs>(&mut self, addr: A) -> io::Result<(usize, SocketAddr)> {
        match addr.to_socket_addrs().unwrap().next().unwrap() {
            SocketAddr::V4(addr) => {
                Listener::new_udp_multicast(addr).map(|listener| {
                    self.network_controller.lock().unwrap().add_listener(listener)
                })
            },
            _ => panic!("listening for udp multicast is only supported for ipv4 addresses"),
        }
    }

    /// Remove a network resource.
    /// Returns `None` if the resource id not exists.
    /// This is used mainly to remove resources that the program has been created explicitely, as connection or listeners.
    /// Resources of endpoints generated by a TcpListener can also be removed to close the connection.
    /// Note: Udp endpoints generated by a UdpListener shared the resource, the own UdpListener.
    /// This means that there is no resource to remove as the TCP case.
    pub fn remove_resource(&mut self, resource_id: usize) -> Option<()> {
        self.network_controller.lock().unwrap().remove_resource(resource_id)
    }

    /// Request a local address of a resource.
    /// Returns `None` if the endpoint id not exists.
    /// Note: Udp endpoints generated by a UdpListener shared the resource.
    pub fn local_address(&self, resource_id: usize) -> Option<SocketAddr> {
        self.network_controller.lock().unwrap().local_address(resource_id)
    }

    /// Serialize and send the message thought the connection represented by the given endpoint.
    /// If the same message should be sent to different endpoints, use `send_all()` to better performance.
    /// Returns an error if there is an error while sending the message, the endpoint does not exists, or if it is not valid.
    pub fn send<OutMessage>(&mut self, endpoint: Endpoint, message: OutMessage) -> io::Result<()>
    where OutMessage: Serialize {
        self.prepare_output_message(message);
        let result = self.network_controller.lock().unwrap().send(endpoint, &self.output_buffer);
        self.output_buffer.clear();
        if let Ok(_) = result {
            log::trace!("Message sent to {}", endpoint);
        }
        result
    }

    /// Serialize and send the message thought the connections represented by the given endpoints.
    /// When there are severals endpoints to send the data, this function is faster than consecutive calls to `send()`
    /// since the serialization is only performed one time for all endpoints.
    /// An list of erroneous endpoints along their errors is returned if there was a problem with some message sent.
    pub fn send_all<'b, OutMessage>(&mut self, endpoints: impl IntoIterator<Item=&'b Endpoint>, message: OutMessage) -> Result<(), Vec<(Endpoint, io::Error)>>
    where OutMessage: Serialize {
        self.prepare_output_message(message);
        let mut errors = Vec::new();
        let mut controller = self.network_controller.lock().unwrap();
        for endpoint in endpoints {
            match controller.send(*endpoint, &self.output_buffer) {
                Ok(_) => log::trace!("Message sent to {}", endpoint),
                Err(err) => errors.push((*endpoint, err))
            }
        }
        self.output_buffer.clear();
        if errors.is_empty() { Ok(()) } else { Err(errors) }
    }

    fn prepare_output_message<OutMessage>(&mut self, message: OutMessage)
    where OutMessage: Serialize {
        self.output_buffer.resize(encoding::PADDING, 0);
        bincode::serialize_into(&mut self.output_buffer, &message).unwrap();
        encoding::encode(&mut self.output_buffer);
    }
}

impl Drop for NetworkManager {
    fn drop(&mut self) {
        self.network_thread_running.store(false, Ordering::Relaxed);
        self.network_event_thread.take().unwrap().join().unwrap();
    }
}