pub mod laminar;
pub mod tcp;
pub mod udp;
const NETWORK_SIM_TIME_SYSTEM_NAME: &str = "simulation_time";
const NETWORK_SEND_SYSTEM_NAME: &str = "network_send";
const NETWORK_RECV_SYSTEM_NAME: &str = "network_recv";
const NETWORK_POLL_SYSTEM_NAME: &str = "network_poll";
use crate::simulation::{
message::Message,
requirements::{DeliveryRequirement, UrgencyRequirement},
};
use std::{collections::VecDeque, net::SocketAddr};
pub struct TransportResource {
messages: VecDeque<Message>,
frame_budget_bytes: i32,
latency_nanos: i64,
packet_loss: f32,
}
impl TransportResource {
pub fn new() -> Self {
Self {
messages: VecDeque::new(),
frame_budget_bytes: 0,
latency_nanos: 0,
packet_loss: 0.0,
}
}
pub fn frame_budget_bytes(&self) -> i32 {
self.frame_budget_bytes
}
pub fn set_frame_budget_bytes(&mut self, budget: i32) {
self.frame_budget_bytes = budget;
}
pub fn latency_millis(&mut self) -> i64 {
self.latency_nanos / 1_000_000
}
pub fn latency_micros(&mut self) -> i64 {
self.latency_nanos / 1000
}
pub fn latency_nanos(&self) -> i64 {
self.latency_nanos
}
pub fn set_latency_nanos(&mut self, latency: i64) {
self.latency_nanos = latency;
}
pub fn packet_loss(&self) -> f32 {
self.packet_loss
}
pub fn set_packet_loss(&mut self, loss: f32) {
self.packet_loss = loss;
}
pub fn send(&mut self, destination: SocketAddr, payload: &[u8]) {
self.send_with_requirements(
destination,
payload,
DeliveryRequirement::Default,
UrgencyRequirement::OnTick,
);
}
pub fn send_immediate(&mut self, destination: SocketAddr, payload: &[u8]) {
self.send_with_requirements(
destination,
payload,
DeliveryRequirement::Default,
UrgencyRequirement::Immediate,
);
}
pub fn send_with_requirements(
&mut self,
destination: SocketAddr,
payload: &[u8],
delivery: DeliveryRequirement,
timing: UrgencyRequirement,
) {
let message = Message::new(destination, payload, delivery, timing);
self.messages.push_back(message);
}
pub fn has_messages(&self) -> bool {
!self.messages.is_empty()
}
pub fn get_messages(&self) -> &VecDeque<Message> {
&self.messages
}
pub fn drain_messages_to_send(
&mut self,
mut filter: impl FnMut(&mut Message) -> bool,
) -> Vec<Message> {
self.drain_messages(|message| {
message.urgency == UrgencyRequirement::Immediate || filter(message)
})
}
pub fn drain_messages(&mut self, mut filter: impl FnMut(&mut Message) -> bool) -> Vec<Message> {
let mut drained = Vec::with_capacity(self.messages.len());
let mut i = 0;
while i != self.messages.len() {
if filter(&mut self.messages[i]) {
if let Some(m) = self.messages.remove(i) {
drained.push(m);
}
} else {
i += 1;
}
}
drained
}
}
impl Default for TransportResource {
fn default() -> Self {
Self {
messages: VecDeque::new(),
frame_budget_bytes: 0,
latency_nanos: 0,
packet_loss: 0.0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_send_with_default_requirements() {
let mut resource = create_test_resource();
resource.send("127.0.0.1:3000".parse().unwrap(), test_payload());
let packet = &resource.messages[0];
assert_eq!(resource.messages.len(), 1);
assert_eq!(packet.delivery, DeliveryRequirement::Default);
assert_eq!(packet.urgency, UrgencyRequirement::OnTick);
}
#[test]
fn test_send_immediate_message() {
let mut resource = create_test_resource();
resource.send_immediate("127.0.0.1:3000".parse().unwrap(), test_payload());
let packet = &resource.messages[0];
assert_eq!(resource.messages.len(), 1);
assert_eq!(packet.delivery, DeliveryRequirement::Default);
assert_eq!(packet.urgency, UrgencyRequirement::Immediate);
}
#[test]
fn test_has_messages() {
let mut resource = create_test_resource();
assert_eq!(resource.has_messages(), false);
resource.send_immediate("127.0.0.1:3000".parse().unwrap(), test_payload());
assert_eq!(resource.has_messages(), true);
}
#[test]
fn test_drain_only_immediate_messages() {
let mut resource = create_test_resource();
let addr = "127.0.0.1:3000".parse().unwrap();
resource.send_immediate(addr, test_payload());
resource.send_immediate(addr, test_payload());
resource.send(addr, test_payload());
resource.send(addr, test_payload());
resource.send_immediate(addr, test_payload());
assert_eq!(resource.drain_messages_to_send(|_| false).len(), 3);
assert_eq!(resource.drain_messages_to_send(|_| false).len(), 0);
}
#[test]
fn test_drain_only_messages_with_specific_requirements() {
let mut resource = create_test_resource();
let addr = "127.0.0.1:3000".parse().unwrap();
resource.send_with_requirements(
addr,
test_payload(),
DeliveryRequirement::Unreliable,
UrgencyRequirement::OnTick,
);
resource.send_with_requirements(
addr,
test_payload(),
DeliveryRequirement::Reliable,
UrgencyRequirement::OnTick,
);
resource.send_with_requirements(
addr,
test_payload(),
DeliveryRequirement::ReliableOrdered(None),
UrgencyRequirement::OnTick,
);
resource.send_with_requirements(
addr,
test_payload(),
DeliveryRequirement::ReliableSequenced(None),
UrgencyRequirement::OnTick,
);
resource.send_with_requirements(
addr,
test_payload(),
DeliveryRequirement::Unreliable,
UrgencyRequirement::OnTick,
);
assert_eq!(
resource
.drain_messages(|message| message.delivery == DeliveryRequirement::Unreliable)
.len(),
2
);
assert_eq!(
resource
.drain_messages(|message| message.delivery == DeliveryRequirement::Unreliable)
.len(),
0
);
}
#[test]
fn test_send_with_requirements() {
use DeliveryRequirement::*;
let mut resource = create_test_resource();
let addr = "127.0.0.1:3000".parse().unwrap();
let requirements = [
Unreliable,
UnreliableSequenced(None),
Reliable,
ReliableSequenced(None),
ReliableOrdered(None),
Default,
];
for req in requirements.iter().cloned() {
resource.send_with_requirements(addr, test_payload(), req, UrgencyRequirement::OnTick);
}
assert_eq!(resource.messages.len(), requirements.len());
for (i, req) in requirements.iter().enumerate() {
assert_eq!(resource.messages[i].delivery, *req);
}
}
fn test_payload() -> &'static [u8] {
b"test"
}
fn create_test_resource() -> TransportResource {
TransportResource::new()
}
}