use crate::class::Class;
use crate::id::{broadcast_machine_id, MachineID, RawID};
use crate::messaging::{Message, Packet};
use crate::type_registry::ShortTypeId;
use byteorder::{ByteOrder, LittleEndian, WriteBytesExt};
use compact::Compact;
use std::collections::HashMap;
#[cfg(feature = "server")]
use std::net::{TcpListener, TcpStream};
#[cfg(feature = "browser")]
use stdweb::traits::{IEventTarget, IMessageEvent};
#[cfg(feature = "browser")]
use stdweb::web::{SocketBinaryType, SocketReadyState, TypedArray, WebSocket};
#[cfg(feature = "server")]
use tungstenite::util::NonBlockingError;
#[cfg(feature = "server")]
use tungstenite::{
accept as websocket_accept, client as websocket_client, HandshakeError,
Message as WebSocketMessage, WebSocket,
};
#[cfg(feature = "server")]
use url::Url;
pub struct Networking {
pub machine_id: MachineID,
batch_message_bytes: usize,
pub n_turns: usize,
acceptable_turn_distance: usize,
skip_turns_per_turn_head: usize,
network: Vec<String>,
network_connections: Vec<Option<Connection>>,
#[cfg(feature = "server")]
listener: TcpListener,
}
impl Networking {
pub fn new(
machine_id: u8,
network: Vec<String>,
batch_message_bytes: usize,
acceptable_turn_distance: usize,
skip_turns_per_turn_head: usize,
) -> Networking {
#[cfg(feature = "server")]
let listener = {
let listener = TcpListener::bind(&network[machine_id as usize]).unwrap();
listener.set_nonblocking(true).unwrap();
listener
};
Networking {
machine_id: MachineID(machine_id),
batch_message_bytes,
n_turns: 0,
acceptable_turn_distance,
skip_turns_per_turn_head,
network_connections: (0..network.len()).into_iter().map(|_| None).collect(),
network,
#[cfg(feature = "server")]
listener,
}
}
#[cfg(feature = "server")]
pub(crate) fn connect(&mut self) {
if self
.network_connections
.iter()
.enumerate()
.any(|(machine_id, connection)| {
machine_id > self.machine_id.0 as usize && connection.is_none()
}) {
match self.listener.accept() {
Ok((stream, addr)) => {
println!("Got connection from {}, shaking hands...", addr);
let mut handshake_state = Some(websocket_accept(stream));
loop {
handshake_state = match handshake_state {
Some(Ok(mut websocket)) => {
loop {
match websocket.read_message() {
Ok(WebSocketMessage::Binary(data)) => {
let peer_machine_id = data[0];
self.network_connections[peer_machine_id as usize] =
Some(Connection::new(
websocket,
self.batch_message_bytes,
));
println!(
"...machine ID {} connected!",
peer_machine_id
);
break;
}
Ok(_) => {}
Err(e) => if let Some(real_err) = e.into_non_blocking() {
println!(
"Error while expecting first message: {}",
real_err
);
break;
},
}
}
break;
}
Some(Err(HandshakeError::Interrupted(s))) => Some(s.handshake()),
Some(Err(HandshakeError::Failure(e))) => {
println!("Error while accepting connection: {}", e);
break;
}
None => break,
}
}
}
Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => {}
Err(e) => println!("Error while accepting connection: {}", e),
}
}
for (machine_id, address) in self.network.iter().enumerate() {
if machine_id < self.machine_id.0 as usize {
if self.network_connections[machine_id].is_none() {
let stream = TcpStream::connect(address).unwrap();
stream.set_read_timeout(None).unwrap();
stream.set_write_timeout(None).unwrap();
let mut websocket =
websocket_client(Url::parse(&format!("ws://{}", address)).unwrap(), stream)
.unwrap()
.0;
match websocket
.write_message(WebSocketMessage::binary(vec![self.machine_id.0]))
.and_then(|_| websocket.write_pending())
{
Ok(_) => {}
Err(e) => panic!("Error while sending first message: {}", e),
}
self.network_connections[machine_id] =
Some(Connection::new(websocket, self.batch_message_bytes));
println!("Connected to Machine ID {}", machine_id);
}
}
}
}
#[cfg(feature = "browser")]
pub fn connect(&mut self) {
for (machine_id, address) in self.network.iter().enumerate() {
if machine_id != self.machine_id.0 as usize {
if self.network_connections[machine_id].is_none() {
let websocket = WebSocket::new(&format!("ws://{}", address)).unwrap();
let mut connection = Some(Connection::new(websocket, self.batch_message_bytes));
connection
.as_mut()
.unwrap()
.out_batches
.insert(0, vec![self.machine_id.0]);
self.network_connections[machine_id] = connection;
}
}
}
}
pub(crate) fn finish_turn(&mut self) -> Option<usize> {
let mut maybe_skip_turns = None;
for maybe_connection in &mut self.network_connections {
if let Some(Connection { n_turns, .. }) = *maybe_connection {
if n_turns + self.acceptable_turn_distance < self.n_turns {
maybe_skip_turns = Some(
(self.n_turns - self.acceptable_turn_distance - n_turns)
* self.skip_turns_per_turn_head,
);
}
}
}
self.n_turns += 1;
for maybe_connection in self.network_connections.iter_mut() {
if let Some(ref mut connection) = *maybe_connection {
{
let data = connection.enqueue_in_batch(
::std::mem::size_of::<ShortTypeId>() + ::std::mem::size_of::<u32>(),
);
data.write_u16::<LittleEndian>(0).unwrap();
data.write_u32::<LittleEndian>(self.n_turns as u32).unwrap();
}
connection.n_turns_since_own_turn = 0;
}
}
maybe_skip_turns
}
pub(crate) fn send_and_receive(
&mut self,
classes: &mut [Option<Class>],
implementors: &mut [Option<Vec<ShortTypeId>>],
) {
self.connect();
for (machine_id, maybe_connection) in self.network_connections.iter_mut().enumerate() {
let closed_reason = if let Some(ref mut connection) = *maybe_connection {
match connection
.try_send_pending()
.and_then(|_| connection.try_receive(classes, implementors))
{
Ok(()) => None,
Err(err) => Some(err),
}
} else {
None
};
if let Some(closed_reason) = closed_reason {
println!(
"Closed connection to Machine ID {} while receiving: {}",
machine_id, closed_reason
);
*maybe_connection = None
}
}
#[cfg(feature = "browser")]
{
let max_n_turns = self
.network_connections
.iter()
.map(|maybe_connection| {
if let Some(connection) = maybe_connection {
connection.n_turns
} else {
0
}
}).max()
.unwrap_or(self.n_turns);
if max_n_turns > 1000 + self.n_turns {
self.n_turns = max_n_turns;
}
}
}
pub(crate) fn enqueue<M: Message>(&mut self, message_type_id: ShortTypeId, mut packet: Packet<M>) {
if self.network.len() == 1 {
return;
}
let packet_size = Compact::total_size_bytes(&packet);
let total_size = ::std::mem::size_of::<ShortTypeId>() + packet_size;
let machine_id = packet.recipient_id.machine;
let recipients = if machine_id == broadcast_machine_id() {
(0..self.network.len()).into_iter().collect()
} else {
vec![machine_id.0 as usize]
};
for machine_id in recipients {
if let Some(connection) = self.network_connections[machine_id].as_mut() {
let data = connection.enqueue_in_batch(total_size);
data.write_u16::<LittleEndian>(message_type_id.into())
.unwrap();
let packet_pos = data.len();
data.resize(packet_pos + packet_size, 0);
unsafe {
Compact::compact_behind(
&mut packet,
&mut data[packet_pos] as *mut u8 as *mut Packet<M>,
);
}
}
}
::std::mem::forget(packet);
}
pub(crate) fn debug_all_n_turns(&self) -> HashMap<MachineID, isize> {
self.network_connections
.iter()
.enumerate()
.map(|(i, maybe_connection)| {
(
MachineID(i as u8),
if i == usize::from(self.machine_id.0) {
self.n_turns as isize
} else {
if let Some(connection) = maybe_connection.as_ref() {
connection.n_turns as isize
} else {
-1
}
},
)
}).collect()
}
#[cfg(feature = "browser")]
pub fn main_out_connection(&self) -> Option<&Connection> {
self.network_connections[0].as_ref()
}
}
#[cfg(feature = "server")]
pub struct Connection {
n_turns: usize,
n_turns_since_own_turn: usize,
websocket: WebSocket<TcpStream>,
out_batches: Vec<Vec<u8>>,
batch_message_bytes: usize,
}
#[cfg(feature = "server")]
impl Connection {
pub fn new(mut websocket: WebSocket<TcpStream>, batch_message_bytes: usize) -> Connection {
{
let tcp_socket = websocket.get_mut();
tcp_socket.set_nonblocking(true).unwrap();
tcp_socket.set_read_timeout(None).unwrap();
tcp_socket.set_write_timeout(None).unwrap();
tcp_socket.set_nodelay(true).unwrap();
}
Connection {
n_turns: 0,
n_turns_since_own_turn: 0,
websocket,
out_batches: vec![Vec::with_capacity(batch_message_bytes)],
batch_message_bytes,
}
}
pub fn enqueue_in_batch(&mut self, message_size: usize) -> &mut Vec<u8> {
let batch =
if self.out_batches.last().unwrap().len() < self.batch_message_bytes - message_size {
self.out_batches.last_mut().unwrap()
} else {
self.out_batches
.push(Vec::with_capacity(self.batch_message_bytes));
self.out_batches.last_mut().unwrap()
};
batch
.write_u32::<LittleEndian>(message_size as u32)
.unwrap();
batch
}
pub fn try_send_pending(&mut self) -> Result<(), ::tungstenite::Error> {
for batch in self.out_batches.drain(..) {
match self
.websocket
.write_message(WebSocketMessage::binary(batch))
{
Ok(_) => {}
Err(e) => if let Some(real_err) = e.into_non_blocking() {
return Err(real_err);
},
}
}
self.out_batches
.push(Vec::with_capacity(self.batch_message_bytes));
match self.websocket.write_pending() {
Ok(()) => Ok(()),
Err(e) => if let Some(real_err) = e.into_non_blocking() {
Err(real_err)
} else {
Ok(())
},
}
}
pub fn try_receive(
&mut self,
classes: &mut [Option<Class>],
implementors: &mut [Option<Vec<ShortTypeId>>],
) -> Result<(), ::tungstenite::Error> {
loop {
let blocked = match self.websocket.read_message() {
Ok(WebSocketMessage::Binary(data)) => dispatch_batch(
&data,
classes,
implementors,
&mut self.n_turns,
&mut self.n_turns_since_own_turn,
),
Ok(other_message) => panic!("Got a non binary message: {:?}", other_message),
Err(e) => if let Some(real_err) = e.into_non_blocking() {
return Err(real_err);
} else {
true
},
};
if blocked {
break;
}
}
Ok(())
}
}
fn dispatch_batch(
data: &[u8],
classes: &mut [Option<Class>],
implementors: &mut [Option<Vec<ShortTypeId>>],
n_turns: &mut usize,
n_turns_since_own_turn: &mut usize,
) -> bool {
let mut pos = 0;
let mut one_wants_to_wait = false;
while pos < data.len() {
let message_size = LittleEndian::read_u32(&data[pos..]);
pos += ::std::mem::size_of::<u32>();
let wants_to_wait = dispatch_message(
&data[pos..(pos + message_size as usize)],
classes,
implementors,
n_turns,
n_turns_since_own_turn,
);
one_wants_to_wait = one_wants_to_wait || wants_to_wait;
pos += message_size as usize;
}
one_wants_to_wait
}
fn dispatch_message(
data: &[u8],
classes: &mut [Option<Class>],
implementors: &mut [Option<Vec<ShortTypeId>>],
n_turns: &mut usize,
n_turns_since_own_turn: &mut usize,
) -> bool {
if data[0] == 0 && data[1] == 0 {
*n_turns = LittleEndian::read_u32(&data[::std::mem::size_of::<ShortTypeId>()..]) as usize;
*n_turns_since_own_turn += 1;
*n_turns_since_own_turn >= 10
} else {
let recipient_id =
(&data[::std::mem::size_of::<ShortTypeId>()] as *const u8) as *const RawID;
unsafe {
if let Some(ref mut class) = classes[(*recipient_id).type_id.as_usize()] {
class.inbox.put_raw(&data);
} else {
if let Some(implementors) =
implementors[(*recipient_id).type_id.as_usize()].as_ref()
{
for implementor_type_id in implementors {
if let Some(class) = classes[implementor_type_id.as_usize()].as_mut() {
class.inbox.put_raw(&data);
} else {
panic!(
"No inbox for actor type {}, trait type {} (coming from network)",
implementor_type_id.as_usize(),
(*recipient_id).type_id.as_usize()
);
}
}
} else {
panic!(
"No inbox for actor type {} - or no implementors (coming from network)",
(*recipient_id).type_id.as_usize()
)
}
}
}
false
}
}
#[cfg(feature = "browser")]
use std::cell::RefCell;
#[cfg(feature = "browser")]
use std::collections::VecDeque;
#[cfg(feature = "browser")]
use std::rc::Rc;
#[cfg(feature = "browser")]
pub struct Connection {
n_turns: usize,
n_turns_since_own_turn: usize,
websocket: WebSocket,
in_queue: Rc<RefCell<VecDeque<Vec<u8>>>>,
got_machine_id: Rc<RefCell<bool>>,
out_batches: Vec<Vec<u8>>,
batch_message_bytes: usize,
}
#[cfg(feature = "browser")]
use stdweb::web::event::SocketMessageEvent;
#[cfg(feature = "browser")]
impl Connection {
pub fn new(websocket: WebSocket, batch_message_bytes: usize) -> Connection {
let in_queue = Rc::new(RefCell::new(VecDeque::new()));
let in_queue_for_listener = in_queue.clone();
let got_machine_id = Rc::new(RefCell::new(false));
let got_machine_id_for_listener = got_machine_id.clone();
websocket.set_binary_type(SocketBinaryType::ArrayBuffer);
websocket.add_event_listener(move |event: SocketMessageEvent| {
let mut got_machine_id = got_machine_id_for_listener.borrow_mut();
if *got_machine_id {
in_queue_for_listener.borrow_mut().push_back({
let typed_array: TypedArray<u8> =
event.data().into_array_buffer().unwrap().into();
typed_array.to_vec()
})
} else {
*got_machine_id = true;
}
});
Connection {
n_turns: 0,
n_turns_since_own_turn: 0,
websocket,
in_queue,
got_machine_id,
out_batches: vec![Vec::with_capacity(batch_message_bytes)],
batch_message_bytes,
}
}
pub fn enqueue_in_batch(&mut self, message_size: usize) -> &mut Vec<u8> {
let batch =
if self.out_batches.last().unwrap().len() < self.batch_message_bytes - message_size {
self.out_batches.last_mut().unwrap()
} else {
self.out_batches
.push(Vec::with_capacity(self.batch_message_bytes));
self.out_batches.last_mut().unwrap()
};
batch
.write_u32::<LittleEndian>(message_size as u32)
.unwrap();
batch
}
pub fn try_send_pending(&mut self) -> Result<(), ::std::io::Error> {
if self.websocket.ready_state() == SocketReadyState::Open {
for batch in self.out_batches.drain(..) {
self.websocket.send_bytes(&batch).unwrap();
}
self.out_batches
.push(Vec::with_capacity(self.batch_message_bytes));
}
Ok(())
}
pub fn try_receive(
&mut self,
classes: &mut [Option<Class>],
implementors: &mut [Option<Vec<ShortTypeId>>],
) -> Result<(), ::std::io::Error> {
if let Ok(mut in_queue) = self.in_queue.try_borrow_mut() {
for batch in in_queue.drain(..) {
dispatch_batch(
&batch,
classes,
implementors,
&mut self.n_turns,
&mut self.n_turns_since_own_turn,
);
}
} else {
}
Ok(())
}
pub fn in_queue_len(&self) -> usize {
self.in_queue.borrow().len()
}
}