#[allow(unused_imports)] use super::*;
use std::boxed::Box;
use std::io::{self, Read};
use std::net::SocketAddr;
use mio::event::Event;
use mio::net::{TcpListener, TcpStream, UdpSocket};
use mio::{Events, Interest, Poll, Token, Waker};
use channel::TryRecvError;
use ringbuf::RingBuffer;
use super_slab::SuperSlab;
pub mod net {
pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
}
pub fn new_network_factory() -> NetworkFactoryObj {
let (sender, receiver) = channel_with_capacity::<NetCmd>(250);
Arc::new(SystemNetworkFactory { sender, receiver })
}
type NetReceiver = Receiver<NetCmd>;
struct SystemNetworkFactory {
sender: NetSender,
receiver: NetReceiver,
}
impl NetworkFactory for SystemNetworkFactory {
fn get_sender(&self) -> NetSender { self.sender.clone() }
fn start(&self) -> NetworkControlObj {
log::info!("creating network");
let res = Mio::new(self.sender.clone(), self.receiver.clone());
Arc::new(res)
}
}
struct Mio {
sender: NetSender,
thread: Option<thread::JoinHandle<()>>,
}
impl Mio {
fn stop(&self) {
log::info!("stopping network");
send_cmd(&self.sender, NetCmd::Stop);
}
fn new(sender: NetSender, receiver: NetReceiver) -> Self {
let thread = NetworkThread::spawn(receiver);
Self { sender, thread }
}
}
impl NetworkControl for Mio {
fn stop(&self) { self.stop(); }
}
impl Drop for Mio {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
send_cmd(&self.sender, NetCmd::Stop);
log::info!("synchronizing Network shutdown");
if thread.join().is_err() {
log::trace!("failed to join Network thread");
}
}
log::info!("Network shutdown complete");
}
}
const POLL_INTERVAL: Duration = Duration::from_secs(60);
const EVENT_CAPACITY: usize = 512;
const MAX_SERVERS: usize = 32;
const MAX_BYTES: usize = 4096;
const MAX_CONNECTIONS: usize = 10000;
#[derive(Debug)]
enum Listener {
TcpListener(TcpListener),
UdpListener(UdpSocket),
}
#[derive(Debug)]
struct Server {
is_dead: bool,
bind_addr: String,
listener: Listener,
sender: NetSender,
}
struct Connection {
sender: NetSender,
stream: TcpStream,
is_ready: bool,
consumer: ringbuf::Consumer<u8>,
producer: ringbuf::Producer<u8>,
send_count: usize,
}
impl Connection {
fn new(sender: NetSender, stream: TcpStream) -> Self {
let (producer, consumer) = RingBuffer::<u8>::new(MAX_BYTES).split();
Self {
sender,
stream,
is_ready: false,
consumer,
producer,
send_count: 0,
}
}
fn send_bytes(&mut self, token: &Token) -> net::Result<()> {
if self.consumer.is_empty() {
return Ok(());
}
let result: net::Result<()> = loop {
match self.consumer.write_into(&mut self.stream, None) {
Ok(n) => {
self.send_count += n;
break Ok(());
},
Err(ref err) if would_block(err) => {
self.is_ready = false;
break Ok(());
},
Err(ref err) if interrupted(err) => (),
Err(err) => break Err(Box::new(err)),
}
};
if self.send_count > MAX_BYTES / 2 {
self.send_count = 0;
send_cmd(&self.sender, NetCmd::SendReady(token.0, self.producer.remaining()));
}
result
}
}
struct NetworkWaker {
waker: Arc<Waker>,
mio_receiver: NetReceiver,
waker_receiver: NetReceiver,
}
impl NetworkWaker {
fn run(&mut self) {
let mut sel = crossbeam::channel::Select::new();
sel.recv(&self.waker_receiver.receiver);
sel.recv(&self.mio_receiver.receiver);
log::info!("waker is starting");
'outer: loop {
let idx = sel.ready();
match idx {
0 => {
if self.check_yourself_before_you_wreck_yourself() {
break;
}
},
1 => loop {
if self.mio_receiver.receiver.is_empty() {
break;
}
self.waker.wake().expect("unable to wake");
std::thread::sleep(Duration::from_millis(10));
if self.check_yourself_before_you_wreck_yourself() {
break 'outer;
}
},
_ => (),
};
}
log::info!("waker has stopped")
}
fn check_yourself_before_you_wreck_yourself(&self) -> bool {
match self.waker_receiver.receiver.try_recv() {
Ok(NetCmd::Stop) => true,
Ok(_) => false,
Err(TryRecvError::Disconnected) => true,
Err(TryRecvError::Empty) => false,
}
}
}
const WAKE_TOKEN: Token = Token(1023);
struct NetworkThread {
receiver: NetReceiver,
waker_sender: NetSender,
is_running: bool,
poll: Poll,
connections: SuperSlab<Connection>,
servers: SuperSlab<Server>,
waker_thread: Option<thread::JoinHandle<()>>,
}
impl NetworkThread {
fn spawn(receiver: NetReceiver) -> Option<thread::JoinHandle<()>> {
log::info!("Starting Network");
let thread = std::thread::spawn(move || {
let (waker_sender, waker_receiver) = channel::<NetCmd>();
let mut net_thread = Self {
receiver,
waker_sender,
is_running: true,
poll: Poll::new().unwrap(),
connections: SuperSlab::with_capacity(MAX_CONNECTIONS),
servers: SuperSlab::with_capacity(MAX_SERVERS),
waker_thread: None,
};
if net_thread.run(waker_receiver).is_err() {}
if let Some(thread) = net_thread.waker_thread.take() {
if thread.join().is_err() {
log::trace!("failed to join waker thread");
}
}
});
Some(thread)
}
fn run(&mut self, waker_receiver: NetReceiver) -> net::Result<()> {
let mut events = Events::with_capacity(EVENT_CAPACITY);
let waker = Arc::new(Waker::new(self.poll.registry(), WAKE_TOKEN)?);
{
let waker = Arc::clone(&waker);
let mio_receiver = self.receiver.clone();
let thread = std::thread::spawn(move || {
let mut net_waker = NetworkWaker {
waker,
mio_receiver,
waker_receiver,
};
net_waker.run();
});
self.waker_thread = Some(thread);
}
while self.is_running {
self.poll.poll(&mut events, Some(POLL_INTERVAL)).unwrap();
for event in events.iter() {
match event.token() {
token if token.0 == 1023 => (),
token if token.0 < 1023 => {
while let Some(server) = self.servers.get_mut(token.0) {
match &server.listener {
Listener::TcpListener(listener) => {
match listener.accept() {
Ok((connection, address)) => self.store_connection(&token, connection, address)?,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
},
Err(_e) => {
server.is_dead = true;
break;
},
};
},
Listener::UdpListener(socket) => {
let mut buf = [0; 1 << 16];
match socket.recv_from(&mut buf) {
Ok((packet_size, source_addr)) => {
let cmd = NetCmd::RecvPkt(
token.0,
server.bind_addr.to_string(),
source_addr.to_string(),
buf[.. packet_size].to_vec(),
);
server.sender.send(cmd)?
},
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
break;
},
Err(_e) => {
server.is_dead = true;
break;
},
}
},
}
}
},
token => {
match self.handle_connection_event(&token, event) {
Ok(true) => match self.remove_connection(&token, true) {
Ok(_) => (),
Err(_e) => (), },
Err(_e) => (), _ => (),
};
},
}
}
let _result = loop {
let cmd = self.receiver.try_recv();
let result = match cmd {
Ok(NetCmd::Stop) => {
self.is_running = false;
break Ok(());
},
Ok(NetCmd::BindListener(addr, sender)) => self.bind_tcp_listener(addr, sender),
Ok(NetCmd::BindUdpListener(addr, sender)) => self.bind_udp_listener(addr, sender),
Ok(NetCmd::BindConn(conn_id, sender)) => self.bind_connection(conn_id, sender),
Ok(NetCmd::CloseConn(conn_id)) => self.close_connection(conn_id),
Ok(NetCmd::SendBytes(conn_id, bytes)) => self.send_bytes(conn_id, bytes),
Ok(NetCmd::SendPkt(conn_id, destination, bytes)) => self.send_pkt(conn_id, destination, bytes),
Ok(_) => {
log::warn!("unhandled NetCmd");
Ok(())
},
Err(TryRecvError::Disconnected) => {
self.is_running = false;
break Ok(());
},
Err(TryRecvError::Empty) => break Ok(()),
};
if result.is_err() {
break result;
}
};
}
send_cmd(&self.waker_sender, NetCmd::Stop);
Ok(())
}
fn handle_connection_event(&mut self, token: &Token, event: &Event) -> net::Result<bool> {
let key = token.0 - 1024;
if let Some(conn) = self.connections.get_mut(key) {
if event.is_writable() {
conn.is_ready = true;
conn.send_bytes(token)?;
}
if event.is_readable() {
let mut connection_closed = false;
let mut received_data = Vec::with_capacity(MAX_BYTES);
loop {
let mut buf = [0; 256];
match conn.stream.read(&mut buf) {
Ok(0) => {
connection_closed = true;
break;
},
Ok(n) => received_data.extend_from_slice(&buf[.. n]),
Err(ref err) if would_block(err) => break,
Err(ref err) if interrupted(err) => continue,
Err(err) => return Err(Box::new(err)),
}
}
if !received_data.is_empty() {
send_cmd(&conn.sender, NetCmd::RecvBytes(token.0, received_data));
}
if connection_closed {
return Ok(true);
}
}
}
Ok(false)
}
fn remove_connection(&mut self, token: &Token, notify: bool) -> net::Result<()> {
let key = token.0 - 1024;
let mut conn = self.connections.remove(key);
if notify {
send_cmd(&conn.sender, NetCmd::CloseConn(token.0));
}
self.poll.registry().deregister(&mut conn.stream)?;
Ok(())
}
fn send_bytes(&mut self, conn_id: NetConnId, bytes: Vec<u8>) -> net::Result<()> {
let key: usize = conn_id - 1024;
let result = if let Some(conn) = self.connections.get_mut(key) {
let _count = conn.producer.push_slice(bytes.as_slice());
if conn.is_ready {
conn.send_bytes(&Token(conn_id))
} else {
Ok(())
}
} else {
Ok(())
};
result
}
fn send_pkt(&self, conn_id: NetConnId, destination: String, bytes: Vec<u8>) -> net::Result<()> {
let key: usize = conn_id;
let dest_addr: SocketAddr = destination.parse()?;
let result = if let Some(server) = self.servers.get(key) {
if let Listener::UdpListener(socket) = &server.listener {
socket.send_to(&bytes, dest_addr)?;
Ok(())
} else {
log::debug!("connection {} doesn't have a UDP socket associated with it", conn_id);
Ok(())
}
} else {
log::debug!("connection {} doesn't exist", conn_id);
Ok(())
};
result
}
fn close_connection(&mut self, conn_id: NetConnId) -> net::Result<()> {
let token = Token(conn_id);
self.remove_connection(&token, false)
}
fn bind_connection(&mut self, conn_id: NetConnId, sender: NetSender) -> net::Result<()> {
let token = Token(conn_id);
let key: usize = token.0 - 1024;
if let Some(conn) = self.connections.get_mut(key) {
conn.sender = sender;
self.poll
.registry()
.register(&mut conn.stream, token, Interest::READABLE.add(Interest::WRITABLE))?;
}
Ok(())
}
fn bind_tcp_listener(&mut self, addr: String, sender: NetSender) -> net::Result<()> {
let entry = self.servers.vacant_entry();
let key = entry.key();
let token = Token(key);
let bind_addr = addr.parse().unwrap();
let mut listener = TcpListener::bind(bind_addr)?;
self.poll.registry().register(&mut listener, token, Interest::READABLE)?;
let server = Server {
is_dead: false,
bind_addr: addr,
listener: Listener::TcpListener(listener),
sender,
};
entry.insert(server);
Ok(())
}
fn bind_udp_listener(&mut self, addr: String, sender: NetSender) -> net::Result<()> {
let entry = self.servers.vacant_entry();
let key = entry.key();
let token = Token(key);
let bind_addr = addr.parse().unwrap();
let mut listener = UdpSocket::bind(bind_addr)?;
self.poll.registry().register(&mut listener, token, Interest::READABLE)?;
let server = Server {
is_dead: false,
bind_addr: addr,
listener: Listener::UdpListener(listener),
sender,
};
entry.insert(server);
Ok(())
}
fn store_connection(&mut self, server_token: &Token, connection: TcpStream, address: SocketAddr) -> net::Result<()> {
if let Some(server) = self.servers.get(server_token.0) {
let entry = self.connections.vacant_entry();
let key = entry.key();
let token = Token(key + 1024);
let conn = Connection::new(server.sender.clone(), connection);
entry.insert(conn);
send_cmd(
&server.sender,
NetCmd::NewConn(token.0, server.bind_addr.to_string(), address.to_string(), MAX_BYTES),
);
}
Ok(())
}
}
fn would_block(err: &io::Error) -> bool { err.kind() == io::ErrorKind::WouldBlock }
fn interrupted(err: &io::Error) -> bool { err.kind() == io::ErrorKind::Interrupted }
#[cfg(test)]
mod tests {}