1use std::task::Waker;
8use std::time::Duration;
9
10use crate::{SimulationError, SimulationResult};
11
12use super::{
13 events::{Event, ScheduledEvent, StorageOperation},
14 rng::{sim_random, sim_random_range},
15 state::{FileId, PendingOpType, PendingStorageOp},
16 world::{SimInner, SimWorld},
17};
18
19fn take_pending_op(
27 inner: &mut SimInner,
28 file_id: FileId,
29 op_type: PendingOpType,
30) -> Option<(u64, PendingStorageOp)> {
31 let file_state = inner.storage.files.get_mut(&file_id)?;
32
33 let op_seq = file_state
34 .pending_ops
35 .iter()
36 .find(|(_, op)| op.op_type == op_type)
37 .map(|(&seq, _)| seq)?;
38
39 let op = file_state.pending_ops.remove(&op_seq)?;
40 Some((op_seq, op))
41}
42
43pub(crate) fn handle_storage_event(
48 inner: &mut SimInner,
49 file_id: u64,
50 operation: StorageOperation,
51) {
52 let file_id = FileId(file_id);
53
54 match operation {
55 StorageOperation::ReadComplete { len: _ } => {
56 handle_read_complete(inner, file_id);
57 }
58 StorageOperation::WriteComplete { len: _ } => {
59 handle_write_complete(inner, file_id);
60 }
61 StorageOperation::SyncComplete => {
62 handle_sync_complete(inner, file_id);
63 }
64 StorageOperation::OpenComplete => {
65 handle_open_complete(inner, file_id);
66 }
67 StorageOperation::SetLenComplete { new_len } => {
68 handle_set_len_complete(inner, file_id, new_len);
69 }
70 }
71}
72
73fn handle_read_complete(inner: &mut SimInner, file_id: FileId) {
75 let read_fault_probability = inner.storage.config.read_fault_probability;
76
77 let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Read) else {
79 tracing::warn!("ReadComplete for unknown file {:?}", file_id);
80 return;
81 };
82
83 let (offset, len) = (op.offset, op.len);
84
85 if read_fault_probability > 0.0
87 && let Some(file_state) = inner.storage.files.get_mut(&file_id)
88 {
89 let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
90 let end_sector = (offset as usize + len).div_ceil(crate::storage::SECTOR_SIZE);
91
92 for sector in start_sector..end_sector {
93 if sim_random::<f64>() < read_fault_probability {
94 file_state.storage.set_fault(sector);
95 tracing::info!(
96 "Read fault injected for file {:?}, sector {}",
97 file_id,
98 sector
99 );
100 }
101 }
102 }
103
104 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
106 tracing::trace!("Waking read waker for file {:?}, op {}", file_id, op_seq);
107 waker.wake();
108 }
109}
110
111fn handle_write_complete(inner: &mut SimInner, file_id: FileId) {
117 let config = inner.storage.config.clone();
118
119 let Some((op_seq, op)) = take_pending_op(inner, file_id, PendingOpType::Write) else {
121 tracing::warn!("WriteComplete for unknown file {:?}", file_id);
122 return;
123 };
124
125 let (offset, data_opt) = (op.offset, op.data);
126
127 if let Some(data) = data_opt
129 && let Some(file_state) = inner.storage.files.get_mut(&file_id)
130 {
131 if sim_random::<f64>() < config.phantom_write_probability {
133 tracing::info!(
134 "Phantom write injected for file {:?}, offset {}, len {}",
135 file_id,
136 offset,
137 data.len()
138 );
139 file_state.storage.record_phantom_write(offset, &data);
140 }
141 else if sim_random::<f64>() < config.misdirect_write_probability {
143 let max_offset = file_state.storage.size().saturating_sub(data.len() as u64);
145 let mistaken_offset = if max_offset > 0 {
146 sim_random_range(0..max_offset)
147 } else {
148 0
149 };
150 tracing::info!(
151 "Misdirected write injected for file {:?}: intended={}, actual={}",
152 file_id,
153 offset,
154 mistaken_offset
155 );
156 if let Err(e) =
157 file_state
158 .storage
159 .apply_misdirected_write(offset, mistaken_offset, &data)
160 {
161 tracing::warn!("Failed to apply misdirected write: {}", e);
162 }
163 }
164 else if let Err(e) = file_state.storage.write(offset, &data, false) {
166 tracing::warn!("Write failed for file {:?}: {}", file_id, e);
167 } else {
168 if config.write_fault_probability > 0.0 {
170 let start_sector = (offset as usize) / crate::storage::SECTOR_SIZE;
171 let end_sector =
172 (offset as usize + data.len()).div_ceil(crate::storage::SECTOR_SIZE);
173
174 for sector in start_sector..end_sector {
175 if sim_random::<f64>() < config.write_fault_probability {
176 file_state.storage.set_fault(sector);
177 tracing::info!(
178 "Write fault injected for file {:?}, sector {}",
179 file_id,
180 sector
181 );
182 }
183 }
184 }
185 }
186 }
187
188 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
190 tracing::trace!("Waking write waker for file {:?}, op {}", file_id, op_seq);
191 waker.wake();
192 }
193}
194
195fn handle_sync_complete(inner: &mut SimInner, file_id: FileId) {
199 let sync_failure_prob = inner.storage.config.sync_failure_probability;
200
201 let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Sync) else {
203 tracing::warn!("SyncComplete for unknown file {:?}", file_id);
204 return;
205 };
206
207 if sim_random::<f64>() < sync_failure_prob {
209 tracing::info!("Sync failure injected for file {:?}", file_id);
210 inner.storage.sync_failures.insert((file_id, op_seq));
212 } else if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
215 file_state.storage.sync();
217 }
218
219 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
221 tracing::trace!("Waking sync waker for file {:?}, op {}", file_id, op_seq);
222 waker.wake();
223 }
224}
225
226fn handle_open_complete(inner: &mut SimInner, file_id: FileId) {
228 let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::Open) else {
230 tracing::trace!("OpenComplete for file {:?} (no pending op)", file_id);
232 return;
233 };
234
235 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
237 tracing::trace!("Waking open waker for file {:?}, op {}", file_id, op_seq);
238 waker.wake();
239 }
240}
241
242fn handle_set_len_complete(inner: &mut SimInner, file_id: FileId, new_len: u64) {
244 let Some((op_seq, _)) = take_pending_op(inner, file_id, PendingOpType::SetLen) else {
246 tracing::warn!("SetLenComplete for unknown file {:?}", file_id);
247 return;
248 };
249
250 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
252 file_state.storage.resize(new_len);
253 }
254
255 if let Some(waker) = inner.wakers.storage_wakers.remove(&(file_id, op_seq)) {
257 tracing::trace!(
258 "Waking set_len waker for file {:?}, op {}, new_len={}",
259 file_id,
260 op_seq,
261 new_len
262 );
263 waker.wake();
264 }
265}
266
267impl SimWorld {
272 pub fn with_storage_config<F, R>(&self, f: F) -> R
274 where
275 F: FnOnce(&crate::storage::StorageConfiguration) -> R,
276 {
277 let inner = self.inner.borrow();
278 f(&inner.storage.config)
279 }
280
281 pub(crate) fn open_file(
286 &self,
287 path: &str,
288 options: moonpool_core::OpenOptions,
289 initial_size: u64,
290 ) -> SimulationResult<FileId> {
291 use crate::storage::InMemoryStorage;
292
293 let mut inner = self.inner.borrow_mut();
294 let path_str = path.to_string();
295
296 if options.create_new && inner.storage.path_to_file.contains_key(&path_str) {
298 return Err(SimulationError::IoError(
299 "File already exists (create_new)".to_string(),
300 ));
301 }
302
303 if inner.storage.deleted_paths.contains(&path_str) && !options.create {
305 return Err(SimulationError::IoError("File not found".to_string()));
306 }
307
308 if let Some(&existing_id) = inner.storage.path_to_file.get(&path_str) {
310 if let Some(file_state) = inner.storage.files.get_mut(&existing_id) {
311 if options.truncate {
313 let seed = sim_random::<u64>();
314 file_state.storage = InMemoryStorage::new(0, seed);
315 file_state.position = 0;
316 } else if options.append {
317 file_state.position = file_state.storage.size();
319 } else {
320 file_state.position = 0;
322 }
323 file_state.options = options;
325 file_state.is_closed = false;
326 }
327 return Ok(existing_id);
328 }
329
330 if !options.create && !options.create_new {
332 return Err(SimulationError::IoError("File not found".to_string()));
333 }
334
335 let file_id = FileId(inner.storage.next_file_id);
337 inner.storage.next_file_id += 1;
338
339 inner.storage.deleted_paths.remove(&path_str);
341
342 let seed = sim_random::<u64>();
344 let storage = InMemoryStorage::new(initial_size, seed);
345
346 let file_state =
347 super::state::StorageFileState::new(file_id, path_str.clone(), options, storage);
348
349 inner.storage.files.insert(file_id, file_state);
350 inner.storage.path_to_file.insert(path_str, file_id);
351
352 let open_latency = Duration::from_micros(1);
354 let scheduled_time = inner.current_time + open_latency;
355 let sequence = inner.next_sequence;
356 inner.next_sequence += 1;
357 let event = Event::Storage {
358 file_id: file_id.0,
359 operation: StorageOperation::OpenComplete,
360 };
361 inner
362 .event_queue
363 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
364
365 tracing::debug!("Opened file {:?} with id {:?}", path, file_id);
366 Ok(file_id)
367 }
368
369 pub(crate) fn file_exists(&self, path: &str) -> bool {
371 let inner = self.inner.borrow();
372 let path_str = path.to_string();
373 inner.storage.path_to_file.contains_key(&path_str)
374 && !inner.storage.deleted_paths.contains(&path_str)
375 }
376
377 pub(crate) fn delete_file(&self, path: &str) -> SimulationResult<()> {
379 let mut inner = self.inner.borrow_mut();
380 let path_str = path.to_string();
381
382 if let Some(file_id) = inner.storage.path_to_file.remove(&path_str) {
383 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
385 file_state.is_closed = true;
386 }
387 inner.storage.files.remove(&file_id);
388 inner.storage.deleted_paths.insert(path_str);
389 tracing::debug!("Deleted file {:?}", path);
390 Ok(())
391 } else {
392 Err(SimulationError::IoError("File not found".to_string()))
393 }
394 }
395
396 pub(crate) fn rename_file(&self, from: &str, to: &str) -> SimulationResult<()> {
398 let mut inner = self.inner.borrow_mut();
399 let from_str = from.to_string();
400 let to_str = to.to_string();
401
402 if let Some(file_id) = inner.storage.path_to_file.remove(&from_str) {
403 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
405 file_state.path = to_str.clone();
406 }
407 inner.storage.path_to_file.insert(to_str, file_id);
408 inner.storage.deleted_paths.remove(&from_str);
409 tracing::debug!("Renamed file {:?} to {:?}", from, to);
410 Ok(())
411 } else {
412 Err(SimulationError::IoError("File not found".to_string()))
413 }
414 }
415
416 pub(crate) fn schedule_read(
420 &self,
421 file_id: FileId,
422 offset: u64,
423 len: usize,
424 ) -> SimulationResult<u64> {
425 let mut inner = self.inner.borrow_mut();
426
427 let file_state = inner
428 .storage
429 .files
430 .get_mut(&file_id)
431 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
432
433 if file_state.is_closed {
434 return Err(SimulationError::IoError("File is closed".to_string()));
435 }
436
437 let op_seq = file_state.next_op_seq;
438 file_state.next_op_seq += 1;
439
440 file_state.pending_ops.insert(
442 op_seq,
443 PendingStorageOp {
444 op_type: PendingOpType::Read,
445 offset,
446 len,
447 data: None,
448 },
449 );
450
451 let latency = Self::calculate_storage_latency(&inner.storage.config, len, false);
453 let scheduled_time = inner.current_time + latency;
454 let sequence = inner.next_sequence;
455 inner.next_sequence += 1;
456
457 let event = Event::Storage {
458 file_id: file_id.0,
459 operation: StorageOperation::ReadComplete { len: len as u32 },
460 };
461 inner
462 .event_queue
463 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
464
465 tracing::trace!(
466 "Scheduled read: file={:?}, offset={}, len={}, op_seq={}",
467 file_id,
468 offset,
469 len,
470 op_seq
471 );
472
473 Ok(op_seq)
474 }
475
476 pub(crate) fn schedule_write(
480 &self,
481 file_id: FileId,
482 offset: u64,
483 data: Vec<u8>,
484 ) -> SimulationResult<u64> {
485 let mut inner = self.inner.borrow_mut();
486
487 let file_state = inner
488 .storage
489 .files
490 .get_mut(&file_id)
491 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
492
493 if file_state.is_closed {
494 return Err(SimulationError::IoError("File is closed".to_string()));
495 }
496
497 let op_seq = file_state.next_op_seq;
498 file_state.next_op_seq += 1;
499 let len = data.len();
500
501 file_state.pending_ops.insert(
503 op_seq,
504 PendingStorageOp {
505 op_type: PendingOpType::Write,
506 offset,
507 len,
508 data: Some(data),
509 },
510 );
511
512 let latency = Self::calculate_storage_latency(&inner.storage.config, len, true);
514 let scheduled_time = inner.current_time + latency;
515 let sequence = inner.next_sequence;
516 inner.next_sequence += 1;
517
518 let event = Event::Storage {
519 file_id: file_id.0,
520 operation: StorageOperation::WriteComplete { len: len as u32 },
521 };
522 inner
523 .event_queue
524 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
525
526 tracing::trace!(
527 "Scheduled write: file={:?}, offset={}, len={}, op_seq={}",
528 file_id,
529 offset,
530 len,
531 op_seq
532 );
533
534 Ok(op_seq)
535 }
536
537 pub(crate) fn schedule_sync(&self, file_id: FileId) -> SimulationResult<u64> {
541 let mut inner = self.inner.borrow_mut();
542
543 let file_state = inner
544 .storage
545 .files
546 .get_mut(&file_id)
547 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
548
549 if file_state.is_closed {
550 return Err(SimulationError::IoError("File is closed".to_string()));
551 }
552
553 let op_seq = file_state.next_op_seq;
554 file_state.next_op_seq += 1;
555
556 file_state.pending_ops.insert(
558 op_seq,
559 PendingStorageOp {
560 op_type: PendingOpType::Sync,
561 offset: 0,
562 len: 0,
563 data: None,
564 },
565 );
566
567 let latency = crate::network::sample_duration(&inner.storage.config.sync_latency);
569 let scheduled_time = inner.current_time + latency;
570 let sequence = inner.next_sequence;
571 inner.next_sequence += 1;
572
573 let event = Event::Storage {
574 file_id: file_id.0,
575 operation: StorageOperation::SyncComplete,
576 };
577 inner
578 .event_queue
579 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
580
581 tracing::trace!("Scheduled sync: file={:?}, op_seq={}", file_id, op_seq);
582
583 Ok(op_seq)
584 }
585
586 pub(crate) fn schedule_set_len(&self, file_id: FileId, new_len: u64) -> SimulationResult<u64> {
590 let mut inner = self.inner.borrow_mut();
591
592 let file_state = inner
593 .storage
594 .files
595 .get_mut(&file_id)
596 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
597
598 if file_state.is_closed {
599 return Err(SimulationError::IoError("File is closed".to_string()));
600 }
601
602 let op_seq = file_state.next_op_seq;
603 file_state.next_op_seq += 1;
604
605 file_state.pending_ops.insert(
607 op_seq,
608 PendingStorageOp {
609 op_type: PendingOpType::SetLen,
610 offset: new_len,
611 len: 0,
612 data: None,
613 },
614 );
615
616 let latency = crate::network::sample_duration(&inner.storage.config.write_latency);
618 let scheduled_time = inner.current_time + latency;
619 let sequence = inner.next_sequence;
620 inner.next_sequence += 1;
621
622 let event = Event::Storage {
623 file_id: file_id.0,
624 operation: StorageOperation::SetLenComplete { new_len },
625 };
626 inner
627 .event_queue
628 .schedule(ScheduledEvent::new(scheduled_time, event, sequence));
629
630 tracing::trace!(
631 "Scheduled set_len: file={:?}, new_len={}, op_seq={}",
632 file_id,
633 new_len,
634 op_seq
635 );
636
637 Ok(op_seq)
638 }
639
640 pub(crate) fn is_storage_op_complete(&self, file_id: FileId, op_seq: u64) -> bool {
642 let inner = self.inner.borrow();
643 if let Some(file_state) = inner.storage.files.get(&file_id) {
644 !file_state.pending_ops.contains_key(&op_seq)
646 } else {
647 true
649 }
650 }
651
652 pub(crate) fn take_sync_failure(&self, file_id: FileId, op_seq: u64) -> bool {
656 let mut inner = self.inner.borrow_mut();
657 inner.storage.sync_failures.remove(&(file_id, op_seq))
658 }
659
660 pub(crate) fn register_storage_waker(&self, file_id: FileId, op_seq: u64, waker: Waker) {
662 let mut inner = self.inner.borrow_mut();
663 inner.wakers.storage_wakers.insert((file_id, op_seq), waker);
664 }
665
666 pub(crate) fn read_from_file(
670 &self,
671 file_id: FileId,
672 offset: u64,
673 buf: &mut [u8],
674 ) -> SimulationResult<usize> {
675 let inner = self.inner.borrow();
676
677 let file_state = inner
678 .storage
679 .files
680 .get(&file_id)
681 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))?;
682
683 if file_state.is_closed {
684 return Err(SimulationError::IoError("File is closed".to_string()));
685 }
686
687 file_state
689 .storage
690 .read(offset, buf)
691 .map_err(|e| SimulationError::IoError(e.to_string()))?;
692
693 Ok(buf.len())
694 }
695
696 pub(crate) fn get_file_position(&self, file_id: FileId) -> SimulationResult<u64> {
698 let inner = self.inner.borrow();
699 inner
700 .storage
701 .files
702 .get(&file_id)
703 .map(|f| f.position)
704 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))
705 }
706
707 pub(crate) fn set_file_position(&self, file_id: FileId, position: u64) -> SimulationResult<()> {
709 let mut inner = self.inner.borrow_mut();
710 if let Some(file_state) = inner.storage.files.get_mut(&file_id) {
711 file_state.position = position;
712 Ok(())
713 } else {
714 Err(SimulationError::IoError("File not found".to_string()))
715 }
716 }
717
718 pub(crate) fn get_file_size(&self, file_id: FileId) -> SimulationResult<u64> {
720 let inner = self.inner.borrow();
721 inner
722 .storage
723 .files
724 .get(&file_id)
725 .map(|f| f.storage.size())
726 .ok_or_else(|| SimulationError::IoError("File not found".to_string()))
727 }
728
729 fn calculate_storage_latency(
733 config: &crate::storage::StorageConfiguration,
734 size: usize,
735 is_write: bool,
736 ) -> Duration {
737 let base_range = if is_write {
739 &config.write_latency
740 } else {
741 &config.read_latency
742 };
743 let base = crate::network::sample_duration(base_range);
744
745 let iops_overhead = Duration::from_secs_f64(1.0 / config.iops as f64);
747
748 let transfer = Duration::from_secs_f64(size as f64 / config.bandwidth as f64);
750
751 base + iops_overhead + transfer
752 }
753
754 pub fn simulate_crash(&self, close_files: bool) {
762 let mut inner = self.inner.borrow_mut();
763 let crash_probability = inner.storage.config.crash_fault_probability;
764
765 let mut wakers_to_wake = Vec::new();
767 let file_ids: Vec<FileId> = inner.storage.files.keys().copied().collect();
768
769 for file_id in &file_ids {
770 if let Some(file_state) = inner.storage.files.get_mut(file_id) {
771 file_state.storage.apply_crash(crash_probability);
773
774 let lost_ops: Vec<u64> = file_state.pending_ops.keys().copied().collect();
776
777 file_state.pending_ops.clear();
779
780 for op_seq in lost_ops {
782 wakers_to_wake.push((*file_id, op_seq));
783 }
784
785 if close_files {
787 file_state.is_closed = true;
788 }
789 }
790 }
791
792 for key in wakers_to_wake {
794 if let Some(waker) = inner.wakers.storage_wakers.remove(&key) {
795 waker.wake();
796 }
797 }
798
799 tracing::info!(
800 "Storage crash simulated: {} files affected, close_files={}",
801 file_ids.len(),
802 close_files
803 );
804 }
805}