Skip to main content

moonpool_sim/sim/
storage_ops.rs

1//! Storage I/O operations for the simulation.
2//!
3//! This module contains storage-related event handlers and methods extracted from
4//! `world.rs` to improve code organization. It handles file operations like
5//! open, read, write, sync, and provides fault injection for testing storage reliability.
6
7use std::net::IpAddr;
8use std::task::Waker;
9use std::time::Duration;
10use tracing::instrument;
11
12use crate::storage::StorageError;
13
14use crate::chaos::fault_events::SimFaultEvent;
15
16use super::{
17    events::{Event, ScheduledEvent, StorageOperation},
18    rng::{sim_random, sim_random_range},
19    state::{FileId, PendingOpType, PendingStorageOp},
20    world::{SimInner, SimWorld},
21};
22
23// =============================================================================
24// Storage Event Handlers
25// =============================================================================
26
27/// Find and remove the first pending operation of the given type for a file.
28///
29/// Returns the sequence number and operation if found, or None if no such operation exists.
30fn take_pending_op(
31    inner: &mut SimInner,
32    file_id: FileId,
33    op_type: PendingOpType,
34) -> Option<(u64, PendingStorageOp)> {
35    let file_state = inner.storage.files.get_mut(&file_id)?;
36
37    let op_seq = file_state
38        .pending_ops
39        .iter()
40        .find(|(_, op)| op.op_type == op_type)
41        .map(|(&seq, _)| seq)?;
42
43    let op = file_state.pending_ops.remove(&op_seq)?;
44    Some((op_seq, op))
45}
46
47/// Handle storage I/O events.
48///
49/// Storage events represent the completion of I/O operations.
50/// Processing applies faults and wakes waiting tasks.
51pub(crate) fn handle_storage_event(
52    inner: &mut SimInner,
53    file_id: u64,
54    operation: StorageOperation,
55) {
56    let file_id = FileId(file_id);
57
58    match operation {
59        StorageOperation::ReadComplete { len: _ } => {
60            handle_read_complete(inner, file_id);
61        }
62        StorageOperation::WriteComplete { len: _ } => {
63            handle_write_complete(inner, file_id);
64        }
65        StorageOperation::SyncComplete => {
66            handle_sync_complete(inner, file_id);
67        }
68        StorageOperation::OpenComplete => {
69            handle_open_complete(inner, file_id);
70        }
71        StorageOperation::SetLenComplete { new_len } => {
72            handle_set_len_complete(inner, file_id, new_len);
73        }
74    }
75}
76
77/// Handle read operation completion.
78fn handle_read_complete(inner: &mut SimInner, file_id: FileId) {
79    let read_fault_probability = inner
80        .storage
81        .files
82        .get(&file_id)
83        .map(|f| inner.storage.config_for(f.owner_ip).read_fault_probability)
84        .unwrap_or(0.0);
85
86    // Find and remove the oldest pending read operation
87    let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Read) else {
88        tracing::warn!("ReadComplete for unknown file {:?}", file_id);
89        return;
90    };
91
92    let (offset, len) = (op.offset, op.len);
93
94    // Apply read fault injection - mark sectors as faulted based on probability
95    let mut read_faulted = false;
96    if read_fault_probability > 0.0
97        && let Some(file_state) = inner.storage.files.get_mut(&file_id)
98    {
99        let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
100        let end_sector = (offset as usize + len).div_ceil(crate::storage::SECTOR_SIZE);
101
102        for sector in start_sector..end_sector {
103            if sim_random::<f64>() < read_fault_probability {
104                file_state.storage.set_fault(sector);
105                read_faulted = true;
106                tracing::info!(
107                    "Read fault injected for file {:?}, sector {}",
108                    file_id,
109                    sector
110                );
111            }
112        }
113    }
114    if read_faulted {
115        let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
116        if let Some(ip) = ip {
117            inner.emit_fault(SimFaultEvent::StorageReadFault {
118                ip: ip.to_string(),
119                file_id: file_id.0,
120            });
121        }
122    }
123
124    // Wake the waker for this operation
125    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
126        tracing::trace!("Waking read waker for file {:?}, op {}", file_id, op_seq);
127        waker.wake();
128    }
129}
130
131/// Handle write operation completion.
132///
133/// Applies the write to storage with potential fault injection:
134/// - phantom_write_probability: write appears to succeed but isn't persisted
135/// - misdirect_write_probability: write lands at wrong location
136fn handle_write_complete(inner: &mut SimInner, file_id: FileId) {
137    let config = inner
138        .storage
139        .files
140        .get(&file_id)
141        .map(|f| inner.storage.config_for(f.owner_ip).clone())
142        .unwrap_or_default();
143
144    let owner_ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
145
146    // Find and remove the oldest pending write operation
147    let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Write) else {
148        tracing::warn!("WriteComplete for unknown file {:?}", file_id);
149        return;
150    };
151
152    let (offset, data_opt) = (op.offset, op.data);
153
154    // Apply the write with potential fault injection
155    let mut write_fault_kind: Option<&str> = None;
156    if let Some(data) = data_opt
157        && let Some(file_state) = inner.storage.files.get_mut(&file_id)
158    {
159        // Check for phantom write (write appears to succeed but doesn't persist)
160        if sim_random::<f64>() < config.phantom_write_probability {
161            tracing::info!(
162                "Phantom write injected for file {:?}, offset {}, len {}",
163                file_id,
164                offset,
165                data.len()
166            );
167            file_state.storage.record_phantom_write(offset, &data);
168            write_fault_kind = Some("phantom");
169        }
170        // Check for misdirected write
171        else if sim_random::<f64>() < config.misdirect_write_probability {
172            // Pick a random different offset
173            let max_offset = file_state.storage.size().saturating_sub(data.len() as u64);
174            let mistaken_offset = if max_offset > 0 {
175                sim_random_range(0..max_offset)
176            } else {
177                0
178            };
179            tracing::info!(
180                "Misdirected write injected for file {:?}: intended={}, actual={}",
181                file_id,
182                offset,
183                mistaken_offset
184            );
185            if let Err(e) =
186                file_state
187                    .storage
188                    .apply_misdirected_write(offset, mistaken_offset, &data)
189            {
190                tracing::warn!("Failed to apply misdirected write: {}", e);
191            }
192            write_fault_kind = Some("misdirected");
193        }
194        // Normal write (not synced - may be lost on crash)
195        else if let Err(e) = file_state.storage.write(offset, &data, false) {
196            tracing::warn!("Write failed for file {:?}: {}", file_id, e);
197        } else {
198            // Check for write corruption - mark sectors as faulted after successful write
199            if config.write_fault_probability > 0.0 {
200                let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
201                let end_sector =
202                    (offset as usize + data.len()).div_ceil(crate::storage::SECTOR_SIZE);
203
204                for sector in start_sector..end_sector {
205                    if sim_random::<f64>() < config.write_fault_probability {
206                        file_state.storage.set_fault(sector);
207                        write_fault_kind = Some("corruption");
208                        tracing::info!(
209                            "Write fault injected for file {:?}, sector {}",
210                            file_id,
211                            sector
212                        );
213                    }
214                }
215            }
216        }
217    }
218    if let (Some(kind), Some(ip)) = (write_fault_kind, owner_ip) {
219        inner.emit_fault(SimFaultEvent::StorageWriteFault {
220            ip: ip.to_string(),
221            file_id: file_id.0,
222            kind: kind.to_string(),
223        });
224    }
225
226    // Wake the waker for this operation
227    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
228        tracing::trace!("Waking write waker for file {:?}, op {}", file_id, op_seq);
229        waker.wake();
230    }
231}
232
233/// Handle sync operation completion.
234///
235/// Applies sync_failure_probability fault injection.
236fn handle_sync_complete(inner: &mut SimInner, file_id: FileId) {
237    let sync_failure_prob = inner
238        .storage
239        .files
240        .get(&file_id)
241        .map(|f| {
242            inner
243                .storage
244                .config_for(f.owner_ip)
245                .sync_failure_probability
246        })
247        .unwrap_or(0.0);
248
249    // Find and remove the oldest pending sync operation
250    let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Sync) else {
251        tracing::warn!("SyncComplete for unknown file {:?}", file_id);
252        return;
253    };
254
255    // Check for sync failure
256    if sim_random::<f64>() < sync_failure_prob {
257        tracing::info!("Sync failure injected for file {:?}", file_id);
258        // Record the failure so SyncFuture can return an error
259        inner.storage.sync_failures.insert((file_id, op_seq));
260        // On sync failure, we don't call storage.sync()
261        // Data remains in pending state and may be lost on crash
262        let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
263        if let Some(ip) = ip {
264            inner.emit_fault(SimFaultEvent::StorageSyncFault {
265                ip: ip.to_string(),
266                file_id: file_id.0,
267            });
268        }
269    } else if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
270        // Successful sync - make all pending writes durable
271        file_state.storage.sync();
272    }
273
274    // Wake the waker for this operation
275    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
276        tracing::trace!("Waking sync waker for file {:?}, op {}", file_id, op_seq);
277        waker.wake();
278    }
279}
280
281/// Handle open operation completion.
282fn handle_open_complete(inner: &mut SimInner, file_id: FileId) {
283    // Find and remove the oldest pending open operation
284    let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Open) else {
285        // File might not have pending open op (it was already "open" on creation)
286        tracing::trace!("OpenComplete for file {:?} (no pending op)", file_id);
287        return;
288    };
289
290    // Wake the waker for this operation
291    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
292        tracing::trace!("Waking open waker for file {:?}, op {}", file_id, op_seq);
293        waker.wake();
294    }
295}
296
297/// Handle set_len operation completion.
298fn handle_set_len_complete(inner: &mut SimInner, file_id: FileId, new_len: u64) {
299    // Find and remove the oldest pending set_len operation
300    let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::SetLen) else {
301        tracing::warn!("SetLenComplete for unknown file {:?}", file_id);
302        return;
303    };
304
305    // Resize the storage (preserves seed, written/fault bitmaps, and overlays)
306    if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
307        file_state.storage.resize(new_len);
308    }
309
310    // Wake the waker for this operation
311    if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
312        tracing::trace!(
313            "Waking set_len waker for file {:?}, op {}, new_len={}",
314            file_id,
315            op_seq,
316            new_len
317        );
318        waker.wake();
319    }
320}
321
322// =============================================================================
323// Storage Methods for SimWorld
324// =============================================================================
325
326impl SimWorld {
327    /// Access storage configuration for the simulation.
328    pub fn with_storage_config<F, R>(&self, f: F) -> R
329    where
330        F: FnOnce(&crate::storage::StorageConfiguration) -> R,
331    {
332        let inner = self.inner.borrow();
333        f(&inner.storage.config)
334    }
335
336    /// Open a file in the simulation.
337    ///
338    /// Creates a new file or opens an existing one based on the options.
339    /// Files are tagged with the `owner_ip` for per-process storage fault injection.
340    /// Schedules an `OpenComplete` event and returns the file ID.
341    pub(crate) fn open_file(
342        &self,
343        path: &str,
344        options: moonpool_core::OpenOptions,
345        initial_size: u64,
346        owner_ip: IpAddr,
347    ) -> Result<FileId, StorageError> {
348        use crate::storage::InMemoryStorage;
349
350        let mut inner = self.inner.borrow_mut();
351        let path_str = path.to_string();
352
353        // Check create_new semantics - fail if file exists
354        if options.create_new && inner.storage.path_to_file.contains_key(&path_str) {
355            return Err(StorageError::AlreadyExists { path: path_str });
356        }
357
358        // Check if file was deleted and create is not set
359        if inner.storage.deleted_paths.contains(&path_str) && !options.create {
360            return Err(StorageError::NotFound { path: path_str });
361        }
362
363        // If file already exists and we're opening it, return existing file ID
364        if let Some(&existing_id) = inner.storage.path_to_file.get(&path_str) {
365            if let Some(file_state) = inner.storage.files.get_mut(&existing_id) {
366                // If truncate is set, reset the storage
367                if options.truncate {
368                    let seed = sim_random::<u64>();
369                    file_state.storage = InMemoryStorage::new(0, seed);
370                    file_state.position = 0;
371                } else if options.append {
372                    // For append mode, seek to end
373                    file_state.position = file_state.storage.size();
374                } else {
375                    // For normal reopen, reset position to start
376                    file_state.position = 0;
377                }
378                // Update options for the new open
379                file_state.options = options;
380                file_state.is_closed = false;
381            }
382            return Ok(existing_id);
383        }
384
385        // File doesn't exist - check if we're allowed to create it
386        if !options.create && !options.create_new {
387            return Err(StorageError::NotFound { path: path_str });
388        }
389
390        // Create new file
391        let file_id = FileId(inner.storage.next_file_id);
392        inner.storage.next_file_id += 1;
393
394        // Remove from deleted paths if re-creating
395        inner.storage.deleted_paths.remove(&path_str);
396
397        // Create in-memory storage with deterministic seed
398        let seed = sim_random::<u64>();
399        let storage = InMemoryStorage::new(initial_size, seed);
400
401        let file_state = super::state::StorageFileState::new(
402            file_id,
403            path_str.clone(),
404            options,
405            storage,
406            owner_ip,
407        );
408
409        inner.storage.files.insert(file_id, file_state);
410        inner.storage.path_to_file.insert(path_str, file_id);
411
412        // Schedule OpenComplete event with minimal latency
413        let open_latency = Duration::from_micros(1);
414        let scheduled_time = inner.current_time + open_latency;
415        let sequence = inner.next_sequence;
416        inner.next_sequence += 1;
417        let event = Event::Storage {
418            file_id: file_id.0,
419            operation: StorageOperation::OpenComplete,
420        };
421        inner
422            .event_queue
423            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
424
425        tracing::debug!("Opened file {:?} with id {:?}", path, file_id);
426        Ok(file_id)
427    }
428
429    /// Check if a file exists at the given path.
430    pub(crate) fn file_exists(&self, path: &str) -> bool {
431        let inner = self.inner.borrow();
432        let path_str = path.to_string();
433        inner.storage.path_to_file.contains_key(&path_str)
434            && !inner.storage.deleted_paths.contains(&path_str)
435    }
436
437    /// Delete a file at the given path.
438    pub(crate) fn delete_file(&self, path: &str) -> Result<(), StorageError> {
439        let mut inner = self.inner.borrow_mut();
440        let path_str = path.to_string();
441
442        if let Some(file_id) = inner.storage.path_to_file.remove(&path_str) {
443            // Mark file as closed and remove it
444            if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
445                file_state.is_closed = true;
446            }
447            inner.storage.files.remove(&file_id);
448            inner.storage.deleted_paths.insert(path_str);
449            tracing::debug!("Deleted file {:?}", path);
450            Ok(())
451        } else {
452            Err(StorageError::NotFound { path: path_str })
453        }
454    }
455
456    /// Rename a file from one path to another.
457    pub(crate) fn rename_file(&self, from: &str, to: &str) -> Result<(), StorageError> {
458        let mut inner = self.inner.borrow_mut();
459        let from_str = from.to_string();
460        let to_str = to.to_string();
461
462        if let Some(file_id) = inner.storage.path_to_file.remove(&from_str) {
463            // Update the path in the file state
464            if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
465                file_state.path = to_str.clone();
466            }
467            inner.storage.path_to_file.insert(to_str, file_id);
468            inner.storage.deleted_paths.remove(&from_str);
469            tracing::debug!("Renamed file {:?} to {:?}", from, to);
470            Ok(())
471        } else {
472            Err(StorageError::NotFound { path: from_str })
473        }
474    }
475
476    /// Schedule a read operation on a file.
477    ///
478    /// Returns an operation sequence number that can be used to check completion.
479    pub(crate) fn schedule_read(
480        &self,
481        file_id: FileId,
482        offset: u64,
483        len: usize,
484    ) -> Result<u64, StorageError> {
485        let mut inner = self.inner.borrow_mut();
486
487        let file_state = inner
488            .storage
489            .files
490            .get_mut(&file_id)
491            .ok_or(StorageError::InvalidFileHandle { file_id })?;
492
493        if file_state.is_closed {
494            return Err(StorageError::FileClosed { file_id });
495        }
496
497        let op_seq = file_state.next_op_seq;
498        file_state.next_op_seq += 1;
499
500        // Store the pending operation
501        file_state.pending_ops.insert(
502            op_seq,
503            PendingStorageOp {
504                op_type: PendingOpType::Read,
505                offset,
506                len,
507                data: None,
508            },
509        );
510
511        // Calculate latency using per-process config
512        let owner_ip = file_state.owner_ip;
513        let config = inner.storage.config_for(owner_ip);
514        let latency = Self::calculate_storage_latency(config, len, false);
515        let scheduled_time = inner.current_time + latency;
516        let sequence = inner.next_sequence;
517        inner.next_sequence += 1;
518
519        let event = Event::Storage {
520            file_id: file_id.0,
521            operation: StorageOperation::ReadComplete { len: len as u32 },
522        };
523        inner
524            .event_queue
525            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
526
527        tracing::trace!(
528            "Scheduled read: file={:?}, offset={}, len={}, op_seq={}",
529            file_id,
530            offset,
531            len,
532            op_seq
533        );
534
535        Ok(op_seq)
536    }
537
538    /// Schedule a write operation on a file.
539    ///
540    /// Returns an operation sequence number that can be used to check completion.
541    pub(crate) fn schedule_write(
542        &self,
543        file_id: FileId,
544        offset: u64,
545        data: Vec<u8>,
546    ) -> Result<u64, StorageError> {
547        let mut inner = self.inner.borrow_mut();
548
549        let file_state = inner
550            .storage
551            .files
552            .get_mut(&file_id)
553            .ok_or(StorageError::InvalidFileHandle { file_id })?;
554
555        if file_state.is_closed {
556            return Err(StorageError::FileClosed { file_id });
557        }
558
559        let op_seq = file_state.next_op_seq;
560        file_state.next_op_seq += 1;
561        let len = data.len();
562
563        // Store the pending operation with the data
564        file_state.pending_ops.insert(
565            op_seq,
566            PendingStorageOp {
567                op_type: PendingOpType::Write,
568                offset,
569                len,
570                data: Some(data),
571            },
572        );
573
574        // Calculate latency using per-process config
575        let owner_ip = file_state.owner_ip;
576        let config = inner.storage.config_for(owner_ip);
577        let latency = Self::calculate_storage_latency(config, len, true);
578        let scheduled_time = inner.current_time + latency;
579        let sequence = inner.next_sequence;
580        inner.next_sequence += 1;
581
582        let event = Event::Storage {
583            file_id: file_id.0,
584            operation: StorageOperation::WriteComplete { len: len as u32 },
585        };
586        inner
587            .event_queue
588            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
589
590        tracing::trace!(
591            "Scheduled write: file={:?}, offset={}, len={}, op_seq={}",
592            file_id,
593            offset,
594            len,
595            op_seq
596        );
597
598        Ok(op_seq)
599    }
600
601    /// Schedule a sync operation on a file.
602    ///
603    /// Returns an operation sequence number that can be used to check completion.
604    pub(crate) fn schedule_sync(&self, file_id: FileId) -> Result<u64, StorageError> {
605        let mut inner = self.inner.borrow_mut();
606
607        let file_state = inner
608            .storage
609            .files
610            .get_mut(&file_id)
611            .ok_or(StorageError::InvalidFileHandle { file_id })?;
612
613        if file_state.is_closed {
614            return Err(StorageError::FileClosed { file_id });
615        }
616
617        let op_seq = file_state.next_op_seq;
618        file_state.next_op_seq += 1;
619
620        // Store the pending operation
621        file_state.pending_ops.insert(
622            op_seq,
623            PendingStorageOp {
624                op_type: PendingOpType::Sync,
625                offset: 0,
626                len: 0,
627                data: None,
628            },
629        );
630
631        // Use sync latency from per-process config
632        let owner_ip = file_state.owner_ip;
633        let config = inner.storage.config_for(owner_ip);
634        let latency = crate::network::sample_duration(&config.sync_latency);
635        let scheduled_time = inner.current_time + latency;
636        let sequence = inner.next_sequence;
637        inner.next_sequence += 1;
638
639        let event = Event::Storage {
640            file_id: file_id.0,
641            operation: StorageOperation::SyncComplete,
642        };
643        inner
644            .event_queue
645            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
646
647        tracing::trace!("Scheduled sync: file={:?}, op_seq={}", file_id, op_seq);
648
649        Ok(op_seq)
650    }
651
652    /// Schedule a set_len operation on a file.
653    ///
654    /// Returns an operation sequence number that can be used to check completion.
655    pub(crate) fn schedule_set_len(
656        &self,
657        file_id: FileId,
658        new_len: u64,
659    ) -> Result<u64, StorageError> {
660        let mut inner = self.inner.borrow_mut();
661
662        let file_state = inner
663            .storage
664            .files
665            .get_mut(&file_id)
666            .ok_or(StorageError::InvalidFileHandle { file_id })?;
667
668        if file_state.is_closed {
669            return Err(StorageError::FileClosed { file_id });
670        }
671
672        let op_seq = file_state.next_op_seq;
673        file_state.next_op_seq += 1;
674
675        // Store the pending operation
676        file_state.pending_ops.insert(
677            op_seq,
678            PendingStorageOp {
679                op_type: PendingOpType::SetLen,
680                offset: new_len,
681                len: 0,
682                data: None,
683            },
684        );
685
686        // Use write latency from per-process config
687        let owner_ip = file_state.owner_ip;
688        let config = inner.storage.config_for(owner_ip);
689        let latency = crate::network::sample_duration(&config.write_latency);
690        let scheduled_time = inner.current_time + latency;
691        let sequence = inner.next_sequence;
692        inner.next_sequence += 1;
693
694        let event = Event::Storage {
695            file_id: file_id.0,
696            operation: StorageOperation::SetLenComplete { new_len },
697        };
698        inner
699            .event_queue
700            .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
701
702        tracing::trace!(
703            "Scheduled set_len: file={:?}, new_len={}, op_seq={}",
704            file_id,
705            new_len,
706            op_seq
707        );
708
709        Ok(op_seq)
710    }
711
712    /// Check if a storage operation is complete.
713    pub(crate) fn is_storage_op_complete(&self, file_id: FileId, op_seq: u64) -> bool {
714        let inner = self.inner.borrow();
715        if let Some(file_state) = inner.storage.files.get(&file_id) {
716            // Operation is complete when it's no longer in pending_ops
717            !file_state.pending_ops.contains_key(&op_seq)
718        } else {
719            // File not found means operation is effectively "complete" (failed)
720            true
721        }
722    }
723
724    /// Check if a sync operation failed and clear the failure flag.
725    ///
726    /// Returns true if the sync failed due to fault injection.
727    pub(crate) fn take_sync_failure(&self, file_id: FileId, op_seq: u64) -> bool {
728        let mut inner = self.inner.borrow_mut();
729        inner.storage.sync_failures.remove(&(file_id, op_seq))
730    }
731
732    /// Register a waker for a storage operation.
733    pub(crate) fn register_storage_waker(&self, file_id: FileId, op_seq: u64, waker: Waker) {
734        let mut inner = self.inner.borrow_mut();
735        inner.wakers.storage_wakers.insert((file_id, op_seq), waker);
736    }
737
738    /// Read data from a file at the given offset.
739    ///
740    /// This is called after ReadComplete to actually fetch the data.
741    pub(crate) fn read_from_file(
742        &self,
743        file_id: FileId,
744        offset: u64,
745        buf: &mut [u8],
746    ) -> Result<usize, StorageError> {
747        let inner = self.inner.borrow();
748
749        let file_state = inner
750            .storage
751            .files
752            .get(&file_id)
753            .ok_or(StorageError::InvalidFileHandle { file_id })?;
754
755        if file_state.is_closed {
756            return Err(StorageError::FileClosed { file_id });
757        }
758
759        // Read from the in-memory storage
760        file_state
761            .storage
762            .read(offset, buf)
763            .map_err(|e| StorageError::Io {
764                file_id,
765                kind: e.kind(),
766                message: e.to_string(),
767            })?;
768
769        Ok(buf.len())
770    }
771
772    /// Get the current file position.
773    pub(crate) fn file_position(&self, file_id: FileId) -> Result<u64, StorageError> {
774        let inner = self.inner.borrow();
775        inner
776            .storage
777            .files
778            .get(&file_id)
779            .map(|f| f.position)
780            .ok_or(StorageError::InvalidFileHandle { file_id })
781    }
782
783    /// Set the current file position.
784    pub(crate) fn set_file_position(
785        &self,
786        file_id: FileId,
787        position: u64,
788    ) -> Result<(), StorageError> {
789        let mut inner = self.inner.borrow_mut();
790        if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
791            file_state.position = position;
792            Ok(())
793        } else {
794            Err(StorageError::InvalidFileHandle { file_id })
795        }
796    }
797
798    /// Get the size of a file.
799    pub(crate) fn file_size(&self, file_id: FileId) -> Result<u64, StorageError> {
800        let inner = self.inner.borrow();
801        inner
802            .storage
803            .files
804            .get(&file_id)
805            .map(|f| f.storage.size())
806            .ok_or(StorageError::InvalidFileHandle { file_id })
807    }
808
809    /// Calculate storage latency using FDB formula.
810    ///
811    /// Latency = base_latency + iops_overhead + transfer_time
812    fn calculate_storage_latency(
813        config: &crate::storage::StorageConfiguration,
814        size: usize,
815        is_write: bool,
816    ) -> Duration {
817        // Sample base latency from config range
818        let base_range = if is_write {
819            &config.write_latency
820        } else {
821            &config.read_latency
822        };
823        let base = crate::network::sample_duration(base_range);
824
825        // IOPS overhead: 1/iops seconds per operation
826        let iops_overhead = Duration::from_secs_f64(1.0 / config.iops as f64);
827
828        // Transfer time: size / bandwidth seconds
829        let transfer = Duration::from_secs_f64(size as f64 / config.bandwidth as f64);
830
831        base + iops_overhead + transfer
832    }
833
834    /// Simulate a crash affecting storage for a specific process.
835    ///
836    /// Only affects files owned by the given IP address:
837    /// 1. Calls `apply_crash()` on matching `InMemoryStorage` instances
838    /// 2. Clears pending operations (lost in crash)
839    /// 3. Optionally marks files as closed
840    /// 4. Wakes all storage wakers (operations will fail)
841    ///
842    /// Files owned by other IPs are unaffected.
843    #[instrument(skip(self))]
844    pub fn simulate_crash_for_process(&self, ip: IpAddr, close_files: bool) {
845        let mut inner = self.inner.borrow_mut();
846        let crash_probability = inner.storage.config_for(ip).crash_fault_probability;
847
848        // Collect all wakers to wake in one pass (to avoid borrow conflict)
849        let mut wakers_to_wake = Vec::new();
850        let file_ids: Vec<FileId> = inner
851            .storage
852            .files
853            .iter()
854            .filter(|(_, f)| f.owner_ip == ip)
855            .map(|(id, _)| *id)
856            .collect();
857
858        for file_id in &file_ids {
859            if let Some(file_state) = inner.storage.files.get_mut(file_id) {
860                // Apply crash to in-memory storage (may corrupt pending writes)
861                file_state.storage.apply_crash(crash_probability);
862
863                // Collect lost op sequence numbers
864                let lost_ops: Vec<u64> = file_state.pending_ops.keys().copied().collect();
865
866                // Clear pending ops - they're lost in crash
867                file_state.pending_ops.clear();
868
869                // Collect waker keys for later removal
870                for op_seq in lost_ops {
871                    wakers_to_wake.push((*file_id, op_seq));
872                }
873
874                // Optionally close files
875                if close_files {
876                    file_state.is_closed = true;
877                }
878            }
879        }
880
881        // Wake all collected wakers (after file iteration is complete)
882        for key in wakers_to_wake {
883            if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
884                waker.wake();
885            }
886        }
887
888        inner.emit_fault(SimFaultEvent::StorageCrash { ip: ip.to_string() });
889
890        tracing::info!(
891            "Storage crash simulated for {}: {} files affected, close_files={}",
892            ip,
893            file_ids.len(),
894            close_files
895        );
896    }
897
898    /// Wipe all storage for a specific process.
899    ///
900    /// Deletes all files owned by the given IP address. Used by `CrashAndWipe`
901    /// reboot to simulate total data loss. After wipe, the process can create
902    /// new files at the same paths.
903    ///
904    /// Files owned by other IPs are unaffected.
905    #[instrument(skip(self))]
906    pub fn wipe_storage_for_process(&self, ip: IpAddr) {
907        let mut inner = self.inner.borrow_mut();
908
909        // Collect files owned by this IP
910        let file_ids: Vec<(FileId, String)> = inner
911            .storage
912            .files
913            .iter()
914            .filter(|(_, f)| f.owner_ip == ip)
915            .map(|(id, f)| (*id, f.path.clone()))
916            .collect();
917
918        // Collect wakers to wake
919        let mut wakers_to_wake = Vec::new();
920
921        for (file_id, path) in &file_ids {
922            if let Some(file_state) = inner.storage.files.remove(file_id) {
923                for op_seq in file_state.pending_ops.keys() {
924                    wakers_to_wake.push((*file_id, *op_seq));
925                }
926            }
927            inner.storage.path_to_file.remove(path);
928            inner.storage.deleted_paths.insert(path.clone());
929        }
930
931        // Wake all collected wakers
932        for key in wakers_to_wake {
933            if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
934                waker.wake();
935            }
936        }
937
938        inner.emit_fault(SimFaultEvent::StorageWipe { ip: ip.to_string() });
939
940        tracing::info!("Storage wiped for {}: {} files deleted", ip, file_ids.len(),);
941    }
942
943    /// Set storage configuration for a specific process.
944    ///
945    /// Files owned by this IP will use this configuration for fault injection
946    /// and latency calculations. Takes effect immediately, even for files
947    /// already open.
948    #[instrument(skip(self, config))]
949    pub fn set_process_storage_config(
950        &self,
951        ip: IpAddr,
952        config: crate::storage::StorageConfiguration,
953    ) {
954        let mut inner = self.inner.borrow_mut();
955        inner.storage.per_process_configs.insert(ip, config);
956    }
957}