1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

use crate::MAX_FRAME_SIZE;
use crate::frame;

use super::endpoint;
use super::peer;
use super::udp_frame_sink::UdpFrameSink;

use std::cell::RefCell;
use std::collections::HashMap;
use std::net;
use std::rc::Rc;

/// A polling-based socket object which manages inbound `uflow` connections.
pub struct Server {
    socket: net::UdpSocket,

    max_connections: usize,
    incoming_config: endpoint::Config,

    endpoints: HashMap<net::SocketAddr, Rc<RefCell<endpoint::Endpoint>>>,
    incoming_peers: Vec<peer::Peer>,
}

impl Server {
    /// Opens a UDP socket and creates a corresponding [`Server`](Self) object. The UDP socket is
    /// bound to the provided address, and configured to be non-blocking. Any errors resulting from
    /// socket initialization are forwarded to the caller.
    ///
    /// The server will limit the number of concurrent, non-zombie connections to
    /// `max_connections`, and will silently ignore connection requests which would exceed that
    /// limit. Valid incoming connections will be initialized according to `incoming_config`.
    pub fn bind<A: net::ToSocketAddrs>(addr: A,
                                       max_connections: usize,
                                       incoming_config: endpoint::Config) -> Result<Self, std::io::Error> {
        assert!(incoming_config.is_valid(), "invalid endpoint config");

        let socket = net::UdpSocket::bind(addr)?;

        socket.set_nonblocking(true)?;

        Ok(Self {
            socket,

            endpoints: HashMap::new(),
            incoming_peers: Vec::new(),

            max_connections,
            incoming_config,
        })
    }

    /// Equivalent to calling [`bind()`](Self::bind) with address
    /// `(`[`std::net::Ipv4Addr::UNSPECIFIED`](std::net::Ipv4Addr::UNSPECIFIED)`, 0)`.
    pub fn bind_any_ipv4(max_connections: usize, incoming_config: endpoint::Config) -> Result<Self, std::io::Error> {
        Self::bind((net::Ipv4Addr::UNSPECIFIED, 0), max_connections, incoming_config)
    }

    /// Equivalent to calling [`bind()`](Self::bind) with address
    /// `(`[`std::net::Ipv6Addr::UNSPECIFIED`](std::net::Ipv6Addr::UNSPECIFIED)`, 0)`.
    pub fn bind_any_ipv6(max_connections: usize, incoming_config: endpoint::Config) -> Result<Self, std::io::Error> {
        Self::bind((net::Ipv6Addr::UNSPECIFIED, 0), max_connections, incoming_config)
    }

    /// Returns a number of [`Peer`](peer::Peer) objects representing new connections since the
    /// last call to [`incoming()`](Self::incoming). Any connections which have since entered the
    /// zombie state (timed out) will not be returned.
    pub fn incoming(&mut self) -> impl Iterator<Item = peer::Peer> {
        std::mem::take(&mut self.incoming_peers).into_iter()
    }

    /// Processes UDP frames received since the last call to [`step()`](Self::step), and
    /// sends any pending outbound frames (acknowledgements, keep-alives, packet data, etc.).
    ///
    /// Current, non-zombie [`Peer`](peer::Peer) objects will be updated as relevant data is
    /// received. Call [`Peer::poll_events()`](peer::Peer::poll_events) after calling
    /// [`step()`](Self::step) to retrieve incoming packets and connection status updates for an
    /// individual peer.
    pub fn step(&mut self) {
        let mut frame_data_buf = [0; MAX_FRAME_SIZE];

        while let Ok((frame_size, address)) = self.socket.recv_from(&mut frame_data_buf) {
            use frame::serial::Serialize;

            if let Some(frame) = frame::Frame::read(&frame_data_buf[ .. frame_size]) {
                self.handle_frame(address, frame);
            }
        }

        self.flush();

        self.endpoints.retain(|_, endpoint| !endpoint.borrow().is_zombie());
        self.incoming_peers.retain(|client| !client.is_zombie());
    }

    /// Sends any pending outbound frames (acknowledgements, keep-alives, packet data, etc.).
    pub fn flush(&mut self) {
        for (&address, endpoint) in self.endpoints.iter_mut() {
            let ref mut data_sink = UdpFrameSink::new(&self.socket, address);

            endpoint.borrow_mut().flush(data_sink);
        }
    }

    /// Returns the local address of the internal UDP socket.
    pub fn address(&self) -> net::SocketAddr {
        self.socket.local_addr().unwrap()
    }

    fn handle_frame(&mut self, address: net::SocketAddr, frame: frame::Frame) {
        if let Some(endpoint) = self.endpoints.get_mut(&address) {
            let ref mut data_sink = UdpFrameSink::new(&self.socket, address);

            endpoint.borrow_mut().handle_frame(frame, data_sink);
        } else {
            if self.endpoints.len() < self.max_connections as usize {
                let ref mut data_sink = UdpFrameSink::new(&self.socket, address);

                let mut endpoint = endpoint::Endpoint::new(self.incoming_config.clone());
                endpoint.handle_frame(frame, data_sink);

                let endpoint_ref = Rc::new(RefCell::new(endpoint));
                self.endpoints.insert(address, Rc::clone(&endpoint_ref));
                self.incoming_peers.push(peer::Peer::new(address, endpoint_ref));
            }
        }
    }
}