use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
net::IpAddr,
time::Duration,
};
use moonpool_core::OpenOptions;
use crate::{
network::{
NetworkConfiguration,
sim::{ConnectionId, ListenerId},
},
storage::{InMemoryStorage, StorageConfiguration},
};
#[derive(Debug)]
pub struct ClogState {
pub expires_at: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CloseReason {
#[default]
None,
Graceful,
Aborted,
}
#[derive(Debug, Clone)]
pub struct PartitionState {
pub expires_at: Duration,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ConnectionFlags(u16);
impl ConnectionFlags {
const IS_CLOSED: u16 = 1 << 0;
const SEND_CLOSED: u16 = 1 << 1;
const RECV_CLOSED: u16 = 1 << 2;
const IS_CUT: u16 = 1 << 3;
const IS_HALF_OPEN: u16 = 1 << 4;
const IS_STABLE: u16 = 1 << 5;
const GRACEFUL_CLOSE_PENDING: u16 = 1 << 6;
const REMOTE_FIN_RECEIVED: u16 = 1 << 7;
const SEND_IN_PROGRESS: u16 = 1 << 8;
fn get(self, mask: u16) -> bool {
(self.0 & mask) != 0
}
fn set_bit(&mut self, mask: u16, value: bool) {
if value {
self.0 |= mask;
} else {
self.0 &= !mask;
}
}
#[must_use]
pub fn is_closed(self) -> bool {
self.get(Self::IS_CLOSED)
}
pub fn set_is_closed(&mut self, value: bool) {
self.set_bit(Self::IS_CLOSED, value);
}
#[must_use]
pub fn send_closed(self) -> bool {
self.get(Self::SEND_CLOSED)
}
pub fn set_send_closed(&mut self, value: bool) {
self.set_bit(Self::SEND_CLOSED, value);
}
#[must_use]
pub fn recv_closed(self) -> bool {
self.get(Self::RECV_CLOSED)
}
pub fn set_recv_closed(&mut self, value: bool) {
self.set_bit(Self::RECV_CLOSED, value);
}
#[must_use]
pub fn is_cut(self) -> bool {
self.get(Self::IS_CUT)
}
pub fn set_is_cut(&mut self, value: bool) {
self.set_bit(Self::IS_CUT, value);
}
#[must_use]
pub fn is_half_open(self) -> bool {
self.get(Self::IS_HALF_OPEN)
}
pub fn set_is_half_open(&mut self, value: bool) {
self.set_bit(Self::IS_HALF_OPEN, value);
}
#[must_use]
pub fn is_stable(self) -> bool {
self.get(Self::IS_STABLE)
}
pub fn set_is_stable(&mut self, value: bool) {
self.set_bit(Self::IS_STABLE, value);
}
#[must_use]
pub fn graceful_close_pending(self) -> bool {
self.get(Self::GRACEFUL_CLOSE_PENDING)
}
pub fn set_graceful_close_pending(&mut self, value: bool) {
self.set_bit(Self::GRACEFUL_CLOSE_PENDING, value);
}
#[must_use]
pub fn remote_fin_received(self) -> bool {
self.get(Self::REMOTE_FIN_RECEIVED)
}
pub fn set_remote_fin_received(&mut self, value: bool) {
self.set_bit(Self::REMOTE_FIN_RECEIVED, value);
}
#[must_use]
pub fn send_in_progress(self) -> bool {
self.get(Self::SEND_IN_PROGRESS)
}
pub fn set_send_in_progress(&mut self, value: bool) {
self.set_bit(Self::SEND_IN_PROGRESS, value);
}
}
#[derive(Debug, Clone)]
pub struct ConnectionState {
pub local_ip: Option<IpAddr>,
pub remote_ip: Option<IpAddr>,
pub peer_address: String,
pub receive_buffer: VecDeque<u8>,
pub paired_connection: Option<ConnectionId>,
pub send_buffer: VecDeque<Vec<u8>>,
pub next_send_time: Duration,
pub flags: ConnectionFlags,
pub cut_expiry: Option<Duration>,
pub close_reason: CloseReason,
pub send_buffer_capacity: usize,
pub send_delay: Option<Duration>,
pub recv_delay: Option<Duration>,
pub half_open_error_at: Option<Duration>,
pub last_data_delivery_scheduled_at: Option<Duration>,
}
#[derive(Debug)]
pub struct NetworkState {
pub next_connection_id: u64,
pub next_listener_id: u64,
pub config: NetworkConfiguration,
pub connections: BTreeMap<ConnectionId, ConnectionState>,
pub listeners: BTreeSet<ListenerId>,
pub pending_connections: BTreeMap<String, ConnectionId>,
pub connection_clogs: BTreeMap<ConnectionId, ClogState>,
pub read_clogs: BTreeMap<ConnectionId, ClogState>,
pub ip_partitions: BTreeMap<(IpAddr, IpAddr), PartitionState>,
pub send_partitions: BTreeMap<IpAddr, Duration>,
pub recv_partitions: BTreeMap<IpAddr, Duration>,
pub last_random_close_time: Duration,
pub pair_latencies: BTreeMap<(IpAddr, IpAddr), Duration>,
}
impl NetworkState {
#[must_use]
pub fn new(config: NetworkConfiguration) -> Self {
Self {
next_connection_id: 0,
next_listener_id: 0,
config,
connections: BTreeMap::new(),
listeners: BTreeSet::new(),
pending_connections: BTreeMap::new(),
connection_clogs: BTreeMap::new(),
read_clogs: BTreeMap::new(),
ip_partitions: BTreeMap::new(),
send_partitions: BTreeMap::new(),
recv_partitions: BTreeMap::new(),
last_random_close_time: Duration::ZERO,
pair_latencies: BTreeMap::new(),
}
}
#[must_use]
pub fn parse_ip_from_addr(addr: &str) -> Option<IpAddr> {
if addr.starts_with('[')
&& let Some(bracket_end) = addr.find(']')
{
return addr[1..bracket_end].parse().ok();
}
if let Some(colon_pos) = addr.rfind(':') {
addr[..colon_pos].parse().ok()
} else {
addr.parse().ok()
}
}
#[must_use]
pub fn is_partitioned(&self, from_ip: IpAddr, to_ip: IpAddr, current_time: Duration) -> bool {
if let Some(partition) = self.ip_partitions.get(&(from_ip, to_ip))
&& current_time < partition.expires_at
{
return true;
}
if let Some(&partition_until) = self.send_partitions.get(&from_ip)
&& current_time < partition_until
{
return true;
}
if let Some(&partition_until) = self.recv_partitions.get(&to_ip)
&& current_time < partition_until
{
return true;
}
false
}
#[must_use]
pub fn is_connection_partitioned(
&self,
connection_id: ConnectionId,
current_time: Duration,
) -> bool {
if let Some(conn) = self.connections.get(&connection_id)
&& let (Some(local_ip), Some(remote_ip)) = (conn.local_ip, conn.remote_ip)
{
return self.is_partitioned(local_ip, remote_ip, current_time);
}
false
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct FileId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PendingOpType {
Read,
Write,
Sync,
SetLen,
Open,
}
#[derive(Debug, Clone)]
pub struct PendingStorageOp {
pub op_type: PendingOpType,
pub offset: u64,
pub len: usize,
pub data: Option<Vec<u8>>,
}
#[derive(Debug)]
pub struct StorageFileState {
pub id: FileId,
pub path: String,
pub position: u64,
pub options: OpenOptions,
pub storage: InMemoryStorage,
pub is_closed: bool,
pub pending_ops: BTreeMap<u64, PendingStorageOp>,
pub next_op_seq: u64,
pub owner_ip: IpAddr,
}
impl StorageFileState {
#[must_use]
pub fn new(
id: FileId,
path: String,
options: OpenOptions,
storage: InMemoryStorage,
owner_ip: IpAddr,
) -> Self {
Self {
id,
path,
position: 0,
options,
storage,
is_closed: false,
pending_ops: BTreeMap::new(),
next_op_seq: 0,
owner_ip,
}
}
}
#[derive(Debug)]
pub struct StorageState {
pub next_file_id: u64,
pub config: StorageConfiguration,
pub per_process_configs: HashMap<IpAddr, StorageConfiguration>,
pub files: BTreeMap<FileId, StorageFileState>,
pub path_to_file: BTreeMap<String, FileId>,
pub deleted_paths: HashSet<String>,
pub sync_failures: HashSet<(FileId, u64)>,
}
impl StorageState {
#[must_use]
pub fn new(config: StorageConfiguration) -> Self {
Self {
next_file_id: 0,
config,
per_process_configs: HashMap::new(),
files: BTreeMap::new(),
path_to_file: BTreeMap::new(),
deleted_paths: HashSet::new(),
sync_failures: HashSet::new(),
}
}
#[must_use]
pub fn config_for(&self, ip: IpAddr) -> &StorageConfiguration {
self.per_process_configs.get(&ip).unwrap_or(&self.config)
}
}
impl Default for StorageState {
fn default() -> Self {
Self::new(StorageConfiguration::default())
}
}