Skip to main content

moonpool_sim/sim/
storage_ops.rs

1//! Storage I/O operations for the simulation.
2//!
3//! This module contains storage-related event handlers and methods extracted from
4//! `world.rs` to improve code organization. It handles file operations like
5//! open, read, write, sync, and provides fault injection for testing storage reliability.
6
7use std::task::Waker;
8use std::time::Duration;
9
10use crate::{SimulationError, SimulationResult};
11
12use super::{
13    events::{Event, ScheduledEvent, StorageOperation},
14    rng::{sim_random, sim_random_range},
15    state::{FileId, PendingOpType, PendingStorageOp},
16    world::{SimInner, SimWorld},
17};
18
19// =============================================================================
20// Storage Event Handlers
21// =============================================================================
22
23/// Find and remove the first pending operation of the given type for a file.
24///
25/// Returns the sequence number and operation if found, or None if no such operation exists.
26fn take_pending_op(
27    inner: &mut SimInner,
28    file_id: FileId,
29    op_type: PendingOpType,
30) -> Option<(u64, PendingStorageOp)> {
31    let file_state = inner.storage.files.get_mut(&file_id)?;
32
33    let op_seq = file_state
34        .pending_ops
35        .iter()
36        .find(|(_, op)| op.op_type == op_type)
37        .map(|(&seq, _)| seq)?;
38
39    let op = file_state.pending_ops.remove(&op_seq)?;
40    Some((op_seq, op))
41}
42
43/// Handle storage I/O events.
44///
45/// Storage events represent the completion of I/O operations.
46/// Processing applies faults and wakes waiting tasks.
47pub(crate) fn handle_storage_event(
48    inner: &mut SimInner,
49    file_id: u64,
50    operation: StorageOperation,
51) {
52    let file_id = FileId(file_id);
53
54    match operation {
55        StorageOperation::ReadComplete { len: _ } => {
56            handle_read_complete(inner, file_id);
57        }
58        StorageOperation::WriteComplete { len: _ } => {
59            handle_write_complete(inner, file_id);
60        }
61        StorageOperation::SyncComplete => {
62            handle_sync_complete(inner, file_id);
63        }
64        StorageOperation::OpenComplete => {
65            handle_open_complete(inner, file_id);
66        }
67        StorageOperation::SetLenComplete { new_len } => {
68            handle_set_len_complete(inner, file_id, new_len);
69        }
70    }
71}
72
73/// Handle read operation completion.
74fn handle_read_complete(inner: &mut SimInner, file_id: FileId) {
75    let read_fault_probability = inner.storage.config.read_fault_probability;
76
77    // Find and remove the oldest pending read operation
78    let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Read) else {
79        tracing::warn!("ReadComplete for unknown file {:?}", file_id);
80        return;
81    };
82
83    let (offset, len) = (op.offset, op.len);
84
85    // Apply read fault injection - mark sectors as faulted based on probability
86    if read_fault_probability > 0.0
87        && let Some(file_state) = inner.storage.files.get_mut(&file_id)
88    {
89        let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
90        let end_sector = (offset as usize + len).div_ceil(crate::storage::SECTOR_SIZE);
91
92        for sector in start_sector..end_sector {
93            if sim_random::<f64>() < read_fault_probability {
94                file_state.storage.set_fault(sector);
95                tracing::info!(
96                    "Read fault injected for file {:?}, sector {}",
97                    file_id,
98                    sector
99                );
100            }
101        }
102    }
103
104    // Wake the waker for this operation
105    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
106        tracing::trace!("Waking read waker for file {:?}, op {}", file_id, op_seq);
107        waker.wake();
108    }
109}
110
111/// Handle write operation completion.
112///
113/// Applies the write to storage with potential fault injection:
114/// - phantom_write_probability: write appears to succeed but isn't persisted
115/// - misdirect_write_probability: write lands at wrong location
116fn handle_write_complete(inner: &mut SimInner, file_id: FileId) {
117    let config = inner.storage.config.clone();
118
119    // Find and remove the oldest pending write operation
120    let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Write) else {
121        tracing::warn!("WriteComplete for unknown file {:?}", file_id);
122        return;
123    };
124
125    let (offset, data_opt) = (op.offset, op.data);
126
127    // Apply the write with potential fault injection
128    if let Some(data) = data_opt
129        && let Some(file_state) = inner.storage.files.get_mut(&file_id)
130    {
131        // Check for phantom write (write appears to succeed but doesn't persist)
132        if sim_random::<f64>() < config.phantom_write_probability {
133            tracing::info!(
134                "Phantom write injected for file {:?}, offset {}, len {}",
135                file_id,
136                offset,
137                data.len()
138            );
139            file_state.storage.record_phantom_write(offset, &data);
140        }
141        // Check for misdirected write
142        else if sim_random::<f64>() < config.misdirect_write_probability {
143            // Pick a random different offset
144            let max_offset = file_state.storage.size().saturating_sub(data.len() as u64);
145            let mistaken_offset = if max_offset > 0 {
146                sim_random_range(0..max_offset)
147            } else {
148                0
149            };
150            tracing::info!(
151                "Misdirected write injected for file {:?}: intended={}, actual={}",
152                file_id,
153                offset,
154                mistaken_offset
155            );
156            if let Err(e) =
157                file_state
158                    .storage
159                    .apply_misdirected_write(offset, mistaken_offset, &data)
160            {
161                tracing::warn!("Failed to apply misdirected write: {}", e);
162            }
163        }
164        // Normal write (not synced - may be lost on crash)
165        else if let Err(e) = file_state.storage.write(offset, &data, false) {
166            tracing::warn!("Write failed for file {:?}: {}", file_id, e);
167        } else {
168            // Check for write corruption - mark sectors as faulted after successful write
169            if config.write_fault_probability > 0.0 {
170                let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
171                let end_sector =
172                    (offset as usize + data.len()).div_ceil(crate::storage::SECTOR_SIZE);
173
174                for sector in start_sector..end_sector {
175                    if sim_random::<f64>() < config.write_fault_probability {
176                        file_state.storage.set_fault(sector);
177                        tracing::info!(
178                            "Write fault injected for file {:?}, sector {}",
179                            file_id,
180                            sector
181                        );
182                    }
183                }
184            }
185        }
186    }
187
188    // Wake the waker for this operation
189    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
190        tracing::trace!("Waking write waker for file {:?}, op {}", file_id, op_seq);
191        waker.wake();
192    }
193}
194
195/// Handle sync operation completion.
196///
197/// Applies sync_failure_probability fault injection.
198fn handle_sync_complete(inner: &mut SimInner, file_id: FileId) {
199    let sync_failure_prob = inner.storage.config.sync_failure_probability;
200
201    // Find and remove the oldest pending sync operation
202    let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Sync) else {
203        tracing::warn!("SyncComplete for unknown file {:?}", file_id);
204        return;
205    };
206
207    // Check for sync failure
208    if sim_random::<f64>() < sync_failure_prob {
209        tracing::info!("Sync failure injected for file {:?}", file_id);
210        // Record the failure so SyncFuture can return an error
211        inner.storage.sync_failures.insert((file_id, op_seq));
212        // On sync failure, we don't call storage.sync()
213        // Data remains in pending state and may be lost on crash
214    } else if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
215        // Successful sync - make all pending writes durable
216        file_state.storage.sync();
217    }
218
219    // Wake the waker for this operation
220    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
221        tracing::trace!("Waking sync waker for file {:?}, op {}", file_id, op_seq);
222        waker.wake();
223    }
224}
225
226/// Handle open operation completion.
227fn handle_open_complete(inner: &mut SimInner, file_id: FileId) {
228    // Find and remove the oldest pending open operation
229    let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Open) else {
230        // File might not have pending open op (it was already "open" on creation)
231        tracing::trace!("OpenComplete for file {:?} (no pending op)", file_id);
232        return;
233    };
234
235    // Wake the waker for this operation
236    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
237        tracing::trace!("Waking open waker for file {:?}, op {}", file_id, op_seq);
238        waker.wake();
239    }
240}
241
242/// Handle set_len operation completion.
243fn handle_set_len_complete(inner: &mut SimInner, file_id: FileId, new_len: u64) {
244    // Find and remove the oldest pending set_len operation
245    let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::SetLen) else {
246        tracing::warn!("SetLenComplete for unknown file {:?}", file_id);
247        return;
248    };
249
250    // Resize the storage (preserves seed, written/fault bitmaps, and overlays)
251    if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
252        file_state.storage.resize(new_len);
253    }
254
255    // Wake the waker for this operation
256    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
257        tracing::trace!(
258            "Waking set_len waker for file {:?}, op {}, new_len={}",
259            file_id,
260            op_seq,
261            new_len
262        );
263        waker.wake();
264    }
265}
266
267// =============================================================================
268// Storage Methods for SimWorld
269// =============================================================================
270
271impl SimWorld {
272    /// Access storage configuration for the simulation.
273    pub fn with_storage_config<F, R>(&self, f: F) -> R
274    where
275        F: FnOnce(&crate::storage::StorageConfiguration) -> R,
276    {
277        let inner = self.inner.borrow();
278        f(&inner.storage.config)
279    }
280
281    /// Open a file in the simulation.
282    ///
283    /// Creates a new file or opens an existing one based on the options.
284    /// Schedules an `OpenComplete` event and returns the file ID.
285    pub(crate) fn open_file(
286        &self,
287        path: &str,
288        options: moonpool_core::OpenOptions,
289        initial_size: u64,
290    ) -> SimulationResult<FileId> {
291        use crate::storage::InMemoryStorage;
292
293        let mut inner = self.inner.borrow_mut();
294        let path_str = path.to_string();
295
296        // Check create_new semantics - fail if file exists
297        if options.create_new && inner.storage.path_to_file.contains_key(&path_str) {
298            return Err(SimulationError::IoError(
299                "File already exists (create_new)".to_string(),
300            ));
301        }
302
303        // Check if file was deleted and create is not set
304        if inner.storage.deleted_paths.contains(&path_str) && !options.create {
305            return Err(SimulationError::IoError("File not found".to_string()));
306        }
307
308        // If file already exists and we're opening it, return existing file ID
309        if let Some(&existing_id) = inner.storage.path_to_file.get(&path_str) {
310            if let Some(file_state) = inner.storage.files.get_mut(&existing_id) {
311                // If truncate is set, reset the storage
312                if options.truncate {
313                    let seed = sim_random::<u64>();
314                    file_state.storage = InMemoryStorage::new(0, seed);
315                    file_state.position = 0;
316                } else if options.append {
317                    // For append mode, seek to end
318                    file_state.position = file_state.storage.size();
319                } else {
320                    // For normal reopen, reset position to start
321                    file_state.position = 0;
322                }
323                // Update options for the new open
324                file_state.options = options;
325                file_state.is_closed = false;
326            }
327            return Ok(existing_id);
328        }
329
330        // File doesn't exist - check if we're allowed to create it
331        if !options.create && !options.create_new {
332            return Err(SimulationError::IoError("File not found".to_string()));
333        }
334
335        // Create new file
336        let file_id = FileId(inner.storage.next_file_id);
337        inner.storage.next_file_id += 1;
338
339        // Remove from deleted paths if re-creating
340        inner.storage.deleted_paths.remove(&path_str);
341
342        // Create in-memory storage with deterministic seed
343        let seed = sim_random::<u64>();
344        let storage = InMemoryStorage::new(initial_size, seed);
345
346        let file_state =
347            super::state::StorageFileState::new(file_id, path_str.clone(), options, storage);
348
349        inner.storage.files.insert(file_id, file_state);
350        inner.storage.path_to_file.insert(path_str, file_id);
351
352        // Schedule OpenComplete event with minimal latency
353        let open_latency = Duration::from_micros(1);
354        let scheduled_time = inner.current_time + open_latency;
355        let sequence = inner.next_sequence;
356        inner.next_sequence += 1;
357        let event = Event::Storage {
358            file_id: file_id.0,
359            operation: StorageOperation::OpenComplete,
360        };
361        inner
362            .event_queue
363            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
364
365        tracing::debug!("Opened file {:?} with id {:?}", path, file_id);
366        Ok(file_id)
367    }
368
369    /// Check if a file exists at the given path.
370    pub(crate) fn file_exists(&self, path: &str) -> bool {
371        let inner = self.inner.borrow();
372        let path_str = path.to_string();
373        inner.storage.path_to_file.contains_key(&path_str)
374            && !inner.storage.deleted_paths.contains(&path_str)
375    }
376
377    /// Delete a file at the given path.
378    pub(crate) fn delete_file(&self, path: &str) -> SimulationResult<()> {
379        let mut inner = self.inner.borrow_mut();
380        let path_str = path.to_string();
381
382        if let Some(file_id) = inner.storage.path_to_file.remove(&path_str) {
383            // Mark file as closed and remove it
384            if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
385                file_state.is_closed = true;
386            }
387            inner.storage.files.remove(&file_id);
388            inner.storage.deleted_paths.insert(path_str);
389            tracing::debug!("Deleted file {:?}", path);
390            Ok(())
391        } else {
392            Err(SimulationError::IoError("File not found".to_string()))
393        }
394    }
395
396    /// Rename a file from one path to another.
397    pub(crate) fn rename_file(&self, from: &str, to: &str) -> SimulationResult<()> {
398        let mut inner = self.inner.borrow_mut();
399        let from_str = from.to_string();
400        let to_str = to.to_string();
401
402        if let Some(file_id) = inner.storage.path_to_file.remove(&from_str) {
403            // Update the path in the file state
404            if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
405                file_state.path = to_str.clone();
406            }
407            inner.storage.path_to_file.insert(to_str, file_id);
408            inner.storage.deleted_paths.remove(&from_str);
409            tracing::debug!("Renamed file {:?} to {:?}", from, to);
410            Ok(())
411        } else {
412            Err(SimulationError::IoError("File not found".to_string()))
413        }
414    }
415
416    /// Schedule a read operation on a file.
417    ///
418    /// Returns an operation sequence number that can be used to check completion.
419    pub(crate) fn schedule_read(
420        &self,
421        file_id: FileId,
422        offset: u64,
423        len: usize,
424    ) -> SimulationResult<u64> {
425        let mut inner = self.inner.borrow_mut();
426
427        let file_state = inner
428            .storage
429            .files
430            .get_mut(&file_id)
431            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
432
433        if file_state.is_closed {
434            return Err(SimulationError::IoError("File is closed".to_string()));
435        }
436
437        let op_seq = file_state.next_op_seq;
438        file_state.next_op_seq += 1;
439
440        // Store the pending operation
441        file_state.pending_ops.insert(
442            op_seq,
443            PendingStorageOp {
444                op_type: PendingOpType::Read,
445                offset,
446                len,
447                data: None,
448            },
449        );
450
451        // Calculate latency and schedule completion event
452        let latency = Self::calculate_storage_latency(&inner.storage.config, len, false);
453        let scheduled_time = inner.current_time + latency;
454        let sequence = inner.next_sequence;
455        inner.next_sequence += 1;
456
457        let event = Event::Storage {
458            file_id: file_id.0,
459            operation: StorageOperation::ReadComplete { len: len as u32 },
460        };
461        inner
462            .event_queue
463            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
464
465        tracing::trace!(
466            "Scheduled read: file={:?}, offset={}, len={}, op_seq={}",
467            file_id,
468            offset,
469            len,
470            op_seq
471        );
472
473        Ok(op_seq)
474    }
475
476    /// Schedule a write operation on a file.
477    ///
478    /// Returns an operation sequence number that can be used to check completion.
479    pub(crate) fn schedule_write(
480        &self,
481        file_id: FileId,
482        offset: u64,
483        data: Vec<u8>,
484    ) -> SimulationResult<u64> {
485        let mut inner = self.inner.borrow_mut();
486
487        let file_state = inner
488            .storage
489            .files
490            .get_mut(&file_id)
491            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
492
493        if file_state.is_closed {
494            return Err(SimulationError::IoError("File is closed".to_string()));
495        }
496
497        let op_seq = file_state.next_op_seq;
498        file_state.next_op_seq += 1;
499        let len = data.len();
500
501        // Store the pending operation with the data
502        file_state.pending_ops.insert(
503            op_seq,
504            PendingStorageOp {
505                op_type: PendingOpType::Write,
506                offset,
507                len,
508                data: Some(data),
509            },
510        );
511
512        // Calculate latency and schedule completion event
513        let latency = Self::calculate_storage_latency(&inner.storage.config, len, true);
514        let scheduled_time = inner.current_time + latency;
515        let sequence = inner.next_sequence;
516        inner.next_sequence += 1;
517
518        let event = Event::Storage {
519            file_id: file_id.0,
520            operation: StorageOperation::WriteComplete { len: len as u32 },
521        };
522        inner
523            .event_queue
524            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
525
526        tracing::trace!(
527            "Scheduled write: file={:?}, offset={}, len={}, op_seq={}",
528            file_id,
529            offset,
530            len,
531            op_seq
532        );
533
534        Ok(op_seq)
535    }
536
537    /// Schedule a sync operation on a file.
538    ///
539    /// Returns an operation sequence number that can be used to check completion.
540    pub(crate) fn schedule_sync(&self, file_id: FileId) -> SimulationResult<u64> {
541        let mut inner = self.inner.borrow_mut();
542
543        let file_state = inner
544            .storage
545            .files
546            .get_mut(&file_id)
547            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
548
549        if file_state.is_closed {
550            return Err(SimulationError::IoError("File is closed".to_string()));
551        }
552
553        let op_seq = file_state.next_op_seq;
554        file_state.next_op_seq += 1;
555
556        // Store the pending operation
557        file_state.pending_ops.insert(
558            op_seq,
559            PendingStorageOp {
560                op_type: PendingOpType::Sync,
561                offset: 0,
562                len: 0,
563                data: None,
564            },
565        );
566
567        // Use sync latency from config
568        let latency = crate::network::sample_duration(&inner.storage.config.sync_latency);
569        let scheduled_time = inner.current_time + latency;
570        let sequence = inner.next_sequence;
571        inner.next_sequence += 1;
572
573        let event = Event::Storage {
574            file_id: file_id.0,
575            operation: StorageOperation::SyncComplete,
576        };
577        inner
578            .event_queue
579            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
580
581        tracing::trace!("Scheduled sync: file={:?}, op_seq={}", file_id, op_seq);
582
583        Ok(op_seq)
584    }
585
586    /// Schedule a set_len operation on a file.
587    ///
588    /// Returns an operation sequence number that can be used to check completion.
589    pub(crate) fn schedule_set_len(&self, file_id: FileId, new_len: u64) -> SimulationResult<u64> {
590        let mut inner = self.inner.borrow_mut();
591
592        let file_state = inner
593            .storage
594            .files
595            .get_mut(&file_id)
596            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
597
598        if file_state.is_closed {
599            return Err(SimulationError::IoError("File is closed".to_string()));
600        }
601
602        let op_seq = file_state.next_op_seq;
603        file_state.next_op_seq += 1;
604
605        // Store the pending operation
606        file_state.pending_ops.insert(
607            op_seq,
608            PendingStorageOp {
609                op_type: PendingOpType::SetLen,
610                offset: new_len,
611                len: 0,
612                data: None,
613            },
614        );
615
616        // Use write latency for set_len
617        let latency = crate::network::sample_duration(&inner.storage.config.write_latency);
618        let scheduled_time = inner.current_time + latency;
619        let sequence = inner.next_sequence;
620        inner.next_sequence += 1;
621
622        let event = Event::Storage {
623            file_id: file_id.0,
624            operation: StorageOperation::SetLenComplete { new_len },
625        };
626        inner
627            .event_queue
628            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
629
630        tracing::trace!(
631            "Scheduled set_len: file={:?}, new_len={}, op_seq={}",
632            file_id,
633            new_len,
634            op_seq
635        );
636
637        Ok(op_seq)
638    }
639
640    /// Check if a storage operation is complete.
641    pub(crate) fn is_storage_op_complete(&self, file_id: FileId, op_seq: u64) -> bool {
642        let inner = self.inner.borrow();
643        if let Some(file_state) = inner.storage.files.get(&file_id) {
644            // Operation is complete when it's no longer in pending_ops
645            !file_state.pending_ops.contains_key(&op_seq)
646        } else {
647            // File not found means operation is effectively "complete" (failed)
648            true
649        }
650    }
651
652    /// Check if a sync operation failed and clear the failure flag.
653    ///
654    /// Returns true if the sync failed due to fault injection.
655    pub(crate) fn take_sync_failure(&self, file_id: FileId, op_seq: u64) -> bool {
656        let mut inner = self.inner.borrow_mut();
657        inner.storage.sync_failures.remove(&(file_id, op_seq))
658    }
659
660    /// Register a waker for a storage operation.
661    pub(crate) fn register_storage_waker(&self, file_id: FileId, op_seq: u64, waker: Waker) {
662        let mut inner = self.inner.borrow_mut();
663        inner.wakers.storage_wakers.insert((file_id, op_seq), waker);
664    }
665
666    /// Read data from a file at the given offset.
667    ///
668    /// This is called after ReadComplete to actually fetch the data.
669    pub(crate) fn read_from_file(
670        &self,
671        file_id: FileId,
672        offset: u64,
673        buf: &mut [u8],
674    ) -> SimulationResult<usize> {
675        let inner = self.inner.borrow();
676
677        let file_state = inner
678            .storage
679            .files
680            .get(&file_id)
681            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
682
683        if file_state.is_closed {
684            return Err(SimulationError::IoError("File is closed".to_string()));
685        }
686
687        // Read from the in-memory storage
688        file_state
689            .storage
690            .read(offset, buf)
691            .map_err(|e| SimulationError::IoError(e.to_string()))?;
692
693        Ok(buf.len())
694    }
695
696    /// Get the current file position.
697    pub(crate) fn get_file_position(&self, file_id: FileId) -> SimulationResult<u64> {
698        let inner = self.inner.borrow();
699        inner
700            .storage
701            .files
702            .get(&file_id)
703            .map(|f| f.position)
704            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))
705    }
706
707    /// Set the current file position.
708    pub(crate) fn set_file_position(&self, file_id: FileId, position: u64) -> SimulationResult<()> {
709        let mut inner = self.inner.borrow_mut();
710        if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
711            file_state.position = position;
712            Ok(())
713        } else {
714            Err(SimulationError::IoError("File not found".to_string()))
715        }
716    }
717
718    /// Get the size of a file.
719    pub(crate) fn get_file_size(&self, file_id: FileId) -> SimulationResult<u64> {
720        let inner = self.inner.borrow();
721        inner
722            .storage
723            .files
724            .get(&file_id)
725            .map(|f| f.storage.size())
726            .ok_or_else(|| SimulationError::IoError("File not found".to_string()))
727    }
728
729    /// Calculate storage latency using FDB formula.
730    ///
731    /// Latency = base_latency + iops_overhead + transfer_time
732    fn calculate_storage_latency(
733        config: &crate::storage::StorageConfiguration,
734        size: usize,
735        is_write: bool,
736    ) -> Duration {
737        // Sample base latency from config range
738        let base_range = if is_write {
739            &config.write_latency
740        } else {
741            &config.read_latency
742        };
743        let base = crate::network::sample_duration(base_range);
744
745        // IOPS overhead: 1/iops seconds per operation
746        let iops_overhead = Duration::from_secs_f64(1.0 / config.iops as f64);
747
748        // Transfer time: size / bandwidth seconds
749        let transfer = Duration::from_secs_f64(size as f64 / config.bandwidth as f64);
750
751        base + iops_overhead + transfer
752    }
753
754    /// Simulate a crash affecting storage.
755    ///
756    /// This applies crash simulation to all open files:
757    /// 1. Calls `apply_crash()` on all `InMemoryStorage` instances
758    /// 2. Clears pending operations (lost in crash)
759    /// 3. Optionally marks files as closed
760    /// 4. Wakes all storage wakers (operations will fail)
761    pub fn simulate_crash(&self, close_files: bool) {
762        let mut inner = self.inner.borrow_mut();
763        let crash_probability = inner.storage.config.crash_fault_probability;
764
765        // Collect all wakers to wake in one pass (to avoid borrow conflict)
766        let mut wakers_to_wake = Vec::new();
767        let file_ids: Vec<FileId> = inner.storage.files.keys().copied().collect();
768
769        for file_id in &file_ids {
770            if let Some(file_state) = inner.storage.files.get_mut(file_id) {
771                // Apply crash to in-memory storage (may corrupt pending writes)
772                file_state.storage.apply_crash(crash_probability);
773
774                // Collect lost op sequence numbers
775                let lost_ops: Vec<u64> = file_state.pending_ops.keys().copied().collect();
776
777                // Clear pending ops - they're lost in crash
778                file_state.pending_ops.clear();
779
780                // Collect waker keys for later removal
781                for op_seq in lost_ops {
782                    wakers_to_wake.push((*file_id, op_seq));
783                }
784
785                // Optionally close files
786                if close_files {
787                    file_state.is_closed = true;
788                }
789            }
790        }
791
792        // Wake all collected wakers (after file iteration is complete)
793        for key in wakers_to_wake {
794            if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
795                waker.wake();
796            }
797        }
798
799        tracing::info!(
800            "Storage crash simulated: {} files affected, close_files={}",
801            file_ids.len(),
802            close_files
803        );
804    }
805}