use std::net::IpAddr;
use std::task::Waker;
use std::time::Duration;
use tracing::instrument;
use crate::storage::StorageError;
use crate::chaos::fault_events::SimFaultEvent;
use super::{
events::{Event, ScheduledEvent, StorageOperation},
rng::{sim_random, sim_random_range},
state::{FileId, PendingOpType, PendingStorageOp},
world::{SimInner, SimWorld},
};
fn take_pending_op(
inner: &mut SimInner,
file_id: FileId,
op_type: PendingOpType,
) -> Option<(u64, PendingStorageOp)> {
let file_state = inner.storage.files.get_mut(&file_id)?;
let op_seq = file_state
.pending_ops
.iter()
.find(|(_, op)| op.op_type == op_type)
.map(|(&seq, _)| seq)?;
let op = file_state.pending_ops.remove(&op_seq)?;
Some((op_seq, op))
}
pub(crate) fn handle_storage_event(
inner: &mut SimInner,
file_id: u64,
operation: StorageOperation,
) {
let file_id = FileId(file_id);
match operation {
StorageOperation::ReadComplete { len: _ } => {
handle_read_complete(inner, file_id);
}
StorageOperation::WriteComplete { len: _ } => {
handle_write_complete(inner, file_id);
}
StorageOperation::SyncComplete => {
handle_sync_complete(inner, file_id);
}
StorageOperation::OpenComplete => {
handle_open_complete(inner, file_id);
}
StorageOperation::SetLenComplete { new_len } => {
handle_set_len_complete(inner, file_id, new_len);
}
}
}
fn handle_read_complete(inner: &mut SimInner, file_id: FileId) {
let read_fault_probability = inner.storage.files.get(&file_id).map_or(0.0, |f| {
inner.storage.config_for(f.owner_ip).read_fault_probability
});
let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Read) else {
tracing::warn!("ReadComplete for unknown file {:?}", file_id);
return;
};
let (offset, len) = (op.offset, op.len);
let mut read_faulted = false;
if read_fault_probability > 0.0
&& let Some(file_state) = inner.storage.files.get_mut(&file_id)
{
let offset_usize = usize::try_from(offset).expect("offset fits in usize");
let start_sector = offset_usize / crate::storage::SECTOR_SIZE;
let end_sector = (offset_usize + len).div_ceil(crate::storage::SECTOR_SIZE);
for sector in start_sector..end_sector {
if sim_random::<f64>() < read_fault_probability {
file_state.storage.set_fault(sector);
read_faulted = true;
tracing::info!(
"Read fault injected for file {:?}, sector {}",
file_id,
sector
);
}
}
}
if read_faulted {
let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
if let Some(ip) = ip {
SimInner::emit_fault(&SimFaultEvent::StorageReadFault {
ip: ip.to_string(),
file_id: file_id.0,
});
}
}
if let Some(waker) = inner.wakers.storage_ops.remove(&(file_id, op_seq)) {
tracing::trace!("Waking read waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_write_complete(inner: &mut SimInner, file_id: FileId) {
let config = inner
.storage
.files
.get(&file_id)
.map(|f| inner.storage.config_for(f.owner_ip).clone())
.unwrap_or_default();
let owner_ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Write) else {
tracing::warn!("WriteComplete for unknown file {:?}", file_id);
return;
};
let (offset, data_opt) = (op.offset, op.data);
let mut write_fault_kind: Option<&str> = None;
if let Some(data) = data_opt
&& let Some(file_state) = inner.storage.files.get_mut(&file_id)
{
if sim_random::<f64>() < config.phantom_write_probability {
tracing::info!(
"Phantom write injected for file {:?}, offset {}, len {}",
file_id,
offset,
data.len()
);
file_state.storage.record_phantom_write(offset, &data);
write_fault_kind = Some("phantom");
}
else if sim_random::<f64>() < config.misdirect_write_probability {
let max_offset = file_state.storage.size().saturating_sub(data.len() as u64);
let mistaken_offset = if max_offset > 0 {
sim_random_range(0..max_offset)
} else {
0
};
tracing::info!(
"Misdirected write injected for file {:?}: intended={}, actual={}",
file_id,
offset,
mistaken_offset
);
if let Err(e) =
file_state
.storage
.apply_misdirected_write(offset, mistaken_offset, &data)
{
tracing::warn!("Failed to apply misdirected write: {}", e);
}
write_fault_kind = Some("misdirected");
}
else if let Err(e) = file_state.storage.write(offset, &data, false) {
tracing::warn!("Write failed for file {:?}: {}", file_id, e);
} else {
if config.write_fault_probability > 0.0 {
let offset_usize = usize::try_from(offset).expect("offset fits in usize");
let start_sector = offset_usize / crate::storage::SECTOR_SIZE;
let end_sector = (offset_usize + data.len()).div_ceil(crate::storage::SECTOR_SIZE);
for sector in start_sector..end_sector {
if sim_random::<f64>() < config.write_fault_probability {
file_state.storage.set_fault(sector);
write_fault_kind = Some("corruption");
tracing::info!(
"Write fault injected for file {:?}, sector {}",
file_id,
sector
);
}
}
}
}
}
if let (Some(kind), Some(ip)) = (write_fault_kind, owner_ip) {
SimInner::emit_fault(&SimFaultEvent::StorageWriteFault {
ip: ip.to_string(),
file_id: file_id.0,
kind: kind.to_string(),
});
}
if let Some(waker) = inner.wakers.storage_ops.remove(&(file_id, op_seq)) {
tracing::trace!("Waking write waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_sync_complete(inner: &mut SimInner, file_id: FileId) {
let sync_failure_prob = inner.storage.files.get(&file_id).map_or(0.0, |f| {
inner
.storage
.config_for(f.owner_ip)
.sync_failure_probability
});
let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Sync) else {
tracing::warn!("SyncComplete for unknown file {:?}", file_id);
return;
};
if sim_random::<f64>() < sync_failure_prob {
tracing::info!("Sync failure injected for file {:?}", file_id);
inner.storage.sync_failures.insert((file_id, op_seq));
let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
if let Some(ip) = ip {
SimInner::emit_fault(&SimFaultEvent::StorageSyncFault {
ip: ip.to_string(),
file_id: file_id.0,
});
}
} else if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.storage.sync();
}
if let Some(waker) = inner.wakers.storage_ops.remove(&(file_id, op_seq)) {
tracing::trace!("Waking sync waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_open_complete(inner: &mut SimInner, file_id: FileId) {
let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Open) else {
tracing::trace!("OpenComplete for file {:?} (no pending op)", file_id);
return;
};
if let Some(waker) = inner.wakers.storage_ops.remove(&(file_id, op_seq)) {
tracing::trace!("Waking open waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_set_len_complete(inner: &mut SimInner, file_id: FileId, new_len: u64) {
let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::SetLen) else {
tracing::warn!("SetLenComplete for unknown file {:?}", file_id);
return;
};
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.storage.resize(new_len);
}
if let Some(waker) = inner.wakers.storage_ops.remove(&(file_id, op_seq)) {
tracing::trace!(
"Waking set_len waker for file {:?}, op {}, new_len={}",
file_id,
op_seq,
new_len
);
waker.wake();
}
}
impl SimWorld {
pub fn with_storage_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&crate::storage::StorageConfiguration) -> R,
{
let inner = self
.inner
.read()
.expect("RwLock poisoned: prior task panicked");
f(&inner.storage.config)
}
pub(crate) fn open_file(
&self,
path: &str,
options: moonpool_core::OpenOptions,
initial_size: u64,
owner_ip: IpAddr,
) -> Result<FileId, StorageError> {
use crate::storage::InMemoryStorage;
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let path_str = path.to_string();
if options.is_create_new() && inner.storage.path_to_file.contains_key(&path_str) {
return Err(StorageError::AlreadyExists { path: path_str });
}
if inner.storage.deleted_paths.contains(&path_str) && !options.is_create() {
return Err(StorageError::NotFound { path: path_str });
}
if let Some(&existing_id) = inner.storage.path_to_file.get(&path_str) {
if let Some(file_state) = inner.storage.files.get_mut(&existing_id) {
if options.is_truncate() {
let seed = sim_random::<u64>();
file_state.storage = InMemoryStorage::new(0, seed);
file_state.position = 0;
} else if options.is_append() {
file_state.position = file_state.storage.size();
} else {
file_state.position = 0;
}
file_state.options = options;
file_state.is_closed = false;
}
return Ok(existing_id);
}
if !options.is_create() && !options.is_create_new() {
return Err(StorageError::NotFound { path: path_str });
}
let file_id = FileId(inner.storage.next_file_id);
inner.storage.next_file_id += 1;
inner.storage.deleted_paths.remove(&path_str);
let seed = sim_random::<u64>();
let storage = InMemoryStorage::new(initial_size, seed);
let file_state = super::state::StorageFileState::new(
file_id,
path_str.clone(),
options,
storage,
owner_ip,
);
inner.storage.files.insert(file_id, file_state);
inner.storage.path_to_file.insert(path_str, file_id);
let open_latency = Duration::from_micros(1);
let scheduled_time = inner.current_time + open_latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::OpenComplete,
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::debug!("Opened file {:?} with id {:?}", path, file_id);
Ok(file_id)
}
pub(crate) fn file_exists(&self, path: &str) -> bool {
let inner = self
.inner
.read()
.expect("RwLock poisoned: prior task panicked");
let path_str = path.to_string();
inner.storage.path_to_file.contains_key(&path_str)
&& !inner.storage.deleted_paths.contains(&path_str)
}
pub(crate) fn delete_file(&self, path: &str) -> Result<(), StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let path_str = path.to_string();
if let Some(file_id) = inner.storage.path_to_file.remove(&path_str) {
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.is_closed = true;
}
inner.storage.files.remove(&file_id);
inner.storage.deleted_paths.insert(path_str);
tracing::debug!("Deleted file {:?}", path);
Ok(())
} else {
Err(StorageError::NotFound { path: path_str })
}
}
pub(crate) fn rename_file(&self, from: &str, to: &str) -> Result<(), StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let from_str = from.to_string();
let to_str = to.to_string();
if let Some(file_id) = inner.storage.path_to_file.remove(&from_str) {
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.path.clone_from(&to_str);
}
inner.storage.path_to_file.insert(to_str, file_id);
inner.storage.deleted_paths.remove(&from_str);
tracing::debug!("Renamed file {:?} to {:?}", from, to);
Ok(())
} else {
Err(StorageError::NotFound { path: from_str })
}
}
pub(crate) fn schedule_read(
&self,
file_id: FileId,
offset: u64,
len: usize,
) -> Result<u64, StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::Read,
offset,
len,
data: None,
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = Self::calculate_storage_latency(config, len, false);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::ReadComplete {
len: u32::try_from(len).expect("read length fits in u32"),
},
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!(
"Scheduled read: file={:?}, offset={}, len={}, op_seq={}",
file_id,
offset,
len,
op_seq
);
Ok(op_seq)
}
pub(crate) fn schedule_write(
&self,
file_id: FileId,
offset: u64,
data: Vec<u8>,
) -> Result<u64, StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
let len = data.len();
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::Write,
offset,
len,
data: Some(data),
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = Self::calculate_storage_latency(config, len, true);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::WriteComplete {
len: u32::try_from(len).expect("write length fits in u32"),
},
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!(
"Scheduled write: file={:?}, offset={}, len={}, op_seq={}",
file_id,
offset,
len,
op_seq
);
Ok(op_seq)
}
pub(crate) fn schedule_sync(&self, file_id: FileId) -> Result<u64, StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::Sync,
offset: 0,
len: 0,
data: None,
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = crate::network::sample_duration(&config.sync_latency);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::SyncComplete,
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!("Scheduled sync: file={:?}, op_seq={}", file_id, op_seq);
Ok(op_seq)
}
pub(crate) fn schedule_set_len(
&self,
file_id: FileId,
new_len: u64,
) -> Result<u64, StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::SetLen,
offset: new_len,
len: 0,
data: None,
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = crate::network::sample_duration(&config.write_latency);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::SetLenComplete { new_len },
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!(
"Scheduled set_len: file={:?}, new_len={}, op_seq={}",
file_id,
new_len,
op_seq
);
Ok(op_seq)
}
pub(crate) fn is_storage_op_complete(&self, file_id: FileId, op_seq: u64) -> bool {
let inner = self
.inner
.read()
.expect("RwLock poisoned: prior task panicked");
if let Some(file_state) = inner.storage.files.get(&file_id) {
!file_state.pending_ops.contains_key(&op_seq)
} else {
true
}
}
pub(crate) fn take_sync_failure(&self, file_id: FileId, op_seq: u64) -> bool {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
inner.storage.sync_failures.remove(&(file_id, op_seq))
}
pub(crate) fn register_storage_waker(&self, file_id: FileId, op_seq: u64, waker: Waker) {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
inner.wakers.storage_ops.insert((file_id, op_seq), waker);
}
pub(crate) fn read_from_file(
&self,
file_id: FileId,
offset: u64,
buf: &mut [u8],
) -> Result<usize, StorageError> {
let inner = self
.inner
.read()
.expect("RwLock poisoned: prior task panicked");
let file_state = inner
.storage
.files
.get(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
file_state
.storage
.read(offset, buf)
.map_err(|e| StorageError::Io {
file_id,
kind: e.kind(),
message: e.to_string(),
})?;
Ok(buf.len())
}
pub(crate) fn file_position(&self, file_id: FileId) -> Result<u64, StorageError> {
let inner = self
.inner
.read()
.expect("RwLock poisoned: prior task panicked");
inner
.storage
.files
.get(&file_id)
.map(|f| f.position)
.ok_or(StorageError::InvalidFileHandle { file_id })
}
pub(crate) fn set_file_position(
&self,
file_id: FileId,
position: u64,
) -> Result<(), StorageError> {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.position = position;
Ok(())
} else {
Err(StorageError::InvalidFileHandle { file_id })
}
}
pub(crate) fn file_size(&self, file_id: FileId) -> Result<u64, StorageError> {
let inner = self
.inner
.read()
.expect("RwLock poisoned: prior task panicked");
inner
.storage
.files
.get(&file_id)
.map(|f| f.storage.size())
.ok_or(StorageError::InvalidFileHandle { file_id })
}
fn calculate_storage_latency(
config: &crate::storage::StorageConfiguration,
size: usize,
is_write: bool,
) -> Duration {
let base_range = if is_write {
&config.write_latency
} else {
&config.read_latency
};
let base = crate::network::sample_duration(base_range);
let iops_f64 = u32::try_from(config.iops).map_or(f64::from(u32::MAX), f64::from);
let iops_overhead = Duration::from_secs_f64(1.0 / iops_f64);
let size_f64 = u32::try_from(size).map_or(f64::from(u32::MAX), f64::from);
let bandwidth_f64 = u32::try_from(config.bandwidth).map_or(f64::from(u32::MAX), f64::from);
let transfer = Duration::from_secs_f64(size_f64 / bandwidth_f64);
base + iops_overhead + transfer
}
#[instrument(skip(self))]
pub fn simulate_crash_for_process(&self, ip: IpAddr, close_files: bool) {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let crash_probability = inner.storage.config_for(ip).crash_fault_probability;
let mut wakers_to_wake = Vec::new();
let file_ids: Vec<FileId> = inner
.storage
.files
.iter()
.filter(|(_, f)| f.owner_ip == ip)
.map(|(id, _)| *id)
.collect();
for file_id in &file_ids {
if let Some(file_state) = inner.storage.files.get_mut(file_id) {
file_state.storage.apply_crash(crash_probability);
let lost_ops: Vec<u64> = file_state.pending_ops.keys().copied().collect();
file_state.pending_ops.clear();
for op_seq in lost_ops {
wakers_to_wake.push((*file_id, op_seq));
}
if close_files {
file_state.is_closed = true;
}
}
}
for key in wakers_to_wake {
if let Some(waker) = inner.wakers.storage_ops.remove(&key) {
waker.wake();
}
}
SimInner::emit_fault(&SimFaultEvent::StorageCrash { ip: ip.to_string() });
tracing::info!(
"Storage crash simulated for {}: {} files affected, close_files={}",
ip,
file_ids.len(),
close_files
);
}
#[instrument(skip(self))]
pub fn wipe_storage_for_process(&self, ip: IpAddr) {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
let file_ids: Vec<(FileId, String)> = inner
.storage
.files
.iter()
.filter(|(_, f)| f.owner_ip == ip)
.map(|(id, f)| (*id, f.path.clone()))
.collect();
let mut wakers_to_wake = Vec::new();
for (file_id, path) in &file_ids {
if let Some(file_state) = inner.storage.files.remove(file_id) {
for op_seq in file_state.pending_ops.keys() {
wakers_to_wake.push((*file_id, *op_seq));
}
}
inner.storage.path_to_file.remove(path);
inner.storage.deleted_paths.insert(path.clone());
}
for key in wakers_to_wake {
if let Some(waker) = inner.wakers.storage_ops.remove(&key) {
waker.wake();
}
}
SimInner::emit_fault(&SimFaultEvent::StorageWipe { ip: ip.to_string() });
tracing::info!("Storage wiped for {}: {} files deleted", ip, file_ids.len(),);
}
#[instrument(skip(self, config))]
pub fn set_process_storage_config(
&self,
ip: IpAddr,
config: crate::storage::StorageConfiguration,
) {
let mut inner = self
.inner
.write()
.expect("RwLock poisoned: prior task panicked");
inner.storage.per_process_configs.insert(ip, config);
}
}