mod bandwidth;
mod latency;
use std::sync::Arc;
pub use bandwidth::BandwidthConfig;
use crate::GLOBAL_POOL;
use crate::Jiffies;
use crate::Pid;
use crate::events::Event;
use crate::events::MessageState;
use crate::events::PidEvent;
use crate::events::TimedEvent;
use crate::network::bandwidth::Bandwidth;
use crate::network::latency::Latency;
use crate::random::Randomizer;
use crate::random::Seed;
use crate::topology::Topology;
pub(crate) enum NetworkPoll {
Await(TimedEvent),
Ready(PidEvent),
}
pub(crate) struct Network {
bandwidth: Bandwidth,
latency: Latency,
}
impl Network {
pub(crate) fn new(
seed: Seed,
bandwidth_type: BandwidthConfig,
topology: Arc<Topology>,
) -> Self {
Self {
latency: Latency::new(Randomizer::new(seed), topology.clone()),
bandwidth: Bandwidth::new(bandwidth_type, topology.list_pool(GLOBAL_POOL).len()),
}
}
}
impl Network {
pub(crate) fn poll(
&mut self,
invocation_time: Jiffies,
pid: Pid,
event: PidEvent,
) -> NetworkPoll {
match event {
PidEvent::Message {
message,
state,
source,
} => match state {
MessageState::Init => {
let message_size = message.virtual_size();
let (state, extra_latency) =
match self.bandwidth.try_send(message_size, pid, invocation_time) {
None => {
(
MessageState::AwaitLatency,
self.latency.random_latency(source, pid),
)
}
Some(extra_bandwidth_latency) => {
(MessageState::AwaitSendBandwidth, extra_bandwidth_latency)
}
};
NetworkPoll::Await(TimedEvent {
invocation_time: invocation_time + extra_latency,
event: Event::Pid {
pid,
event: PidEvent::Message {
source,
message,
state,
},
},
})
}
MessageState::AwaitSendBandwidth => {
let latency = self.latency.random_latency(source, pid);
NetworkPoll::Await(TimedEvent {
invocation_time: invocation_time + latency,
event: Event::Pid {
pid,
event: PidEvent::Message {
source,
message,
state: MessageState::AwaitLatency,
},
},
})
}
MessageState::AwaitLatency => {
let message_size = message.virtual_size();
match self.bandwidth.try_recv(message_size, pid, invocation_time) {
None => NetworkPoll::Ready(PidEvent::Message {
message,
state,
source,
}),
Some(additional_latency) => NetworkPoll::Await(TimedEvent {
invocation_time: invocation_time + additional_latency,
event: Event::Pid {
pid,
event: PidEvent::Message {
source,
message,
state: MessageState::AwaitRecvBandwidth,
},
},
}),
}
}
MessageState::AwaitRecvBandwidth => NetworkPoll::Ready(PidEvent::Message {
message,
state,
source,
}),
},
_ => unreachable!(),
}
}
}