1use std::{
7 cell::RefCell,
8 collections::{BTreeMap, HashSet, VecDeque},
9 net::IpAddr,
10 rc::{Rc, Weak},
11 task::Waker,
12 time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17 SimulationError, SimulationResult,
18 chaos::fault_events::{SIM_FAULT_TIMELINE, SimFaultEvent},
19 chaos::state_handle::StateHandle,
20 network::{
21 NetworkConfiguration, PartitionStrategy,
22 sim::{ConnectionId, ListenerId, SimNetworkProvider},
23 },
24};
25
26use super::{
27 events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
28 rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
29 sleep::SleepFuture,
30 state::{
31 ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
32 StorageState,
33 },
34 wakers::WakerRegistry,
35};
36
37#[derive(Debug)]
39pub(crate) struct SimInner {
40 pub(crate) current_time: Duration,
41 pub(crate) timer_time: Duration,
44 pub(crate) event_queue: EventQueue,
45 pub(crate) next_sequence: u64,
46
47 pub(crate) network: NetworkState,
49
50 pub(crate) storage: StorageState,
52
53 pub(crate) wakers: WakerRegistry,
55
56 pub(crate) next_task_id: u64,
58 pub(crate) awakened_tasks: HashSet<u64>,
59
60 pub(crate) events_processed: u64,
62
63 pub(crate) last_bit_flip_time: Duration,
65
66 pub(crate) last_processed_event: Option<Event>,
68
69 pub(crate) state: Option<StateHandle>,
71}
72
73impl SimInner {
74 pub(crate) fn new() -> Self {
75 Self {
76 current_time: Duration::ZERO,
77 timer_time: Duration::ZERO,
78 event_queue: EventQueue::new(),
79 next_sequence: 0,
80 network: NetworkState::new(NetworkConfiguration::default()),
81 storage: StorageState::default(),
82 wakers: WakerRegistry::default(),
83 next_task_id: 0,
84 awakened_tasks: HashSet::new(),
85 events_processed: 0,
86 last_bit_flip_time: Duration::ZERO,
87 last_processed_event: None,
88 state: None,
89 }
90 }
91
92 pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
93 Self {
94 current_time: Duration::ZERO,
95 timer_time: Duration::ZERO,
96 event_queue: EventQueue::new(),
97 next_sequence: 0,
98 network: NetworkState::new(network_config),
99 storage: StorageState::default(),
100 wakers: WakerRegistry::default(),
101 next_task_id: 0,
102 awakened_tasks: HashSet::new(),
103 events_processed: 0,
104 last_bit_flip_time: Duration::ZERO,
105 last_processed_event: None,
106 state: None,
107 }
108 }
109
110 pub(crate) fn emit_fault(&self, event: SimFaultEvent) {
112 if let Some(ref state) = self.state {
113 let time_ms = self.current_time.as_millis() as u64;
114 state.emit_raw(SIM_FAULT_TIMELINE, event, time_ms, "sim");
115 }
116 }
117
118 pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
128 if random_value == 0 {
129 return max_bits.min(32);
131 }
132
133 let bit_count = 1 + random_value.leading_zeros();
135
136 bit_count.clamp(min_bits, max_bits)
138 }
139}
140
141#[derive(Debug)]
147pub struct SimWorld {
148 pub(crate) inner: Rc<RefCell<SimInner>>,
149}
150
151impl SimWorld {
152 fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
154 reset_sim_rng();
155 set_sim_seed(seed);
156 crate::chaos::assertions::reset_assertion_results();
157
158 let inner = match network_config {
159 Some(config) => SimInner::new_with_config(config),
160 None => SimInner::new(),
161 };
162
163 Self {
164 inner: Rc::new(RefCell::new(inner)),
165 }
166 }
167
168 pub fn new() -> Self {
173 Self::create(None, 0)
174 }
175
176 pub fn new_with_seed(seed: u64) -> Self {
185 Self::create(None, seed)
186 }
187
188 pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
190 Self::create(Some(network_config), 0)
191 }
192
193 pub fn new_with_network_config_and_seed(
200 network_config: NetworkConfiguration,
201 seed: u64,
202 ) -> Self {
203 Self::create(Some(network_config), seed)
204 }
205
206 pub fn set_state(&self, state: StateHandle) {
211 self.inner.borrow_mut().state = Some(state);
212 }
213
214 #[instrument(skip(self))]
219 pub fn step(&mut self) -> bool {
220 let mut inner = self.inner.borrow_mut();
221
222 if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
223 inner.current_time = scheduled_event.time();
225
226 Self::clear_expired_clogs_with_inner(&mut inner);
228
229 Self::randomly_trigger_partitions_with_inner(&mut inner);
231
232 let event = scheduled_event.into_event();
234 inner.last_processed_event = Some(event.clone());
235
236 Self::process_event_with_inner(&mut inner, event);
238
239 !inner.event_queue.is_empty()
241 } else {
242 inner.last_processed_event = None;
243 false
245 }
246 }
247
248 #[instrument(skip(self))]
254 pub fn run_until_empty(&mut self) {
255 while self.step() {
256 if self.inner.borrow().events_processed.is_multiple_of(50) {
258 let has_workload_events = !self
259 .inner
260 .borrow()
261 .event_queue
262 .has_only_infrastructure_events();
263 if !has_workload_events {
264 tracing::debug!(
265 "Early termination: only infrastructure events remain in queue"
266 );
267 break;
268 }
269 }
270 }
271 }
272
273 pub fn current_time(&self) -> Duration {
275 self.inner.borrow().current_time
276 }
277
278 pub fn now(&self) -> Duration {
283 self.inner.borrow().current_time
284 }
285
286 pub fn timer(&self) -> Duration {
301 let mut inner = self.inner.borrow_mut();
302 let chaos = &inner.network.config.chaos;
303
304 if !chaos.clock_drift_enabled {
306 return inner.current_time;
307 }
308
309 let max_timer = inner.current_time + chaos.clock_drift_max;
313
314 if inner.timer_time < max_timer {
316 let random_factor = sim_random::<f64>(); let gap = (max_timer - inner.timer_time).as_secs_f64();
318 let delta = random_factor * gap / 2.0;
319 inner.timer_time += Duration::from_secs_f64(delta);
320 }
321
322 inner.timer_time = inner.timer_time.max(inner.current_time);
324
325 inner.timer_time
326 }
327
328 #[instrument(skip(self))]
330 pub fn schedule_event(&self, event: Event, delay: Duration) {
331 let mut inner = self.inner.borrow_mut();
332 let scheduled_time = inner.current_time + delay;
333 let sequence = inner.next_sequence;
334 inner.next_sequence += 1;
335
336 let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
337 inner.event_queue.schedule(scheduled_event);
338 }
339
340 pub fn schedule_event_at(&self, event: Event, time: Duration) {
342 let mut inner = self.inner.borrow_mut();
343 let sequence = inner.next_sequence;
344 inner.next_sequence += 1;
345
346 let scheduled_event = ScheduledEvent::new(time, event, sequence);
347 inner.event_queue.schedule(scheduled_event);
348 }
349
350 pub fn downgrade(&self) -> WeakSimWorld {
355 WeakSimWorld {
356 inner: Rc::downgrade(&self.inner),
357 }
358 }
359
360 pub fn has_pending_events(&self) -> bool {
362 !self.inner.borrow().event_queue.is_empty()
363 }
364
365 pub fn pending_event_count(&self) -> usize {
367 self.inner.borrow().event_queue.len()
368 }
369
370 pub fn network_provider(&self) -> SimNetworkProvider {
372 SimNetworkProvider::new(self.downgrade())
373 }
374
375 pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
377 crate::providers::SimTimeProvider::new(self.downgrade())
378 }
379
380 pub fn task_provider(&self) -> crate::TokioTaskProvider {
382 crate::TokioTaskProvider
383 }
384
385 pub fn storage_provider(&self, ip: std::net::IpAddr) -> crate::storage::SimStorageProvider {
387 crate::storage::SimStorageProvider::new(self.downgrade(), ip)
388 }
389
390 pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
394 self.inner.borrow_mut().storage.config = config;
395 }
396
397 pub fn with_network_config<F, R>(&self, f: F) -> R
405 where
406 F: FnOnce(&NetworkConfiguration) -> R,
407 {
408 let inner = self.inner.borrow();
409 f(&inner.network.config)
410 }
411
412 pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
414 let mut inner = self.inner.borrow_mut();
415 let listener_id = ListenerId(inner.network.next_listener_id);
416 inner.network.next_listener_id += 1;
417
418 inner.network.listeners.insert(
419 listener_id,
420 ListenerState {
421 id: listener_id,
422 addr,
423 pending_connections: VecDeque::new(),
424 },
425 );
426
427 Ok(listener_id)
428 }
429
430 pub(crate) fn read_from_connection(
432 &self,
433 connection_id: ConnectionId,
434 buf: &mut [u8],
435 ) -> SimulationResult<usize> {
436 let mut inner = self.inner.borrow_mut();
437
438 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
439 let mut bytes_read = 0;
440 while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
441 if let Some(byte) = connection.receive_buffer.pop_front() {
442 buf[bytes_read] = byte;
443 bytes_read += 1;
444 }
445 }
446 Ok(bytes_read)
447 } else {
448 Err(SimulationError::InvalidState(
449 "connection not found".to_string(),
450 ))
451 }
452 }
453
454 pub(crate) fn write_to_connection(
456 &self,
457 connection_id: ConnectionId,
458 data: &[u8],
459 ) -> SimulationResult<()> {
460 let mut inner = self.inner.borrow_mut();
461
462 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
463 for &byte in data {
464 connection.receive_buffer.push_back(byte);
465 }
466 Ok(())
467 } else {
468 Err(SimulationError::InvalidState(
469 "connection not found".to_string(),
470 ))
471 }
472 }
473
474 pub(crate) fn buffer_send(
479 &self,
480 connection_id: ConnectionId,
481 data: Vec<u8>,
482 ) -> SimulationResult<()> {
483 tracing::debug!(
484 "buffer_send called for connection_id={} with {} bytes",
485 connection_id.0,
486 data.len()
487 );
488 let mut inner = self.inner.borrow_mut();
489
490 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
491 conn.send_buffer.push_back(data);
493 tracing::debug!(
494 "buffer_send: added data to send_buffer, new length: {}",
495 conn.send_buffer.len()
496 );
497
498 if !conn.send_in_progress {
500 tracing::debug!(
501 "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
502 );
503 conn.send_in_progress = true;
504
505 let scheduled_time = inner.current_time + std::time::Duration::ZERO;
507 let sequence = inner.next_sequence;
508 inner.next_sequence += 1;
509 let scheduled_event = ScheduledEvent::new(
510 scheduled_time,
511 Event::Network {
512 connection_id: connection_id.0,
513 operation: NetworkOperation::ProcessSendBuffer,
514 },
515 sequence,
516 );
517 inner.event_queue.schedule(scheduled_event);
518 tracing::debug!(
519 "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
520 sequence
521 );
522 } else {
523 tracing::debug!(
524 "buffer_send: sender already in progress, not scheduling new event"
525 );
526 }
527
528 Ok(())
529 } else {
530 tracing::debug!(
531 "buffer_send: connection_id={} not found in connections table",
532 connection_id.0
533 );
534 Err(SimulationError::InvalidState(
535 "connection not found".to_string(),
536 ))
537 }
538 }
539
540 pub(crate) fn create_connection_pair(
548 &self,
549 client_addr: String,
550 server_addr: String,
551 ) -> SimulationResult<(ConnectionId, ConnectionId)> {
552 let mut inner = self.inner.borrow_mut();
553
554 let client_id = ConnectionId(inner.network.next_connection_id);
555 inner.network.next_connection_id += 1;
556
557 let server_id = ConnectionId(inner.network.next_connection_id);
558 inner.network.next_connection_id += 1;
559
560 let current_time = inner.current_time;
562
563 let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
565 let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
566
567 let ephemeral_peer_addr = match client_ip {
571 Some(std::net::IpAddr::V4(ipv4)) => {
572 let octets = ipv4.octets();
573 let ip_offset = sim_random_range(0u32..256) as u8;
574 let new_last_octet = octets[3].wrapping_add(ip_offset);
575 let ephemeral_ip =
576 std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
577 let ephemeral_port = sim_random_range(40000u16..60000);
578 format!("{}:{}", ephemeral_ip, ephemeral_port)
579 }
580 Some(std::net::IpAddr::V6(ipv6)) => {
581 let segments = ipv6.segments();
583 let mut new_segments = segments;
584 let ip_offset = sim_random_range(0u16..256);
585 new_segments[7] = new_segments[7].wrapping_add(ip_offset);
586 let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
587 let ephemeral_port = sim_random_range(40000u16..60000);
588 format!("[{}]:{}", ephemeral_ip, ephemeral_port)
589 }
590 None => {
591 let ephemeral_port = sim_random_range(40000u16..60000);
593 format!("unknown:{}", ephemeral_port)
594 }
595 };
596
597 const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; inner.network.connections.insert(
605 client_id,
606 ConnectionState {
607 id: client_id,
608 addr: client_addr,
609 local_ip: client_ip,
610 remote_ip: server_ip,
611 peer_address: server_addr.clone(),
612 receive_buffer: VecDeque::new(),
613 paired_connection: Some(server_id),
614 send_buffer: VecDeque::new(),
615 send_in_progress: false,
616 next_send_time: current_time,
617 is_closed: false,
618 send_closed: false,
619 recv_closed: false,
620 is_cut: false,
621 cut_expiry: None,
622 close_reason: CloseReason::None,
623 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
624 send_delay: None,
625 recv_delay: None,
626 is_half_open: false,
627 half_open_error_at: None,
628 is_stable: false,
629 graceful_close_pending: false,
630 last_data_delivery_scheduled_at: None,
631 remote_fin_received: false,
632 },
633 );
634
635 inner.network.connections.insert(
637 server_id,
638 ConnectionState {
639 id: server_id,
640 addr: server_addr,
641 local_ip: server_ip,
642 remote_ip: client_ip,
643 peer_address: ephemeral_peer_addr,
644 receive_buffer: VecDeque::new(),
645 paired_connection: Some(client_id),
646 send_buffer: VecDeque::new(),
647 send_in_progress: false,
648 next_send_time: current_time,
649 is_closed: false,
650 send_closed: false,
651 recv_closed: false,
652 is_cut: false,
653 cut_expiry: None,
654 close_reason: CloseReason::None,
655 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
656 send_delay: None,
657 recv_delay: None,
658 is_half_open: false,
659 half_open_error_at: None,
660 is_stable: false,
661 graceful_close_pending: false,
662 last_data_delivery_scheduled_at: None,
663 remote_fin_received: false,
664 },
665 );
666
667 Ok((client_id, server_id))
668 }
669
670 pub(crate) fn register_read_waker(
672 &self,
673 connection_id: ConnectionId,
674 waker: Waker,
675 ) -> SimulationResult<()> {
676 let mut inner = self.inner.borrow_mut();
677 let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
678 inner.wakers.read_wakers.insert(connection_id, waker);
679 tracing::debug!(
680 "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
681 connection_id.0,
682 is_replacement,
683 inner.wakers.read_wakers.len()
684 );
685 Ok(())
686 }
687
688 pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
690 let mut inner = self.inner.borrow_mut();
691 use std::collections::hash_map::DefaultHasher;
693 use std::hash::{Hash, Hasher};
694 let mut hasher = DefaultHasher::new();
695 addr.hash(&mut hasher);
696 let listener_key = ListenerId(hasher.finish());
697
698 inner.wakers.listener_wakers.insert(listener_key, waker);
699 Ok(())
700 }
701
702 pub(crate) fn store_pending_connection(
704 &self,
705 addr: &str,
706 connection_id: ConnectionId,
707 ) -> SimulationResult<()> {
708 let mut inner = self.inner.borrow_mut();
709 inner
710 .network
711 .pending_connections
712 .insert(addr.to_string(), connection_id);
713
714 use std::collections::hash_map::DefaultHasher;
716 use std::hash::{Hash, Hasher};
717 let mut hasher = DefaultHasher::new();
718 addr.hash(&mut hasher);
719 let listener_key = ListenerId(hasher.finish());
720
721 if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
722 waker.wake();
723 }
724
725 Ok(())
726 }
727
728 pub(crate) fn pending_connection(&self, addr: &str) -> SimulationResult<Option<ConnectionId>> {
730 let mut inner = self.inner.borrow_mut();
731 Ok(inner.network.pending_connections.remove(addr))
732 }
733
734 pub(crate) fn connection_peer_address(&self, connection_id: ConnectionId) -> Option<String> {
743 let inner = self.inner.borrow();
744 inner
745 .network
746 .connections
747 .get(&connection_id)
748 .map(|conn| conn.peer_address.clone())
749 }
750
751 #[instrument(skip(self))]
756 pub fn sleep(&self, duration: Duration) -> SleepFuture {
757 let task_id = self.generate_task_id();
758
759 let actual_duration = self.apply_buggified_delay(duration);
761
762 self.schedule_event(Event::Timer { task_id }, actual_duration);
764
765 SleepFuture::new(self.downgrade(), task_id)
767 }
768
769 fn apply_buggified_delay(&self, duration: Duration) -> Duration {
771 let inner = self.inner.borrow();
772 let chaos = &inner.network.config.chaos;
773
774 if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
775 return duration;
776 }
777
778 if sim_random::<f64>() < chaos.buggified_delay_probability {
780 let random_factor = sim_random::<f64>().powf(1000.0);
782 let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
783 tracing::trace!(
784 extra_delay_ms = extra_delay.as_millis(),
785 "Buggified delay applied"
786 );
787 duration + extra_delay
788 } else {
789 duration
790 }
791 }
792
793 fn generate_task_id(&self) -> u64 {
795 let mut inner = self.inner.borrow_mut();
796 let task_id = inner.next_task_id;
797 inner.next_task_id += 1;
798 task_id
799 }
800
801 fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
803 if let Some(waker_list) = wakers.remove(&connection_id) {
804 for waker in waker_list {
805 waker.wake();
806 }
807 }
808 }
809
810 pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
812 let inner = self.inner.borrow();
813 Ok(inner.awakened_tasks.contains(&task_id))
814 }
815
816 pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
818 let mut inner = self.inner.borrow_mut();
819 inner.wakers.task_wakers.insert(task_id, waker);
820 Ok(())
821 }
822
823 fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
825 let now = inner.current_time;
826 let expired: Vec<ConnectionId> = inner
827 .network
828 .connection_clogs
829 .iter()
830 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
831 .collect();
832
833 for id in expired {
834 inner.network.connection_clogs.remove(&id);
835 Self::wake_all(&mut inner.wakers.clog_wakers, id);
836 }
837 }
838
839 #[instrument(skip(inner))]
841 fn process_event_with_inner(inner: &mut SimInner, event: Event) {
842 inner.events_processed += 1;
843
844 match event {
845 Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
846 Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
847 Event::Network {
848 connection_id,
849 operation,
850 } => Self::handle_network_event(inner, connection_id, operation),
851 Event::Storage { file_id, operation } => {
852 super::storage_ops::handle_storage_event(inner, file_id, operation)
853 }
854 Event::Shutdown => Self::handle_shutdown_event(inner),
855 Event::ProcessRestart { ip }
856 | Event::ProcessGracefulShutdown { ip, .. }
857 | Event::ProcessForceKill { ip, .. } => {
858 tracing::debug!("Process lifecycle event for IP {}", ip);
861 }
862 }
863 }
864
865 fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
867 inner.awakened_tasks.insert(task_id);
868 if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
869 waker.wake();
870 }
871 }
872
873 fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
875 let connection_id = ConnectionId(id);
876
877 match state {
878 ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
879 }
881 ConnectionStateChange::ClogClear => {
882 inner.network.connection_clogs.remove(&connection_id);
883 Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
884 }
885 ConnectionStateChange::ReadClogClear => {
886 inner.network.read_clogs.remove(&connection_id);
887 Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
888 }
889 ConnectionStateChange::PartitionRestore => {
890 Self::clear_expired_partitions(inner);
891 }
892 ConnectionStateChange::SendPartitionClear => {
893 Self::clear_expired_send_partitions(inner);
894 }
895 ConnectionStateChange::RecvPartitionClear => {
896 Self::clear_expired_recv_partitions(inner);
897 }
898 ConnectionStateChange::CutRestore => {
899 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
900 && conn.is_cut
901 {
902 conn.is_cut = false;
903 conn.cut_expiry = None;
904 inner.emit_fault(SimFaultEvent::CutRestored { connection_id: id });
905 tracing::debug!("Connection {} restored via scheduled event", id);
906 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
907 }
908 }
909 ConnectionStateChange::HalfOpenError => {
910 inner.emit_fault(SimFaultEvent::HalfOpenError { connection_id: id });
911 tracing::debug!("Connection {} half-open error time reached", id);
912 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
913 waker.wake();
914 }
915 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
916 }
917 }
918 }
919
920 fn clear_expired_partitions(inner: &mut SimInner) {
922 let now = inner.current_time;
923 let expired: Vec<_> = inner
924 .network
925 .ip_partitions
926 .iter()
927 .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
928 .collect();
929
930 for pair in expired {
931 inner.network.ip_partitions.remove(&pair);
932 tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
933 }
934 }
935
936 fn clear_expired_send_partitions(inner: &mut SimInner) {
938 let now = inner.current_time;
939 let expired: Vec<_> = inner
940 .network
941 .send_partitions
942 .iter()
943 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
944 .collect();
945
946 for ip in expired {
947 inner.network.send_partitions.remove(&ip);
948 tracing::debug!("Cleared send partition for {}", ip);
949 }
950 }
951
952 fn clear_expired_recv_partitions(inner: &mut SimInner) {
954 let now = inner.current_time;
955 let expired: Vec<_> = inner
956 .network
957 .recv_partitions
958 .iter()
959 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
960 .collect();
961
962 for ip in expired {
963 inner.network.recv_partitions.remove(&ip);
964 tracing::debug!("Cleared receive partition for {}", ip);
965 }
966 }
967
968 fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
970 let connection_id = ConnectionId(conn_id);
971
972 match operation {
973 NetworkOperation::DataDelivery { data } => {
974 Self::handle_data_delivery(inner, connection_id, data);
975 }
976 NetworkOperation::ProcessSendBuffer => {
977 Self::handle_process_send_buffer(inner, connection_id);
978 }
979 NetworkOperation::FinDelivery => {
980 Self::handle_fin_delivery(inner, connection_id);
981 }
982 }
983 }
984
985 fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
987 tracing::trace!(
988 "DataDelivery: {} bytes to connection {}",
989 data.len(),
990 connection_id.0
991 );
992
993 let is_stable = inner
995 .network
996 .connections
997 .get(&connection_id)
998 .is_some_and(|conn| conn.is_stable);
999
1000 if !inner.network.connections.contains_key(&connection_id) {
1001 tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
1002 return;
1003 }
1004
1005 let data_to_deliver = if is_stable {
1007 data
1008 } else {
1009 Self::maybe_corrupt_data(inner, connection_id, &data)
1010 };
1011
1012 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1014 return;
1015 };
1016
1017 for &byte in &data_to_deliver {
1018 conn.receive_buffer.push_back(byte);
1019 }
1020
1021 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1022 waker.wake();
1023 }
1024 }
1025
1026 fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
1032 tracing::debug!(
1033 "FinDelivery: FIN received on connection {}",
1034 connection_id.0
1035 );
1036
1037 let is_closed = inner
1039 .network
1040 .connections
1041 .get(&connection_id)
1042 .is_some_and(|conn| conn.is_closed);
1043
1044 if is_closed {
1045 tracing::debug!(
1046 "FinDelivery: connection {} already closed, ignoring stale FIN",
1047 connection_id.0
1048 );
1049 return;
1050 }
1051
1052 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1053 conn.remote_fin_received = true;
1054 }
1055
1056 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1057 waker.wake();
1058 }
1059 }
1060
1061 fn schedule_fin_delivery(
1066 inner: &mut SimInner,
1067 paired_id: Option<ConnectionId>,
1068 last_delivery_time: Option<Duration>,
1069 ) {
1070 let Some(peer_id) = paired_id else {
1071 return;
1072 };
1073
1074 let fin_time = match last_delivery_time {
1076 Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
1077 _ => inner.current_time + Duration::from_nanos(1),
1078 };
1079
1080 let sequence = inner.next_sequence;
1081 inner.next_sequence += 1;
1082
1083 tracing::debug!(
1084 "Scheduling FinDelivery to connection {} at {:?}",
1085 peer_id.0,
1086 fin_time
1087 );
1088
1089 inner.event_queue.schedule(ScheduledEvent::new(
1090 fin_time,
1091 Event::Network {
1092 connection_id: peer_id.0,
1093 operation: NetworkOperation::FinDelivery,
1094 },
1095 sequence,
1096 ));
1097 }
1098
1099 fn maybe_corrupt_data(
1101 inner: &mut SimInner,
1102 connection_id: ConnectionId,
1103 data: &[u8],
1104 ) -> Vec<u8> {
1105 if data.is_empty() {
1106 return data.to_vec();
1107 }
1108
1109 let chaos = &inner.network.config.chaos;
1110 let now = inner.current_time;
1111 let cooldown_elapsed =
1112 now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
1113
1114 if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
1115 return data.to_vec();
1116 }
1117
1118 let random_value = sim_random::<u32>();
1119 let flip_count = SimInner::calculate_flip_bit_count(
1120 random_value,
1121 chaos.bit_flip_min_bits,
1122 chaos.bit_flip_max_bits,
1123 );
1124
1125 let mut corrupted_data = data.to_vec();
1126 let mut flipped_positions = std::collections::HashSet::new();
1127
1128 for _ in 0..flip_count {
1129 let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
1130 let bit_idx = (sim_random::<u64>() as usize) % 8;
1131 let position = (byte_idx, bit_idx);
1132
1133 if !flipped_positions.contains(&position) {
1134 flipped_positions.insert(position);
1135 corrupted_data[byte_idx] ^= 1 << bit_idx;
1136 }
1137 }
1138
1139 inner.last_bit_flip_time = now;
1140 tracing::info!(
1141 "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
1142 data.len(),
1143 flip_count,
1144 flipped_positions.len()
1145 );
1146
1147 inner.emit_fault(SimFaultEvent::BitFlip {
1148 connection_id: connection_id.0,
1149 flip_count: flipped_positions.len(),
1150 });
1151
1152 corrupted_data
1153 }
1154
1155 fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1157 let is_partitioned = inner
1158 .network
1159 .is_connection_partitioned(connection_id, inner.current_time);
1160
1161 if is_partitioned {
1162 Self::handle_partitioned_send(inner, connection_id);
1163 } else {
1164 Self::handle_normal_send(inner, connection_id);
1165 }
1166 }
1167
1168 fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1170 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1171 return;
1172 };
1173
1174 if let Some(data) = conn.send_buffer.pop_front() {
1175 tracing::debug!(
1176 "Connection {} partitioned, failing send of {} bytes",
1177 connection_id.0,
1178 data.len()
1179 );
1180 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1181
1182 if !conn.send_buffer.is_empty() {
1183 Self::schedule_process_send_buffer(inner, connection_id);
1184 } else {
1185 conn.send_in_progress = false;
1186 if conn.graceful_close_pending {
1188 conn.graceful_close_pending = false;
1189 let peer_id = conn.paired_connection;
1190 let last_time = conn.last_data_delivery_scheduled_at;
1191 Self::schedule_fin_delivery(inner, peer_id, last_time);
1192 }
1193 }
1194 } else {
1195 conn.send_in_progress = false;
1196 if conn.graceful_close_pending {
1198 conn.graceful_close_pending = false;
1199 let peer_id = conn.paired_connection;
1200 let last_time = conn.last_data_delivery_scheduled_at;
1201 Self::schedule_fin_delivery(inner, peer_id, last_time);
1202 }
1203 }
1204 }
1205
1206 fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1208 let Some(conn) = inner.network.connections.get(&connection_id) else {
1210 return;
1211 };
1212
1213 let paired_id = conn.paired_connection;
1214 let send_delay = conn.send_delay;
1215 let next_send_time = conn.next_send_time;
1216 let has_data = !conn.send_buffer.is_empty();
1217 let is_stable = conn.is_stable; let recv_delay = paired_id.and_then(|pid| {
1220 inner
1221 .network
1222 .connections
1223 .get(&pid)
1224 .and_then(|c| c.recv_delay)
1225 });
1226
1227 if !has_data {
1228 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1229 conn.send_in_progress = false;
1230 if conn.graceful_close_pending {
1232 conn.graceful_close_pending = false;
1233 let peer_id = conn.paired_connection;
1234 let last_time = conn.last_data_delivery_scheduled_at;
1235 Self::schedule_fin_delivery(inner, peer_id, last_time);
1236 }
1237 }
1238 return;
1239 }
1240
1241 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1242 return;
1243 };
1244
1245 let Some(mut data) = conn.send_buffer.pop_front() else {
1246 conn.send_in_progress = false;
1247 return;
1248 };
1249
1250 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1251
1252 if !is_stable && crate::buggify!() && !data.is_empty() {
1254 let max_send = std::cmp::min(
1255 data.len(),
1256 inner.network.config.chaos.partial_write_max_bytes,
1257 );
1258 let truncate_to = sim_random_range(0..max_send + 1);
1259
1260 if truncate_to < data.len() {
1261 let remainder = data.split_off(truncate_to);
1262 conn.send_buffer.push_front(remainder);
1263 tracing::debug!(
1264 "BUGGIFY: Partial write on connection {} - sending {} bytes",
1265 connection_id.0,
1266 data.len()
1267 );
1268 }
1269 }
1270
1271 let has_more = !conn.send_buffer.is_empty();
1272 let base_delay = if has_more {
1273 Duration::from_nanos(1)
1274 } else {
1275 send_delay.unwrap_or_else(|| {
1276 crate::network::sample_duration(&inner.network.config.write_latency)
1277 })
1278 };
1279
1280 let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1281 conn.next_send_time = earliest_time + Duration::from_nanos(1);
1282
1283 if let Some(paired_id) = paired_id {
1285 let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1286 let sequence = inner.next_sequence;
1287 inner.next_sequence += 1;
1288
1289 inner.event_queue.schedule(ScheduledEvent::new(
1290 scheduled_time,
1291 Event::Network {
1292 connection_id: paired_id.0,
1293 operation: NetworkOperation::DataDelivery { data },
1294 },
1295 sequence,
1296 ));
1297
1298 conn.last_data_delivery_scheduled_at = Some(scheduled_time);
1300 }
1301
1302 if !conn.send_buffer.is_empty() {
1304 Self::schedule_process_send_buffer(inner, connection_id);
1305 } else {
1306 conn.send_in_progress = false;
1307 if conn.graceful_close_pending {
1309 conn.graceful_close_pending = false;
1310 let peer_id = conn.paired_connection;
1311 let last_time = conn.last_data_delivery_scheduled_at;
1312 Self::schedule_fin_delivery(inner, peer_id, last_time);
1313 }
1314 }
1315 }
1316
1317 fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1319 let sequence = inner.next_sequence;
1320 inner.next_sequence += 1;
1321
1322 inner.event_queue.schedule(ScheduledEvent::new(
1323 inner.current_time,
1324 Event::Network {
1325 connection_id: connection_id.0,
1326 operation: NetworkOperation::ProcessSendBuffer,
1327 },
1328 sequence,
1329 ));
1330 }
1331
1332 fn handle_shutdown_event(inner: &mut SimInner) {
1334 tracing::debug!("Processing Shutdown event - waking all pending tasks");
1335
1336 for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
1337 tracing::trace!("Waking task {}", task_id);
1338 waker.wake();
1339 }
1340
1341 for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
1342 waker.wake();
1343 }
1344
1345 tracing::debug!("Shutdown event processed");
1346 }
1347
1348 pub fn assertion_results(
1350 &self,
1351 ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1352 crate::chaos::assertion_results()
1353 }
1354
1355 pub fn reset_assertion_results(&self) {
1357 crate::chaos::reset_assertion_results();
1358 }
1359
1360 pub fn abort_all_connections_for_ip(&self, ip: std::net::IpAddr) {
1366 let connection_ids: Vec<ConnectionId> = {
1367 let inner = self.inner.borrow();
1368 inner
1369 .network
1370 .connections
1371 .iter()
1372 .filter_map(|(id, conn)| {
1373 if conn.local_ip == Some(ip) || conn.remote_ip == Some(ip) {
1374 Some(*id)
1375 } else {
1376 None
1377 }
1378 })
1379 .collect()
1380 };
1381
1382 let count = connection_ids.len();
1383 for conn_id in connection_ids {
1384 self.close_connection_abort(conn_id);
1385 }
1386
1387 if count > 0 {
1388 tracing::debug!("Aborted {} connections for rebooted IP {}", count, ip);
1389 }
1390 }
1391
1392 pub fn schedule_process_restart(
1396 &self,
1397 ip: std::net::IpAddr,
1398 recovery_delay: std::time::Duration,
1399 ) {
1400 self.schedule_event(Event::ProcessRestart { ip }, recovery_delay);
1401 tracing::debug!(
1402 "Scheduled process restart for IP {} in {:?}",
1403 ip,
1404 recovery_delay
1405 );
1406 }
1407
1408 pub fn last_processed_event(&self) -> Option<Event> {
1413 self.inner.borrow().last_processed_event.clone()
1414 }
1415
1416 pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1418 let inner = self.inner.borrow();
1419
1420 crate::runner::SimulationMetrics {
1421 wall_time: std::time::Duration::ZERO,
1422 simulated_time: inner.current_time,
1423 events_processed: inner.events_processed,
1424 }
1425 }
1426
1427 pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1431 let inner = self.inner.borrow();
1432 let config = &inner.network.config;
1433
1434 if inner
1436 .network
1437 .connections
1438 .get(&connection_id)
1439 .is_some_and(|conn| conn.is_stable)
1440 {
1441 return false;
1442 }
1443
1444 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1446 return inner.current_time < clog_state.expires_at;
1447 }
1448
1449 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1451 }
1452
1453 pub fn clog_write(&self, connection_id: ConnectionId) {
1455 let mut inner = self.inner.borrow_mut();
1456 let config = &inner.network.config;
1457
1458 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1459 let expires_at = inner.current_time + clog_duration;
1460 inner
1461 .network
1462 .connection_clogs
1463 .insert(connection_id, ClogState { expires_at });
1464
1465 let clear_event = Event::Connection {
1467 id: connection_id.0,
1468 state: ConnectionStateChange::ClogClear,
1469 };
1470 let sequence = inner.next_sequence;
1471 inner.next_sequence += 1;
1472 inner
1473 .event_queue
1474 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1475 }
1476
1477 pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1479 let inner = self.inner.borrow();
1480
1481 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1482 inner.current_time < clog_state.expires_at
1483 } else {
1484 false
1485 }
1486 }
1487
1488 pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1490 let mut inner = self.inner.borrow_mut();
1491 inner
1492 .wakers
1493 .clog_wakers
1494 .entry(connection_id)
1495 .or_default()
1496 .push(waker);
1497 }
1498
1499 pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1503 let inner = self.inner.borrow();
1504 let config = &inner.network.config;
1505
1506 if inner
1508 .network
1509 .connections
1510 .get(&connection_id)
1511 .is_some_and(|conn| conn.is_stable)
1512 {
1513 return false;
1514 }
1515
1516 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1518 return inner.current_time < clog_state.expires_at;
1519 }
1520
1521 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1523 }
1524
1525 pub fn clog_read(&self, connection_id: ConnectionId) {
1527 let mut inner = self.inner.borrow_mut();
1528 let config = &inner.network.config;
1529
1530 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1531 let expires_at = inner.current_time + clog_duration;
1532 inner
1533 .network
1534 .read_clogs
1535 .insert(connection_id, ClogState { expires_at });
1536
1537 let clear_event = Event::Connection {
1539 id: connection_id.0,
1540 state: ConnectionStateChange::ReadClogClear,
1541 };
1542 let sequence = inner.next_sequence;
1543 inner.next_sequence += 1;
1544 inner
1545 .event_queue
1546 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1547 }
1548
1549 pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1551 let inner = self.inner.borrow();
1552
1553 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1554 inner.current_time < clog_state.expires_at
1555 } else {
1556 false
1557 }
1558 }
1559
1560 pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1562 let mut inner = self.inner.borrow_mut();
1563 inner
1564 .wakers
1565 .read_clog_wakers
1566 .entry(connection_id)
1567 .or_default()
1568 .push(waker);
1569 }
1570
1571 pub fn clear_expired_clogs(&self) {
1573 let mut inner = self.inner.borrow_mut();
1574 let now = inner.current_time;
1575 let expired: Vec<ConnectionId> = inner
1576 .network
1577 .connection_clogs
1578 .iter()
1579 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1580 .collect();
1581
1582 for id in expired {
1583 inner.network.connection_clogs.remove(&id);
1584 Self::wake_all(&mut inner.wakers.clog_wakers, id);
1585 }
1586 }
1587
1588 pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1601 let mut inner = self.inner.borrow_mut();
1602 let expires_at = inner.current_time + duration;
1603
1604 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1605 conn.is_cut = true;
1606 conn.cut_expiry = Some(expires_at);
1607
1608 inner.emit_fault(SimFaultEvent::ConnectionCut {
1609 connection_id: connection_id.0,
1610 duration_ms: duration.as_millis() as u64,
1611 });
1612
1613 tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1614
1615 let restore_event = Event::Connection {
1617 id: connection_id.0,
1618 state: ConnectionStateChange::CutRestore,
1619 };
1620 let sequence = inner.next_sequence;
1621 inner.next_sequence += 1;
1622 inner
1623 .event_queue
1624 .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1625 }
1626 }
1627
1628 pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1633 let inner = self.inner.borrow();
1634 inner
1635 .network
1636 .connections
1637 .get(&connection_id)
1638 .is_some_and(|conn| {
1639 conn.is_cut
1640 && conn
1641 .cut_expiry
1642 .is_some_and(|expiry| inner.current_time < expiry)
1643 })
1644 }
1645
1646 pub fn restore_connection(&self, connection_id: ConnectionId) {
1650 let mut inner = self.inner.borrow_mut();
1651
1652 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1653 && conn.is_cut
1654 {
1655 conn.is_cut = false;
1656 conn.cut_expiry = None;
1657 tracing::debug!("Connection {} restored", connection_id.0);
1658
1659 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1661 }
1662 }
1663
1664 pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1666 let mut inner = self.inner.borrow_mut();
1667 inner
1668 .wakers
1669 .cut_wakers
1670 .entry(connection_id)
1671 .or_default()
1672 .push(waker);
1673 }
1674
1675 pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1679 let inner = self.inner.borrow();
1680 inner
1681 .network
1682 .connections
1683 .get(&connection_id)
1684 .map(|conn| conn.send_buffer_capacity)
1685 .unwrap_or(0)
1686 }
1687
1688 pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1690 let inner = self.inner.borrow();
1691 inner
1692 .network
1693 .connections
1694 .get(&connection_id)
1695 .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1696 .unwrap_or(0)
1697 }
1698
1699 pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1701 let capacity = self.send_buffer_capacity(connection_id);
1702 let used = self.send_buffer_used(connection_id);
1703 capacity.saturating_sub(used)
1704 }
1705
1706 pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1708 let mut inner = self.inner.borrow_mut();
1709 inner
1710 .wakers
1711 .send_buffer_wakers
1712 .entry(connection_id)
1713 .or_default()
1714 .push(waker);
1715 }
1716
1717 #[allow(dead_code)] fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1720 let mut inner = self.inner.borrow_mut();
1721 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1722 }
1723
1724 pub fn pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1729 let inner = self.inner.borrow();
1730 inner.network.pair_latencies.get(&(src, dst)).copied()
1731 }
1732
1733 pub fn set_pair_latency_if_not_set(
1736 &self,
1737 src: IpAddr,
1738 dst: IpAddr,
1739 latency: Duration,
1740 ) -> Duration {
1741 let mut inner = self.inner.borrow_mut();
1742 *inner
1743 .network
1744 .pair_latencies
1745 .entry((src, dst))
1746 .or_insert_with(|| {
1747 tracing::debug!(
1748 "Setting base latency for IP pair {} -> {} to {:?}",
1749 src,
1750 dst,
1751 latency
1752 );
1753 latency
1754 })
1755 }
1756
1757 pub fn connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1760 let inner = self.inner.borrow();
1761 let (local_ip, remote_ip) = inner
1762 .network
1763 .connections
1764 .get(&connection_id)
1765 .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1766 .unwrap_or({
1767 (
1768 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1769 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1770 )
1771 });
1772 drop(inner);
1773
1774 if let Some(latency) = self.pair_latency(local_ip, remote_ip) {
1776 return latency;
1777 }
1778
1779 let latency = self
1781 .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1782 self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1783 }
1784
1785 pub fn send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1790 let inner = self.inner.borrow();
1791 inner
1792 .network
1793 .connections
1794 .get(&connection_id)
1795 .and_then(|conn| conn.send_delay)
1796 }
1797
1798 pub fn recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1801 let inner = self.inner.borrow();
1802 inner
1803 .network
1804 .connections
1805 .get(&connection_id)
1806 .and_then(|conn| conn.recv_delay)
1807 }
1808
1809 pub fn set_asymmetric_delays(
1812 &self,
1813 connection_id: ConnectionId,
1814 send_delay: Option<Duration>,
1815 recv_delay: Option<Duration>,
1816 ) {
1817 let mut inner = self.inner.borrow_mut();
1818 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1819 conn.send_delay = send_delay;
1820 conn.recv_delay = recv_delay;
1821 tracing::debug!(
1822 "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1823 connection_id.0,
1824 send_delay,
1825 recv_delay
1826 );
1827 }
1828 }
1829
1830 pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1832 let inner = self.inner.borrow();
1833 inner
1834 .network
1835 .connections
1836 .get(&connection_id)
1837 .is_some_and(|conn| conn.is_closed)
1838 }
1839
1840 pub fn close_connection(&self, connection_id: ConnectionId) {
1844 self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1845 }
1846
1847 pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1851 self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1852 }
1853
1854 pub fn close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1856 let inner = self.inner.borrow();
1857 inner
1858 .network
1859 .connections
1860 .get(&connection_id)
1861 .map(|conn| conn.close_reason)
1862 .unwrap_or(CloseReason::None)
1863 }
1864
1865 fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1867 match reason {
1868 CloseReason::Graceful => self.close_connection_graceful(connection_id),
1869 CloseReason::Aborted => self.close_connection_aborted(connection_id),
1870 CloseReason::None => {}
1871 }
1872 }
1873
1874 fn close_connection_graceful(&self, connection_id: ConnectionId) {
1880 let mut inner = self.inner.borrow_mut();
1881
1882 let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
1884 (
1885 conn.paired_connection,
1886 conn.send_closed,
1887 conn.is_closed,
1888 conn.send_in_progress,
1889 conn.send_buffer.is_empty(),
1890 conn.last_data_delivery_scheduled_at,
1891 )
1892 });
1893
1894 let Some((
1895 paired_id,
1896 was_send_closed,
1897 was_closed,
1898 send_in_progress,
1899 send_buffer_empty,
1900 last_delivery_time,
1901 )) = conn_info
1902 else {
1903 return;
1904 };
1905
1906 if was_closed || was_send_closed {
1908 tracing::debug!(
1909 "Connection {} already closed/send_closed, skipping graceful close",
1910 connection_id.0
1911 );
1912 return;
1913 }
1914
1915 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1917 conn.is_closed = true;
1918 conn.close_reason = CloseReason::Graceful;
1919 conn.send_closed = true;
1920 tracing::debug!(
1921 "Connection {} graceful close (FIN) - local write shut down",
1922 connection_id.0
1923 );
1924 }
1925
1926 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1928 waker.wake();
1929 }
1930
1931 if send_in_progress || !send_buffer_empty {
1934 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1936 conn.graceful_close_pending = true;
1937 tracing::debug!(
1938 "Connection {} graceful close deferred (send pipeline active)",
1939 connection_id.0
1940 );
1941 }
1942 } else {
1943 Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
1945 }
1946 }
1947
1948 fn close_connection_aborted(&self, connection_id: ConnectionId) {
1953 let mut inner = self.inner.borrow_mut();
1954
1955 let paired_connection_id = inner
1956 .network
1957 .connections
1958 .get(&connection_id)
1959 .and_then(|conn| conn.paired_connection);
1960
1961 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1962 && !conn.is_closed
1963 {
1964 conn.is_closed = true;
1965 conn.close_reason = CloseReason::Aborted;
1966 conn.graceful_close_pending = false;
1968 tracing::debug!(
1969 "Connection {} closed permanently (reason: Aborted)",
1970 connection_id.0
1971 );
1972 }
1973
1974 if let Some(paired_id) = paired_connection_id
1975 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1976 && !paired_conn.is_closed
1977 {
1978 paired_conn.is_closed = true;
1979 paired_conn.close_reason = CloseReason::Aborted;
1980 tracing::debug!(
1981 "Paired connection {} also closed (reason: Aborted)",
1982 paired_id.0
1983 );
1984 }
1985
1986 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1987 tracing::debug!(
1988 "Waking read waker for aborted connection {}",
1989 connection_id.0
1990 );
1991 waker.wake();
1992 }
1993
1994 if let Some(paired_id) = paired_connection_id
1995 && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1996 {
1997 tracing::debug!(
1998 "Waking read waker for paired aborted connection {}",
1999 paired_id.0
2000 );
2001 paired_waker.wake();
2002 }
2003 }
2004
2005 pub fn close_connection_asymmetric(
2007 &self,
2008 connection_id: ConnectionId,
2009 close_send: bool,
2010 close_recv: bool,
2011 ) {
2012 let mut inner = self.inner.borrow_mut();
2013
2014 let paired_id = inner
2015 .network
2016 .connections
2017 .get(&connection_id)
2018 .and_then(|conn| conn.paired_connection);
2019
2020 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2021 conn.send_closed = true;
2022 conn.send_buffer.clear();
2023 tracing::debug!(
2024 "Connection {} send side closed (asymmetric)",
2025 connection_id.0
2026 );
2027 }
2028
2029 if close_recv
2030 && let Some(paired) = paired_id
2031 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2032 {
2033 paired_conn.recv_closed = true;
2034 tracing::debug!(
2035 "Connection {} recv side closed (asymmetric via peer)",
2036 paired.0
2037 );
2038 }
2039
2040 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2041 waker.wake();
2042 }
2043 if close_recv
2044 && let Some(paired) = paired_id
2045 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2046 {
2047 waker.wake();
2048 }
2049 }
2050
2051 pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
2053 let mut inner = self.inner.borrow_mut();
2054 let config = &inner.network.config;
2055
2056 if inner
2058 .network
2059 .connections
2060 .get(&connection_id)
2061 .is_some_and(|conn| conn.is_stable)
2062 {
2063 return None;
2064 }
2065
2066 if config.chaos.random_close_probability <= 0.0 {
2067 return None;
2068 }
2069
2070 let current_time = inner.current_time;
2071 let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
2072 if time_since_last < config.chaos.random_close_cooldown {
2073 return None;
2074 }
2075
2076 if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
2077 return None;
2078 }
2079
2080 inner.network.last_random_close_time = current_time;
2081
2082 inner.emit_fault(SimFaultEvent::RandomClose {
2083 connection_id: connection_id.0,
2084 });
2085
2086 let paired_id = inner
2087 .network
2088 .connections
2089 .get(&connection_id)
2090 .and_then(|conn| conn.paired_connection);
2091
2092 let a = super::rng::sim_random_f64();
2093 let close_recv = a < 0.66;
2094 let close_send = a > 0.33;
2095
2096 tracing::info!(
2097 "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
2098 connection_id.0,
2099 close_send,
2100 close_recv,
2101 a
2102 );
2103
2104 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2105 conn.send_closed = true;
2106 conn.send_buffer.clear();
2107 }
2108
2109 if close_recv
2110 && let Some(paired) = paired_id
2111 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2112 {
2113 paired_conn.recv_closed = true;
2114 }
2115
2116 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2117 waker.wake();
2118 }
2119 if close_recv
2120 && let Some(paired) = paired_id
2121 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2122 {
2123 waker.wake();
2124 }
2125
2126 let b = super::rng::sim_random_f64();
2127 let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
2128
2129 tracing::debug!(
2130 "Random close explicit={} (b={:.3}, ratio={:.2})",
2131 explicit,
2132 b,
2133 inner.network.config.chaos.random_close_explicit_ratio
2134 );
2135
2136 Some(explicit)
2137 }
2138
2139 pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
2141 let inner = self.inner.borrow();
2142 inner
2143 .network
2144 .connections
2145 .get(&connection_id)
2146 .is_some_and(|conn| conn.send_closed || conn.is_closed)
2147 }
2148
2149 pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
2151 let inner = self.inner.borrow();
2152 inner
2153 .network
2154 .connections
2155 .get(&connection_id)
2156 .is_some_and(|conn| conn.recv_closed || conn.is_closed)
2157 }
2158
2159 pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
2164 let inner = self.inner.borrow();
2165 inner
2166 .network
2167 .connections
2168 .get(&connection_id)
2169 .is_some_and(|conn| conn.remote_fin_received)
2170 }
2171
2172 pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
2185 let mut inner = self.inner.borrow_mut();
2186 let current_time = inner.current_time;
2187 let error_at = current_time + error_delay;
2188
2189 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2190 conn.is_half_open = true;
2191 conn.half_open_error_at = Some(error_at);
2192
2193 conn.paired_connection = None;
2196
2197 inner.emit_fault(SimFaultEvent::PeerCrash {
2198 connection_id: connection_id.0,
2199 });
2200
2201 tracing::info!(
2202 "Connection {} now half-open, errors manifest at {:?}",
2203 connection_id.0,
2204 error_at
2205 );
2206 }
2207
2208 let wake_event = Event::Connection {
2210 id: connection_id.0,
2211 state: ConnectionStateChange::HalfOpenError,
2212 };
2213 let sequence = inner.next_sequence;
2214 inner.next_sequence += 1;
2215 let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
2216 inner.event_queue.schedule(scheduled_event);
2217 }
2218
2219 pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
2221 let inner = self.inner.borrow();
2222 inner
2223 .network
2224 .connections
2225 .get(&connection_id)
2226 .is_some_and(|conn| conn.is_half_open)
2227 }
2228
2229 pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
2231 let inner = self.inner.borrow();
2232 let current_time = inner.current_time;
2233 inner
2234 .network
2235 .connections
2236 .get(&connection_id)
2237 .is_some_and(|conn| {
2238 conn.is_half_open
2239 && conn
2240 .half_open_error_at
2241 .is_some_and(|error_at| current_time >= error_at)
2242 })
2243 }
2244
2245 pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
2262 let mut inner = self.inner.borrow_mut();
2263 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2264 conn.is_stable = true;
2265 tracing::debug!("Connection {} marked as stable", connection_id.0);
2266
2267 if let Some(paired_id) = conn.paired_connection
2269 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
2270 {
2271 paired_conn.is_stable = true;
2272 tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
2273 }
2274 }
2275 }
2276
2277 pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
2279 let inner = self.inner.borrow();
2280 inner
2281 .network
2282 .connections
2283 .get(&connection_id)
2284 .is_some_and(|conn| conn.is_stable)
2285 }
2286
2287 pub fn partition_pair(
2291 &self,
2292 from_ip: std::net::IpAddr,
2293 to_ip: std::net::IpAddr,
2294 duration: Duration,
2295 ) -> SimulationResult<()> {
2296 let mut inner = self.inner.borrow_mut();
2297 let expires_at = inner.current_time + duration;
2298
2299 inner
2300 .network
2301 .ip_partitions
2302 .insert((from_ip, to_ip), PartitionState { expires_at });
2303
2304 let restore_event = Event::Connection {
2305 id: 0,
2306 state: ConnectionStateChange::PartitionRestore,
2307 };
2308 let sequence = inner.next_sequence;
2309 inner.next_sequence += 1;
2310 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2311 inner.event_queue.schedule(scheduled_event);
2312
2313 inner.emit_fault(SimFaultEvent::PartitionCreated {
2314 from: from_ip.to_string(),
2315 to: to_ip.to_string(),
2316 });
2317
2318 tracing::debug!(
2319 "Partitioned {} -> {} until {:?}",
2320 from_ip,
2321 to_ip,
2322 expires_at
2323 );
2324 Ok(())
2325 }
2326
2327 pub fn partition_send_from(
2329 &self,
2330 ip: std::net::IpAddr,
2331 duration: Duration,
2332 ) -> SimulationResult<()> {
2333 let mut inner = self.inner.borrow_mut();
2334 let expires_at = inner.current_time + duration;
2335
2336 inner.network.send_partitions.insert(ip, expires_at);
2337
2338 let clear_event = Event::Connection {
2339 id: 0,
2340 state: ConnectionStateChange::SendPartitionClear,
2341 };
2342 let sequence = inner.next_sequence;
2343 inner.next_sequence += 1;
2344 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2345 inner.event_queue.schedule(scheduled_event);
2346
2347 inner.emit_fault(SimFaultEvent::SendPartitionCreated { ip: ip.to_string() });
2348 tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
2349 Ok(())
2350 }
2351
2352 pub fn partition_recv_to(
2354 &self,
2355 ip: std::net::IpAddr,
2356 duration: Duration,
2357 ) -> SimulationResult<()> {
2358 let mut inner = self.inner.borrow_mut();
2359 let expires_at = inner.current_time + duration;
2360
2361 inner.network.recv_partitions.insert(ip, expires_at);
2362
2363 let clear_event = Event::Connection {
2364 id: 0,
2365 state: ConnectionStateChange::RecvPartitionClear,
2366 };
2367 let sequence = inner.next_sequence;
2368 inner.next_sequence += 1;
2369 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2370 inner.event_queue.schedule(scheduled_event);
2371
2372 inner.emit_fault(SimFaultEvent::RecvPartitionCreated { ip: ip.to_string() });
2373 tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
2374 Ok(())
2375 }
2376
2377 pub fn restore_partition(
2379 &self,
2380 from_ip: std::net::IpAddr,
2381 to_ip: std::net::IpAddr,
2382 ) -> SimulationResult<()> {
2383 let mut inner = self.inner.borrow_mut();
2384 inner.network.ip_partitions.remove(&(from_ip, to_ip));
2385 inner.emit_fault(SimFaultEvent::PartitionHealed {
2386 from: from_ip.to_string(),
2387 to: to_ip.to_string(),
2388 });
2389 tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
2390 Ok(())
2391 }
2392
2393 pub fn is_partitioned(
2395 &self,
2396 from_ip: std::net::IpAddr,
2397 to_ip: std::net::IpAddr,
2398 ) -> SimulationResult<bool> {
2399 let inner = self.inner.borrow();
2400 Ok(inner
2401 .network
2402 .is_partitioned(from_ip, to_ip, inner.current_time))
2403 }
2404
2405 fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
2412 let partition_config = &inner.network.config;
2413
2414 if partition_config.chaos.partition_probability == 0.0 {
2415 return;
2416 }
2417
2418 if sim_random::<f64>() >= partition_config.chaos.partition_probability {
2420 return;
2421 }
2422
2423 let unique_ips: HashSet<IpAddr> = inner
2425 .network
2426 .connections
2427 .values()
2428 .filter_map(|conn| conn.local_ip)
2429 .collect();
2430
2431 if unique_ips.len() < 2 {
2432 return; }
2434
2435 let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
2436 let partition_duration =
2437 crate::network::sample_duration(&partition_config.chaos.partition_duration);
2438 let expires_at = inner.current_time + partition_duration;
2439
2440 let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
2442 PartitionStrategy::Random => {
2443 ip_list
2445 .iter()
2446 .filter(|_| sim_random::<f64>() < 0.5)
2447 .copied()
2448 .collect()
2449 }
2450 PartitionStrategy::UniformSize => {
2451 let partition_size = sim_random_range(1..ip_list.len());
2453 let mut shuffled = ip_list.clone();
2455 for i in (1..shuffled.len()).rev() {
2457 let j = sim_random_range(0..i + 1);
2458 shuffled.swap(i, j);
2459 }
2460 shuffled.into_iter().take(partition_size).collect()
2461 }
2462 PartitionStrategy::IsolateSingle => {
2463 let idx = sim_random_range(0..ip_list.len());
2465 vec![ip_list[idx]]
2466 }
2467 };
2468
2469 if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
2471 return;
2472 }
2473
2474 let non_partitioned: Vec<IpAddr> = ip_list
2476 .iter()
2477 .filter(|ip| !partitioned_ips.contains(ip))
2478 .copied()
2479 .collect();
2480
2481 for &from_ip in &partitioned_ips {
2482 for &to_ip in &non_partitioned {
2483 if inner
2485 .network
2486 .is_partitioned(from_ip, to_ip, inner.current_time)
2487 {
2488 continue;
2489 }
2490
2491 inner
2493 .network
2494 .ip_partitions
2495 .insert((from_ip, to_ip), PartitionState { expires_at });
2496 inner
2497 .network
2498 .ip_partitions
2499 .insert((to_ip, from_ip), PartitionState { expires_at });
2500
2501 inner.emit_fault(SimFaultEvent::PartitionCreated {
2502 from: from_ip.to_string(),
2503 to: to_ip.to_string(),
2504 });
2505
2506 tracing::debug!(
2507 "Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
2508 from_ip,
2509 to_ip,
2510 expires_at,
2511 partition_config.chaos.partition_strategy
2512 );
2513 }
2514 }
2515
2516 let restore_event = Event::Connection {
2518 id: 0,
2519 state: ConnectionStateChange::PartitionRestore,
2520 };
2521 let sequence = inner.next_sequence;
2522 inner.next_sequence += 1;
2523 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2524 inner.event_queue.schedule(scheduled_event);
2525 }
2526}
2527
2528impl Default for SimWorld {
2529 fn default() -> Self {
2530 Self::new()
2531 }
2532}
2533
2534#[derive(Debug)]
2539pub struct WeakSimWorld {
2540 pub(crate) inner: Weak<RefCell<SimInner>>,
2541}
2542
2543macro_rules! weak_forward {
2545 (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2547 $(#[$meta])*
2548 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2549 Ok(self.upgrade()?.$method($($arg),*))
2550 }
2551 };
2552 (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2554 $(#[$meta])*
2555 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2556 self.upgrade()?.$method($($arg),*)
2557 }
2558 };
2559 (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2561 $(#[$meta])*
2562 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2563 self.upgrade()?.$method($($arg),*);
2564 Ok(())
2565 }
2566 };
2567}
2568
2569impl WeakSimWorld {
2570 pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2572 self.inner
2573 .upgrade()
2574 .map(|inner| SimWorld { inner })
2575 .ok_or(SimulationError::SimulationShutdown)
2576 }
2577
2578 weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2579 weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2580 weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2581 weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2582 weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2583 weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2584 weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2585 weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2586 weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2587 weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2588 weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2589
2590 pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2592 where
2593 F: FnOnce(&NetworkConfiguration) -> R,
2594 {
2595 Ok(self.upgrade()?.with_network_config(f))
2596 }
2597}
2598
2599impl Clone for WeakSimWorld {
2600 fn clone(&self) -> Self {
2601 Self {
2602 inner: self.inner.clone(),
2603 }
2604 }
2605}
2606
2607#[cfg(test)]
2608mod tests {
2609 use super::*;
2610
2611 #[test]
2612 fn sim_world_basic_lifecycle() {
2613 let mut sim = SimWorld::new();
2614
2615 assert_eq!(sim.current_time(), Duration::ZERO);
2617 assert!(!sim.has_pending_events());
2618 assert_eq!(sim.pending_event_count(), 0);
2619
2620 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2622
2623 assert!(sim.has_pending_events());
2624 assert_eq!(sim.pending_event_count(), 1);
2625 assert_eq!(sim.current_time(), Duration::ZERO);
2626
2627 let has_more = sim.step();
2629 assert!(!has_more);
2630 assert_eq!(sim.current_time(), Duration::from_millis(100));
2631 assert!(!sim.has_pending_events());
2632 assert_eq!(sim.pending_event_count(), 0);
2633 }
2634
2635 #[test]
2636 fn sim_world_multiple_events() {
2637 let mut sim = SimWorld::new();
2638
2639 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2641 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2642 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2643
2644 assert_eq!(sim.pending_event_count(), 3);
2645
2646 assert!(sim.step());
2648 assert_eq!(sim.current_time(), Duration::from_millis(100));
2649 assert_eq!(sim.pending_event_count(), 2);
2650
2651 assert!(sim.step());
2652 assert_eq!(sim.current_time(), Duration::from_millis(200));
2653 assert_eq!(sim.pending_event_count(), 1);
2654
2655 assert!(!sim.step());
2656 assert_eq!(sim.current_time(), Duration::from_millis(300));
2657 assert_eq!(sim.pending_event_count(), 0);
2658 }
2659
2660 #[test]
2661 fn sim_world_run_until_empty() {
2662 let mut sim = SimWorld::new();
2663
2664 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2666 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2667 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2668
2669 sim.run_until_empty();
2671
2672 assert_eq!(sim.current_time(), Duration::from_millis(300));
2673 assert!(!sim.has_pending_events());
2674 }
2675
2676 #[test]
2677 fn sim_world_schedule_at_specific_time() {
2678 let mut sim = SimWorld::new();
2679
2680 sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2682
2683 assert_eq!(sim.current_time(), Duration::ZERO);
2684
2685 sim.step();
2686
2687 assert_eq!(sim.current_time(), Duration::from_millis(500));
2688 }
2689
2690 #[test]
2691 fn weak_sim_world_lifecycle() {
2692 let sim = SimWorld::new();
2693 let weak = sim.downgrade();
2694
2695 assert_eq!(
2697 weak.current_time().expect("should get time"),
2698 Duration::ZERO
2699 );
2700
2701 weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2703 .expect("should schedule event");
2704
2705 assert!(sim.has_pending_events());
2707
2708 drop(sim);
2710
2711 assert_eq!(
2713 weak.current_time(),
2714 Err(SimulationError::SimulationShutdown)
2715 );
2716 assert_eq!(
2717 weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2718 Err(SimulationError::SimulationShutdown)
2719 );
2720 }
2721
2722 #[test]
2723 fn deterministic_event_ordering() {
2724 let mut sim = SimWorld::new();
2725
2726 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2728 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2729 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2730
2731 assert!(sim.step());
2733 assert_eq!(sim.current_time(), Duration::from_millis(100));
2734 assert!(sim.step());
2735 assert_eq!(sim.current_time(), Duration::from_millis(100));
2736 assert!(!sim.step());
2737 assert_eq!(sim.current_time(), Duration::from_millis(100));
2738 }
2739
2740 #[test]
2741 fn emit_fault_without_state_is_noop() {
2742 let inner = SimInner::new();
2743 assert!(inner.state.is_none());
2744 inner.emit_fault(SimFaultEvent::StorageCrash {
2746 ip: "10.0.1.1".to_string(),
2747 });
2748 }
2749
2750 #[test]
2751 fn emit_fault_with_state_writes_to_timeline() {
2752 let mut inner = SimInner::new();
2753 let state = StateHandle::new();
2754 inner.state = Some(state.clone());
2755 inner.current_time = Duration::from_millis(500);
2756
2757 inner.emit_fault(SimFaultEvent::StorageCrash {
2758 ip: "10.0.1.1".to_string(),
2759 });
2760
2761 let tl = state
2762 .timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
2763 .expect("timeline should exist");
2764 assert_eq!(tl.len(), 1);
2765 let entry = tl.last().expect("should have entry");
2766 assert_eq!(entry.time_ms, 500);
2767 assert_eq!(entry.source, "sim");
2768 assert!(matches!(entry.event, SimFaultEvent::StorageCrash { .. }));
2769 }
2770
2771 #[test]
2772 fn partition_pair_emits_fault_event() {
2773 let sim = SimWorld::new();
2774 let state = StateHandle::new();
2775 sim.set_state(state.clone());
2776
2777 let from: std::net::IpAddr = "10.0.1.1".parse().expect("valid ip");
2778 let to: std::net::IpAddr = "10.0.1.2".parse().expect("valid ip");
2779 sim.partition_pair(from, to, Duration::from_secs(10))
2780 .expect("partition should succeed");
2781
2782 let tl = state
2783 .timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
2784 .expect("timeline should exist");
2785 assert_eq!(tl.len(), 1);
2786 assert!(matches!(
2787 &tl.all()[0].event,
2788 SimFaultEvent::PartitionCreated { from, to }
2789 if from == "10.0.1.1" && to == "10.0.1.2"
2790 ));
2791 }
2792
2793 #[test]
2794 fn restore_partition_emits_fault_event() {
2795 let sim = SimWorld::new();
2796 let state = StateHandle::new();
2797 sim.set_state(state.clone());
2798
2799 let from: std::net::IpAddr = "10.0.1.1".parse().expect("valid ip");
2800 let to: std::net::IpAddr = "10.0.1.2".parse().expect("valid ip");
2801 sim.partition_pair(from, to, Duration::from_secs(10))
2802 .expect("partition");
2803 sim.restore_partition(from, to).expect("restore");
2804
2805 let tl = state
2806 .timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
2807 .expect("timeline should exist");
2808 assert_eq!(tl.len(), 2);
2809 assert!(matches!(
2810 &tl.all()[0].event,
2811 SimFaultEvent::PartitionCreated { .. }
2812 ));
2813 assert!(matches!(
2814 &tl.all()[1].event,
2815 SimFaultEvent::PartitionHealed { .. }
2816 ));
2817 }
2818}