use super::*;
use crate::{
messaging::{NetMessage, SerialisedFrame},
net::buffers::{BufferChunk, BufferPool, DecodeBuffer},
};
use mio::net::UdpSocket;
use network_thread::*;
use std::{cell::RefCell, cmp::min, collections::VecDeque, io, io::Error, net::SocketAddr};
const MAX_PACKET_SIZE: usize = 65535;
pub(super) struct UdpState {
logger: KompactLogger,
pub(super) socket: UdpSocket,
outbound_queue: VecDeque<(SocketAddr, SerialisedFrame)>,
input_buffer: DecodeBuffer,
pub(super) incoming_messages: VecDeque<NetMessage>,
max_packet_size: usize,
}
impl UdpState {
pub(super) fn new(
socket: UdpSocket,
buffer_chunk: BufferChunk,
logger: KompactLogger,
network_config: &NetworkConfig,
) -> Self {
let chunk_size = network_config.get_buffer_config().chunk_size;
let max_packet_size = min(chunk_size, MAX_PACKET_SIZE);
UdpState {
logger,
socket,
outbound_queue: VecDeque::new(),
input_buffer: DecodeBuffer::new(buffer_chunk, network_config.get_buffer_config()),
incoming_messages: VecDeque::new(),
max_packet_size,
}
}
pub(super) fn pending_messages(&self) -> usize {
self.outbound_queue.len()
}
pub(super) fn try_write(&mut self) -> io::Result<usize> {
let mut sent_bytes: usize = 0;
let mut interrupts = 0;
while let Some((addr, mut frame)) = self.outbound_queue.pop_front() {
frame.make_contiguous();
match self.socket.send_to(frame.bytes(), addr) {
Ok(n) => {
assert_eq!(n, frame.len(), "A UDP frame was written incompletely!");
sent_bytes += n;
}
Err(ref err) if would_block(err) => {
self.outbound_queue.push_front((addr, frame));
return Ok(sent_bytes);
}
Err(err) if interrupted(&err) => {
self.outbound_queue.push_front((addr, frame));
interrupts += 1;
if interrupts >= MAX_INTERRUPTS {
return Err(err);
}
}
Err(err) => {
self.outbound_queue.push_front((addr, frame));
return Err(err);
}
}
}
Ok(sent_bytes)
}
fn swap_buffer(&mut self, buffer_pool: &RefCell<BufferPool>) -> () {
let mut pool = buffer_pool.borrow_mut();
if let Some(mut buffer) = pool.get_buffer() {
debug!(
self.logger,
"Swapping UDP buffer as only {} bytes remain.",
self.input_buffer.writeable_len()
);
self.input_buffer.swap_buffer(&mut buffer);
pool.return_buffer(buffer);
}
}
pub(super) fn try_read(&mut self, buffer_pool: &RefCell<BufferPool>) -> io::Result<()> {
let mut interrupts = 0;
loop {
if self.input_buffer.writeable_len() < self.max_packet_size {
self.swap_buffer(buffer_pool);
if self.input_buffer.writeable_len() < self.max_packet_size {
return Err(Error::new(io::ErrorKind::InvalidInput, "Out of Buffers"));
}
}
if let Some(buf) = self.input_buffer.get_writeable() {
match self.socket.recv_from(buf) {
Ok((0, addr)) => {
debug!(self.logger, "Got empty UDP datagram from {}", addr);
return Ok(());
}
Ok((n, addr)) => {
self.input_buffer.advance_writeable(n);
self.decode_message(addr);
}
Err(err) if would_block(&err) => {
return Ok(());
}
Err(err) if interrupted(&err) => {
interrupts += 1;
if interrupts >= network_thread::MAX_INTERRUPTS {
return Err(err);
}
}
Err(err) => {
return Err(err);
}
}
} else {
return Err(Error::new(io::ErrorKind::InvalidInput, "Out of Buffers"));
}
}
}
fn decode_message(&mut self, source: SocketAddr) {
match self.input_buffer.get_frame() {
Ok(Frame::Data(frame)) => {
use serialisation::ser_helpers::deserialise_chunk_lease;
let buf = frame.payload();
match deserialise_chunk_lease(buf) {
Ok(envelope) => self.incoming_messages.push_back(envelope),
Err(e) => {
warn!(
self.logger,
"Could not deserialise UDP frame from {}: {}", source, e
);
}
}
}
Ok(frame) => {
warn!(
self.logger,
"Decoded unexpected frame from UDP datagram from {}: {:?}", source, frame
);
}
Err(e) => {
warn!(
self.logger,
"Could not decode UDP datagram from {}: {:?}", source, e
);
}
}
}
pub(super) fn enqueue_serialised(&mut self, addr: SocketAddr, frame: SerialisedFrame) -> () {
self.outbound_queue.push_back((addr, frame));
}
}