1use std::{
7 cell::RefCell,
8 collections::{HashMap, HashSet, VecDeque},
9 net::IpAddr,
10 rc::{Rc, Weak},
11 task::Waker,
12 time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17 SimulationError, SimulationResult,
18 network::{
19 NetworkConfiguration,
20 sim::{ConnectionId, ListenerId, SimNetworkProvider},
21 },
22};
23
24use super::{
25 events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
26 rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
27 sleep::SleepFuture,
28 state::{ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState},
29 wakers::WakerRegistry,
30};
31
32#[derive(Debug)]
34pub(crate) struct SimInner {
35 pub(crate) current_time: Duration,
36 pub(crate) timer_time: Duration,
39 pub(crate) event_queue: EventQueue,
40 pub(crate) next_sequence: u64,
41
42 pub(crate) network: NetworkState,
44
45 pub(crate) wakers: WakerRegistry,
47
48 pub(crate) next_task_id: u64,
50 pub(crate) awakened_tasks: HashSet<u64>,
51
52 pub(crate) events_processed: u64,
54
55 pub(crate) last_bit_flip_time: Duration,
57}
58
59impl SimInner {
60 pub(crate) fn new() -> Self {
61 Self {
62 current_time: Duration::ZERO,
63 timer_time: Duration::ZERO,
64 event_queue: EventQueue::new(),
65 next_sequence: 0,
66 network: NetworkState::new(NetworkConfiguration::default()),
67 wakers: WakerRegistry::default(),
68 next_task_id: 0,
69 awakened_tasks: HashSet::new(),
70 events_processed: 0,
71 last_bit_flip_time: Duration::ZERO,
72 }
73 }
74
75 pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
76 Self {
77 current_time: Duration::ZERO,
78 timer_time: Duration::ZERO,
79 event_queue: EventQueue::new(),
80 next_sequence: 0,
81 network: NetworkState::new(network_config),
82 wakers: WakerRegistry::default(),
83 next_task_id: 0,
84 awakened_tasks: HashSet::new(),
85 events_processed: 0,
86 last_bit_flip_time: Duration::ZERO,
87 }
88 }
89
90 pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
100 if random_value == 0 {
101 return max_bits.min(32);
103 }
104
105 let bit_count = 1 + random_value.leading_zeros();
107
108 bit_count.clamp(min_bits, max_bits)
110 }
111}
112
113#[derive(Debug)]
119pub struct SimWorld {
120 pub(crate) inner: Rc<RefCell<SimInner>>,
121}
122
123impl SimWorld {
124 fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
126 reset_sim_rng();
127 set_sim_seed(seed);
128 crate::chaos::assertions::reset_assertion_results();
129
130 let inner = match network_config {
131 Some(config) => SimInner::new_with_config(config),
132 None => SimInner::new(),
133 };
134
135 Self {
136 inner: Rc::new(RefCell::new(inner)),
137 }
138 }
139
140 pub fn new() -> Self {
145 Self::create(None, 0)
146 }
147
148 pub fn new_with_seed(seed: u64) -> Self {
157 Self::create(None, seed)
158 }
159
160 pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
162 Self::create(Some(network_config), 0)
163 }
164
165 pub fn new_with_network_config_and_seed(
172 network_config: NetworkConfiguration,
173 seed: u64,
174 ) -> Self {
175 Self::create(Some(network_config), seed)
176 }
177
178 #[instrument(skip(self))]
183 pub fn step(&mut self) -> bool {
184 let mut inner = self.inner.borrow_mut();
185
186 if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
187 inner.current_time = scheduled_event.time();
189
190 Self::clear_expired_clogs_with_inner(&mut inner);
192
193 Self::randomly_trigger_partitions_with_inner(&mut inner);
195
196 Self::process_event_with_inner(&mut inner, scheduled_event.into_event());
198
199 !inner.event_queue.is_empty()
201 } else {
202 false
204 }
205 }
206
207 #[instrument(skip(self))]
213 pub fn run_until_empty(&mut self) {
214 while self.step() {
215 if self.inner.borrow().events_processed.is_multiple_of(50) {
217 let has_workload_events = !self
218 .inner
219 .borrow()
220 .event_queue
221 .has_only_infrastructure_events();
222 if !has_workload_events {
223 tracing::debug!(
224 "Early termination: only infrastructure events remain in queue"
225 );
226 break;
227 }
228 }
229 }
230 }
231
232 pub fn current_time(&self) -> Duration {
234 self.inner.borrow().current_time
235 }
236
237 pub fn now(&self) -> Duration {
242 self.inner.borrow().current_time
243 }
244
245 pub fn timer(&self) -> Duration {
260 let mut inner = self.inner.borrow_mut();
261 let chaos = &inner.network.config.chaos;
262
263 if !chaos.clock_drift_enabled {
265 return inner.current_time;
266 }
267
268 let max_timer = inner.current_time + chaos.clock_drift_max;
272
273 if inner.timer_time < max_timer {
275 let random_factor = sim_random::<f64>(); let gap = (max_timer - inner.timer_time).as_secs_f64();
277 let delta = random_factor * gap / 2.0;
278 inner.timer_time += Duration::from_secs_f64(delta);
279 }
280
281 inner.timer_time = inner.timer_time.max(inner.current_time);
283
284 inner.timer_time
285 }
286
287 #[instrument(skip(self))]
289 pub fn schedule_event(&self, event: Event, delay: Duration) {
290 let mut inner = self.inner.borrow_mut();
291 let scheduled_time = inner.current_time + delay;
292 let sequence = inner.next_sequence;
293 inner.next_sequence += 1;
294
295 let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
296 inner.event_queue.schedule(scheduled_event);
297 }
298
299 pub fn schedule_event_at(&self, event: Event, time: Duration) {
301 let mut inner = self.inner.borrow_mut();
302 let sequence = inner.next_sequence;
303 inner.next_sequence += 1;
304
305 let scheduled_event = ScheduledEvent::new(time, event, sequence);
306 inner.event_queue.schedule(scheduled_event);
307 }
308
309 pub fn downgrade(&self) -> WeakSimWorld {
314 WeakSimWorld {
315 inner: Rc::downgrade(&self.inner),
316 }
317 }
318
319 pub fn has_pending_events(&self) -> bool {
321 !self.inner.borrow().event_queue.is_empty()
322 }
323
324 pub fn pending_event_count(&self) -> usize {
326 self.inner.borrow().event_queue.len()
327 }
328
329 pub fn network_provider(&self) -> SimNetworkProvider {
331 SimNetworkProvider::new(self.downgrade())
332 }
333
334 pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
336 crate::providers::SimTimeProvider::new(self.downgrade())
337 }
338
339 pub fn task_provider(&self) -> crate::TokioTaskProvider {
341 crate::TokioTaskProvider
342 }
343
344 pub fn with_network_config<F, R>(&self, f: F) -> R
352 where
353 F: FnOnce(&NetworkConfiguration) -> R,
354 {
355 let inner = self.inner.borrow();
356 f(&inner.network.config)
357 }
358
359 pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
361 let mut inner = self.inner.borrow_mut();
362 let listener_id = ListenerId(inner.network.next_listener_id);
363 inner.network.next_listener_id += 1;
364
365 inner.network.listeners.insert(
366 listener_id,
367 ListenerState {
368 id: listener_id,
369 addr,
370 pending_connections: VecDeque::new(),
371 },
372 );
373
374 Ok(listener_id)
375 }
376
377 pub(crate) fn read_from_connection(
379 &self,
380 connection_id: ConnectionId,
381 buf: &mut [u8],
382 ) -> SimulationResult<usize> {
383 let mut inner = self.inner.borrow_mut();
384
385 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
386 let mut bytes_read = 0;
387 while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
388 if let Some(byte) = connection.receive_buffer.pop_front() {
389 buf[bytes_read] = byte;
390 bytes_read += 1;
391 }
392 }
393 Ok(bytes_read)
394 } else {
395 Err(SimulationError::InvalidState(
396 "connection not found".to_string(),
397 ))
398 }
399 }
400
401 pub(crate) fn write_to_connection(
403 &self,
404 connection_id: ConnectionId,
405 data: &[u8],
406 ) -> SimulationResult<()> {
407 let mut inner = self.inner.borrow_mut();
408
409 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
410 for &byte in data {
411 connection.receive_buffer.push_back(byte);
412 }
413 Ok(())
414 } else {
415 Err(SimulationError::InvalidState(
416 "connection not found".to_string(),
417 ))
418 }
419 }
420
421 pub(crate) fn buffer_send(
426 &self,
427 connection_id: ConnectionId,
428 data: Vec<u8>,
429 ) -> SimulationResult<()> {
430 tracing::debug!(
431 "buffer_send called for connection_id={} with {} bytes",
432 connection_id.0,
433 data.len()
434 );
435 let mut inner = self.inner.borrow_mut();
436
437 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
438 conn.send_buffer.push_back(data);
440 tracing::debug!(
441 "buffer_send: added data to send_buffer, new length: {}",
442 conn.send_buffer.len()
443 );
444
445 if !conn.send_in_progress {
447 tracing::debug!(
448 "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
449 );
450 conn.send_in_progress = true;
451
452 let scheduled_time = inner.current_time + std::time::Duration::ZERO;
454 let sequence = inner.next_sequence;
455 inner.next_sequence += 1;
456 let scheduled_event = ScheduledEvent::new(
457 scheduled_time,
458 Event::Network {
459 connection_id: connection_id.0,
460 operation: NetworkOperation::ProcessSendBuffer,
461 },
462 sequence,
463 );
464 inner.event_queue.schedule(scheduled_event);
465 tracing::debug!(
466 "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
467 sequence
468 );
469 } else {
470 tracing::debug!(
471 "buffer_send: sender already in progress, not scheduling new event"
472 );
473 }
474
475 Ok(())
476 } else {
477 tracing::debug!(
478 "buffer_send: connection_id={} not found in connections table",
479 connection_id.0
480 );
481 Err(SimulationError::InvalidState(
482 "connection not found".to_string(),
483 ))
484 }
485 }
486
487 pub(crate) fn create_connection_pair(
495 &self,
496 client_addr: String,
497 server_addr: String,
498 ) -> SimulationResult<(ConnectionId, ConnectionId)> {
499 let mut inner = self.inner.borrow_mut();
500
501 let client_id = ConnectionId(inner.network.next_connection_id);
502 inner.network.next_connection_id += 1;
503
504 let server_id = ConnectionId(inner.network.next_connection_id);
505 inner.network.next_connection_id += 1;
506
507 let current_time = inner.current_time;
509
510 let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
512 let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
513
514 let ephemeral_peer_addr = match client_ip {
518 Some(std::net::IpAddr::V4(ipv4)) => {
519 let octets = ipv4.octets();
520 let ip_offset = sim_random_range(0u32..256) as u8;
521 let new_last_octet = octets[3].wrapping_add(ip_offset);
522 let ephemeral_ip =
523 std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
524 let ephemeral_port = sim_random_range(40000u16..60000);
525 format!("{}:{}", ephemeral_ip, ephemeral_port)
526 }
527 Some(std::net::IpAddr::V6(ipv6)) => {
528 let segments = ipv6.segments();
530 let mut new_segments = segments;
531 let ip_offset = sim_random_range(0u16..256);
532 new_segments[7] = new_segments[7].wrapping_add(ip_offset);
533 let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
534 let ephemeral_port = sim_random_range(40000u16..60000);
535 format!("[{}]:{}", ephemeral_ip, ephemeral_port)
536 }
537 None => {
538 let ephemeral_port = sim_random_range(40000u16..60000);
540 format!("unknown:{}", ephemeral_port)
541 }
542 };
543
544 const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; inner.network.connections.insert(
552 client_id,
553 ConnectionState {
554 id: client_id,
555 addr: client_addr,
556 local_ip: client_ip,
557 remote_ip: server_ip,
558 peer_address: server_addr.clone(),
559 receive_buffer: VecDeque::new(),
560 paired_connection: Some(server_id),
561 send_buffer: VecDeque::new(),
562 send_in_progress: false,
563 next_send_time: current_time,
564 is_closed: false,
565 send_closed: false,
566 recv_closed: false,
567 is_cut: false,
568 cut_expiry: None,
569 close_reason: CloseReason::None,
570 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
571 send_delay: None,
572 recv_delay: None,
573 is_half_open: false,
574 half_open_error_at: None,
575 },
576 );
577
578 inner.network.connections.insert(
580 server_id,
581 ConnectionState {
582 id: server_id,
583 addr: server_addr,
584 local_ip: server_ip,
585 remote_ip: client_ip,
586 peer_address: ephemeral_peer_addr,
587 receive_buffer: VecDeque::new(),
588 paired_connection: Some(client_id),
589 send_buffer: VecDeque::new(),
590 send_in_progress: false,
591 next_send_time: current_time,
592 is_closed: false,
593 send_closed: false,
594 recv_closed: false,
595 is_cut: false,
596 cut_expiry: None,
597 close_reason: CloseReason::None,
598 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
599 send_delay: None,
600 recv_delay: None,
601 is_half_open: false,
602 half_open_error_at: None,
603 },
604 );
605
606 Ok((client_id, server_id))
607 }
608
609 pub(crate) fn register_read_waker(
611 &self,
612 connection_id: ConnectionId,
613 waker: Waker,
614 ) -> SimulationResult<()> {
615 let mut inner = self.inner.borrow_mut();
616 let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
617 inner.wakers.read_wakers.insert(connection_id, waker);
618 tracing::debug!(
619 "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
620 connection_id.0,
621 is_replacement,
622 inner.wakers.read_wakers.len()
623 );
624 Ok(())
625 }
626
627 pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
629 let mut inner = self.inner.borrow_mut();
630 use std::collections::hash_map::DefaultHasher;
632 use std::hash::{Hash, Hasher};
633 let mut hasher = DefaultHasher::new();
634 addr.hash(&mut hasher);
635 let listener_key = ListenerId(hasher.finish());
636
637 inner.wakers.listener_wakers.insert(listener_key, waker);
638 Ok(())
639 }
640
641 pub(crate) fn store_pending_connection(
643 &self,
644 addr: &str,
645 connection_id: ConnectionId,
646 ) -> SimulationResult<()> {
647 let mut inner = self.inner.borrow_mut();
648 inner
649 .network
650 .pending_connections
651 .insert(addr.to_string(), connection_id);
652
653 use std::collections::hash_map::DefaultHasher;
655 use std::hash::{Hash, Hasher};
656 let mut hasher = DefaultHasher::new();
657 addr.hash(&mut hasher);
658 let listener_key = ListenerId(hasher.finish());
659
660 if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
661 waker.wake();
662 }
663
664 Ok(())
665 }
666
667 pub(crate) fn get_pending_connection(
669 &self,
670 addr: &str,
671 ) -> SimulationResult<Option<ConnectionId>> {
672 let mut inner = self.inner.borrow_mut();
673 Ok(inner.network.pending_connections.remove(addr))
674 }
675
676 pub(crate) fn get_connection_peer_address(
685 &self,
686 connection_id: ConnectionId,
687 ) -> Option<String> {
688 let inner = self.inner.borrow();
689 inner
690 .network
691 .connections
692 .get(&connection_id)
693 .map(|conn| conn.peer_address.clone())
694 }
695
696 #[instrument(skip(self))]
701 pub fn sleep(&self, duration: Duration) -> SleepFuture {
702 let task_id = self.generate_task_id();
703
704 let actual_duration = self.apply_buggified_delay(duration);
706
707 self.schedule_event(Event::Timer { task_id }, actual_duration);
709
710 SleepFuture::new(self.downgrade(), task_id)
712 }
713
714 fn apply_buggified_delay(&self, duration: Duration) -> Duration {
716 let inner = self.inner.borrow();
717 let chaos = &inner.network.config.chaos;
718
719 if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
720 return duration;
721 }
722
723 if sim_random::<f64>() < chaos.buggified_delay_probability {
725 let random_factor = sim_random::<f64>().powf(1000.0);
727 let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
728 tracing::trace!(
729 extra_delay_ms = extra_delay.as_millis(),
730 "Buggified delay applied"
731 );
732 duration + extra_delay
733 } else {
734 duration
735 }
736 }
737
738 fn generate_task_id(&self) -> u64 {
740 let mut inner = self.inner.borrow_mut();
741 let task_id = inner.next_task_id;
742 inner.next_task_id += 1;
743 task_id
744 }
745
746 fn wake_all(wakers: &mut HashMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
748 if let Some(waker_list) = wakers.remove(&connection_id) {
749 for waker in waker_list {
750 waker.wake();
751 }
752 }
753 }
754
755 pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
757 let inner = self.inner.borrow();
758 Ok(inner.awakened_tasks.contains(&task_id))
759 }
760
761 pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
763 let mut inner = self.inner.borrow_mut();
764 inner.wakers.task_wakers.insert(task_id, waker);
765 Ok(())
766 }
767
768 fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
770 let now = inner.current_time;
771 let expired: Vec<ConnectionId> = inner
772 .network
773 .connection_clogs
774 .iter()
775 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
776 .collect();
777
778 for id in expired {
779 inner.network.connection_clogs.remove(&id);
780 Self::wake_all(&mut inner.wakers.clog_wakers, id);
781 }
782 }
783
784 #[instrument(skip(inner))]
786 fn process_event_with_inner(inner: &mut SimInner, event: Event) {
787 inner.events_processed += 1;
788
789 match event {
790 Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
791 Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
792 Event::Network {
793 connection_id,
794 operation,
795 } => Self::handle_network_event(inner, connection_id, operation),
796 Event::Shutdown => Self::handle_shutdown_event(inner),
797 }
798 }
799
800 fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
802 inner.awakened_tasks.insert(task_id);
803 if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
804 waker.wake();
805 }
806 }
807
808 fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
810 let connection_id = ConnectionId(id);
811
812 match state {
813 ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
814 }
816 ConnectionStateChange::ClogClear => {
817 inner.network.connection_clogs.remove(&connection_id);
818 Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
819 }
820 ConnectionStateChange::ReadClogClear => {
821 inner.network.read_clogs.remove(&connection_id);
822 Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
823 }
824 ConnectionStateChange::PartitionRestore => {
825 Self::clear_expired_partitions(inner);
826 }
827 ConnectionStateChange::SendPartitionClear => {
828 Self::clear_expired_send_partitions(inner);
829 }
830 ConnectionStateChange::RecvPartitionClear => {
831 Self::clear_expired_recv_partitions(inner);
832 }
833 ConnectionStateChange::CutRestore => {
834 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
835 && conn.is_cut
836 {
837 conn.is_cut = false;
838 conn.cut_expiry = None;
839 tracing::debug!("Connection {} restored via scheduled event", id);
840 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
841 }
842 }
843 ConnectionStateChange::HalfOpenError => {
844 tracing::debug!("Connection {} half-open error time reached", id);
845 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
846 waker.wake();
847 }
848 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
849 }
850 }
851 }
852
853 fn clear_expired_partitions(inner: &mut SimInner) {
855 let now = inner.current_time;
856 let expired: Vec<_> = inner
857 .network
858 .ip_partitions
859 .iter()
860 .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
861 .collect();
862
863 for pair in expired {
864 inner.network.ip_partitions.remove(&pair);
865 tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
866 }
867 }
868
869 fn clear_expired_send_partitions(inner: &mut SimInner) {
871 let now = inner.current_time;
872 let expired: Vec<_> = inner
873 .network
874 .send_partitions
875 .iter()
876 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
877 .collect();
878
879 for ip in expired {
880 inner.network.send_partitions.remove(&ip);
881 tracing::debug!("Cleared send partition for {}", ip);
882 }
883 }
884
885 fn clear_expired_recv_partitions(inner: &mut SimInner) {
887 let now = inner.current_time;
888 let expired: Vec<_> = inner
889 .network
890 .recv_partitions
891 .iter()
892 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
893 .collect();
894
895 for ip in expired {
896 inner.network.recv_partitions.remove(&ip);
897 tracing::debug!("Cleared receive partition for {}", ip);
898 }
899 }
900
901 fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
903 let connection_id = ConnectionId(conn_id);
904
905 match operation {
906 NetworkOperation::DataDelivery { data } => {
907 Self::handle_data_delivery(inner, connection_id, data);
908 }
909 NetworkOperation::ProcessSendBuffer => {
910 Self::handle_process_send_buffer(inner, connection_id);
911 }
912 }
913 }
914
915 fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
917 tracing::trace!(
918 "DataDelivery: {} bytes to connection {}",
919 data.len(),
920 connection_id.0
921 );
922
923 let packet_loss_prob = inner.network.config.chaos.packet_loss_probability;
925 if packet_loss_prob > 0.0 && crate::buggify_with_prob!(packet_loss_prob) {
926 tracing::info!(
927 "PacketLoss: connection={} bytes={} dropped",
928 connection_id.0,
929 data.len()
930 );
931 return;
932 }
933
934 if !inner.network.connections.contains_key(&connection_id) {
936 tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
937 return;
938 }
939
940 let data_to_deliver = Self::maybe_corrupt_data(inner, &data);
942
943 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
945 return;
946 };
947
948 for &byte in &data_to_deliver {
949 conn.receive_buffer.push_back(byte);
950 }
951
952 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
953 waker.wake();
954 }
955 }
956
957 fn maybe_corrupt_data(inner: &mut SimInner, data: &[u8]) -> Vec<u8> {
959 if data.is_empty() {
960 return data.to_vec();
961 }
962
963 let chaos = &inner.network.config.chaos;
964 let now = inner.current_time;
965 let cooldown_elapsed =
966 now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
967
968 if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
969 return data.to_vec();
970 }
971
972 let random_value = sim_random::<u32>();
973 let flip_count = SimInner::calculate_flip_bit_count(
974 random_value,
975 chaos.bit_flip_min_bits,
976 chaos.bit_flip_max_bits,
977 );
978
979 let mut corrupted_data = data.to_vec();
980 let mut flipped_positions = std::collections::HashSet::new();
981
982 for _ in 0..flip_count {
983 let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
984 let bit_idx = (sim_random::<u64>() as usize) % 8;
985 let position = (byte_idx, bit_idx);
986
987 if !flipped_positions.contains(&position) {
988 flipped_positions.insert(position);
989 corrupted_data[byte_idx] ^= 1 << bit_idx;
990 }
991 }
992
993 inner.last_bit_flip_time = now;
994 tracing::info!(
995 "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
996 data.len(),
997 flip_count,
998 flipped_positions.len()
999 );
1000
1001 corrupted_data
1002 }
1003
1004 fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1006 let is_partitioned = inner
1007 .network
1008 .is_connection_partitioned(connection_id, inner.current_time);
1009
1010 if is_partitioned {
1011 Self::handle_partitioned_send(inner, connection_id);
1012 } else {
1013 Self::handle_normal_send(inner, connection_id);
1014 }
1015 }
1016
1017 fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1019 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1020 return;
1021 };
1022
1023 if let Some(data) = conn.send_buffer.pop_front() {
1024 tracing::debug!(
1025 "Connection {} partitioned, failing send of {} bytes",
1026 connection_id.0,
1027 data.len()
1028 );
1029 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1030
1031 if !conn.send_buffer.is_empty() {
1032 Self::schedule_process_send_buffer(inner, connection_id);
1033 } else {
1034 conn.send_in_progress = false;
1035 }
1036 } else {
1037 conn.send_in_progress = false;
1038 }
1039 }
1040
1041 fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1043 let Some(conn) = inner.network.connections.get(&connection_id) else {
1045 return;
1046 };
1047
1048 let paired_id = conn.paired_connection;
1049 let send_delay = conn.send_delay;
1050 let next_send_time = conn.next_send_time;
1051 let has_data = !conn.send_buffer.is_empty();
1052
1053 let recv_delay = paired_id.and_then(|pid| {
1054 inner
1055 .network
1056 .connections
1057 .get(&pid)
1058 .and_then(|c| c.recv_delay)
1059 });
1060
1061 if !has_data {
1062 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1063 conn.send_in_progress = false;
1064 }
1065 return;
1066 }
1067
1068 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1069 return;
1070 };
1071
1072 let Some(mut data) = conn.send_buffer.pop_front() else {
1073 conn.send_in_progress = false;
1074 return;
1075 };
1076
1077 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1078
1079 if crate::buggify!() && !data.is_empty() {
1081 let max_send = std::cmp::min(
1082 data.len(),
1083 inner.network.config.chaos.partial_write_max_bytes,
1084 );
1085 let truncate_to = sim_random_range(0..max_send + 1);
1086
1087 if truncate_to < data.len() {
1088 let remainder = data.split_off(truncate_to);
1089 conn.send_buffer.push_front(remainder);
1090 tracing::debug!(
1091 "BUGGIFY: Partial write on connection {} - sending {} bytes",
1092 connection_id.0,
1093 data.len()
1094 );
1095 }
1096 }
1097
1098 let has_more = !conn.send_buffer.is_empty();
1099 let base_delay = if has_more {
1100 Duration::from_nanos(1)
1101 } else {
1102 send_delay.unwrap_or_else(|| {
1103 crate::network::sample_duration(&inner.network.config.write_latency)
1104 })
1105 };
1106
1107 let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1108 conn.next_send_time = earliest_time + Duration::from_nanos(1);
1109
1110 if let Some(paired_id) = paired_id {
1112 let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1113 let sequence = inner.next_sequence;
1114 inner.next_sequence += 1;
1115
1116 inner.event_queue.schedule(ScheduledEvent::new(
1117 scheduled_time,
1118 Event::Network {
1119 connection_id: paired_id.0,
1120 operation: NetworkOperation::DataDelivery { data },
1121 },
1122 sequence,
1123 ));
1124 }
1125
1126 if !conn.send_buffer.is_empty() {
1128 Self::schedule_process_send_buffer(inner, connection_id);
1129 } else {
1130 conn.send_in_progress = false;
1131 }
1132 }
1133
1134 fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1136 let sequence = inner.next_sequence;
1137 inner.next_sequence += 1;
1138
1139 inner.event_queue.schedule(ScheduledEvent::new(
1140 inner.current_time,
1141 Event::Network {
1142 connection_id: connection_id.0,
1143 operation: NetworkOperation::ProcessSendBuffer,
1144 },
1145 sequence,
1146 ));
1147 }
1148
1149 fn handle_shutdown_event(inner: &mut SimInner) {
1151 tracing::debug!("Processing Shutdown event - waking all pending tasks");
1152
1153 for (task_id, waker) in inner.wakers.task_wakers.drain() {
1154 tracing::trace!("Waking task {}", task_id);
1155 waker.wake();
1156 }
1157
1158 for (_conn_id, waker) in inner.wakers.read_wakers.drain() {
1159 waker.wake();
1160 }
1161
1162 tracing::debug!("Shutdown event processed");
1163 }
1164
1165 pub fn assertion_results(
1167 &self,
1168 ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1169 crate::chaos::get_assertion_results()
1170 }
1171
1172 pub fn reset_assertion_results(&self) {
1174 crate::chaos::reset_assertion_results();
1175 }
1176
1177 pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1179 let inner = self.inner.borrow();
1180
1181 crate::runner::SimulationMetrics {
1182 wall_time: std::time::Duration::ZERO,
1183 simulated_time: inner.current_time,
1184 events_processed: inner.events_processed,
1185 }
1186 }
1187
1188 pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1192 let inner = self.inner.borrow();
1193 let config = &inner.network.config;
1194
1195 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1197 return inner.current_time < clog_state.expires_at;
1198 }
1199
1200 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1202 }
1203
1204 pub fn clog_write(&self, connection_id: ConnectionId) {
1206 let mut inner = self.inner.borrow_mut();
1207 let config = &inner.network.config;
1208
1209 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1210 let expires_at = inner.current_time + clog_duration;
1211 inner
1212 .network
1213 .connection_clogs
1214 .insert(connection_id, ClogState { expires_at });
1215
1216 let clear_event = Event::Connection {
1218 id: connection_id.0,
1219 state: ConnectionStateChange::ClogClear,
1220 };
1221 let sequence = inner.next_sequence;
1222 inner.next_sequence += 1;
1223 inner
1224 .event_queue
1225 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1226 }
1227
1228 pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1230 let inner = self.inner.borrow();
1231
1232 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1233 inner.current_time < clog_state.expires_at
1234 } else {
1235 false
1236 }
1237 }
1238
1239 pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1241 let mut inner = self.inner.borrow_mut();
1242 inner
1243 .wakers
1244 .clog_wakers
1245 .entry(connection_id)
1246 .or_default()
1247 .push(waker);
1248 }
1249
1250 pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1254 let inner = self.inner.borrow();
1255 let config = &inner.network.config;
1256
1257 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1259 return inner.current_time < clog_state.expires_at;
1260 }
1261
1262 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1264 }
1265
1266 pub fn clog_read(&self, connection_id: ConnectionId) {
1268 let mut inner = self.inner.borrow_mut();
1269 let config = &inner.network.config;
1270
1271 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1272 let expires_at = inner.current_time + clog_duration;
1273 inner
1274 .network
1275 .read_clogs
1276 .insert(connection_id, ClogState { expires_at });
1277
1278 let clear_event = Event::Connection {
1280 id: connection_id.0,
1281 state: ConnectionStateChange::ReadClogClear,
1282 };
1283 let sequence = inner.next_sequence;
1284 inner.next_sequence += 1;
1285 inner
1286 .event_queue
1287 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1288 }
1289
1290 pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1292 let inner = self.inner.borrow();
1293
1294 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1295 inner.current_time < clog_state.expires_at
1296 } else {
1297 false
1298 }
1299 }
1300
1301 pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1303 let mut inner = self.inner.borrow_mut();
1304 inner
1305 .wakers
1306 .read_clog_wakers
1307 .entry(connection_id)
1308 .or_default()
1309 .push(waker);
1310 }
1311
1312 pub fn clear_expired_clogs(&self) {
1314 let mut inner = self.inner.borrow_mut();
1315 let now = inner.current_time;
1316 let expired: Vec<ConnectionId> = inner
1317 .network
1318 .connection_clogs
1319 .iter()
1320 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1321 .collect();
1322
1323 for id in expired {
1324 inner.network.connection_clogs.remove(&id);
1325 Self::wake_all(&mut inner.wakers.clog_wakers, id);
1326 }
1327 }
1328
1329 pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1342 let mut inner = self.inner.borrow_mut();
1343 let expires_at = inner.current_time + duration;
1344
1345 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1346 conn.is_cut = true;
1347 conn.cut_expiry = Some(expires_at);
1348 tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1349
1350 let restore_event = Event::Connection {
1352 id: connection_id.0,
1353 state: ConnectionStateChange::CutRestore,
1354 };
1355 let sequence = inner.next_sequence;
1356 inner.next_sequence += 1;
1357 inner
1358 .event_queue
1359 .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1360 }
1361 }
1362
1363 pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1368 let inner = self.inner.borrow();
1369 inner
1370 .network
1371 .connections
1372 .get(&connection_id)
1373 .is_some_and(|conn| {
1374 conn.is_cut
1375 && conn
1376 .cut_expiry
1377 .is_some_and(|expiry| inner.current_time < expiry)
1378 })
1379 }
1380
1381 pub fn restore_connection(&self, connection_id: ConnectionId) {
1385 let mut inner = self.inner.borrow_mut();
1386
1387 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1388 && conn.is_cut
1389 {
1390 conn.is_cut = false;
1391 conn.cut_expiry = None;
1392 tracing::debug!("Connection {} restored", connection_id.0);
1393
1394 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1396 }
1397 }
1398
1399 pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1401 let mut inner = self.inner.borrow_mut();
1402 inner
1403 .wakers
1404 .cut_wakers
1405 .entry(connection_id)
1406 .or_default()
1407 .push(waker);
1408 }
1409
1410 pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1414 let inner = self.inner.borrow();
1415 inner
1416 .network
1417 .connections
1418 .get(&connection_id)
1419 .map(|conn| conn.send_buffer_capacity)
1420 .unwrap_or(0)
1421 }
1422
1423 pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1425 let inner = self.inner.borrow();
1426 inner
1427 .network
1428 .connections
1429 .get(&connection_id)
1430 .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1431 .unwrap_or(0)
1432 }
1433
1434 pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1436 let capacity = self.send_buffer_capacity(connection_id);
1437 let used = self.send_buffer_used(connection_id);
1438 capacity.saturating_sub(used)
1439 }
1440
1441 pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1443 let mut inner = self.inner.borrow_mut();
1444 inner
1445 .wakers
1446 .send_buffer_wakers
1447 .entry(connection_id)
1448 .or_default()
1449 .push(waker);
1450 }
1451
1452 #[allow(dead_code)] fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1455 let mut inner = self.inner.borrow_mut();
1456 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1457 }
1458
1459 pub fn get_pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1464 let inner = self.inner.borrow();
1465 inner.network.pair_latencies.get(&(src, dst)).copied()
1466 }
1467
1468 pub fn set_pair_latency_if_not_set(
1471 &self,
1472 src: IpAddr,
1473 dst: IpAddr,
1474 latency: Duration,
1475 ) -> Duration {
1476 let mut inner = self.inner.borrow_mut();
1477 *inner
1478 .network
1479 .pair_latencies
1480 .entry((src, dst))
1481 .or_insert_with(|| {
1482 tracing::debug!(
1483 "Setting base latency for IP pair {} -> {} to {:?}",
1484 src,
1485 dst,
1486 latency
1487 );
1488 latency
1489 })
1490 }
1491
1492 pub fn get_connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1495 let inner = self.inner.borrow();
1496 let (local_ip, remote_ip) = inner
1497 .network
1498 .connections
1499 .get(&connection_id)
1500 .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1501 .unwrap_or({
1502 (
1503 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1504 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1505 )
1506 });
1507 drop(inner);
1508
1509 if let Some(latency) = self.get_pair_latency(local_ip, remote_ip) {
1511 return latency;
1512 }
1513
1514 let latency = self
1516 .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1517 self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1518 }
1519
1520 pub fn get_send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1525 let inner = self.inner.borrow();
1526 inner
1527 .network
1528 .connections
1529 .get(&connection_id)
1530 .and_then(|conn| conn.send_delay)
1531 }
1532
1533 pub fn get_recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1536 let inner = self.inner.borrow();
1537 inner
1538 .network
1539 .connections
1540 .get(&connection_id)
1541 .and_then(|conn| conn.recv_delay)
1542 }
1543
1544 pub fn set_asymmetric_delays(
1547 &self,
1548 connection_id: ConnectionId,
1549 send_delay: Option<Duration>,
1550 recv_delay: Option<Duration>,
1551 ) {
1552 let mut inner = self.inner.borrow_mut();
1553 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1554 conn.send_delay = send_delay;
1555 conn.recv_delay = recv_delay;
1556 tracing::debug!(
1557 "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1558 connection_id.0,
1559 send_delay,
1560 recv_delay
1561 );
1562 }
1563 }
1564
1565 pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1567 let inner = self.inner.borrow();
1568 inner
1569 .network
1570 .connections
1571 .get(&connection_id)
1572 .is_some_and(|conn| conn.is_closed)
1573 }
1574
1575 pub fn close_connection(&self, connection_id: ConnectionId) {
1579 self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1580 }
1581
1582 pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1586 self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1587 }
1588
1589 pub fn get_close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1591 let inner = self.inner.borrow();
1592 inner
1593 .network
1594 .connections
1595 .get(&connection_id)
1596 .map(|conn| conn.close_reason)
1597 .unwrap_or(CloseReason::None)
1598 }
1599
1600 fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1602 let mut inner = self.inner.borrow_mut();
1603
1604 let paired_connection_id = inner
1605 .network
1606 .connections
1607 .get(&connection_id)
1608 .and_then(|conn| conn.paired_connection);
1609
1610 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1611 && !conn.is_closed
1612 {
1613 conn.is_closed = true;
1614 conn.close_reason = reason;
1615 tracing::debug!(
1616 "Connection {} closed permanently (reason: {:?})",
1617 connection_id.0,
1618 reason
1619 );
1620 }
1621
1622 if let Some(paired_id) = paired_connection_id
1623 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1624 && !paired_conn.is_closed
1625 {
1626 paired_conn.is_closed = true;
1627 paired_conn.close_reason = reason;
1628 tracing::debug!(
1629 "Paired connection {} also closed (reason: {:?})",
1630 paired_id.0,
1631 reason
1632 );
1633 }
1634
1635 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1636 tracing::debug!(
1637 "Waking read waker for closed connection {}",
1638 connection_id.0
1639 );
1640 waker.wake();
1641 }
1642
1643 if let Some(paired_id) = paired_connection_id
1644 && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1645 {
1646 tracing::debug!(
1647 "Waking read waker for paired closed connection {}",
1648 paired_id.0
1649 );
1650 paired_waker.wake();
1651 }
1652 }
1653
1654 pub fn close_connection_asymmetric(
1656 &self,
1657 connection_id: ConnectionId,
1658 close_send: bool,
1659 close_recv: bool,
1660 ) {
1661 let mut inner = self.inner.borrow_mut();
1662
1663 let paired_id = inner
1664 .network
1665 .connections
1666 .get(&connection_id)
1667 .and_then(|conn| conn.paired_connection);
1668
1669 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1670 conn.send_closed = true;
1671 conn.send_buffer.clear();
1672 tracing::debug!(
1673 "Connection {} send side closed (asymmetric)",
1674 connection_id.0
1675 );
1676 }
1677
1678 if close_recv
1679 && let Some(paired) = paired_id
1680 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1681 {
1682 paired_conn.recv_closed = true;
1683 tracing::debug!(
1684 "Connection {} recv side closed (asymmetric via peer)",
1685 paired.0
1686 );
1687 }
1688
1689 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1690 waker.wake();
1691 }
1692 if close_recv
1693 && let Some(paired) = paired_id
1694 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
1695 {
1696 waker.wake();
1697 }
1698 }
1699
1700 pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
1702 let mut inner = self.inner.borrow_mut();
1703 let config = &inner.network.config;
1704
1705 if config.chaos.random_close_probability <= 0.0 {
1706 return None;
1707 }
1708
1709 let current_time = inner.current_time;
1710 let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
1711 if time_since_last < config.chaos.random_close_cooldown {
1712 return None;
1713 }
1714
1715 if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
1716 return None;
1717 }
1718
1719 inner.network.last_random_close_time = current_time;
1720
1721 let paired_id = inner
1722 .network
1723 .connections
1724 .get(&connection_id)
1725 .and_then(|conn| conn.paired_connection);
1726
1727 let a = super::rng::sim_random_f64();
1728 let close_recv = a < 0.66;
1729 let close_send = a > 0.33;
1730
1731 tracing::info!(
1732 "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
1733 connection_id.0,
1734 close_send,
1735 close_recv,
1736 a
1737 );
1738
1739 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1740 conn.send_closed = true;
1741 conn.send_buffer.clear();
1742 }
1743
1744 if close_recv
1745 && let Some(paired) = paired_id
1746 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1747 {
1748 paired_conn.recv_closed = true;
1749 }
1750
1751 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1752 waker.wake();
1753 }
1754 if close_recv
1755 && let Some(paired) = paired_id
1756 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
1757 {
1758 waker.wake();
1759 }
1760
1761 let b = super::rng::sim_random_f64();
1762 let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
1763
1764 tracing::debug!(
1765 "Random close explicit={} (b={:.3}, ratio={:.2})",
1766 explicit,
1767 b,
1768 inner.network.config.chaos.random_close_explicit_ratio
1769 );
1770
1771 Some(explicit)
1772 }
1773
1774 pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
1776 let inner = self.inner.borrow();
1777 inner
1778 .network
1779 .connections
1780 .get(&connection_id)
1781 .is_some_and(|conn| conn.send_closed || conn.is_closed)
1782 }
1783
1784 pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
1786 let inner = self.inner.borrow();
1787 inner
1788 .network
1789 .connections
1790 .get(&connection_id)
1791 .is_some_and(|conn| conn.recv_closed || conn.is_closed)
1792 }
1793
1794 pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
1807 let mut inner = self.inner.borrow_mut();
1808 let current_time = inner.current_time;
1809 let error_at = current_time + error_delay;
1810
1811 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1812 conn.is_half_open = true;
1813 conn.half_open_error_at = Some(error_at);
1814
1815 conn.paired_connection = None;
1818
1819 tracing::info!(
1820 "Connection {} now half-open, errors manifest at {:?}",
1821 connection_id.0,
1822 error_at
1823 );
1824 }
1825
1826 let wake_event = Event::Connection {
1828 id: connection_id.0,
1829 state: ConnectionStateChange::HalfOpenError,
1830 };
1831 let sequence = inner.next_sequence;
1832 inner.next_sequence += 1;
1833 let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
1834 inner.event_queue.schedule(scheduled_event);
1835 }
1836
1837 pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
1839 let inner = self.inner.borrow();
1840 inner
1841 .network
1842 .connections
1843 .get(&connection_id)
1844 .is_some_and(|conn| conn.is_half_open)
1845 }
1846
1847 pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
1849 let inner = self.inner.borrow();
1850 let current_time = inner.current_time;
1851 inner
1852 .network
1853 .connections
1854 .get(&connection_id)
1855 .is_some_and(|conn| {
1856 conn.is_half_open
1857 && conn
1858 .half_open_error_at
1859 .is_some_and(|error_at| current_time >= error_at)
1860 })
1861 }
1862
1863 pub fn partition_pair(
1867 &self,
1868 from_ip: std::net::IpAddr,
1869 to_ip: std::net::IpAddr,
1870 duration: Duration,
1871 ) -> SimulationResult<()> {
1872 let mut inner = self.inner.borrow_mut();
1873 let expires_at = inner.current_time + duration;
1874
1875 inner
1876 .network
1877 .ip_partitions
1878 .insert((from_ip, to_ip), PartitionState { expires_at });
1879
1880 let restore_event = Event::Connection {
1881 id: 0,
1882 state: ConnectionStateChange::PartitionRestore,
1883 };
1884 let sequence = inner.next_sequence;
1885 inner.next_sequence += 1;
1886 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
1887 inner.event_queue.schedule(scheduled_event);
1888
1889 tracing::debug!(
1890 "Partitioned {} -> {} until {:?}",
1891 from_ip,
1892 to_ip,
1893 expires_at
1894 );
1895 Ok(())
1896 }
1897
1898 pub fn partition_send_from(
1900 &self,
1901 ip: std::net::IpAddr,
1902 duration: Duration,
1903 ) -> SimulationResult<()> {
1904 let mut inner = self.inner.borrow_mut();
1905 let expires_at = inner.current_time + duration;
1906
1907 inner.network.send_partitions.insert(ip, expires_at);
1908
1909 let clear_event = Event::Connection {
1910 id: 0,
1911 state: ConnectionStateChange::SendPartitionClear,
1912 };
1913 let sequence = inner.next_sequence;
1914 inner.next_sequence += 1;
1915 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
1916 inner.event_queue.schedule(scheduled_event);
1917
1918 tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
1919 Ok(())
1920 }
1921
1922 pub fn partition_recv_to(
1924 &self,
1925 ip: std::net::IpAddr,
1926 duration: Duration,
1927 ) -> SimulationResult<()> {
1928 let mut inner = self.inner.borrow_mut();
1929 let expires_at = inner.current_time + duration;
1930
1931 inner.network.recv_partitions.insert(ip, expires_at);
1932
1933 let clear_event = Event::Connection {
1934 id: 0,
1935 state: ConnectionStateChange::RecvPartitionClear,
1936 };
1937 let sequence = inner.next_sequence;
1938 inner.next_sequence += 1;
1939 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
1940 inner.event_queue.schedule(scheduled_event);
1941
1942 tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
1943 Ok(())
1944 }
1945
1946 pub fn restore_partition(
1948 &self,
1949 from_ip: std::net::IpAddr,
1950 to_ip: std::net::IpAddr,
1951 ) -> SimulationResult<()> {
1952 let mut inner = self.inner.borrow_mut();
1953 inner.network.ip_partitions.remove(&(from_ip, to_ip));
1954 tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
1955 Ok(())
1956 }
1957
1958 pub fn is_partitioned(
1960 &self,
1961 from_ip: std::net::IpAddr,
1962 to_ip: std::net::IpAddr,
1963 ) -> SimulationResult<bool> {
1964 let inner = self.inner.borrow();
1965 Ok(inner
1966 .network
1967 .is_partitioned(from_ip, to_ip, inner.current_time))
1968 }
1969
1970 fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
1972 let partition_config = &inner.network.config;
1973
1974 if partition_config.chaos.partition_probability == 0.0 {
1975 return;
1976 }
1977
1978 let ip_pairs: Vec<(std::net::IpAddr, std::net::IpAddr)> = inner
1979 .network
1980 .connections
1981 .values()
1982 .filter_map(|conn| {
1983 if let (Some(local), Some(remote)) = (conn.local_ip, conn.remote_ip) {
1984 Some((local, remote))
1985 } else {
1986 None
1987 }
1988 })
1989 .collect();
1990
1991 for (from_ip, to_ip) in ip_pairs {
1992 if inner
1993 .network
1994 .is_partitioned(from_ip, to_ip, inner.current_time)
1995 {
1996 continue;
1997 }
1998
1999 if sim_random::<f64>() < partition_config.chaos.partition_probability {
2000 let partition_duration =
2001 crate::network::sample_duration(&partition_config.chaos.partition_duration);
2002 let expires_at = inner.current_time + partition_duration;
2003
2004 inner
2005 .network
2006 .ip_partitions
2007 .insert((from_ip, to_ip), PartitionState { expires_at });
2008
2009 let restore_event = Event::Connection {
2010 id: 0,
2011 state: ConnectionStateChange::PartitionRestore,
2012 };
2013 let sequence = inner.next_sequence;
2014 inner.next_sequence += 1;
2015 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2016 inner.event_queue.schedule(scheduled_event);
2017
2018 tracing::debug!(
2019 "Randomly triggered partition {} -> {} until {:?}, restoration event scheduled with sequence {}",
2020 from_ip,
2021 to_ip,
2022 expires_at,
2023 sequence
2024 );
2025 }
2026 }
2027 }
2028}
2029
2030impl Default for SimWorld {
2031 fn default() -> Self {
2032 Self::new()
2033 }
2034}
2035
2036#[derive(Debug)]
2041pub struct WeakSimWorld {
2042 pub(crate) inner: Weak<RefCell<SimInner>>,
2043}
2044
2045macro_rules! weak_forward {
2047 (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2049 $(#[$meta])*
2050 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2051 Ok(self.upgrade()?.$method($($arg),*))
2052 }
2053 };
2054 (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2056 $(#[$meta])*
2057 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2058 self.upgrade()?.$method($($arg),*)
2059 }
2060 };
2061 (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2063 $(#[$meta])*
2064 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2065 self.upgrade()?.$method($($arg),*);
2066 Ok(())
2067 }
2068 };
2069}
2070
2071impl WeakSimWorld {
2072 pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2074 self.inner
2075 .upgrade()
2076 .map(|inner| SimWorld { inner })
2077 .ok_or(SimulationError::SimulationShutdown)
2078 }
2079
2080 weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2081 weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2082 weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2083 weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2084 weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2085 weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2086 weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2087 weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2088 weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2089 weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2090 weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2091
2092 pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2094 where
2095 F: FnOnce(&NetworkConfiguration) -> R,
2096 {
2097 Ok(self.upgrade()?.with_network_config(f))
2098 }
2099}
2100
2101impl Clone for WeakSimWorld {
2102 fn clone(&self) -> Self {
2103 Self {
2104 inner: self.inner.clone(),
2105 }
2106 }
2107}
2108
2109#[cfg(test)]
2110mod tests {
2111 use super::*;
2112
2113 #[test]
2114 fn sim_world_basic_lifecycle() {
2115 let mut sim = SimWorld::new();
2116
2117 assert_eq!(sim.current_time(), Duration::ZERO);
2119 assert!(!sim.has_pending_events());
2120 assert_eq!(sim.pending_event_count(), 0);
2121
2122 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2124
2125 assert!(sim.has_pending_events());
2126 assert_eq!(sim.pending_event_count(), 1);
2127 assert_eq!(sim.current_time(), Duration::ZERO);
2128
2129 let has_more = sim.step();
2131 assert!(!has_more);
2132 assert_eq!(sim.current_time(), Duration::from_millis(100));
2133 assert!(!sim.has_pending_events());
2134 assert_eq!(sim.pending_event_count(), 0);
2135 }
2136
2137 #[test]
2138 fn sim_world_multiple_events() {
2139 let mut sim = SimWorld::new();
2140
2141 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2143 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2144 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2145
2146 assert_eq!(sim.pending_event_count(), 3);
2147
2148 assert!(sim.step());
2150 assert_eq!(sim.current_time(), Duration::from_millis(100));
2151 assert_eq!(sim.pending_event_count(), 2);
2152
2153 assert!(sim.step());
2154 assert_eq!(sim.current_time(), Duration::from_millis(200));
2155 assert_eq!(sim.pending_event_count(), 1);
2156
2157 assert!(!sim.step());
2158 assert_eq!(sim.current_time(), Duration::from_millis(300));
2159 assert_eq!(sim.pending_event_count(), 0);
2160 }
2161
2162 #[test]
2163 fn sim_world_run_until_empty() {
2164 let mut sim = SimWorld::new();
2165
2166 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2168 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2169 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2170
2171 sim.run_until_empty();
2173
2174 assert_eq!(sim.current_time(), Duration::from_millis(300));
2175 assert!(!sim.has_pending_events());
2176 }
2177
2178 #[test]
2179 fn sim_world_schedule_at_specific_time() {
2180 let mut sim = SimWorld::new();
2181
2182 sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2184
2185 assert_eq!(sim.current_time(), Duration::ZERO);
2186
2187 sim.step();
2188
2189 assert_eq!(sim.current_time(), Duration::from_millis(500));
2190 }
2191
2192 #[test]
2193 fn weak_sim_world_lifecycle() {
2194 let sim = SimWorld::new();
2195 let weak = sim.downgrade();
2196
2197 assert_eq!(
2199 weak.current_time().expect("should get time"),
2200 Duration::ZERO
2201 );
2202
2203 weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2205 .expect("should schedule event");
2206
2207 assert!(sim.has_pending_events());
2209
2210 drop(sim);
2212
2213 assert_eq!(
2215 weak.current_time(),
2216 Err(SimulationError::SimulationShutdown)
2217 );
2218 assert_eq!(
2219 weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2220 Err(SimulationError::SimulationShutdown)
2221 );
2222 }
2223
2224 #[test]
2225 fn deterministic_event_ordering() {
2226 let mut sim = SimWorld::new();
2227
2228 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2230 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2231 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2232
2233 assert!(sim.step());
2235 assert_eq!(sim.current_time(), Duration::from_millis(100));
2236 assert!(sim.step());
2237 assert_eq!(sim.current_time(), Duration::from_millis(100));
2238 assert!(!sim.step());
2239 assert_eq!(sim.current_time(), Duration::from_millis(100));
2240 }
2241}