use std::net::IpAddr;
use std::task::Waker;
use std::time::Duration;
use tracing::instrument;
use crate::storage::StorageError;
use crate::chaos::fault_events::SimFaultEvent;
use super::{
events::{Event, ScheduledEvent, StorageOperation},
rng::{sim_random, sim_random_range},
state::{FileId, PendingOpType, PendingStorageOp},
world::{SimInner, SimWorld},
};
fn take_pending_op(
inner: &mut SimInner,
file_id: FileId,
op_type: PendingOpType,
) -> Option<(u64, PendingStorageOp)> {
let file_state = inner.storage.files.get_mut(&file_id)?;
let op_seq = file_state
.pending_ops
.iter()
.find(|(_, op)| op.op_type == op_type)
.map(|(&seq, _)| seq)?;
let op = file_state.pending_ops.remove(&op_seq)?;
Some((op_seq, op))
}
pub(crate) fn handle_storage_event(
inner: &mut SimInner,
file_id: u64,
operation: StorageOperation,
) {
let file_id = FileId(file_id);
match operation {
StorageOperation::ReadComplete { len: _ } => {
handle_read_complete(inner, file_id);
}
StorageOperation::WriteComplete { len: _ } => {
handle_write_complete(inner, file_id);
}
StorageOperation::SyncComplete => {
handle_sync_complete(inner, file_id);
}
StorageOperation::OpenComplete => {
handle_open_complete(inner, file_id);
}
StorageOperation::SetLenComplete { new_len } => {
handle_set_len_complete(inner, file_id, new_len);
}
}
}
fn handle_read_complete(inner: &mut SimInner, file_id: FileId) {
let read_fault_probability = inner
.storage
.files
.get(&file_id)
.map(|f| inner.storage.config_for(f.owner_ip).read_fault_probability)
.unwrap_or(0.0);
let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Read) else {
tracing::warn!("ReadComplete for unknown file {:?}", file_id);
return;
};
let (offset, len) = (op.offset, op.len);
let mut read_faulted = false;
if read_fault_probability > 0.0
&& let Some(file_state) = inner.storage.files.get_mut(&file_id)
{
let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
let end_sector = (offset as usize + len).div_ceil(crate::storage::SECTOR_SIZE);
for sector in start_sector..end_sector {
if sim_random::<f64>() < read_fault_probability {
file_state.storage.set_fault(sector);
read_faulted = true;
tracing::info!(
"Read fault injected for file {:?}, sector {}",
file_id,
sector
);
}
}
}
if read_faulted {
let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
if let Some(ip) = ip {
inner.emit_fault(SimFaultEvent::StorageReadFault {
ip: ip.to_string(),
file_id: file_id.0,
});
}
}
if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
tracing::trace!("Waking read waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_write_complete(inner: &mut SimInner, file_id: FileId) {
let config = inner
.storage
.files
.get(&file_id)
.map(|f| inner.storage.config_for(f.owner_ip).clone())
.unwrap_or_default();
let owner_ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Write) else {
tracing::warn!("WriteComplete for unknown file {:?}", file_id);
return;
};
let (offset, data_opt) = (op.offset, op.data);
let mut write_fault_kind: Option<&str> = None;
if let Some(data) = data_opt
&& let Some(file_state) = inner.storage.files.get_mut(&file_id)
{
if sim_random::<f64>() < config.phantom_write_probability {
tracing::info!(
"Phantom write injected for file {:?}, offset {}, len {}",
file_id,
offset,
data.len()
);
file_state.storage.record_phantom_write(offset, &data);
write_fault_kind = Some("phantom");
}
else if sim_random::<f64>() < config.misdirect_write_probability {
let max_offset = file_state.storage.size().saturating_sub(data.len() as u64);
let mistaken_offset = if max_offset > 0 {
sim_random_range(0..max_offset)
} else {
0
};
tracing::info!(
"Misdirected write injected for file {:?}: intended={}, actual={}",
file_id,
offset,
mistaken_offset
);
if let Err(e) =
file_state
.storage
.apply_misdirected_write(offset, mistaken_offset, &data)
{
tracing::warn!("Failed to apply misdirected write: {}", e);
}
write_fault_kind = Some("misdirected");
}
else if let Err(e) = file_state.storage.write(offset, &data, false) {
tracing::warn!("Write failed for file {:?}: {}", file_id, e);
} else {
if config.write_fault_probability > 0.0 {
let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
let end_sector =
(offset as usize + data.len()).div_ceil(crate::storage::SECTOR_SIZE);
for sector in start_sector..end_sector {
if sim_random::<f64>() < config.write_fault_probability {
file_state.storage.set_fault(sector);
write_fault_kind = Some("corruption");
tracing::info!(
"Write fault injected for file {:?}, sector {}",
file_id,
sector
);
}
}
}
}
}
if let (Some(kind), Some(ip)) = (write_fault_kind, owner_ip) {
inner.emit_fault(SimFaultEvent::StorageWriteFault {
ip: ip.to_string(),
file_id: file_id.0,
kind: kind.to_string(),
});
}
if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
tracing::trace!("Waking write waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_sync_complete(inner: &mut SimInner, file_id: FileId) {
let sync_failure_prob = inner
.storage
.files
.get(&file_id)
.map(|f| {
inner
.storage
.config_for(f.owner_ip)
.sync_failure_probability
})
.unwrap_or(0.0);
let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Sync) else {
tracing::warn!("SyncComplete for unknown file {:?}", file_id);
return;
};
if sim_random::<f64>() < sync_failure_prob {
tracing::info!("Sync failure injected for file {:?}", file_id);
inner.storage.sync_failures.insert((file_id, op_seq));
let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
if let Some(ip) = ip {
inner.emit_fault(SimFaultEvent::StorageSyncFault {
ip: ip.to_string(),
file_id: file_id.0,
});
}
} else if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.storage.sync();
}
if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
tracing::trace!("Waking sync waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_open_complete(inner: &mut SimInner, file_id: FileId) {
let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Open) else {
tracing::trace!("OpenComplete for file {:?} (no pending op)", file_id);
return;
};
if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
tracing::trace!("Waking open waker for file {:?}, op {}", file_id, op_seq);
waker.wake();
}
}
fn handle_set_len_complete(inner: &mut SimInner, file_id: FileId, new_len: u64) {
let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::SetLen) else {
tracing::warn!("SetLenComplete for unknown file {:?}", file_id);
return;
};
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.storage.resize(new_len);
}
if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
tracing::trace!(
"Waking set_len waker for file {:?}, op {}, new_len={}",
file_id,
op_seq,
new_len
);
waker.wake();
}
}
impl SimWorld {
pub fn with_storage_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&crate::storage::StorageConfiguration) -> R,
{
let inner = self.inner.borrow();
f(&inner.storage.config)
}
pub(crate) fn open_file(
&self,
path: &str,
options: moonpool_core::OpenOptions,
initial_size: u64,
owner_ip: IpAddr,
) -> Result<FileId, StorageError> {
use crate::storage::InMemoryStorage;
let mut inner = self.inner.borrow_mut();
let path_str = path.to_string();
if options.create_new && inner.storage.path_to_file.contains_key(&path_str) {
return Err(StorageError::AlreadyExists { path: path_str });
}
if inner.storage.deleted_paths.contains(&path_str) && !options.create {
return Err(StorageError::NotFound { path: path_str });
}
if let Some(&existing_id) = inner.storage.path_to_file.get(&path_str) {
if let Some(file_state) = inner.storage.files.get_mut(&existing_id) {
if options.truncate {
let seed = sim_random::<u64>();
file_state.storage = InMemoryStorage::new(0, seed);
file_state.position = 0;
} else if options.append {
file_state.position = file_state.storage.size();
} else {
file_state.position = 0;
}
file_state.options = options;
file_state.is_closed = false;
}
return Ok(existing_id);
}
if !options.create && !options.create_new {
return Err(StorageError::NotFound { path: path_str });
}
let file_id = FileId(inner.storage.next_file_id);
inner.storage.next_file_id += 1;
inner.storage.deleted_paths.remove(&path_str);
let seed = sim_random::<u64>();
let storage = InMemoryStorage::new(initial_size, seed);
let file_state = super::state::StorageFileState::new(
file_id,
path_str.clone(),
options,
storage,
owner_ip,
);
inner.storage.files.insert(file_id, file_state);
inner.storage.path_to_file.insert(path_str, file_id);
let open_latency = Duration::from_micros(1);
let scheduled_time = inner.current_time + open_latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::OpenComplete,
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::debug!("Opened file {:?} with id {:?}", path, file_id);
Ok(file_id)
}
pub(crate) fn file_exists(&self, path: &str) -> bool {
let inner = self.inner.borrow();
let path_str = path.to_string();
inner.storage.path_to_file.contains_key(&path_str)
&& !inner.storage.deleted_paths.contains(&path_str)
}
pub(crate) fn delete_file(&self, path: &str) -> Result<(), StorageError> {
let mut inner = self.inner.borrow_mut();
let path_str = path.to_string();
if let Some(file_id) = inner.storage.path_to_file.remove(&path_str) {
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.is_closed = true;
}
inner.storage.files.remove(&file_id);
inner.storage.deleted_paths.insert(path_str);
tracing::debug!("Deleted file {:?}", path);
Ok(())
} else {
Err(StorageError::NotFound { path: path_str })
}
}
pub(crate) fn rename_file(&self, from: &str, to: &str) -> Result<(), StorageError> {
let mut inner = self.inner.borrow_mut();
let from_str = from.to_string();
let to_str = to.to_string();
if let Some(file_id) = inner.storage.path_to_file.remove(&from_str) {
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.path = to_str.clone();
}
inner.storage.path_to_file.insert(to_str, file_id);
inner.storage.deleted_paths.remove(&from_str);
tracing::debug!("Renamed file {:?} to {:?}", from, to);
Ok(())
} else {
Err(StorageError::NotFound { path: from_str })
}
}
pub(crate) fn schedule_read(
&self,
file_id: FileId,
offset: u64,
len: usize,
) -> Result<u64, StorageError> {
let mut inner = self.inner.borrow_mut();
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::Read,
offset,
len,
data: None,
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = Self::calculate_storage_latency(config, len, false);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::ReadComplete { len: len as u32 },
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!(
"Scheduled read: file={:?}, offset={}, len={}, op_seq={}",
file_id,
offset,
len,
op_seq
);
Ok(op_seq)
}
pub(crate) fn schedule_write(
&self,
file_id: FileId,
offset: u64,
data: Vec<u8>,
) -> Result<u64, StorageError> {
let mut inner = self.inner.borrow_mut();
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
let len = data.len();
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::Write,
offset,
len,
data: Some(data),
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = Self::calculate_storage_latency(config, len, true);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::WriteComplete { len: len as u32 },
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!(
"Scheduled write: file={:?}, offset={}, len={}, op_seq={}",
file_id,
offset,
len,
op_seq
);
Ok(op_seq)
}
pub(crate) fn schedule_sync(&self, file_id: FileId) -> Result<u64, StorageError> {
let mut inner = self.inner.borrow_mut();
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::Sync,
offset: 0,
len: 0,
data: None,
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = crate::network::sample_duration(&config.sync_latency);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::SyncComplete,
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!("Scheduled sync: file={:?}, op_seq={}", file_id, op_seq);
Ok(op_seq)
}
pub(crate) fn schedule_set_len(
&self,
file_id: FileId,
new_len: u64,
) -> Result<u64, StorageError> {
let mut inner = self.inner.borrow_mut();
let file_state = inner
.storage
.files
.get_mut(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
let op_seq = file_state.next_op_seq;
file_state.next_op_seq += 1;
file_state.pending_ops.insert(
op_seq,
PendingStorageOp {
op_type: PendingOpType::SetLen,
offset: new_len,
len: 0,
data: None,
},
);
let owner_ip = file_state.owner_ip;
let config = inner.storage.config_for(owner_ip);
let latency = crate::network::sample_duration(&config.write_latency);
let scheduled_time = inner.current_time + latency;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let event = Event::Storage {
file_id: file_id.0,
operation: StorageOperation::SetLenComplete { new_len },
};
inner
.event_queue
.schedule(ScheduledEvent::new(scheduled_time, event, sequence));
tracing::trace!(
"Scheduled set_len: file={:?}, new_len={}, op_seq={}",
file_id,
new_len,
op_seq
);
Ok(op_seq)
}
pub(crate) fn is_storage_op_complete(&self, file_id: FileId, op_seq: u64) -> bool {
let inner = self.inner.borrow();
if let Some(file_state) = inner.storage.files.get(&file_id) {
!file_state.pending_ops.contains_key(&op_seq)
} else {
true
}
}
pub(crate) fn take_sync_failure(&self, file_id: FileId, op_seq: u64) -> bool {
let mut inner = self.inner.borrow_mut();
inner.storage.sync_failures.remove(&(file_id, op_seq))
}
pub(crate) fn register_storage_waker(&self, file_id: FileId, op_seq: u64, waker: Waker) {
let mut inner = self.inner.borrow_mut();
inner.wakers.storage_wakers.insert((file_id, op_seq), waker);
}
pub(crate) fn read_from_file(
&self,
file_id: FileId,
offset: u64,
buf: &mut [u8],
) -> Result<usize, StorageError> {
let inner = self.inner.borrow();
let file_state = inner
.storage
.files
.get(&file_id)
.ok_or(StorageError::InvalidFileHandle { file_id })?;
if file_state.is_closed {
return Err(StorageError::FileClosed { file_id });
}
file_state
.storage
.read(offset, buf)
.map_err(|e| StorageError::Io {
file_id,
kind: e.kind(),
message: e.to_string(),
})?;
Ok(buf.len())
}
pub(crate) fn file_position(&self, file_id: FileId) -> Result<u64, StorageError> {
let inner = self.inner.borrow();
inner
.storage
.files
.get(&file_id)
.map(|f| f.position)
.ok_or(StorageError::InvalidFileHandle { file_id })
}
pub(crate) fn set_file_position(
&self,
file_id: FileId,
position: u64,
) -> Result<(), StorageError> {
let mut inner = self.inner.borrow_mut();
if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
file_state.position = position;
Ok(())
} else {
Err(StorageError::InvalidFileHandle { file_id })
}
}
pub(crate) fn file_size(&self, file_id: FileId) -> Result<u64, StorageError> {
let inner = self.inner.borrow();
inner
.storage
.files
.get(&file_id)
.map(|f| f.storage.size())
.ok_or(StorageError::InvalidFileHandle { file_id })
}
fn calculate_storage_latency(
config: &crate::storage::StorageConfiguration,
size: usize,
is_write: bool,
) -> Duration {
let base_range = if is_write {
&config.write_latency
} else {
&config.read_latency
};
let base = crate::network::sample_duration(base_range);
let iops_overhead = Duration::from_secs_f64(1.0 / config.iops as f64);
let transfer = Duration::from_secs_f64(size as f64 / config.bandwidth as f64);
base + iops_overhead + transfer
}
#[instrument(skip(self))]
pub fn simulate_crash_for_process(&self, ip: IpAddr, close_files: bool) {
let mut inner = self.inner.borrow_mut();
let crash_probability = inner.storage.config_for(ip).crash_fault_probability;
let mut wakers_to_wake = Vec::new();
let file_ids: Vec<FileId> = inner
.storage
.files
.iter()
.filter(|(_, f)| f.owner_ip == ip)
.map(|(id, _)| *id)
.collect();
for file_id in &file_ids {
if let Some(file_state) = inner.storage.files.get_mut(file_id) {
file_state.storage.apply_crash(crash_probability);
let lost_ops: Vec<u64> = file_state.pending_ops.keys().copied().collect();
file_state.pending_ops.clear();
for op_seq in lost_ops {
wakers_to_wake.push((*file_id, op_seq));
}
if close_files {
file_state.is_closed = true;
}
}
}
for key in wakers_to_wake {
if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
waker.wake();
}
}
inner.emit_fault(SimFaultEvent::StorageCrash { ip: ip.to_string() });
tracing::info!(
"Storage crash simulated for {}: {} files affected, close_files={}",
ip,
file_ids.len(),
close_files
);
}
#[instrument(skip(self))]
pub fn wipe_storage_for_process(&self, ip: IpAddr) {
let mut inner = self.inner.borrow_mut();
let file_ids: Vec<(FileId, String)> = inner
.storage
.files
.iter()
.filter(|(_, f)| f.owner_ip == ip)
.map(|(id, f)| (*id, f.path.clone()))
.collect();
let mut wakers_to_wake = Vec::new();
for (file_id, path) in &file_ids {
if let Some(file_state) = inner.storage.files.remove(file_id) {
for op_seq in file_state.pending_ops.keys() {
wakers_to_wake.push((*file_id, *op_seq));
}
}
inner.storage.path_to_file.remove(path);
inner.storage.deleted_paths.insert(path.clone());
}
for key in wakers_to_wake {
if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
waker.wake();
}
}
inner.emit_fault(SimFaultEvent::StorageWipe { ip: ip.to_string() });
tracing::info!("Storage wiped for {}: {} files deleted", ip, file_ids.len(),);
}
#[instrument(skip(self, config))]
pub fn set_process_storage_config(
&self,
ip: IpAddr,
config: crate::storage::StorageConfiguration,
) {
let mut inner = self.inner.borrow_mut();
inner.storage.per_process_configs.insert(ip, config);
}
}