1use std::{
7 cell::RefCell,
8 collections::{BTreeMap, HashSet, VecDeque},
9 net::IpAddr,
10 rc::{Rc, Weak},
11 task::Waker,
12 time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17 SimulationError, SimulationResult,
18 network::{
19 NetworkConfiguration, PartitionStrategy,
20 sim::{ConnectionId, ListenerId, SimNetworkProvider},
21 },
22};
23
24use super::{
25 events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
26 rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
27 sleep::SleepFuture,
28 state::{
29 ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
30 StorageState,
31 },
32 wakers::WakerRegistry,
33};
34
35#[derive(Debug)]
37pub(crate) struct SimInner {
38 pub(crate) current_time: Duration,
39 pub(crate) timer_time: Duration,
42 pub(crate) event_queue: EventQueue,
43 pub(crate) next_sequence: u64,
44
45 pub(crate) network: NetworkState,
47
48 pub(crate) storage: StorageState,
50
51 pub(crate) wakers: WakerRegistry,
53
54 pub(crate) next_task_id: u64,
56 pub(crate) awakened_tasks: HashSet<u64>,
57
58 pub(crate) events_processed: u64,
60
61 pub(crate) last_bit_flip_time: Duration,
63}
64
65impl SimInner {
66 pub(crate) fn new() -> Self {
67 Self {
68 current_time: Duration::ZERO,
69 timer_time: Duration::ZERO,
70 event_queue: EventQueue::new(),
71 next_sequence: 0,
72 network: NetworkState::new(NetworkConfiguration::default()),
73 storage: StorageState::default(),
74 wakers: WakerRegistry::default(),
75 next_task_id: 0,
76 awakened_tasks: HashSet::new(),
77 events_processed: 0,
78 last_bit_flip_time: Duration::ZERO,
79 }
80 }
81
82 pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
83 Self {
84 current_time: Duration::ZERO,
85 timer_time: Duration::ZERO,
86 event_queue: EventQueue::new(),
87 next_sequence: 0,
88 network: NetworkState::new(network_config),
89 storage: StorageState::default(),
90 wakers: WakerRegistry::default(),
91 next_task_id: 0,
92 awakened_tasks: HashSet::new(),
93 events_processed: 0,
94 last_bit_flip_time: Duration::ZERO,
95 }
96 }
97
98 pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
108 if random_value == 0 {
109 return max_bits.min(32);
111 }
112
113 let bit_count = 1 + random_value.leading_zeros();
115
116 bit_count.clamp(min_bits, max_bits)
118 }
119}
120
121#[derive(Debug)]
127pub struct SimWorld {
128 pub(crate) inner: Rc<RefCell<SimInner>>,
129}
130
131impl SimWorld {
132 fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
134 reset_sim_rng();
135 set_sim_seed(seed);
136 crate::chaos::assertions::reset_assertion_results();
137
138 let inner = match network_config {
139 Some(config) => SimInner::new_with_config(config),
140 None => SimInner::new(),
141 };
142
143 Self {
144 inner: Rc::new(RefCell::new(inner)),
145 }
146 }
147
148 pub fn new() -> Self {
153 Self::create(None, 0)
154 }
155
156 pub fn new_with_seed(seed: u64) -> Self {
165 Self::create(None, seed)
166 }
167
168 pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
170 Self::create(Some(network_config), 0)
171 }
172
173 pub fn new_with_network_config_and_seed(
180 network_config: NetworkConfiguration,
181 seed: u64,
182 ) -> Self {
183 Self::create(Some(network_config), seed)
184 }
185
186 #[instrument(skip(self))]
191 pub fn step(&mut self) -> bool {
192 let mut inner = self.inner.borrow_mut();
193
194 if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
195 inner.current_time = scheduled_event.time();
197
198 Self::clear_expired_clogs_with_inner(&mut inner);
200
201 Self::randomly_trigger_partitions_with_inner(&mut inner);
203
204 Self::process_event_with_inner(&mut inner, scheduled_event.into_event());
206
207 !inner.event_queue.is_empty()
209 } else {
210 false
212 }
213 }
214
215 #[instrument(skip(self))]
221 pub fn run_until_empty(&mut self) {
222 while self.step() {
223 if self.inner.borrow().events_processed.is_multiple_of(50) {
225 let has_workload_events = !self
226 .inner
227 .borrow()
228 .event_queue
229 .has_only_infrastructure_events();
230 if !has_workload_events {
231 tracing::debug!(
232 "Early termination: only infrastructure events remain in queue"
233 );
234 break;
235 }
236 }
237 }
238 }
239
240 pub fn current_time(&self) -> Duration {
242 self.inner.borrow().current_time
243 }
244
245 pub fn now(&self) -> Duration {
250 self.inner.borrow().current_time
251 }
252
253 pub fn timer(&self) -> Duration {
268 let mut inner = self.inner.borrow_mut();
269 let chaos = &inner.network.config.chaos;
270
271 if !chaos.clock_drift_enabled {
273 return inner.current_time;
274 }
275
276 let max_timer = inner.current_time + chaos.clock_drift_max;
280
281 if inner.timer_time < max_timer {
283 let random_factor = sim_random::<f64>(); let gap = (max_timer - inner.timer_time).as_secs_f64();
285 let delta = random_factor * gap / 2.0;
286 inner.timer_time += Duration::from_secs_f64(delta);
287 }
288
289 inner.timer_time = inner.timer_time.max(inner.current_time);
291
292 inner.timer_time
293 }
294
295 #[instrument(skip(self))]
297 pub fn schedule_event(&self, event: Event, delay: Duration) {
298 let mut inner = self.inner.borrow_mut();
299 let scheduled_time = inner.current_time + delay;
300 let sequence = inner.next_sequence;
301 inner.next_sequence += 1;
302
303 let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
304 inner.event_queue.schedule(scheduled_event);
305 }
306
307 pub fn schedule_event_at(&self, event: Event, time: Duration) {
309 let mut inner = self.inner.borrow_mut();
310 let sequence = inner.next_sequence;
311 inner.next_sequence += 1;
312
313 let scheduled_event = ScheduledEvent::new(time, event, sequence);
314 inner.event_queue.schedule(scheduled_event);
315 }
316
317 pub fn downgrade(&self) -> WeakSimWorld {
322 WeakSimWorld {
323 inner: Rc::downgrade(&self.inner),
324 }
325 }
326
327 pub fn has_pending_events(&self) -> bool {
329 !self.inner.borrow().event_queue.is_empty()
330 }
331
332 pub fn pending_event_count(&self) -> usize {
334 self.inner.borrow().event_queue.len()
335 }
336
337 pub fn network_provider(&self) -> SimNetworkProvider {
339 SimNetworkProvider::new(self.downgrade())
340 }
341
342 pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
344 crate::providers::SimTimeProvider::new(self.downgrade())
345 }
346
347 pub fn task_provider(&self) -> crate::TokioTaskProvider {
349 crate::TokioTaskProvider
350 }
351
352 pub fn storage_provider(&self) -> crate::storage::SimStorageProvider {
354 crate::storage::SimStorageProvider::new(self.downgrade())
355 }
356
357 pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
361 self.inner.borrow_mut().storage.config = config;
362 }
363
364 pub fn with_network_config<F, R>(&self, f: F) -> R
372 where
373 F: FnOnce(&NetworkConfiguration) -> R,
374 {
375 let inner = self.inner.borrow();
376 f(&inner.network.config)
377 }
378
379 pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
381 let mut inner = self.inner.borrow_mut();
382 let listener_id = ListenerId(inner.network.next_listener_id);
383 inner.network.next_listener_id += 1;
384
385 inner.network.listeners.insert(
386 listener_id,
387 ListenerState {
388 id: listener_id,
389 addr,
390 pending_connections: VecDeque::new(),
391 },
392 );
393
394 Ok(listener_id)
395 }
396
397 pub(crate) fn read_from_connection(
399 &self,
400 connection_id: ConnectionId,
401 buf: &mut [u8],
402 ) -> SimulationResult<usize> {
403 let mut inner = self.inner.borrow_mut();
404
405 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
406 let mut bytes_read = 0;
407 while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
408 if let Some(byte) = connection.receive_buffer.pop_front() {
409 buf[bytes_read] = byte;
410 bytes_read += 1;
411 }
412 }
413 Ok(bytes_read)
414 } else {
415 Err(SimulationError::InvalidState(
416 "connection not found".to_string(),
417 ))
418 }
419 }
420
421 pub(crate) fn write_to_connection(
423 &self,
424 connection_id: ConnectionId,
425 data: &[u8],
426 ) -> SimulationResult<()> {
427 let mut inner = self.inner.borrow_mut();
428
429 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
430 for &byte in data {
431 connection.receive_buffer.push_back(byte);
432 }
433 Ok(())
434 } else {
435 Err(SimulationError::InvalidState(
436 "connection not found".to_string(),
437 ))
438 }
439 }
440
441 pub(crate) fn buffer_send(
446 &self,
447 connection_id: ConnectionId,
448 data: Vec<u8>,
449 ) -> SimulationResult<()> {
450 tracing::debug!(
451 "buffer_send called for connection_id={} with {} bytes",
452 connection_id.0,
453 data.len()
454 );
455 let mut inner = self.inner.borrow_mut();
456
457 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
458 conn.send_buffer.push_back(data);
460 tracing::debug!(
461 "buffer_send: added data to send_buffer, new length: {}",
462 conn.send_buffer.len()
463 );
464
465 if !conn.send_in_progress {
467 tracing::debug!(
468 "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
469 );
470 conn.send_in_progress = true;
471
472 let scheduled_time = inner.current_time + std::time::Duration::ZERO;
474 let sequence = inner.next_sequence;
475 inner.next_sequence += 1;
476 let scheduled_event = ScheduledEvent::new(
477 scheduled_time,
478 Event::Network {
479 connection_id: connection_id.0,
480 operation: NetworkOperation::ProcessSendBuffer,
481 },
482 sequence,
483 );
484 inner.event_queue.schedule(scheduled_event);
485 tracing::debug!(
486 "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
487 sequence
488 );
489 } else {
490 tracing::debug!(
491 "buffer_send: sender already in progress, not scheduling new event"
492 );
493 }
494
495 Ok(())
496 } else {
497 tracing::debug!(
498 "buffer_send: connection_id={} not found in connections table",
499 connection_id.0
500 );
501 Err(SimulationError::InvalidState(
502 "connection not found".to_string(),
503 ))
504 }
505 }
506
507 pub(crate) fn create_connection_pair(
515 &self,
516 client_addr: String,
517 server_addr: String,
518 ) -> SimulationResult<(ConnectionId, ConnectionId)> {
519 let mut inner = self.inner.borrow_mut();
520
521 let client_id = ConnectionId(inner.network.next_connection_id);
522 inner.network.next_connection_id += 1;
523
524 let server_id = ConnectionId(inner.network.next_connection_id);
525 inner.network.next_connection_id += 1;
526
527 let current_time = inner.current_time;
529
530 let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
532 let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
533
534 let ephemeral_peer_addr = match client_ip {
538 Some(std::net::IpAddr::V4(ipv4)) => {
539 let octets = ipv4.octets();
540 let ip_offset = sim_random_range(0u32..256) as u8;
541 let new_last_octet = octets[3].wrapping_add(ip_offset);
542 let ephemeral_ip =
543 std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
544 let ephemeral_port = sim_random_range(40000u16..60000);
545 format!("{}:{}", ephemeral_ip, ephemeral_port)
546 }
547 Some(std::net::IpAddr::V6(ipv6)) => {
548 let segments = ipv6.segments();
550 let mut new_segments = segments;
551 let ip_offset = sim_random_range(0u16..256);
552 new_segments[7] = new_segments[7].wrapping_add(ip_offset);
553 let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
554 let ephemeral_port = sim_random_range(40000u16..60000);
555 format!("[{}]:{}", ephemeral_ip, ephemeral_port)
556 }
557 None => {
558 let ephemeral_port = sim_random_range(40000u16..60000);
560 format!("unknown:{}", ephemeral_port)
561 }
562 };
563
564 const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; inner.network.connections.insert(
572 client_id,
573 ConnectionState {
574 id: client_id,
575 addr: client_addr,
576 local_ip: client_ip,
577 remote_ip: server_ip,
578 peer_address: server_addr.clone(),
579 receive_buffer: VecDeque::new(),
580 paired_connection: Some(server_id),
581 send_buffer: VecDeque::new(),
582 send_in_progress: false,
583 next_send_time: current_time,
584 is_closed: false,
585 send_closed: false,
586 recv_closed: false,
587 is_cut: false,
588 cut_expiry: None,
589 close_reason: CloseReason::None,
590 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
591 send_delay: None,
592 recv_delay: None,
593 is_half_open: false,
594 half_open_error_at: None,
595 is_stable: false,
596 graceful_close_pending: false,
597 last_data_delivery_scheduled_at: None,
598 remote_fin_received: false,
599 },
600 );
601
602 inner.network.connections.insert(
604 server_id,
605 ConnectionState {
606 id: server_id,
607 addr: server_addr,
608 local_ip: server_ip,
609 remote_ip: client_ip,
610 peer_address: ephemeral_peer_addr,
611 receive_buffer: VecDeque::new(),
612 paired_connection: Some(client_id),
613 send_buffer: VecDeque::new(),
614 send_in_progress: false,
615 next_send_time: current_time,
616 is_closed: false,
617 send_closed: false,
618 recv_closed: false,
619 is_cut: false,
620 cut_expiry: None,
621 close_reason: CloseReason::None,
622 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
623 send_delay: None,
624 recv_delay: None,
625 is_half_open: false,
626 half_open_error_at: None,
627 is_stable: false,
628 graceful_close_pending: false,
629 last_data_delivery_scheduled_at: None,
630 remote_fin_received: false,
631 },
632 );
633
634 Ok((client_id, server_id))
635 }
636
637 pub(crate) fn register_read_waker(
639 &self,
640 connection_id: ConnectionId,
641 waker: Waker,
642 ) -> SimulationResult<()> {
643 let mut inner = self.inner.borrow_mut();
644 let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
645 inner.wakers.read_wakers.insert(connection_id, waker);
646 tracing::debug!(
647 "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
648 connection_id.0,
649 is_replacement,
650 inner.wakers.read_wakers.len()
651 );
652 Ok(())
653 }
654
655 pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
657 let mut inner = self.inner.borrow_mut();
658 use std::collections::hash_map::DefaultHasher;
660 use std::hash::{Hash, Hasher};
661 let mut hasher = DefaultHasher::new();
662 addr.hash(&mut hasher);
663 let listener_key = ListenerId(hasher.finish());
664
665 inner.wakers.listener_wakers.insert(listener_key, waker);
666 Ok(())
667 }
668
669 pub(crate) fn store_pending_connection(
671 &self,
672 addr: &str,
673 connection_id: ConnectionId,
674 ) -> SimulationResult<()> {
675 let mut inner = self.inner.borrow_mut();
676 inner
677 .network
678 .pending_connections
679 .insert(addr.to_string(), connection_id);
680
681 use std::collections::hash_map::DefaultHasher;
683 use std::hash::{Hash, Hasher};
684 let mut hasher = DefaultHasher::new();
685 addr.hash(&mut hasher);
686 let listener_key = ListenerId(hasher.finish());
687
688 if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
689 waker.wake();
690 }
691
692 Ok(())
693 }
694
695 pub(crate) fn get_pending_connection(
697 &self,
698 addr: &str,
699 ) -> SimulationResult<Option<ConnectionId>> {
700 let mut inner = self.inner.borrow_mut();
701 Ok(inner.network.pending_connections.remove(addr))
702 }
703
704 pub(crate) fn get_connection_peer_address(
713 &self,
714 connection_id: ConnectionId,
715 ) -> Option<String> {
716 let inner = self.inner.borrow();
717 inner
718 .network
719 .connections
720 .get(&connection_id)
721 .map(|conn| conn.peer_address.clone())
722 }
723
724 #[instrument(skip(self))]
729 pub fn sleep(&self, duration: Duration) -> SleepFuture {
730 let task_id = self.generate_task_id();
731
732 let actual_duration = self.apply_buggified_delay(duration);
734
735 self.schedule_event(Event::Timer { task_id }, actual_duration);
737
738 SleepFuture::new(self.downgrade(), task_id)
740 }
741
742 fn apply_buggified_delay(&self, duration: Duration) -> Duration {
744 let inner = self.inner.borrow();
745 let chaos = &inner.network.config.chaos;
746
747 if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
748 return duration;
749 }
750
751 if sim_random::<f64>() < chaos.buggified_delay_probability {
753 let random_factor = sim_random::<f64>().powf(1000.0);
755 let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
756 tracing::trace!(
757 extra_delay_ms = extra_delay.as_millis(),
758 "Buggified delay applied"
759 );
760 duration + extra_delay
761 } else {
762 duration
763 }
764 }
765
766 fn generate_task_id(&self) -> u64 {
768 let mut inner = self.inner.borrow_mut();
769 let task_id = inner.next_task_id;
770 inner.next_task_id += 1;
771 task_id
772 }
773
774 fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
776 if let Some(waker_list) = wakers.remove(&connection_id) {
777 for waker in waker_list {
778 waker.wake();
779 }
780 }
781 }
782
783 pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
785 let inner = self.inner.borrow();
786 Ok(inner.awakened_tasks.contains(&task_id))
787 }
788
789 pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
791 let mut inner = self.inner.borrow_mut();
792 inner.wakers.task_wakers.insert(task_id, waker);
793 Ok(())
794 }
795
796 fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
798 let now = inner.current_time;
799 let expired: Vec<ConnectionId> = inner
800 .network
801 .connection_clogs
802 .iter()
803 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
804 .collect();
805
806 for id in expired {
807 inner.network.connection_clogs.remove(&id);
808 Self::wake_all(&mut inner.wakers.clog_wakers, id);
809 }
810 }
811
812 #[instrument(skip(inner))]
814 fn process_event_with_inner(inner: &mut SimInner, event: Event) {
815 inner.events_processed += 1;
816
817 match event {
818 Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
819 Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
820 Event::Network {
821 connection_id,
822 operation,
823 } => Self::handle_network_event(inner, connection_id, operation),
824 Event::Storage { file_id, operation } => {
825 super::storage_ops::handle_storage_event(inner, file_id, operation)
826 }
827 Event::Shutdown => Self::handle_shutdown_event(inner),
828 }
829 }
830
831 fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
833 inner.awakened_tasks.insert(task_id);
834 if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
835 waker.wake();
836 }
837 }
838
839 fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
841 let connection_id = ConnectionId(id);
842
843 match state {
844 ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
845 }
847 ConnectionStateChange::ClogClear => {
848 inner.network.connection_clogs.remove(&connection_id);
849 Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
850 }
851 ConnectionStateChange::ReadClogClear => {
852 inner.network.read_clogs.remove(&connection_id);
853 Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
854 }
855 ConnectionStateChange::PartitionRestore => {
856 Self::clear_expired_partitions(inner);
857 }
858 ConnectionStateChange::SendPartitionClear => {
859 Self::clear_expired_send_partitions(inner);
860 }
861 ConnectionStateChange::RecvPartitionClear => {
862 Self::clear_expired_recv_partitions(inner);
863 }
864 ConnectionStateChange::CutRestore => {
865 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
866 && conn.is_cut
867 {
868 conn.is_cut = false;
869 conn.cut_expiry = None;
870 tracing::debug!("Connection {} restored via scheduled event", id);
871 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
872 }
873 }
874 ConnectionStateChange::HalfOpenError => {
875 tracing::debug!("Connection {} half-open error time reached", id);
876 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
877 waker.wake();
878 }
879 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
880 }
881 }
882 }
883
884 fn clear_expired_partitions(inner: &mut SimInner) {
886 let now = inner.current_time;
887 let expired: Vec<_> = inner
888 .network
889 .ip_partitions
890 .iter()
891 .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
892 .collect();
893
894 for pair in expired {
895 inner.network.ip_partitions.remove(&pair);
896 tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
897 }
898 }
899
900 fn clear_expired_send_partitions(inner: &mut SimInner) {
902 let now = inner.current_time;
903 let expired: Vec<_> = inner
904 .network
905 .send_partitions
906 .iter()
907 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
908 .collect();
909
910 for ip in expired {
911 inner.network.send_partitions.remove(&ip);
912 tracing::debug!("Cleared send partition for {}", ip);
913 }
914 }
915
916 fn clear_expired_recv_partitions(inner: &mut SimInner) {
918 let now = inner.current_time;
919 let expired: Vec<_> = inner
920 .network
921 .recv_partitions
922 .iter()
923 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
924 .collect();
925
926 for ip in expired {
927 inner.network.recv_partitions.remove(&ip);
928 tracing::debug!("Cleared receive partition for {}", ip);
929 }
930 }
931
932 fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
934 let connection_id = ConnectionId(conn_id);
935
936 match operation {
937 NetworkOperation::DataDelivery { data } => {
938 Self::handle_data_delivery(inner, connection_id, data);
939 }
940 NetworkOperation::ProcessSendBuffer => {
941 Self::handle_process_send_buffer(inner, connection_id);
942 }
943 NetworkOperation::FinDelivery => {
944 Self::handle_fin_delivery(inner, connection_id);
945 }
946 }
947 }
948
949 fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
951 tracing::trace!(
952 "DataDelivery: {} bytes to connection {}",
953 data.len(),
954 connection_id.0
955 );
956
957 let is_stable = inner
959 .network
960 .connections
961 .get(&connection_id)
962 .is_some_and(|conn| conn.is_stable);
963
964 if !inner.network.connections.contains_key(&connection_id) {
965 tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
966 return;
967 }
968
969 let data_to_deliver = if is_stable {
971 data
972 } else {
973 Self::maybe_corrupt_data(inner, &data)
974 };
975
976 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
978 return;
979 };
980
981 for &byte in &data_to_deliver {
982 conn.receive_buffer.push_back(byte);
983 }
984
985 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
986 waker.wake();
987 }
988 }
989
990 fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
996 tracing::debug!(
997 "FinDelivery: FIN received on connection {}",
998 connection_id.0
999 );
1000
1001 let is_closed = inner
1003 .network
1004 .connections
1005 .get(&connection_id)
1006 .is_some_and(|conn| conn.is_closed);
1007
1008 if is_closed {
1009 tracing::debug!(
1010 "FinDelivery: connection {} already closed, ignoring stale FIN",
1011 connection_id.0
1012 );
1013 return;
1014 }
1015
1016 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1017 conn.remote_fin_received = true;
1018 }
1019
1020 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1021 waker.wake();
1022 }
1023 }
1024
1025 fn schedule_fin_delivery(
1030 inner: &mut SimInner,
1031 paired_id: Option<ConnectionId>,
1032 last_delivery_time: Option<Duration>,
1033 ) {
1034 let Some(peer_id) = paired_id else {
1035 return;
1036 };
1037
1038 let fin_time = match last_delivery_time {
1040 Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
1041 _ => inner.current_time + Duration::from_nanos(1),
1042 };
1043
1044 let sequence = inner.next_sequence;
1045 inner.next_sequence += 1;
1046
1047 tracing::debug!(
1048 "Scheduling FinDelivery to connection {} at {:?}",
1049 peer_id.0,
1050 fin_time
1051 );
1052
1053 inner.event_queue.schedule(ScheduledEvent::new(
1054 fin_time,
1055 Event::Network {
1056 connection_id: peer_id.0,
1057 operation: NetworkOperation::FinDelivery,
1058 },
1059 sequence,
1060 ));
1061 }
1062
1063 fn maybe_corrupt_data(inner: &mut SimInner, data: &[u8]) -> Vec<u8> {
1065 if data.is_empty() {
1066 return data.to_vec();
1067 }
1068
1069 let chaos = &inner.network.config.chaos;
1070 let now = inner.current_time;
1071 let cooldown_elapsed =
1072 now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
1073
1074 if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
1075 return data.to_vec();
1076 }
1077
1078 let random_value = sim_random::<u32>();
1079 let flip_count = SimInner::calculate_flip_bit_count(
1080 random_value,
1081 chaos.bit_flip_min_bits,
1082 chaos.bit_flip_max_bits,
1083 );
1084
1085 let mut corrupted_data = data.to_vec();
1086 let mut flipped_positions = std::collections::HashSet::new();
1087
1088 for _ in 0..flip_count {
1089 let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
1090 let bit_idx = (sim_random::<u64>() as usize) % 8;
1091 let position = (byte_idx, bit_idx);
1092
1093 if !flipped_positions.contains(&position) {
1094 flipped_positions.insert(position);
1095 corrupted_data[byte_idx] ^= 1 << bit_idx;
1096 }
1097 }
1098
1099 inner.last_bit_flip_time = now;
1100 tracing::info!(
1101 "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
1102 data.len(),
1103 flip_count,
1104 flipped_positions.len()
1105 );
1106
1107 corrupted_data
1108 }
1109
1110 fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1112 let is_partitioned = inner
1113 .network
1114 .is_connection_partitioned(connection_id, inner.current_time);
1115
1116 if is_partitioned {
1117 Self::handle_partitioned_send(inner, connection_id);
1118 } else {
1119 Self::handle_normal_send(inner, connection_id);
1120 }
1121 }
1122
1123 fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1125 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1126 return;
1127 };
1128
1129 if let Some(data) = conn.send_buffer.pop_front() {
1130 tracing::debug!(
1131 "Connection {} partitioned, failing send of {} bytes",
1132 connection_id.0,
1133 data.len()
1134 );
1135 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1136
1137 if !conn.send_buffer.is_empty() {
1138 Self::schedule_process_send_buffer(inner, connection_id);
1139 } else {
1140 conn.send_in_progress = false;
1141 if conn.graceful_close_pending {
1143 conn.graceful_close_pending = false;
1144 let peer_id = conn.paired_connection;
1145 let last_time = conn.last_data_delivery_scheduled_at;
1146 Self::schedule_fin_delivery(inner, peer_id, last_time);
1147 }
1148 }
1149 } else {
1150 conn.send_in_progress = false;
1151 if conn.graceful_close_pending {
1153 conn.graceful_close_pending = false;
1154 let peer_id = conn.paired_connection;
1155 let last_time = conn.last_data_delivery_scheduled_at;
1156 Self::schedule_fin_delivery(inner, peer_id, last_time);
1157 }
1158 }
1159 }
1160
1161 fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1163 let Some(conn) = inner.network.connections.get(&connection_id) else {
1165 return;
1166 };
1167
1168 let paired_id = conn.paired_connection;
1169 let send_delay = conn.send_delay;
1170 let next_send_time = conn.next_send_time;
1171 let has_data = !conn.send_buffer.is_empty();
1172 let is_stable = conn.is_stable; let recv_delay = paired_id.and_then(|pid| {
1175 inner
1176 .network
1177 .connections
1178 .get(&pid)
1179 .and_then(|c| c.recv_delay)
1180 });
1181
1182 if !has_data {
1183 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1184 conn.send_in_progress = false;
1185 if conn.graceful_close_pending {
1187 conn.graceful_close_pending = false;
1188 let peer_id = conn.paired_connection;
1189 let last_time = conn.last_data_delivery_scheduled_at;
1190 Self::schedule_fin_delivery(inner, peer_id, last_time);
1191 }
1192 }
1193 return;
1194 }
1195
1196 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1197 return;
1198 };
1199
1200 let Some(mut data) = conn.send_buffer.pop_front() else {
1201 conn.send_in_progress = false;
1202 return;
1203 };
1204
1205 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1206
1207 if !is_stable && crate::buggify!() && !data.is_empty() {
1209 let max_send = std::cmp::min(
1210 data.len(),
1211 inner.network.config.chaos.partial_write_max_bytes,
1212 );
1213 let truncate_to = sim_random_range(0..max_send + 1);
1214
1215 if truncate_to < data.len() {
1216 let remainder = data.split_off(truncate_to);
1217 conn.send_buffer.push_front(remainder);
1218 tracing::debug!(
1219 "BUGGIFY: Partial write on connection {} - sending {} bytes",
1220 connection_id.0,
1221 data.len()
1222 );
1223 }
1224 }
1225
1226 let has_more = !conn.send_buffer.is_empty();
1227 let base_delay = if has_more {
1228 Duration::from_nanos(1)
1229 } else {
1230 send_delay.unwrap_or_else(|| {
1231 crate::network::sample_duration(&inner.network.config.write_latency)
1232 })
1233 };
1234
1235 let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1236 conn.next_send_time = earliest_time + Duration::from_nanos(1);
1237
1238 if let Some(paired_id) = paired_id {
1240 let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1241 let sequence = inner.next_sequence;
1242 inner.next_sequence += 1;
1243
1244 inner.event_queue.schedule(ScheduledEvent::new(
1245 scheduled_time,
1246 Event::Network {
1247 connection_id: paired_id.0,
1248 operation: NetworkOperation::DataDelivery { data },
1249 },
1250 sequence,
1251 ));
1252
1253 conn.last_data_delivery_scheduled_at = Some(scheduled_time);
1255 }
1256
1257 if !conn.send_buffer.is_empty() {
1259 Self::schedule_process_send_buffer(inner, connection_id);
1260 } else {
1261 conn.send_in_progress = false;
1262 if conn.graceful_close_pending {
1264 conn.graceful_close_pending = false;
1265 let peer_id = conn.paired_connection;
1266 let last_time = conn.last_data_delivery_scheduled_at;
1267 Self::schedule_fin_delivery(inner, peer_id, last_time);
1268 }
1269 }
1270 }
1271
1272 fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1274 let sequence = inner.next_sequence;
1275 inner.next_sequence += 1;
1276
1277 inner.event_queue.schedule(ScheduledEvent::new(
1278 inner.current_time,
1279 Event::Network {
1280 connection_id: connection_id.0,
1281 operation: NetworkOperation::ProcessSendBuffer,
1282 },
1283 sequence,
1284 ));
1285 }
1286
1287 fn handle_shutdown_event(inner: &mut SimInner) {
1289 tracing::debug!("Processing Shutdown event - waking all pending tasks");
1290
1291 for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
1292 tracing::trace!("Waking task {}", task_id);
1293 waker.wake();
1294 }
1295
1296 for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
1297 waker.wake();
1298 }
1299
1300 tracing::debug!("Shutdown event processed");
1301 }
1302
1303 pub fn assertion_results(
1305 &self,
1306 ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1307 crate::chaos::get_assertion_results()
1308 }
1309
1310 pub fn reset_assertion_results(&self) {
1312 crate::chaos::reset_assertion_results();
1313 }
1314
1315 pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1317 let inner = self.inner.borrow();
1318
1319 crate::runner::SimulationMetrics {
1320 wall_time: std::time::Duration::ZERO,
1321 simulated_time: inner.current_time,
1322 events_processed: inner.events_processed,
1323 }
1324 }
1325
1326 pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1330 let inner = self.inner.borrow();
1331 let config = &inner.network.config;
1332
1333 if inner
1335 .network
1336 .connections
1337 .get(&connection_id)
1338 .is_some_and(|conn| conn.is_stable)
1339 {
1340 return false;
1341 }
1342
1343 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1345 return inner.current_time < clog_state.expires_at;
1346 }
1347
1348 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1350 }
1351
1352 pub fn clog_write(&self, connection_id: ConnectionId) {
1354 let mut inner = self.inner.borrow_mut();
1355 let config = &inner.network.config;
1356
1357 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1358 let expires_at = inner.current_time + clog_duration;
1359 inner
1360 .network
1361 .connection_clogs
1362 .insert(connection_id, ClogState { expires_at });
1363
1364 let clear_event = Event::Connection {
1366 id: connection_id.0,
1367 state: ConnectionStateChange::ClogClear,
1368 };
1369 let sequence = inner.next_sequence;
1370 inner.next_sequence += 1;
1371 inner
1372 .event_queue
1373 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1374 }
1375
1376 pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1378 let inner = self.inner.borrow();
1379
1380 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1381 inner.current_time < clog_state.expires_at
1382 } else {
1383 false
1384 }
1385 }
1386
1387 pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1389 let mut inner = self.inner.borrow_mut();
1390 inner
1391 .wakers
1392 .clog_wakers
1393 .entry(connection_id)
1394 .or_default()
1395 .push(waker);
1396 }
1397
1398 pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1402 let inner = self.inner.borrow();
1403 let config = &inner.network.config;
1404
1405 if inner
1407 .network
1408 .connections
1409 .get(&connection_id)
1410 .is_some_and(|conn| conn.is_stable)
1411 {
1412 return false;
1413 }
1414
1415 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1417 return inner.current_time < clog_state.expires_at;
1418 }
1419
1420 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1422 }
1423
1424 pub fn clog_read(&self, connection_id: ConnectionId) {
1426 let mut inner = self.inner.borrow_mut();
1427 let config = &inner.network.config;
1428
1429 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1430 let expires_at = inner.current_time + clog_duration;
1431 inner
1432 .network
1433 .read_clogs
1434 .insert(connection_id, ClogState { expires_at });
1435
1436 let clear_event = Event::Connection {
1438 id: connection_id.0,
1439 state: ConnectionStateChange::ReadClogClear,
1440 };
1441 let sequence = inner.next_sequence;
1442 inner.next_sequence += 1;
1443 inner
1444 .event_queue
1445 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1446 }
1447
1448 pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1450 let inner = self.inner.borrow();
1451
1452 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1453 inner.current_time < clog_state.expires_at
1454 } else {
1455 false
1456 }
1457 }
1458
1459 pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1461 let mut inner = self.inner.borrow_mut();
1462 inner
1463 .wakers
1464 .read_clog_wakers
1465 .entry(connection_id)
1466 .or_default()
1467 .push(waker);
1468 }
1469
1470 pub fn clear_expired_clogs(&self) {
1472 let mut inner = self.inner.borrow_mut();
1473 let now = inner.current_time;
1474 let expired: Vec<ConnectionId> = inner
1475 .network
1476 .connection_clogs
1477 .iter()
1478 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1479 .collect();
1480
1481 for id in expired {
1482 inner.network.connection_clogs.remove(&id);
1483 Self::wake_all(&mut inner.wakers.clog_wakers, id);
1484 }
1485 }
1486
1487 pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1500 let mut inner = self.inner.borrow_mut();
1501 let expires_at = inner.current_time + duration;
1502
1503 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1504 conn.is_cut = true;
1505 conn.cut_expiry = Some(expires_at);
1506 tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1507
1508 let restore_event = Event::Connection {
1510 id: connection_id.0,
1511 state: ConnectionStateChange::CutRestore,
1512 };
1513 let sequence = inner.next_sequence;
1514 inner.next_sequence += 1;
1515 inner
1516 .event_queue
1517 .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1518 }
1519 }
1520
1521 pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1526 let inner = self.inner.borrow();
1527 inner
1528 .network
1529 .connections
1530 .get(&connection_id)
1531 .is_some_and(|conn| {
1532 conn.is_cut
1533 && conn
1534 .cut_expiry
1535 .is_some_and(|expiry| inner.current_time < expiry)
1536 })
1537 }
1538
1539 pub fn restore_connection(&self, connection_id: ConnectionId) {
1543 let mut inner = self.inner.borrow_mut();
1544
1545 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1546 && conn.is_cut
1547 {
1548 conn.is_cut = false;
1549 conn.cut_expiry = None;
1550 tracing::debug!("Connection {} restored", connection_id.0);
1551
1552 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1554 }
1555 }
1556
1557 pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1559 let mut inner = self.inner.borrow_mut();
1560 inner
1561 .wakers
1562 .cut_wakers
1563 .entry(connection_id)
1564 .or_default()
1565 .push(waker);
1566 }
1567
1568 pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1572 let inner = self.inner.borrow();
1573 inner
1574 .network
1575 .connections
1576 .get(&connection_id)
1577 .map(|conn| conn.send_buffer_capacity)
1578 .unwrap_or(0)
1579 }
1580
1581 pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1583 let inner = self.inner.borrow();
1584 inner
1585 .network
1586 .connections
1587 .get(&connection_id)
1588 .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1589 .unwrap_or(0)
1590 }
1591
1592 pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1594 let capacity = self.send_buffer_capacity(connection_id);
1595 let used = self.send_buffer_used(connection_id);
1596 capacity.saturating_sub(used)
1597 }
1598
1599 pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1601 let mut inner = self.inner.borrow_mut();
1602 inner
1603 .wakers
1604 .send_buffer_wakers
1605 .entry(connection_id)
1606 .or_default()
1607 .push(waker);
1608 }
1609
1610 #[allow(dead_code)] fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1613 let mut inner = self.inner.borrow_mut();
1614 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1615 }
1616
1617 pub fn get_pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1622 let inner = self.inner.borrow();
1623 inner.network.pair_latencies.get(&(src, dst)).copied()
1624 }
1625
1626 pub fn set_pair_latency_if_not_set(
1629 &self,
1630 src: IpAddr,
1631 dst: IpAddr,
1632 latency: Duration,
1633 ) -> Duration {
1634 let mut inner = self.inner.borrow_mut();
1635 *inner
1636 .network
1637 .pair_latencies
1638 .entry((src, dst))
1639 .or_insert_with(|| {
1640 tracing::debug!(
1641 "Setting base latency for IP pair {} -> {} to {:?}",
1642 src,
1643 dst,
1644 latency
1645 );
1646 latency
1647 })
1648 }
1649
1650 pub fn get_connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1653 let inner = self.inner.borrow();
1654 let (local_ip, remote_ip) = inner
1655 .network
1656 .connections
1657 .get(&connection_id)
1658 .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1659 .unwrap_or({
1660 (
1661 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1662 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1663 )
1664 });
1665 drop(inner);
1666
1667 if let Some(latency) = self.get_pair_latency(local_ip, remote_ip) {
1669 return latency;
1670 }
1671
1672 let latency = self
1674 .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1675 self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1676 }
1677
1678 pub fn get_send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1683 let inner = self.inner.borrow();
1684 inner
1685 .network
1686 .connections
1687 .get(&connection_id)
1688 .and_then(|conn| conn.send_delay)
1689 }
1690
1691 pub fn get_recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1694 let inner = self.inner.borrow();
1695 inner
1696 .network
1697 .connections
1698 .get(&connection_id)
1699 .and_then(|conn| conn.recv_delay)
1700 }
1701
1702 pub fn set_asymmetric_delays(
1705 &self,
1706 connection_id: ConnectionId,
1707 send_delay: Option<Duration>,
1708 recv_delay: Option<Duration>,
1709 ) {
1710 let mut inner = self.inner.borrow_mut();
1711 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1712 conn.send_delay = send_delay;
1713 conn.recv_delay = recv_delay;
1714 tracing::debug!(
1715 "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1716 connection_id.0,
1717 send_delay,
1718 recv_delay
1719 );
1720 }
1721 }
1722
1723 pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1725 let inner = self.inner.borrow();
1726 inner
1727 .network
1728 .connections
1729 .get(&connection_id)
1730 .is_some_and(|conn| conn.is_closed)
1731 }
1732
1733 pub fn close_connection(&self, connection_id: ConnectionId) {
1737 self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1738 }
1739
1740 pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1744 self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1745 }
1746
1747 pub fn get_close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1749 let inner = self.inner.borrow();
1750 inner
1751 .network
1752 .connections
1753 .get(&connection_id)
1754 .map(|conn| conn.close_reason)
1755 .unwrap_or(CloseReason::None)
1756 }
1757
1758 fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1760 match reason {
1761 CloseReason::Graceful => self.close_connection_graceful(connection_id),
1762 CloseReason::Aborted => self.close_connection_aborted(connection_id),
1763 CloseReason::None => {}
1764 }
1765 }
1766
1767 fn close_connection_graceful(&self, connection_id: ConnectionId) {
1773 let mut inner = self.inner.borrow_mut();
1774
1775 let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
1777 (
1778 conn.paired_connection,
1779 conn.send_closed,
1780 conn.is_closed,
1781 conn.send_in_progress,
1782 conn.send_buffer.is_empty(),
1783 conn.last_data_delivery_scheduled_at,
1784 )
1785 });
1786
1787 let Some((
1788 paired_id,
1789 was_send_closed,
1790 was_closed,
1791 send_in_progress,
1792 send_buffer_empty,
1793 last_delivery_time,
1794 )) = conn_info
1795 else {
1796 return;
1797 };
1798
1799 if was_closed || was_send_closed {
1801 tracing::debug!(
1802 "Connection {} already closed/send_closed, skipping graceful close",
1803 connection_id.0
1804 );
1805 return;
1806 }
1807
1808 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1810 conn.is_closed = true;
1811 conn.close_reason = CloseReason::Graceful;
1812 conn.send_closed = true;
1813 tracing::debug!(
1814 "Connection {} graceful close (FIN) - local write shut down",
1815 connection_id.0
1816 );
1817 }
1818
1819 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1821 waker.wake();
1822 }
1823
1824 if send_in_progress || !send_buffer_empty {
1827 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1829 conn.graceful_close_pending = true;
1830 tracing::debug!(
1831 "Connection {} graceful close deferred (send pipeline active)",
1832 connection_id.0
1833 );
1834 }
1835 } else {
1836 Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
1838 }
1839 }
1840
1841 fn close_connection_aborted(&self, connection_id: ConnectionId) {
1846 let mut inner = self.inner.borrow_mut();
1847
1848 let paired_connection_id = inner
1849 .network
1850 .connections
1851 .get(&connection_id)
1852 .and_then(|conn| conn.paired_connection);
1853
1854 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1855 && !conn.is_closed
1856 {
1857 conn.is_closed = true;
1858 conn.close_reason = CloseReason::Aborted;
1859 conn.graceful_close_pending = false;
1861 tracing::debug!(
1862 "Connection {} closed permanently (reason: Aborted)",
1863 connection_id.0
1864 );
1865 }
1866
1867 if let Some(paired_id) = paired_connection_id
1868 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1869 && !paired_conn.is_closed
1870 {
1871 paired_conn.is_closed = true;
1872 paired_conn.close_reason = CloseReason::Aborted;
1873 tracing::debug!(
1874 "Paired connection {} also closed (reason: Aborted)",
1875 paired_id.0
1876 );
1877 }
1878
1879 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1880 tracing::debug!(
1881 "Waking read waker for aborted connection {}",
1882 connection_id.0
1883 );
1884 waker.wake();
1885 }
1886
1887 if let Some(paired_id) = paired_connection_id
1888 && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1889 {
1890 tracing::debug!(
1891 "Waking read waker for paired aborted connection {}",
1892 paired_id.0
1893 );
1894 paired_waker.wake();
1895 }
1896 }
1897
1898 pub fn close_connection_asymmetric(
1900 &self,
1901 connection_id: ConnectionId,
1902 close_send: bool,
1903 close_recv: bool,
1904 ) {
1905 let mut inner = self.inner.borrow_mut();
1906
1907 let paired_id = inner
1908 .network
1909 .connections
1910 .get(&connection_id)
1911 .and_then(|conn| conn.paired_connection);
1912
1913 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1914 conn.send_closed = true;
1915 conn.send_buffer.clear();
1916 tracing::debug!(
1917 "Connection {} send side closed (asymmetric)",
1918 connection_id.0
1919 );
1920 }
1921
1922 if close_recv
1923 && let Some(paired) = paired_id
1924 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1925 {
1926 paired_conn.recv_closed = true;
1927 tracing::debug!(
1928 "Connection {} recv side closed (asymmetric via peer)",
1929 paired.0
1930 );
1931 }
1932
1933 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1934 waker.wake();
1935 }
1936 if close_recv
1937 && let Some(paired) = paired_id
1938 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
1939 {
1940 waker.wake();
1941 }
1942 }
1943
1944 pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
1946 let mut inner = self.inner.borrow_mut();
1947 let config = &inner.network.config;
1948
1949 if inner
1951 .network
1952 .connections
1953 .get(&connection_id)
1954 .is_some_and(|conn| conn.is_stable)
1955 {
1956 return None;
1957 }
1958
1959 if config.chaos.random_close_probability <= 0.0 {
1960 return None;
1961 }
1962
1963 let current_time = inner.current_time;
1964 let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
1965 if time_since_last < config.chaos.random_close_cooldown {
1966 return None;
1967 }
1968
1969 if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
1970 return None;
1971 }
1972
1973 inner.network.last_random_close_time = current_time;
1974
1975 let paired_id = inner
1976 .network
1977 .connections
1978 .get(&connection_id)
1979 .and_then(|conn| conn.paired_connection);
1980
1981 let a = super::rng::sim_random_f64();
1982 let close_recv = a < 0.66;
1983 let close_send = a > 0.33;
1984
1985 tracing::info!(
1986 "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
1987 connection_id.0,
1988 close_send,
1989 close_recv,
1990 a
1991 );
1992
1993 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1994 conn.send_closed = true;
1995 conn.send_buffer.clear();
1996 }
1997
1998 if close_recv
1999 && let Some(paired) = paired_id
2000 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2001 {
2002 paired_conn.recv_closed = true;
2003 }
2004
2005 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2006 waker.wake();
2007 }
2008 if close_recv
2009 && let Some(paired) = paired_id
2010 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2011 {
2012 waker.wake();
2013 }
2014
2015 let b = super::rng::sim_random_f64();
2016 let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
2017
2018 tracing::debug!(
2019 "Random close explicit={} (b={:.3}, ratio={:.2})",
2020 explicit,
2021 b,
2022 inner.network.config.chaos.random_close_explicit_ratio
2023 );
2024
2025 Some(explicit)
2026 }
2027
2028 pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
2030 let inner = self.inner.borrow();
2031 inner
2032 .network
2033 .connections
2034 .get(&connection_id)
2035 .is_some_and(|conn| conn.send_closed || conn.is_closed)
2036 }
2037
2038 pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
2040 let inner = self.inner.borrow();
2041 inner
2042 .network
2043 .connections
2044 .get(&connection_id)
2045 .is_some_and(|conn| conn.recv_closed || conn.is_closed)
2046 }
2047
2048 pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
2053 let inner = self.inner.borrow();
2054 inner
2055 .network
2056 .connections
2057 .get(&connection_id)
2058 .is_some_and(|conn| conn.remote_fin_received)
2059 }
2060
2061 pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
2074 let mut inner = self.inner.borrow_mut();
2075 let current_time = inner.current_time;
2076 let error_at = current_time + error_delay;
2077
2078 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2079 conn.is_half_open = true;
2080 conn.half_open_error_at = Some(error_at);
2081
2082 conn.paired_connection = None;
2085
2086 tracing::info!(
2087 "Connection {} now half-open, errors manifest at {:?}",
2088 connection_id.0,
2089 error_at
2090 );
2091 }
2092
2093 let wake_event = Event::Connection {
2095 id: connection_id.0,
2096 state: ConnectionStateChange::HalfOpenError,
2097 };
2098 let sequence = inner.next_sequence;
2099 inner.next_sequence += 1;
2100 let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
2101 inner.event_queue.schedule(scheduled_event);
2102 }
2103
2104 pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
2106 let inner = self.inner.borrow();
2107 inner
2108 .network
2109 .connections
2110 .get(&connection_id)
2111 .is_some_and(|conn| conn.is_half_open)
2112 }
2113
2114 pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
2116 let inner = self.inner.borrow();
2117 let current_time = inner.current_time;
2118 inner
2119 .network
2120 .connections
2121 .get(&connection_id)
2122 .is_some_and(|conn| {
2123 conn.is_half_open
2124 && conn
2125 .half_open_error_at
2126 .is_some_and(|error_at| current_time >= error_at)
2127 })
2128 }
2129
2130 pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
2147 let mut inner = self.inner.borrow_mut();
2148 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2149 conn.is_stable = true;
2150 tracing::debug!("Connection {} marked as stable", connection_id.0);
2151
2152 if let Some(paired_id) = conn.paired_connection
2154 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
2155 {
2156 paired_conn.is_stable = true;
2157 tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
2158 }
2159 }
2160 }
2161
2162 pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
2164 let inner = self.inner.borrow();
2165 inner
2166 .network
2167 .connections
2168 .get(&connection_id)
2169 .is_some_and(|conn| conn.is_stable)
2170 }
2171
2172 pub fn partition_pair(
2176 &self,
2177 from_ip: std::net::IpAddr,
2178 to_ip: std::net::IpAddr,
2179 duration: Duration,
2180 ) -> SimulationResult<()> {
2181 let mut inner = self.inner.borrow_mut();
2182 let expires_at = inner.current_time + duration;
2183
2184 inner
2185 .network
2186 .ip_partitions
2187 .insert((from_ip, to_ip), PartitionState { expires_at });
2188
2189 let restore_event = Event::Connection {
2190 id: 0,
2191 state: ConnectionStateChange::PartitionRestore,
2192 };
2193 let sequence = inner.next_sequence;
2194 inner.next_sequence += 1;
2195 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2196 inner.event_queue.schedule(scheduled_event);
2197
2198 tracing::debug!(
2199 "Partitioned {} -> {} until {:?}",
2200 from_ip,
2201 to_ip,
2202 expires_at
2203 );
2204 Ok(())
2205 }
2206
2207 pub fn partition_send_from(
2209 &self,
2210 ip: std::net::IpAddr,
2211 duration: Duration,
2212 ) -> SimulationResult<()> {
2213 let mut inner = self.inner.borrow_mut();
2214 let expires_at = inner.current_time + duration;
2215
2216 inner.network.send_partitions.insert(ip, expires_at);
2217
2218 let clear_event = Event::Connection {
2219 id: 0,
2220 state: ConnectionStateChange::SendPartitionClear,
2221 };
2222 let sequence = inner.next_sequence;
2223 inner.next_sequence += 1;
2224 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2225 inner.event_queue.schedule(scheduled_event);
2226
2227 tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
2228 Ok(())
2229 }
2230
2231 pub fn partition_recv_to(
2233 &self,
2234 ip: std::net::IpAddr,
2235 duration: Duration,
2236 ) -> SimulationResult<()> {
2237 let mut inner = self.inner.borrow_mut();
2238 let expires_at = inner.current_time + duration;
2239
2240 inner.network.recv_partitions.insert(ip, expires_at);
2241
2242 let clear_event = Event::Connection {
2243 id: 0,
2244 state: ConnectionStateChange::RecvPartitionClear,
2245 };
2246 let sequence = inner.next_sequence;
2247 inner.next_sequence += 1;
2248 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2249 inner.event_queue.schedule(scheduled_event);
2250
2251 tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
2252 Ok(())
2253 }
2254
2255 pub fn restore_partition(
2257 &self,
2258 from_ip: std::net::IpAddr,
2259 to_ip: std::net::IpAddr,
2260 ) -> SimulationResult<()> {
2261 let mut inner = self.inner.borrow_mut();
2262 inner.network.ip_partitions.remove(&(from_ip, to_ip));
2263 tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
2264 Ok(())
2265 }
2266
2267 pub fn is_partitioned(
2269 &self,
2270 from_ip: std::net::IpAddr,
2271 to_ip: std::net::IpAddr,
2272 ) -> SimulationResult<bool> {
2273 let inner = self.inner.borrow();
2274 Ok(inner
2275 .network
2276 .is_partitioned(from_ip, to_ip, inner.current_time))
2277 }
2278
2279 fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
2286 let partition_config = &inner.network.config;
2287
2288 if partition_config.chaos.partition_probability == 0.0 {
2289 return;
2290 }
2291
2292 if sim_random::<f64>() >= partition_config.chaos.partition_probability {
2294 return;
2295 }
2296
2297 let unique_ips: HashSet<IpAddr> = inner
2299 .network
2300 .connections
2301 .values()
2302 .filter_map(|conn| conn.local_ip)
2303 .collect();
2304
2305 if unique_ips.len() < 2 {
2306 return; }
2308
2309 let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
2310 let partition_duration =
2311 crate::network::sample_duration(&partition_config.chaos.partition_duration);
2312 let expires_at = inner.current_time + partition_duration;
2313
2314 let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
2316 PartitionStrategy::Random => {
2317 ip_list
2319 .iter()
2320 .filter(|_| sim_random::<f64>() < 0.5)
2321 .copied()
2322 .collect()
2323 }
2324 PartitionStrategy::UniformSize => {
2325 let partition_size = sim_random_range(1..ip_list.len());
2327 let mut shuffled = ip_list.clone();
2329 for i in (1..shuffled.len()).rev() {
2331 let j = sim_random_range(0..i + 1);
2332 shuffled.swap(i, j);
2333 }
2334 shuffled.into_iter().take(partition_size).collect()
2335 }
2336 PartitionStrategy::IsolateSingle => {
2337 let idx = sim_random_range(0..ip_list.len());
2339 vec![ip_list[idx]]
2340 }
2341 };
2342
2343 if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
2345 return;
2346 }
2347
2348 let non_partitioned: Vec<IpAddr> = ip_list
2350 .iter()
2351 .filter(|ip| !partitioned_ips.contains(ip))
2352 .copied()
2353 .collect();
2354
2355 for &from_ip in &partitioned_ips {
2356 for &to_ip in &non_partitioned {
2357 if inner
2359 .network
2360 .is_partitioned(from_ip, to_ip, inner.current_time)
2361 {
2362 continue;
2363 }
2364
2365 inner
2367 .network
2368 .ip_partitions
2369 .insert((from_ip, to_ip), PartitionState { expires_at });
2370 inner
2371 .network
2372 .ip_partitions
2373 .insert((to_ip, from_ip), PartitionState { expires_at });
2374
2375 tracing::debug!(
2376 "Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
2377 from_ip,
2378 to_ip,
2379 expires_at,
2380 partition_config.chaos.partition_strategy
2381 );
2382 }
2383 }
2384
2385 let restore_event = Event::Connection {
2387 id: 0,
2388 state: ConnectionStateChange::PartitionRestore,
2389 };
2390 let sequence = inner.next_sequence;
2391 inner.next_sequence += 1;
2392 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2393 inner.event_queue.schedule(scheduled_event);
2394 }
2395}
2396
2397impl Default for SimWorld {
2398 fn default() -> Self {
2399 Self::new()
2400 }
2401}
2402
2403#[derive(Debug)]
2408pub struct WeakSimWorld {
2409 pub(crate) inner: Weak<RefCell<SimInner>>,
2410}
2411
2412macro_rules! weak_forward {
2414 (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2416 $(#[$meta])*
2417 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2418 Ok(self.upgrade()?.$method($($arg),*))
2419 }
2420 };
2421 (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2423 $(#[$meta])*
2424 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2425 self.upgrade()?.$method($($arg),*)
2426 }
2427 };
2428 (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2430 $(#[$meta])*
2431 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2432 self.upgrade()?.$method($($arg),*);
2433 Ok(())
2434 }
2435 };
2436}
2437
2438impl WeakSimWorld {
2439 pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2441 self.inner
2442 .upgrade()
2443 .map(|inner| SimWorld { inner })
2444 .ok_or(SimulationError::SimulationShutdown)
2445 }
2446
2447 weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2448 weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2449 weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2450 weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2451 weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2452 weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2453 weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2454 weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2455 weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2456 weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2457 weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2458
2459 pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2461 where
2462 F: FnOnce(&NetworkConfiguration) -> R,
2463 {
2464 Ok(self.upgrade()?.with_network_config(f))
2465 }
2466}
2467
2468impl Clone for WeakSimWorld {
2469 fn clone(&self) -> Self {
2470 Self {
2471 inner: self.inner.clone(),
2472 }
2473 }
2474}
2475
2476#[cfg(test)]
2477mod tests {
2478 use super::*;
2479
2480 #[test]
2481 fn sim_world_basic_lifecycle() {
2482 let mut sim = SimWorld::new();
2483
2484 assert_eq!(sim.current_time(), Duration::ZERO);
2486 assert!(!sim.has_pending_events());
2487 assert_eq!(sim.pending_event_count(), 0);
2488
2489 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2491
2492 assert!(sim.has_pending_events());
2493 assert_eq!(sim.pending_event_count(), 1);
2494 assert_eq!(sim.current_time(), Duration::ZERO);
2495
2496 let has_more = sim.step();
2498 assert!(!has_more);
2499 assert_eq!(sim.current_time(), Duration::from_millis(100));
2500 assert!(!sim.has_pending_events());
2501 assert_eq!(sim.pending_event_count(), 0);
2502 }
2503
2504 #[test]
2505 fn sim_world_multiple_events() {
2506 let mut sim = SimWorld::new();
2507
2508 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2510 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2511 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2512
2513 assert_eq!(sim.pending_event_count(), 3);
2514
2515 assert!(sim.step());
2517 assert_eq!(sim.current_time(), Duration::from_millis(100));
2518 assert_eq!(sim.pending_event_count(), 2);
2519
2520 assert!(sim.step());
2521 assert_eq!(sim.current_time(), Duration::from_millis(200));
2522 assert_eq!(sim.pending_event_count(), 1);
2523
2524 assert!(!sim.step());
2525 assert_eq!(sim.current_time(), Duration::from_millis(300));
2526 assert_eq!(sim.pending_event_count(), 0);
2527 }
2528
2529 #[test]
2530 fn sim_world_run_until_empty() {
2531 let mut sim = SimWorld::new();
2532
2533 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2535 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2536 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2537
2538 sim.run_until_empty();
2540
2541 assert_eq!(sim.current_time(), Duration::from_millis(300));
2542 assert!(!sim.has_pending_events());
2543 }
2544
2545 #[test]
2546 fn sim_world_schedule_at_specific_time() {
2547 let mut sim = SimWorld::new();
2548
2549 sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2551
2552 assert_eq!(sim.current_time(), Duration::ZERO);
2553
2554 sim.step();
2555
2556 assert_eq!(sim.current_time(), Duration::from_millis(500));
2557 }
2558
2559 #[test]
2560 fn weak_sim_world_lifecycle() {
2561 let sim = SimWorld::new();
2562 let weak = sim.downgrade();
2563
2564 assert_eq!(
2566 weak.current_time().expect("should get time"),
2567 Duration::ZERO
2568 );
2569
2570 weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2572 .expect("should schedule event");
2573
2574 assert!(sim.has_pending_events());
2576
2577 drop(sim);
2579
2580 assert_eq!(
2582 weak.current_time(),
2583 Err(SimulationError::SimulationShutdown)
2584 );
2585 assert_eq!(
2586 weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2587 Err(SimulationError::SimulationShutdown)
2588 );
2589 }
2590
2591 #[test]
2592 fn deterministic_event_ordering() {
2593 let mut sim = SimWorld::new();
2594
2595 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2597 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2598 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2599
2600 assert!(sim.step());
2602 assert_eq!(sim.current_time(), Duration::from_millis(100));
2603 assert!(sim.step());
2604 assert_eq!(sim.current_time(), Duration::from_millis(100));
2605 assert!(!sim.step());
2606 assert_eq!(sim.current_time(), Duration::from_millis(100));
2607 }
2608}