use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
net::IpAddr,
time::Duration,
};
use moonpool_core::OpenOptions;
use crate::{
network::{
NetworkConfiguration,
sim::{ConnectionId, ListenerId},
},
storage::{InMemoryStorage, StorageConfiguration},
};
#[derive(Debug)]
pub struct ClogState {
pub expires_at: Duration,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CloseReason {
#[default]
None,
Graceful,
Aborted,
}
#[derive(Debug, Clone)]
pub struct PartitionState {
pub expires_at: Duration,
}
#[derive(Debug, Clone)]
pub struct ConnectionState {
#[allow(dead_code)]
pub id: ConnectionId,
#[allow(dead_code)]
pub addr: String,
pub local_ip: Option<IpAddr>,
pub remote_ip: Option<IpAddr>,
pub peer_address: String,
pub receive_buffer: VecDeque<u8>,
pub paired_connection: Option<ConnectionId>,
pub send_buffer: VecDeque<Vec<u8>>,
pub send_in_progress: bool,
pub next_send_time: Duration,
pub is_closed: bool,
pub send_closed: bool,
pub recv_closed: bool,
pub is_cut: bool,
pub cut_expiry: Option<Duration>,
pub close_reason: CloseReason,
pub send_buffer_capacity: usize,
pub send_delay: Option<Duration>,
pub recv_delay: Option<Duration>,
pub is_half_open: bool,
pub half_open_error_at: Option<Duration>,
pub is_stable: bool,
pub graceful_close_pending: bool,
pub last_data_delivery_scheduled_at: Option<Duration>,
pub remote_fin_received: bool,
}
#[derive(Debug)]
pub struct ListenerState {
#[allow(dead_code)]
pub id: ListenerId,
#[allow(dead_code)]
pub addr: String,
#[allow(dead_code)]
pub pending_connections: VecDeque<ConnectionId>,
}
#[derive(Debug)]
pub struct NetworkState {
pub next_connection_id: u64,
pub next_listener_id: u64,
pub config: NetworkConfiguration,
pub connections: BTreeMap<ConnectionId, ConnectionState>,
pub listeners: BTreeMap<ListenerId, ListenerState>,
pub pending_connections: BTreeMap<String, ConnectionId>,
pub connection_clogs: BTreeMap<ConnectionId, ClogState>,
pub read_clogs: BTreeMap<ConnectionId, ClogState>,
pub ip_partitions: BTreeMap<(IpAddr, IpAddr), PartitionState>,
pub send_partitions: BTreeMap<IpAddr, Duration>,
pub recv_partitions: BTreeMap<IpAddr, Duration>,
pub last_random_close_time: Duration,
pub pair_latencies: BTreeMap<(IpAddr, IpAddr), Duration>,
}
impl NetworkState {
pub fn new(config: NetworkConfiguration) -> Self {
Self {
next_connection_id: 0,
next_listener_id: 0,
config,
connections: BTreeMap::new(),
listeners: BTreeMap::new(),
pending_connections: BTreeMap::new(),
connection_clogs: BTreeMap::new(),
read_clogs: BTreeMap::new(),
ip_partitions: BTreeMap::new(),
send_partitions: BTreeMap::new(),
recv_partitions: BTreeMap::new(),
last_random_close_time: Duration::ZERO,
pair_latencies: BTreeMap::new(),
}
}
pub fn parse_ip_from_addr(addr: &str) -> Option<IpAddr> {
if addr.starts_with('[')
&& let Some(bracket_end) = addr.find(']')
{
return addr[1..bracket_end].parse().ok();
}
if let Some(colon_pos) = addr.rfind(':') {
addr[..colon_pos].parse().ok()
} else {
addr.parse().ok()
}
}
pub fn is_partitioned(&self, from_ip: IpAddr, to_ip: IpAddr, current_time: Duration) -> bool {
if let Some(partition) = self.ip_partitions.get(&(from_ip, to_ip))
&& current_time < partition.expires_at
{
return true;
}
if let Some(&partition_until) = self.send_partitions.get(&from_ip)
&& current_time < partition_until
{
return true;
}
if let Some(&partition_until) = self.recv_partitions.get(&to_ip)
&& current_time < partition_until
{
return true;
}
false
}
pub fn is_connection_partitioned(
&self,
connection_id: ConnectionId,
current_time: Duration,
) -> bool {
if let Some(conn) = self.connections.get(&connection_id)
&& let (Some(local_ip), Some(remote_ip)) = (conn.local_ip, conn.remote_ip)
{
return self.is_partitioned(local_ip, remote_ip, current_time);
}
false
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct FileId(pub u64);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PendingOpType {
Read,
Write,
Sync,
SetLen,
Open,
}
#[derive(Debug, Clone)]
pub struct PendingStorageOp {
pub op_type: PendingOpType,
pub offset: u64,
pub len: usize,
pub data: Option<Vec<u8>>,
}
#[derive(Debug)]
pub struct StorageFileState {
pub id: FileId,
pub path: String,
pub position: u64,
pub options: OpenOptions,
pub storage: InMemoryStorage,
pub is_closed: bool,
pub pending_ops: BTreeMap<u64, PendingStorageOp>,
pub next_op_seq: u64,
pub owner_ip: IpAddr,
}
impl StorageFileState {
pub fn new(
id: FileId,
path: String,
options: OpenOptions,
storage: InMemoryStorage,
owner_ip: IpAddr,
) -> Self {
Self {
id,
path,
position: 0,
options,
storage,
is_closed: false,
pending_ops: BTreeMap::new(),
next_op_seq: 0,
owner_ip,
}
}
}
#[derive(Debug)]
pub struct StorageState {
pub next_file_id: u64,
pub config: StorageConfiguration,
pub per_process_configs: HashMap<IpAddr, StorageConfiguration>,
pub files: BTreeMap<FileId, StorageFileState>,
pub path_to_file: BTreeMap<String, FileId>,
pub deleted_paths: HashSet<String>,
pub sync_failures: HashSet<(FileId, u64)>,
}
impl StorageState {
pub fn new(config: StorageConfiguration) -> Self {
Self {
next_file_id: 0,
config,
per_process_configs: HashMap::new(),
files: BTreeMap::new(),
path_to_file: BTreeMap::new(),
deleted_paths: HashSet::new(),
sync_failures: HashSet::new(),
}
}
pub fn config_for(&self, ip: IpAddr) -> &StorageConfiguration {
self.per_process_configs.get(&ip).unwrap_or(&self.config)
}
}
impl Default for StorageState {
fn default() -> Self {
Self::new(StorageConfiguration::default())
}
}