1use std::net::IpAddr;
8use std::task::Waker;
9use std::time::Duration;
10use tracing::instrument;
11
12use crate::storage::StorageError;
13
14use crate::chaos::fault_events::SimFaultEvent;
15
16use super::{
17 events::{Event, ScheduledEvent, StorageOperation},
18 rng::{sim_random, sim_random_range},
19 state::{FileId, PendingOpType, PendingStorageOp},
20 world::{SimInner, SimWorld},
21};
22
23fn take_pending_op(
31 inner: &mut SimInner,
32 file_id: FileId,
33 op_type: PendingOpType,
34) -> Option<(u64, PendingStorageOp)> {
35 let file_state = inner.storage.files.get_mut(&file_id)?;
36
37 let op_seq = file_state
38 .pending_ops
39 .iter()
40 .find(|(_, op)| op.op_type == op_type)
41 .map(|(&seq, _)| seq)?;
42
43 let op = file_state.pending_ops.remove(&op_seq)?;
44 Some((op_seq, op))
45}
46
47pub(crate) fn handle_storage_event(
52 inner: &mut SimInner,
53 file_id: u64,
54 operation: StorageOperation,
55) {
56 let file_id = FileId(file_id);
57
58 match operation {
59 StorageOperation::ReadComplete { len: _ } => {
60 handle_read_complete(inner, file_id);
61 }
62 StorageOperation::WriteComplete { len: _ } => {
63 handle_write_complete(inner, file_id);
64 }
65 StorageOperation::SyncComplete => {
66 handle_sync_complete(inner, file_id);
67 }
68 StorageOperation::OpenComplete => {
69 handle_open_complete(inner, file_id);
70 }
71 StorageOperation::SetLenComplete { new_len } => {
72 handle_set_len_complete(inner, file_id, new_len);
73 }
74 }
75}
76
77fn handle_read_complete(inner: &mut SimInner, file_id: FileId) {
79 let read_fault_probability = inner
80 .storage
81 .files
82 .get(&file_id)
83 .map(|f| inner.storage.config_for(f.owner_ip).read_fault_probability)
84 .unwrap_or(0.0);
85
86 let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Read) else {
88 tracing::warn!("ReadComplete for unknown file {:?}", file_id);
89 return;
90 };
91
92 let (offset, len) = (op.offset, op.len);
93
94 let mut read_faulted = false;
96 if read_fault_probability > 0.0
97 && let Some(file_state) = inner.storage.files.get_mut(&file_id)
98 {
99 let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
100 let end_sector = (offset as usize + len).div_ceil(crate::storage::SECTOR_SIZE);
101
102 for sector in start_sector..end_sector {
103 if sim_random::<f64>() < read_fault_probability {
104 file_state.storage.set_fault(sector);
105 read_faulted = true;
106 tracing::info!(
107 "Read fault injected for file {:?}, sector {}",
108 file_id,
109 sector
110 );
111 }
112 }
113 }
114 if read_faulted {
115 let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
116 if let Some(ip) = ip {
117 inner.emit_fault(SimFaultEvent::StorageReadFault {
118 ip: ip.to_string(),
119 file_id: file_id.0,
120 });
121 }
122 }
123
124 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
126 tracing::trace!("Waking read waker for file {:?}, op {}", file_id, op_seq);
127 waker.wake();
128 }
129}
130
131fn handle_write_complete(inner: &mut SimInner, file_id: FileId) {
137 let config = inner
138 .storage
139 .files
140 .get(&file_id)
141 .map(|f| inner.storage.config_for(f.owner_ip).clone())
142 .unwrap_or_default();
143
144 let owner_ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
145
146 let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Write) else {
148 tracing::warn!("WriteComplete for unknown file {:?}", file_id);
149 return;
150 };
151
152 let (offset, data_opt) = (op.offset, op.data);
153
154 let mut write_fault_kind: Option<&str> = None;
156 if let Some(data) = data_opt
157 && let Some(file_state) = inner.storage.files.get_mut(&file_id)
158 {
159 if sim_random::<f64>() < config.phantom_write_probability {
161 tracing::info!(
162 "Phantom write injected for file {:?}, offset {}, len {}",
163 file_id,
164 offset,
165 data.len()
166 );
167 file_state.storage.record_phantom_write(offset, &data);
168 write_fault_kind = Some("phantom");
169 }
170 else if sim_random::<f64>() < config.misdirect_write_probability {
172 let max_offset = file_state.storage.size().saturating_sub(data.len() as u64);
174 let mistaken_offset = if max_offset > 0 {
175 sim_random_range(0..max_offset)
176 } else {
177 0
178 };
179 tracing::info!(
180 "Misdirected write injected for file {:?}: intended={}, actual={}",
181 file_id,
182 offset,
183 mistaken_offset
184 );
185 if let Err(e) =
186 file_state
187 .storage
188 .apply_misdirected_write(offset, mistaken_offset, &data)
189 {
190 tracing::warn!("Failed to apply misdirected write: {}", e);
191 }
192 write_fault_kind = Some("misdirected");
193 }
194 else if let Err(e) = file_state.storage.write(offset, &data, false) {
196 tracing::warn!("Write failed for file {:?}: {}", file_id, e);
197 } else {
198 if config.write_fault_probability > 0.0 {
200 let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
201 let end_sector =
202 (offset as usize + data.len()).div_ceil(crate::storage::SECTOR_SIZE);
203
204 for sector in start_sector..end_sector {
205 if sim_random::<f64>() < config.write_fault_probability {
206 file_state.storage.set_fault(sector);
207 write_fault_kind = Some("corruption");
208 tracing::info!(
209 "Write fault injected for file {:?}, sector {}",
210 file_id,
211 sector
212 );
213 }
214 }
215 }
216 }
217 }
218 if let (Some(kind), Some(ip)) = (write_fault_kind, owner_ip) {
219 inner.emit_fault(SimFaultEvent::StorageWriteFault {
220 ip: ip.to_string(),
221 file_id: file_id.0,
222 kind: kind.to_string(),
223 });
224 }
225
226 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
228 tracing::trace!("Waking write waker for file {:?}, op {}", file_id, op_seq);
229 waker.wake();
230 }
231}
232
233fn handle_sync_complete(inner: &mut SimInner, file_id: FileId) {
237 let sync_failure_prob = inner
238 .storage
239 .files
240 .get(&file_id)
241 .map(|f| {
242 inner
243 .storage
244 .config_for(f.owner_ip)
245 .sync_failure_probability
246 })
247 .unwrap_or(0.0);
248
249 let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Sync) else {
251 tracing::warn!("SyncComplete for unknown file {:?}", file_id);
252 return;
253 };
254
255 if sim_random::<f64>() < sync_failure_prob {
257 tracing::info!("Sync failure injected for file {:?}", file_id);
258 inner.storage.sync_failures.insert((file_id, op_seq));
260 let ip = inner.storage.files.get(&file_id).map(|f| f.owner_ip);
263 if let Some(ip) = ip {
264 inner.emit_fault(SimFaultEvent::StorageSyncFault {
265 ip: ip.to_string(),
266 file_id: file_id.0,
267 });
268 }
269 } else if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
270 file_state.storage.sync();
272 }
273
274 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
276 tracing::trace!("Waking sync waker for file {:?}, op {}", file_id, op_seq);
277 waker.wake();
278 }
279}
280
281fn handle_open_complete(inner: &mut SimInner, file_id: FileId) {
283 let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Open) else {
285 tracing::trace!("OpenComplete for file {:?} (no pending op)", file_id);
287 return;
288 };
289
290 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
292 tracing::trace!("Waking open waker for file {:?}, op {}", file_id, op_seq);
293 waker.wake();
294 }
295}
296
297fn handle_set_len_complete(inner: &mut SimInner, file_id: FileId, new_len: u64) {
299 let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::SetLen) else {
301 tracing::warn!("SetLenComplete for unknown file {:?}", file_id);
302 return;
303 };
304
305 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
307 file_state.storage.resize(new_len);
308 }
309
310 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
312 tracing::trace!(
313 "Waking set_len waker for file {:?}, op {}, new_len={}",
314 file_id,
315 op_seq,
316 new_len
317 );
318 waker.wake();
319 }
320}
321
322impl SimWorld {
327 pub fn with_storage_config<F, R>(&self, f: F) -> R
329 where
330 F: FnOnce(&crate::storage::StorageConfiguration) -> R,
331 {
332 let inner = self.inner.borrow();
333 f(&inner.storage.config)
334 }
335
336 pub(crate) fn open_file(
342 &self,
343 path: &str,
344 options: moonpool_core::OpenOptions,
345 initial_size: u64,
346 owner_ip: IpAddr,
347 ) -> Result<FileId, StorageError> {
348 use crate::storage::InMemoryStorage;
349
350 let mut inner = self.inner.borrow_mut();
351 let path_str = path.to_string();
352
353 if options.create_new && inner.storage.path_to_file.contains_key(&path_str) {
355 return Err(StorageError::AlreadyExists { path: path_str });
356 }
357
358 if inner.storage.deleted_paths.contains(&path_str) && !options.create {
360 return Err(StorageError::NotFound { path: path_str });
361 }
362
363 if let Some(&existing_id) = inner.storage.path_to_file.get(&path_str) {
365 if let Some(file_state) = inner.storage.files.get_mut(&existing_id) {
366 if options.truncate {
368 let seed = sim_random::<u64>();
369 file_state.storage = InMemoryStorage::new(0, seed);
370 file_state.position = 0;
371 } else if options.append {
372 file_state.position = file_state.storage.size();
374 } else {
375 file_state.position = 0;
377 }
378 file_state.options = options;
380 file_state.is_closed = false;
381 }
382 return Ok(existing_id);
383 }
384
385 if !options.create && !options.create_new {
387 return Err(StorageError::NotFound { path: path_str });
388 }
389
390 let file_id = FileId(inner.storage.next_file_id);
392 inner.storage.next_file_id += 1;
393
394 inner.storage.deleted_paths.remove(&path_str);
396
397 let seed = sim_random::<u64>();
399 let storage = InMemoryStorage::new(initial_size, seed);
400
401 let file_state = super::state::StorageFileState::new(
402 file_id,
403 path_str.clone(),
404 options,
405 storage,
406 owner_ip,
407 );
408
409 inner.storage.files.insert(file_id, file_state);
410 inner.storage.path_to_file.insert(path_str, file_id);
411
412 let open_latency = Duration::from_micros(1);
414 let scheduled_time = inner.current_time + open_latency;
415 let sequence = inner.next_sequence;
416 inner.next_sequence += 1;
417 let event = Event::Storage {
418 file_id: file_id.0,
419 operation: StorageOperation::OpenComplete,
420 };
421 inner
422 .event_queue
423 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
424
425 tracing::debug!("Opened file {:?} with id {:?}", path, file_id);
426 Ok(file_id)
427 }
428
429 pub(crate) fn file_exists(&self, path: &str) -> bool {
431 let inner = self.inner.borrow();
432 let path_str = path.to_string();
433 inner.storage.path_to_file.contains_key(&path_str)
434 && !inner.storage.deleted_paths.contains(&path_str)
435 }
436
437 pub(crate) fn delete_file(&self, path: &str) -> Result<(), StorageError> {
439 let mut inner = self.inner.borrow_mut();
440 let path_str = path.to_string();
441
442 if let Some(file_id) = inner.storage.path_to_file.remove(&path_str) {
443 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
445 file_state.is_closed = true;
446 }
447 inner.storage.files.remove(&file_id);
448 inner.storage.deleted_paths.insert(path_str);
449 tracing::debug!("Deleted file {:?}", path);
450 Ok(())
451 } else {
452 Err(StorageError::NotFound { path: path_str })
453 }
454 }
455
456 pub(crate) fn rename_file(&self, from: &str, to: &str) -> Result<(), StorageError> {
458 let mut inner = self.inner.borrow_mut();
459 let from_str = from.to_string();
460 let to_str = to.to_string();
461
462 if let Some(file_id) = inner.storage.path_to_file.remove(&from_str) {
463 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
465 file_state.path = to_str.clone();
466 }
467 inner.storage.path_to_file.insert(to_str, file_id);
468 inner.storage.deleted_paths.remove(&from_str);
469 tracing::debug!("Renamed file {:?} to {:?}", from, to);
470 Ok(())
471 } else {
472 Err(StorageError::NotFound { path: from_str })
473 }
474 }
475
476 pub(crate) fn schedule_read(
480 &self,
481 file_id: FileId,
482 offset: u64,
483 len: usize,
484 ) -> Result<u64, StorageError> {
485 let mut inner = self.inner.borrow_mut();
486
487 let file_state = inner
488 .storage
489 .files
490 .get_mut(&file_id)
491 .ok_or(StorageError::InvalidFileHandle { file_id })?;
492
493 if file_state.is_closed {
494 return Err(StorageError::FileClosed { file_id });
495 }
496
497 let op_seq = file_state.next_op_seq;
498 file_state.next_op_seq += 1;
499
500 file_state.pending_ops.insert(
502 op_seq,
503 PendingStorageOp {
504 op_type: PendingOpType::Read,
505 offset,
506 len,
507 data: None,
508 },
509 );
510
511 let owner_ip = file_state.owner_ip;
513 let config = inner.storage.config_for(owner_ip);
514 let latency = Self::calculate_storage_latency(config, len, false);
515 let scheduled_time = inner.current_time + latency;
516 let sequence = inner.next_sequence;
517 inner.next_sequence += 1;
518
519 let event = Event::Storage {
520 file_id: file_id.0,
521 operation: StorageOperation::ReadComplete { len: len as u32 },
522 };
523 inner
524 .event_queue
525 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
526
527 tracing::trace!(
528 "Scheduled read: file={:?}, offset={}, len={}, op_seq={}",
529 file_id,
530 offset,
531 len,
532 op_seq
533 );
534
535 Ok(op_seq)
536 }
537
538 pub(crate) fn schedule_write(
542 &self,
543 file_id: FileId,
544 offset: u64,
545 data: Vec<u8>,
546 ) -> Result<u64, StorageError> {
547 let mut inner = self.inner.borrow_mut();
548
549 let file_state = inner
550 .storage
551 .files
552 .get_mut(&file_id)
553 .ok_or(StorageError::InvalidFileHandle { file_id })?;
554
555 if file_state.is_closed {
556 return Err(StorageError::FileClosed { file_id });
557 }
558
559 let op_seq = file_state.next_op_seq;
560 file_state.next_op_seq += 1;
561 let len = data.len();
562
563 file_state.pending_ops.insert(
565 op_seq,
566 PendingStorageOp {
567 op_type: PendingOpType::Write,
568 offset,
569 len,
570 data: Some(data),
571 },
572 );
573
574 let owner_ip = file_state.owner_ip;
576 let config = inner.storage.config_for(owner_ip);
577 let latency = Self::calculate_storage_latency(config, len, true);
578 let scheduled_time = inner.current_time + latency;
579 let sequence = inner.next_sequence;
580 inner.next_sequence += 1;
581
582 let event = Event::Storage {
583 file_id: file_id.0,
584 operation: StorageOperation::WriteComplete { len: len as u32 },
585 };
586 inner
587 .event_queue
588 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
589
590 tracing::trace!(
591 "Scheduled write: file={:?}, offset={}, len={}, op_seq={}",
592 file_id,
593 offset,
594 len,
595 op_seq
596 );
597
598 Ok(op_seq)
599 }
600
601 pub(crate) fn schedule_sync(&self, file_id: FileId) -> Result<u64, StorageError> {
605 let mut inner = self.inner.borrow_mut();
606
607 let file_state = inner
608 .storage
609 .files
610 .get_mut(&file_id)
611 .ok_or(StorageError::InvalidFileHandle { file_id })?;
612
613 if file_state.is_closed {
614 return Err(StorageError::FileClosed { file_id });
615 }
616
617 let op_seq = file_state.next_op_seq;
618 file_state.next_op_seq += 1;
619
620 file_state.pending_ops.insert(
622 op_seq,
623 PendingStorageOp {
624 op_type: PendingOpType::Sync,
625 offset: 0,
626 len: 0,
627 data: None,
628 },
629 );
630
631 let owner_ip = file_state.owner_ip;
633 let config = inner.storage.config_for(owner_ip);
634 let latency = crate::network::sample_duration(&config.sync_latency);
635 let scheduled_time = inner.current_time + latency;
636 let sequence = inner.next_sequence;
637 inner.next_sequence += 1;
638
639 let event = Event::Storage {
640 file_id: file_id.0,
641 operation: StorageOperation::SyncComplete,
642 };
643 inner
644 .event_queue
645 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
646
647 tracing::trace!("Scheduled sync: file={:?}, op_seq={}", file_id, op_seq);
648
649 Ok(op_seq)
650 }
651
652 pub(crate) fn schedule_set_len(
656 &self,
657 file_id: FileId,
658 new_len: u64,
659 ) -> Result<u64, StorageError> {
660 let mut inner = self.inner.borrow_mut();
661
662 let file_state = inner
663 .storage
664 .files
665 .get_mut(&file_id)
666 .ok_or(StorageError::InvalidFileHandle { file_id })?;
667
668 if file_state.is_closed {
669 return Err(StorageError::FileClosed { file_id });
670 }
671
672 let op_seq = file_state.next_op_seq;
673 file_state.next_op_seq += 1;
674
675 file_state.pending_ops.insert(
677 op_seq,
678 PendingStorageOp {
679 op_type: PendingOpType::SetLen,
680 offset: new_len,
681 len: 0,
682 data: None,
683 },
684 );
685
686 let owner_ip = file_state.owner_ip;
688 let config = inner.storage.config_for(owner_ip);
689 let latency = crate::network::sample_duration(&config.write_latency);
690 let scheduled_time = inner.current_time + latency;
691 let sequence = inner.next_sequence;
692 inner.next_sequence += 1;
693
694 let event = Event::Storage {
695 file_id: file_id.0,
696 operation: StorageOperation::SetLenComplete { new_len },
697 };
698 inner
699 .event_queue
700 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
701
702 tracing::trace!(
703 "Scheduled set_len: file={:?}, new_len={}, op_seq={}",
704 file_id,
705 new_len,
706 op_seq
707 );
708
709 Ok(op_seq)
710 }
711
712 pub(crate) fn is_storage_op_complete(&self, file_id: FileId, op_seq: u64) -> bool {
714 let inner = self.inner.borrow();
715 if let Some(file_state) = inner.storage.files.get(&file_id) {
716 !file_state.pending_ops.contains_key(&op_seq)
718 } else {
719 true
721 }
722 }
723
724 pub(crate) fn take_sync_failure(&self, file_id: FileId, op_seq: u64) -> bool {
728 let mut inner = self.inner.borrow_mut();
729 inner.storage.sync_failures.remove(&(file_id, op_seq))
730 }
731
732 pub(crate) fn register_storage_waker(&self, file_id: FileId, op_seq: u64, waker: Waker) {
734 let mut inner = self.inner.borrow_mut();
735 inner.wakers.storage_wakers.insert((file_id, op_seq), waker);
736 }
737
738 pub(crate) fn read_from_file(
742 &self,
743 file_id: FileId,
744 offset: u64,
745 buf: &mut [u8],
746 ) -> Result<usize, StorageError> {
747 let inner = self.inner.borrow();
748
749 let file_state = inner
750 .storage
751 .files
752 .get(&file_id)
753 .ok_or(StorageError::InvalidFileHandle { file_id })?;
754
755 if file_state.is_closed {
756 return Err(StorageError::FileClosed { file_id });
757 }
758
759 file_state
761 .storage
762 .read(offset, buf)
763 .map_err(|e| StorageError::Io {
764 file_id,
765 kind: e.kind(),
766 message: e.to_string(),
767 })?;
768
769 Ok(buf.len())
770 }
771
772 pub(crate) fn file_position(&self, file_id: FileId) -> Result<u64, StorageError> {
774 let inner = self.inner.borrow();
775 inner
776 .storage
777 .files
778 .get(&file_id)
779 .map(|f| f.position)
780 .ok_or(StorageError::InvalidFileHandle { file_id })
781 }
782
783 pub(crate) fn set_file_position(
785 &self,
786 file_id: FileId,
787 position: u64,
788 ) -> Result<(), StorageError> {
789 let mut inner = self.inner.borrow_mut();
790 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
791 file_state.position = position;
792 Ok(())
793 } else {
794 Err(StorageError::InvalidFileHandle { file_id })
795 }
796 }
797
798 pub(crate) fn file_size(&self, file_id: FileId) -> Result<u64, StorageError> {
800 let inner = self.inner.borrow();
801 inner
802 .storage
803 .files
804 .get(&file_id)
805 .map(|f| f.storage.size())
806 .ok_or(StorageError::InvalidFileHandle { file_id })
807 }
808
809 fn calculate_storage_latency(
813 config: &crate::storage::StorageConfiguration,
814 size: usize,
815 is_write: bool,
816 ) -> Duration {
817 let base_range = if is_write {
819 &config.write_latency
820 } else {
821 &config.read_latency
822 };
823 let base = crate::network::sample_duration(base_range);
824
825 let iops_overhead = Duration::from_secs_f64(1.0 / config.iops as f64);
827
828 let transfer = Duration::from_secs_f64(size as f64 / config.bandwidth as f64);
830
831 base + iops_overhead + transfer
832 }
833
834 #[instrument(skip(self))]
844 pub fn simulate_crash_for_process(&self, ip: IpAddr, close_files: bool) {
845 let mut inner = self.inner.borrow_mut();
846 let crash_probability = inner.storage.config_for(ip).crash_fault_probability;
847
848 let mut wakers_to_wake = Vec::new();
850 let file_ids: Vec<FileId> = inner
851 .storage
852 .files
853 .iter()
854 .filter(|(_, f)| f.owner_ip == ip)
855 .map(|(id, _)| *id)
856 .collect();
857
858 for file_id in &file_ids {
859 if let Some(file_state) = inner.storage.files.get_mut(file_id) {
860 file_state.storage.apply_crash(crash_probability);
862
863 let lost_ops: Vec<u64> = file_state.pending_ops.keys().copied().collect();
865
866 file_state.pending_ops.clear();
868
869 for op_seq in lost_ops {
871 wakers_to_wake.push((*file_id, op_seq));
872 }
873
874 if close_files {
876 file_state.is_closed = true;
877 }
878 }
879 }
880
881 for key in wakers_to_wake {
883 if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
884 waker.wake();
885 }
886 }
887
888 inner.emit_fault(SimFaultEvent::StorageCrash { ip: ip.to_string() });
889
890 tracing::info!(
891 "Storage crash simulated for {}: {} files affected, close_files={}",
892 ip,
893 file_ids.len(),
894 close_files
895 );
896 }
897
898 #[instrument(skip(self))]
906 pub fn wipe_storage_for_process(&self, ip: IpAddr) {
907 let mut inner = self.inner.borrow_mut();
908
909 let file_ids: Vec<(FileId, String)> = inner
911 .storage
912 .files
913 .iter()
914 .filter(|(_, f)| f.owner_ip == ip)
915 .map(|(id, f)| (*id, f.path.clone()))
916 .collect();
917
918 let mut wakers_to_wake = Vec::new();
920
921 for (file_id, path) in &file_ids {
922 if let Some(file_state) = inner.storage.files.remove(file_id) {
923 for op_seq in file_state.pending_ops.keys() {
924 wakers_to_wake.push((*file_id, *op_seq));
925 }
926 }
927 inner.storage.path_to_file.remove(path);
928 inner.storage.deleted_paths.insert(path.clone());
929 }
930
931 for key in wakers_to_wake {
933 if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
934 waker.wake();
935 }
936 }
937
938 inner.emit_fault(SimFaultEvent::StorageWipe { ip: ip.to_string() });
939
940 tracing::info!("Storage wiped for {}: {} files deleted", ip, file_ids.len(),);
941 }
942
943 #[instrument(skip(self, config))]
949 pub fn set_process_storage_config(
950 &self,
951 ip: IpAddr,
952 config: crate::storage::StorageConfiguration,
953 ) {
954 let mut inner = self.inner.borrow_mut();
955 inner.storage.per_process_configs.insert(ip, config);
956 }
957}