1use std::{
7 cell::RefCell,
8 collections::{BTreeMap, HashSet, VecDeque},
9 net::IpAddr,
10 rc::{Rc, Weak},
11 task::Waker,
12 time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17 SimulationError, SimulationResult,
18 network::{
19 NetworkConfiguration, PartitionStrategy,
20 sim::{ConnectionId, ListenerId, SimNetworkProvider},
21 },
22};
23
24use super::{
25 events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
26 rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
27 sleep::SleepFuture,
28 state::{
29 ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
30 StorageState,
31 },
32 wakers::WakerRegistry,
33};
34
35#[derive(Debug)]
37pub(crate) struct SimInner {
38 pub(crate) current_time: Duration,
39 pub(crate) timer_time: Duration,
42 pub(crate) event_queue: EventQueue,
43 pub(crate) next_sequence: u64,
44
45 pub(crate) network: NetworkState,
47
48 pub(crate) storage: StorageState,
50
51 pub(crate) wakers: WakerRegistry,
53
54 pub(crate) next_task_id: u64,
56 pub(crate) awakened_tasks: HashSet<u64>,
57
58 pub(crate) events_processed: u64,
60
61 pub(crate) last_bit_flip_time: Duration,
63
64 pub(crate) last_processed_event: Option<Event>,
66}
67
68impl SimInner {
69 pub(crate) fn new() -> Self {
70 Self {
71 current_time: Duration::ZERO,
72 timer_time: Duration::ZERO,
73 event_queue: EventQueue::new(),
74 next_sequence: 0,
75 network: NetworkState::new(NetworkConfiguration::default()),
76 storage: StorageState::default(),
77 wakers: WakerRegistry::default(),
78 next_task_id: 0,
79 awakened_tasks: HashSet::new(),
80 events_processed: 0,
81 last_bit_flip_time: Duration::ZERO,
82 last_processed_event: None,
83 }
84 }
85
86 pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
87 Self {
88 current_time: Duration::ZERO,
89 timer_time: Duration::ZERO,
90 event_queue: EventQueue::new(),
91 next_sequence: 0,
92 network: NetworkState::new(network_config),
93 storage: StorageState::default(),
94 wakers: WakerRegistry::default(),
95 next_task_id: 0,
96 awakened_tasks: HashSet::new(),
97 events_processed: 0,
98 last_bit_flip_time: Duration::ZERO,
99 last_processed_event: None,
100 }
101 }
102
103 pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
113 if random_value == 0 {
114 return max_bits.min(32);
116 }
117
118 let bit_count = 1 + random_value.leading_zeros();
120
121 bit_count.clamp(min_bits, max_bits)
123 }
124}
125
126#[derive(Debug)]
132pub struct SimWorld {
133 pub(crate) inner: Rc<RefCell<SimInner>>,
134}
135
136impl SimWorld {
137 fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
139 reset_sim_rng();
140 set_sim_seed(seed);
141 crate::chaos::assertions::reset_assertion_results();
142
143 let inner = match network_config {
144 Some(config) => SimInner::new_with_config(config),
145 None => SimInner::new(),
146 };
147
148 Self {
149 inner: Rc::new(RefCell::new(inner)),
150 }
151 }
152
153 pub fn new() -> Self {
158 Self::create(None, 0)
159 }
160
161 pub fn new_with_seed(seed: u64) -> Self {
170 Self::create(None, seed)
171 }
172
173 pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
175 Self::create(Some(network_config), 0)
176 }
177
178 pub fn new_with_network_config_and_seed(
185 network_config: NetworkConfiguration,
186 seed: u64,
187 ) -> Self {
188 Self::create(Some(network_config), seed)
189 }
190
191 #[instrument(skip(self))]
196 pub fn step(&mut self) -> bool {
197 let mut inner = self.inner.borrow_mut();
198
199 if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
200 inner.current_time = scheduled_event.time();
202
203 Self::clear_expired_clogs_with_inner(&mut inner);
205
206 Self::randomly_trigger_partitions_with_inner(&mut inner);
208
209 let event = scheduled_event.into_event();
211 inner.last_processed_event = Some(event.clone());
212
213 Self::process_event_with_inner(&mut inner, event);
215
216 !inner.event_queue.is_empty()
218 } else {
219 inner.last_processed_event = None;
220 false
222 }
223 }
224
225 #[instrument(skip(self))]
231 pub fn run_until_empty(&mut self) {
232 while self.step() {
233 if self.inner.borrow().events_processed.is_multiple_of(50) {
235 let has_workload_events = !self
236 .inner
237 .borrow()
238 .event_queue
239 .has_only_infrastructure_events();
240 if !has_workload_events {
241 tracing::debug!(
242 "Early termination: only infrastructure events remain in queue"
243 );
244 break;
245 }
246 }
247 }
248 }
249
250 pub fn current_time(&self) -> Duration {
252 self.inner.borrow().current_time
253 }
254
255 pub fn now(&self) -> Duration {
260 self.inner.borrow().current_time
261 }
262
263 pub fn timer(&self) -> Duration {
278 let mut inner = self.inner.borrow_mut();
279 let chaos = &inner.network.config.chaos;
280
281 if !chaos.clock_drift_enabled {
283 return inner.current_time;
284 }
285
286 let max_timer = inner.current_time + chaos.clock_drift_max;
290
291 if inner.timer_time < max_timer {
293 let random_factor = sim_random::<f64>(); let gap = (max_timer - inner.timer_time).as_secs_f64();
295 let delta = random_factor * gap / 2.0;
296 inner.timer_time += Duration::from_secs_f64(delta);
297 }
298
299 inner.timer_time = inner.timer_time.max(inner.current_time);
301
302 inner.timer_time
303 }
304
305 #[instrument(skip(self))]
307 pub fn schedule_event(&self, event: Event, delay: Duration) {
308 let mut inner = self.inner.borrow_mut();
309 let scheduled_time = inner.current_time + delay;
310 let sequence = inner.next_sequence;
311 inner.next_sequence += 1;
312
313 let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
314 inner.event_queue.schedule(scheduled_event);
315 }
316
317 pub fn schedule_event_at(&self, event: Event, time: Duration) {
319 let mut inner = self.inner.borrow_mut();
320 let sequence = inner.next_sequence;
321 inner.next_sequence += 1;
322
323 let scheduled_event = ScheduledEvent::new(time, event, sequence);
324 inner.event_queue.schedule(scheduled_event);
325 }
326
327 pub fn downgrade(&self) -> WeakSimWorld {
332 WeakSimWorld {
333 inner: Rc::downgrade(&self.inner),
334 }
335 }
336
337 pub fn has_pending_events(&self) -> bool {
339 !self.inner.borrow().event_queue.is_empty()
340 }
341
342 pub fn pending_event_count(&self) -> usize {
344 self.inner.borrow().event_queue.len()
345 }
346
347 pub fn network_provider(&self) -> SimNetworkProvider {
349 SimNetworkProvider::new(self.downgrade())
350 }
351
352 pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
354 crate::providers::SimTimeProvider::new(self.downgrade())
355 }
356
357 pub fn task_provider(&self) -> crate::TokioTaskProvider {
359 crate::TokioTaskProvider
360 }
361
362 pub fn storage_provider(&self) -> crate::storage::SimStorageProvider {
364 crate::storage::SimStorageProvider::new(self.downgrade())
365 }
366
367 pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
371 self.inner.borrow_mut().storage.config = config;
372 }
373
374 pub fn with_network_config<F, R>(&self, f: F) -> R
382 where
383 F: FnOnce(&NetworkConfiguration) -> R,
384 {
385 let inner = self.inner.borrow();
386 f(&inner.network.config)
387 }
388
389 pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
391 let mut inner = self.inner.borrow_mut();
392 let listener_id = ListenerId(inner.network.next_listener_id);
393 inner.network.next_listener_id += 1;
394
395 inner.network.listeners.insert(
396 listener_id,
397 ListenerState {
398 id: listener_id,
399 addr,
400 pending_connections: VecDeque::new(),
401 },
402 );
403
404 Ok(listener_id)
405 }
406
407 pub(crate) fn read_from_connection(
409 &self,
410 connection_id: ConnectionId,
411 buf: &mut [u8],
412 ) -> SimulationResult<usize> {
413 let mut inner = self.inner.borrow_mut();
414
415 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
416 let mut bytes_read = 0;
417 while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
418 if let Some(byte) = connection.receive_buffer.pop_front() {
419 buf[bytes_read] = byte;
420 bytes_read += 1;
421 }
422 }
423 Ok(bytes_read)
424 } else {
425 Err(SimulationError::InvalidState(
426 "connection not found".to_string(),
427 ))
428 }
429 }
430
431 pub(crate) fn write_to_connection(
433 &self,
434 connection_id: ConnectionId,
435 data: &[u8],
436 ) -> SimulationResult<()> {
437 let mut inner = self.inner.borrow_mut();
438
439 if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
440 for &byte in data {
441 connection.receive_buffer.push_back(byte);
442 }
443 Ok(())
444 } else {
445 Err(SimulationError::InvalidState(
446 "connection not found".to_string(),
447 ))
448 }
449 }
450
451 pub(crate) fn buffer_send(
456 &self,
457 connection_id: ConnectionId,
458 data: Vec<u8>,
459 ) -> SimulationResult<()> {
460 tracing::debug!(
461 "buffer_send called for connection_id={} with {} bytes",
462 connection_id.0,
463 data.len()
464 );
465 let mut inner = self.inner.borrow_mut();
466
467 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
468 conn.send_buffer.push_back(data);
470 tracing::debug!(
471 "buffer_send: added data to send_buffer, new length: {}",
472 conn.send_buffer.len()
473 );
474
475 if !conn.send_in_progress {
477 tracing::debug!(
478 "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
479 );
480 conn.send_in_progress = true;
481
482 let scheduled_time = inner.current_time + std::time::Duration::ZERO;
484 let sequence = inner.next_sequence;
485 inner.next_sequence += 1;
486 let scheduled_event = ScheduledEvent::new(
487 scheduled_time,
488 Event::Network {
489 connection_id: connection_id.0,
490 operation: NetworkOperation::ProcessSendBuffer,
491 },
492 sequence,
493 );
494 inner.event_queue.schedule(scheduled_event);
495 tracing::debug!(
496 "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
497 sequence
498 );
499 } else {
500 tracing::debug!(
501 "buffer_send: sender already in progress, not scheduling new event"
502 );
503 }
504
505 Ok(())
506 } else {
507 tracing::debug!(
508 "buffer_send: connection_id={} not found in connections table",
509 connection_id.0
510 );
511 Err(SimulationError::InvalidState(
512 "connection not found".to_string(),
513 ))
514 }
515 }
516
517 pub(crate) fn create_connection_pair(
525 &self,
526 client_addr: String,
527 server_addr: String,
528 ) -> SimulationResult<(ConnectionId, ConnectionId)> {
529 let mut inner = self.inner.borrow_mut();
530
531 let client_id = ConnectionId(inner.network.next_connection_id);
532 inner.network.next_connection_id += 1;
533
534 let server_id = ConnectionId(inner.network.next_connection_id);
535 inner.network.next_connection_id += 1;
536
537 let current_time = inner.current_time;
539
540 let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
542 let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
543
544 let ephemeral_peer_addr = match client_ip {
548 Some(std::net::IpAddr::V4(ipv4)) => {
549 let octets = ipv4.octets();
550 let ip_offset = sim_random_range(0u32..256) as u8;
551 let new_last_octet = octets[3].wrapping_add(ip_offset);
552 let ephemeral_ip =
553 std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
554 let ephemeral_port = sim_random_range(40000u16..60000);
555 format!("{}:{}", ephemeral_ip, ephemeral_port)
556 }
557 Some(std::net::IpAddr::V6(ipv6)) => {
558 let segments = ipv6.segments();
560 let mut new_segments = segments;
561 let ip_offset = sim_random_range(0u16..256);
562 new_segments[7] = new_segments[7].wrapping_add(ip_offset);
563 let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
564 let ephemeral_port = sim_random_range(40000u16..60000);
565 format!("[{}]:{}", ephemeral_ip, ephemeral_port)
566 }
567 None => {
568 let ephemeral_port = sim_random_range(40000u16..60000);
570 format!("unknown:{}", ephemeral_port)
571 }
572 };
573
574 const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; inner.network.connections.insert(
582 client_id,
583 ConnectionState {
584 id: client_id,
585 addr: client_addr,
586 local_ip: client_ip,
587 remote_ip: server_ip,
588 peer_address: server_addr.clone(),
589 receive_buffer: VecDeque::new(),
590 paired_connection: Some(server_id),
591 send_buffer: VecDeque::new(),
592 send_in_progress: false,
593 next_send_time: current_time,
594 is_closed: false,
595 send_closed: false,
596 recv_closed: false,
597 is_cut: false,
598 cut_expiry: None,
599 close_reason: CloseReason::None,
600 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
601 send_delay: None,
602 recv_delay: None,
603 is_half_open: false,
604 half_open_error_at: None,
605 is_stable: false,
606 graceful_close_pending: false,
607 last_data_delivery_scheduled_at: None,
608 remote_fin_received: false,
609 },
610 );
611
612 inner.network.connections.insert(
614 server_id,
615 ConnectionState {
616 id: server_id,
617 addr: server_addr,
618 local_ip: server_ip,
619 remote_ip: client_ip,
620 peer_address: ephemeral_peer_addr,
621 receive_buffer: VecDeque::new(),
622 paired_connection: Some(client_id),
623 send_buffer: VecDeque::new(),
624 send_in_progress: false,
625 next_send_time: current_time,
626 is_closed: false,
627 send_closed: false,
628 recv_closed: false,
629 is_cut: false,
630 cut_expiry: None,
631 close_reason: CloseReason::None,
632 send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
633 send_delay: None,
634 recv_delay: None,
635 is_half_open: false,
636 half_open_error_at: None,
637 is_stable: false,
638 graceful_close_pending: false,
639 last_data_delivery_scheduled_at: None,
640 remote_fin_received: false,
641 },
642 );
643
644 Ok((client_id, server_id))
645 }
646
647 pub(crate) fn register_read_waker(
649 &self,
650 connection_id: ConnectionId,
651 waker: Waker,
652 ) -> SimulationResult<()> {
653 let mut inner = self.inner.borrow_mut();
654 let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
655 inner.wakers.read_wakers.insert(connection_id, waker);
656 tracing::debug!(
657 "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
658 connection_id.0,
659 is_replacement,
660 inner.wakers.read_wakers.len()
661 );
662 Ok(())
663 }
664
665 pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
667 let mut inner = self.inner.borrow_mut();
668 use std::collections::hash_map::DefaultHasher;
670 use std::hash::{Hash, Hasher};
671 let mut hasher = DefaultHasher::new();
672 addr.hash(&mut hasher);
673 let listener_key = ListenerId(hasher.finish());
674
675 inner.wakers.listener_wakers.insert(listener_key, waker);
676 Ok(())
677 }
678
679 pub(crate) fn store_pending_connection(
681 &self,
682 addr: &str,
683 connection_id: ConnectionId,
684 ) -> SimulationResult<()> {
685 let mut inner = self.inner.borrow_mut();
686 inner
687 .network
688 .pending_connections
689 .insert(addr.to_string(), connection_id);
690
691 use std::collections::hash_map::DefaultHasher;
693 use std::hash::{Hash, Hasher};
694 let mut hasher = DefaultHasher::new();
695 addr.hash(&mut hasher);
696 let listener_key = ListenerId(hasher.finish());
697
698 if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
699 waker.wake();
700 }
701
702 Ok(())
703 }
704
705 pub(crate) fn get_pending_connection(
707 &self,
708 addr: &str,
709 ) -> SimulationResult<Option<ConnectionId>> {
710 let mut inner = self.inner.borrow_mut();
711 Ok(inner.network.pending_connections.remove(addr))
712 }
713
714 pub(crate) fn get_connection_peer_address(
723 &self,
724 connection_id: ConnectionId,
725 ) -> Option<String> {
726 let inner = self.inner.borrow();
727 inner
728 .network
729 .connections
730 .get(&connection_id)
731 .map(|conn| conn.peer_address.clone())
732 }
733
734 #[instrument(skip(self))]
739 pub fn sleep(&self, duration: Duration) -> SleepFuture {
740 let task_id = self.generate_task_id();
741
742 let actual_duration = self.apply_buggified_delay(duration);
744
745 self.schedule_event(Event::Timer { task_id }, actual_duration);
747
748 SleepFuture::new(self.downgrade(), task_id)
750 }
751
752 fn apply_buggified_delay(&self, duration: Duration) -> Duration {
754 let inner = self.inner.borrow();
755 let chaos = &inner.network.config.chaos;
756
757 if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
758 return duration;
759 }
760
761 if sim_random::<f64>() < chaos.buggified_delay_probability {
763 let random_factor = sim_random::<f64>().powf(1000.0);
765 let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
766 tracing::trace!(
767 extra_delay_ms = extra_delay.as_millis(),
768 "Buggified delay applied"
769 );
770 duration + extra_delay
771 } else {
772 duration
773 }
774 }
775
776 fn generate_task_id(&self) -> u64 {
778 let mut inner = self.inner.borrow_mut();
779 let task_id = inner.next_task_id;
780 inner.next_task_id += 1;
781 task_id
782 }
783
784 fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
786 if let Some(waker_list) = wakers.remove(&connection_id) {
787 for waker in waker_list {
788 waker.wake();
789 }
790 }
791 }
792
793 pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
795 let inner = self.inner.borrow();
796 Ok(inner.awakened_tasks.contains(&task_id))
797 }
798
799 pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
801 let mut inner = self.inner.borrow_mut();
802 inner.wakers.task_wakers.insert(task_id, waker);
803 Ok(())
804 }
805
806 fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
808 let now = inner.current_time;
809 let expired: Vec<ConnectionId> = inner
810 .network
811 .connection_clogs
812 .iter()
813 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
814 .collect();
815
816 for id in expired {
817 inner.network.connection_clogs.remove(&id);
818 Self::wake_all(&mut inner.wakers.clog_wakers, id);
819 }
820 }
821
822 #[instrument(skip(inner))]
824 fn process_event_with_inner(inner: &mut SimInner, event: Event) {
825 inner.events_processed += 1;
826
827 match event {
828 Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
829 Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
830 Event::Network {
831 connection_id,
832 operation,
833 } => Self::handle_network_event(inner, connection_id, operation),
834 Event::Storage { file_id, operation } => {
835 super::storage_ops::handle_storage_event(inner, file_id, operation)
836 }
837 Event::Shutdown => Self::handle_shutdown_event(inner),
838 Event::ProcessRestart { ip }
839 | Event::ProcessGracefulShutdown { ip, .. }
840 | Event::ProcessForceKill { ip, .. } => {
841 tracing::debug!("Process lifecycle event for IP {}", ip);
844 }
845 }
846 }
847
848 fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
850 inner.awakened_tasks.insert(task_id);
851 if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
852 waker.wake();
853 }
854 }
855
856 fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
858 let connection_id = ConnectionId(id);
859
860 match state {
861 ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
862 }
864 ConnectionStateChange::ClogClear => {
865 inner.network.connection_clogs.remove(&connection_id);
866 Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
867 }
868 ConnectionStateChange::ReadClogClear => {
869 inner.network.read_clogs.remove(&connection_id);
870 Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
871 }
872 ConnectionStateChange::PartitionRestore => {
873 Self::clear_expired_partitions(inner);
874 }
875 ConnectionStateChange::SendPartitionClear => {
876 Self::clear_expired_send_partitions(inner);
877 }
878 ConnectionStateChange::RecvPartitionClear => {
879 Self::clear_expired_recv_partitions(inner);
880 }
881 ConnectionStateChange::CutRestore => {
882 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
883 && conn.is_cut
884 {
885 conn.is_cut = false;
886 conn.cut_expiry = None;
887 tracing::debug!("Connection {} restored via scheduled event", id);
888 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
889 }
890 }
891 ConnectionStateChange::HalfOpenError => {
892 tracing::debug!("Connection {} half-open error time reached", id);
893 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
894 waker.wake();
895 }
896 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
897 }
898 }
899 }
900
901 fn clear_expired_partitions(inner: &mut SimInner) {
903 let now = inner.current_time;
904 let expired: Vec<_> = inner
905 .network
906 .ip_partitions
907 .iter()
908 .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
909 .collect();
910
911 for pair in expired {
912 inner.network.ip_partitions.remove(&pair);
913 tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
914 }
915 }
916
917 fn clear_expired_send_partitions(inner: &mut SimInner) {
919 let now = inner.current_time;
920 let expired: Vec<_> = inner
921 .network
922 .send_partitions
923 .iter()
924 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
925 .collect();
926
927 for ip in expired {
928 inner.network.send_partitions.remove(&ip);
929 tracing::debug!("Cleared send partition for {}", ip);
930 }
931 }
932
933 fn clear_expired_recv_partitions(inner: &mut SimInner) {
935 let now = inner.current_time;
936 let expired: Vec<_> = inner
937 .network
938 .recv_partitions
939 .iter()
940 .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
941 .collect();
942
943 for ip in expired {
944 inner.network.recv_partitions.remove(&ip);
945 tracing::debug!("Cleared receive partition for {}", ip);
946 }
947 }
948
949 fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
951 let connection_id = ConnectionId(conn_id);
952
953 match operation {
954 NetworkOperation::DataDelivery { data } => {
955 Self::handle_data_delivery(inner, connection_id, data);
956 }
957 NetworkOperation::ProcessSendBuffer => {
958 Self::handle_process_send_buffer(inner, connection_id);
959 }
960 NetworkOperation::FinDelivery => {
961 Self::handle_fin_delivery(inner, connection_id);
962 }
963 }
964 }
965
966 fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
968 tracing::trace!(
969 "DataDelivery: {} bytes to connection {}",
970 data.len(),
971 connection_id.0
972 );
973
974 let is_stable = inner
976 .network
977 .connections
978 .get(&connection_id)
979 .is_some_and(|conn| conn.is_stable);
980
981 if !inner.network.connections.contains_key(&connection_id) {
982 tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
983 return;
984 }
985
986 let data_to_deliver = if is_stable {
988 data
989 } else {
990 Self::maybe_corrupt_data(inner, &data)
991 };
992
993 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
995 return;
996 };
997
998 for &byte in &data_to_deliver {
999 conn.receive_buffer.push_back(byte);
1000 }
1001
1002 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1003 waker.wake();
1004 }
1005 }
1006
1007 fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
1013 tracing::debug!(
1014 "FinDelivery: FIN received on connection {}",
1015 connection_id.0
1016 );
1017
1018 let is_closed = inner
1020 .network
1021 .connections
1022 .get(&connection_id)
1023 .is_some_and(|conn| conn.is_closed);
1024
1025 if is_closed {
1026 tracing::debug!(
1027 "FinDelivery: connection {} already closed, ignoring stale FIN",
1028 connection_id.0
1029 );
1030 return;
1031 }
1032
1033 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1034 conn.remote_fin_received = true;
1035 }
1036
1037 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1038 waker.wake();
1039 }
1040 }
1041
1042 fn schedule_fin_delivery(
1047 inner: &mut SimInner,
1048 paired_id: Option<ConnectionId>,
1049 last_delivery_time: Option<Duration>,
1050 ) {
1051 let Some(peer_id) = paired_id else {
1052 return;
1053 };
1054
1055 let fin_time = match last_delivery_time {
1057 Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
1058 _ => inner.current_time + Duration::from_nanos(1),
1059 };
1060
1061 let sequence = inner.next_sequence;
1062 inner.next_sequence += 1;
1063
1064 tracing::debug!(
1065 "Scheduling FinDelivery to connection {} at {:?}",
1066 peer_id.0,
1067 fin_time
1068 );
1069
1070 inner.event_queue.schedule(ScheduledEvent::new(
1071 fin_time,
1072 Event::Network {
1073 connection_id: peer_id.0,
1074 operation: NetworkOperation::FinDelivery,
1075 },
1076 sequence,
1077 ));
1078 }
1079
1080 fn maybe_corrupt_data(inner: &mut SimInner, data: &[u8]) -> Vec<u8> {
1082 if data.is_empty() {
1083 return data.to_vec();
1084 }
1085
1086 let chaos = &inner.network.config.chaos;
1087 let now = inner.current_time;
1088 let cooldown_elapsed =
1089 now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
1090
1091 if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
1092 return data.to_vec();
1093 }
1094
1095 let random_value = sim_random::<u32>();
1096 let flip_count = SimInner::calculate_flip_bit_count(
1097 random_value,
1098 chaos.bit_flip_min_bits,
1099 chaos.bit_flip_max_bits,
1100 );
1101
1102 let mut corrupted_data = data.to_vec();
1103 let mut flipped_positions = std::collections::HashSet::new();
1104
1105 for _ in 0..flip_count {
1106 let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
1107 let bit_idx = (sim_random::<u64>() as usize) % 8;
1108 let position = (byte_idx, bit_idx);
1109
1110 if !flipped_positions.contains(&position) {
1111 flipped_positions.insert(position);
1112 corrupted_data[byte_idx] ^= 1 << bit_idx;
1113 }
1114 }
1115
1116 inner.last_bit_flip_time = now;
1117 tracing::info!(
1118 "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
1119 data.len(),
1120 flip_count,
1121 flipped_positions.len()
1122 );
1123
1124 corrupted_data
1125 }
1126
1127 fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1129 let is_partitioned = inner
1130 .network
1131 .is_connection_partitioned(connection_id, inner.current_time);
1132
1133 if is_partitioned {
1134 Self::handle_partitioned_send(inner, connection_id);
1135 } else {
1136 Self::handle_normal_send(inner, connection_id);
1137 }
1138 }
1139
1140 fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1142 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1143 return;
1144 };
1145
1146 if let Some(data) = conn.send_buffer.pop_front() {
1147 tracing::debug!(
1148 "Connection {} partitioned, failing send of {} bytes",
1149 connection_id.0,
1150 data.len()
1151 );
1152 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1153
1154 if !conn.send_buffer.is_empty() {
1155 Self::schedule_process_send_buffer(inner, connection_id);
1156 } else {
1157 conn.send_in_progress = false;
1158 if conn.graceful_close_pending {
1160 conn.graceful_close_pending = false;
1161 let peer_id = conn.paired_connection;
1162 let last_time = conn.last_data_delivery_scheduled_at;
1163 Self::schedule_fin_delivery(inner, peer_id, last_time);
1164 }
1165 }
1166 } else {
1167 conn.send_in_progress = false;
1168 if conn.graceful_close_pending {
1170 conn.graceful_close_pending = false;
1171 let peer_id = conn.paired_connection;
1172 let last_time = conn.last_data_delivery_scheduled_at;
1173 Self::schedule_fin_delivery(inner, peer_id, last_time);
1174 }
1175 }
1176 }
1177
1178 fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1180 let Some(conn) = inner.network.connections.get(&connection_id) else {
1182 return;
1183 };
1184
1185 let paired_id = conn.paired_connection;
1186 let send_delay = conn.send_delay;
1187 let next_send_time = conn.next_send_time;
1188 let has_data = !conn.send_buffer.is_empty();
1189 let is_stable = conn.is_stable; let recv_delay = paired_id.and_then(|pid| {
1192 inner
1193 .network
1194 .connections
1195 .get(&pid)
1196 .and_then(|c| c.recv_delay)
1197 });
1198
1199 if !has_data {
1200 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1201 conn.send_in_progress = false;
1202 if conn.graceful_close_pending {
1204 conn.graceful_close_pending = false;
1205 let peer_id = conn.paired_connection;
1206 let last_time = conn.last_data_delivery_scheduled_at;
1207 Self::schedule_fin_delivery(inner, peer_id, last_time);
1208 }
1209 }
1210 return;
1211 }
1212
1213 let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1214 return;
1215 };
1216
1217 let Some(mut data) = conn.send_buffer.pop_front() else {
1218 conn.send_in_progress = false;
1219 return;
1220 };
1221
1222 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1223
1224 if !is_stable && crate::buggify!() && !data.is_empty() {
1226 let max_send = std::cmp::min(
1227 data.len(),
1228 inner.network.config.chaos.partial_write_max_bytes,
1229 );
1230 let truncate_to = sim_random_range(0..max_send + 1);
1231
1232 if truncate_to < data.len() {
1233 let remainder = data.split_off(truncate_to);
1234 conn.send_buffer.push_front(remainder);
1235 tracing::debug!(
1236 "BUGGIFY: Partial write on connection {} - sending {} bytes",
1237 connection_id.0,
1238 data.len()
1239 );
1240 }
1241 }
1242
1243 let has_more = !conn.send_buffer.is_empty();
1244 let base_delay = if has_more {
1245 Duration::from_nanos(1)
1246 } else {
1247 send_delay.unwrap_or_else(|| {
1248 crate::network::sample_duration(&inner.network.config.write_latency)
1249 })
1250 };
1251
1252 let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1253 conn.next_send_time = earliest_time + Duration::from_nanos(1);
1254
1255 if let Some(paired_id) = paired_id {
1257 let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1258 let sequence = inner.next_sequence;
1259 inner.next_sequence += 1;
1260
1261 inner.event_queue.schedule(ScheduledEvent::new(
1262 scheduled_time,
1263 Event::Network {
1264 connection_id: paired_id.0,
1265 operation: NetworkOperation::DataDelivery { data },
1266 },
1267 sequence,
1268 ));
1269
1270 conn.last_data_delivery_scheduled_at = Some(scheduled_time);
1272 }
1273
1274 if !conn.send_buffer.is_empty() {
1276 Self::schedule_process_send_buffer(inner, connection_id);
1277 } else {
1278 conn.send_in_progress = false;
1279 if conn.graceful_close_pending {
1281 conn.graceful_close_pending = false;
1282 let peer_id = conn.paired_connection;
1283 let last_time = conn.last_data_delivery_scheduled_at;
1284 Self::schedule_fin_delivery(inner, peer_id, last_time);
1285 }
1286 }
1287 }
1288
1289 fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1291 let sequence = inner.next_sequence;
1292 inner.next_sequence += 1;
1293
1294 inner.event_queue.schedule(ScheduledEvent::new(
1295 inner.current_time,
1296 Event::Network {
1297 connection_id: connection_id.0,
1298 operation: NetworkOperation::ProcessSendBuffer,
1299 },
1300 sequence,
1301 ));
1302 }
1303
1304 fn handle_shutdown_event(inner: &mut SimInner) {
1306 tracing::debug!("Processing Shutdown event - waking all pending tasks");
1307
1308 for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
1309 tracing::trace!("Waking task {}", task_id);
1310 waker.wake();
1311 }
1312
1313 for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
1314 waker.wake();
1315 }
1316
1317 tracing::debug!("Shutdown event processed");
1318 }
1319
1320 pub fn assertion_results(
1322 &self,
1323 ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1324 crate::chaos::get_assertion_results()
1325 }
1326
1327 pub fn reset_assertion_results(&self) {
1329 crate::chaos::reset_assertion_results();
1330 }
1331
1332 pub fn abort_all_connections_for_ip(&self, ip: std::net::IpAddr) {
1338 let connection_ids: Vec<ConnectionId> = {
1339 let inner = self.inner.borrow();
1340 inner
1341 .network
1342 .connections
1343 .iter()
1344 .filter_map(|(id, conn)| {
1345 if conn.local_ip == Some(ip) || conn.remote_ip == Some(ip) {
1346 Some(*id)
1347 } else {
1348 None
1349 }
1350 })
1351 .collect()
1352 };
1353
1354 let count = connection_ids.len();
1355 for conn_id in connection_ids {
1356 self.close_connection_abort(conn_id);
1357 }
1358
1359 if count > 0 {
1360 tracing::debug!("Aborted {} connections for rebooted IP {}", count, ip);
1361 }
1362 }
1363
1364 pub fn schedule_process_restart(
1368 &self,
1369 ip: std::net::IpAddr,
1370 recovery_delay: std::time::Duration,
1371 ) {
1372 self.schedule_event(Event::ProcessRestart { ip }, recovery_delay);
1373 tracing::debug!(
1374 "Scheduled process restart for IP {} in {:?}",
1375 ip,
1376 recovery_delay
1377 );
1378 }
1379
1380 pub fn last_processed_event(&self) -> Option<Event> {
1385 self.inner.borrow().last_processed_event.clone()
1386 }
1387
1388 pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1390 let inner = self.inner.borrow();
1391
1392 crate::runner::SimulationMetrics {
1393 wall_time: std::time::Duration::ZERO,
1394 simulated_time: inner.current_time,
1395 events_processed: inner.events_processed,
1396 }
1397 }
1398
1399 pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1403 let inner = self.inner.borrow();
1404 let config = &inner.network.config;
1405
1406 if inner
1408 .network
1409 .connections
1410 .get(&connection_id)
1411 .is_some_and(|conn| conn.is_stable)
1412 {
1413 return false;
1414 }
1415
1416 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1418 return inner.current_time < clog_state.expires_at;
1419 }
1420
1421 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1423 }
1424
1425 pub fn clog_write(&self, connection_id: ConnectionId) {
1427 let mut inner = self.inner.borrow_mut();
1428 let config = &inner.network.config;
1429
1430 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1431 let expires_at = inner.current_time + clog_duration;
1432 inner
1433 .network
1434 .connection_clogs
1435 .insert(connection_id, ClogState { expires_at });
1436
1437 let clear_event = Event::Connection {
1439 id: connection_id.0,
1440 state: ConnectionStateChange::ClogClear,
1441 };
1442 let sequence = inner.next_sequence;
1443 inner.next_sequence += 1;
1444 inner
1445 .event_queue
1446 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1447 }
1448
1449 pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1451 let inner = self.inner.borrow();
1452
1453 if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1454 inner.current_time < clog_state.expires_at
1455 } else {
1456 false
1457 }
1458 }
1459
1460 pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1462 let mut inner = self.inner.borrow_mut();
1463 inner
1464 .wakers
1465 .clog_wakers
1466 .entry(connection_id)
1467 .or_default()
1468 .push(waker);
1469 }
1470
1471 pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1475 let inner = self.inner.borrow();
1476 let config = &inner.network.config;
1477
1478 if inner
1480 .network
1481 .connections
1482 .get(&connection_id)
1483 .is_some_and(|conn| conn.is_stable)
1484 {
1485 return false;
1486 }
1487
1488 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1490 return inner.current_time < clog_state.expires_at;
1491 }
1492
1493 config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1495 }
1496
1497 pub fn clog_read(&self, connection_id: ConnectionId) {
1499 let mut inner = self.inner.borrow_mut();
1500 let config = &inner.network.config;
1501
1502 let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1503 let expires_at = inner.current_time + clog_duration;
1504 inner
1505 .network
1506 .read_clogs
1507 .insert(connection_id, ClogState { expires_at });
1508
1509 let clear_event = Event::Connection {
1511 id: connection_id.0,
1512 state: ConnectionStateChange::ReadClogClear,
1513 };
1514 let sequence = inner.next_sequence;
1515 inner.next_sequence += 1;
1516 inner
1517 .event_queue
1518 .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1519 }
1520
1521 pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1523 let inner = self.inner.borrow();
1524
1525 if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1526 inner.current_time < clog_state.expires_at
1527 } else {
1528 false
1529 }
1530 }
1531
1532 pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1534 let mut inner = self.inner.borrow_mut();
1535 inner
1536 .wakers
1537 .read_clog_wakers
1538 .entry(connection_id)
1539 .or_default()
1540 .push(waker);
1541 }
1542
1543 pub fn clear_expired_clogs(&self) {
1545 let mut inner = self.inner.borrow_mut();
1546 let now = inner.current_time;
1547 let expired: Vec<ConnectionId> = inner
1548 .network
1549 .connection_clogs
1550 .iter()
1551 .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1552 .collect();
1553
1554 for id in expired {
1555 inner.network.connection_clogs.remove(&id);
1556 Self::wake_all(&mut inner.wakers.clog_wakers, id);
1557 }
1558 }
1559
1560 pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1573 let mut inner = self.inner.borrow_mut();
1574 let expires_at = inner.current_time + duration;
1575
1576 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1577 conn.is_cut = true;
1578 conn.cut_expiry = Some(expires_at);
1579 tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1580
1581 let restore_event = Event::Connection {
1583 id: connection_id.0,
1584 state: ConnectionStateChange::CutRestore,
1585 };
1586 let sequence = inner.next_sequence;
1587 inner.next_sequence += 1;
1588 inner
1589 .event_queue
1590 .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1591 }
1592 }
1593
1594 pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1599 let inner = self.inner.borrow();
1600 inner
1601 .network
1602 .connections
1603 .get(&connection_id)
1604 .is_some_and(|conn| {
1605 conn.is_cut
1606 && conn
1607 .cut_expiry
1608 .is_some_and(|expiry| inner.current_time < expiry)
1609 })
1610 }
1611
1612 pub fn restore_connection(&self, connection_id: ConnectionId) {
1616 let mut inner = self.inner.borrow_mut();
1617
1618 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1619 && conn.is_cut
1620 {
1621 conn.is_cut = false;
1622 conn.cut_expiry = None;
1623 tracing::debug!("Connection {} restored", connection_id.0);
1624
1625 Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1627 }
1628 }
1629
1630 pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1632 let mut inner = self.inner.borrow_mut();
1633 inner
1634 .wakers
1635 .cut_wakers
1636 .entry(connection_id)
1637 .or_default()
1638 .push(waker);
1639 }
1640
1641 pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1645 let inner = self.inner.borrow();
1646 inner
1647 .network
1648 .connections
1649 .get(&connection_id)
1650 .map(|conn| conn.send_buffer_capacity)
1651 .unwrap_or(0)
1652 }
1653
1654 pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1656 let inner = self.inner.borrow();
1657 inner
1658 .network
1659 .connections
1660 .get(&connection_id)
1661 .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1662 .unwrap_or(0)
1663 }
1664
1665 pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1667 let capacity = self.send_buffer_capacity(connection_id);
1668 let used = self.send_buffer_used(connection_id);
1669 capacity.saturating_sub(used)
1670 }
1671
1672 pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1674 let mut inner = self.inner.borrow_mut();
1675 inner
1676 .wakers
1677 .send_buffer_wakers
1678 .entry(connection_id)
1679 .or_default()
1680 .push(waker);
1681 }
1682
1683 #[allow(dead_code)] fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1686 let mut inner = self.inner.borrow_mut();
1687 Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1688 }
1689
1690 pub fn get_pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1695 let inner = self.inner.borrow();
1696 inner.network.pair_latencies.get(&(src, dst)).copied()
1697 }
1698
1699 pub fn set_pair_latency_if_not_set(
1702 &self,
1703 src: IpAddr,
1704 dst: IpAddr,
1705 latency: Duration,
1706 ) -> Duration {
1707 let mut inner = self.inner.borrow_mut();
1708 *inner
1709 .network
1710 .pair_latencies
1711 .entry((src, dst))
1712 .or_insert_with(|| {
1713 tracing::debug!(
1714 "Setting base latency for IP pair {} -> {} to {:?}",
1715 src,
1716 dst,
1717 latency
1718 );
1719 latency
1720 })
1721 }
1722
1723 pub fn get_connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1726 let inner = self.inner.borrow();
1727 let (local_ip, remote_ip) = inner
1728 .network
1729 .connections
1730 .get(&connection_id)
1731 .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1732 .unwrap_or({
1733 (
1734 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1735 IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1736 )
1737 });
1738 drop(inner);
1739
1740 if let Some(latency) = self.get_pair_latency(local_ip, remote_ip) {
1742 return latency;
1743 }
1744
1745 let latency = self
1747 .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1748 self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1749 }
1750
1751 pub fn get_send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1756 let inner = self.inner.borrow();
1757 inner
1758 .network
1759 .connections
1760 .get(&connection_id)
1761 .and_then(|conn| conn.send_delay)
1762 }
1763
1764 pub fn get_recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1767 let inner = self.inner.borrow();
1768 inner
1769 .network
1770 .connections
1771 .get(&connection_id)
1772 .and_then(|conn| conn.recv_delay)
1773 }
1774
1775 pub fn set_asymmetric_delays(
1778 &self,
1779 connection_id: ConnectionId,
1780 send_delay: Option<Duration>,
1781 recv_delay: Option<Duration>,
1782 ) {
1783 let mut inner = self.inner.borrow_mut();
1784 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1785 conn.send_delay = send_delay;
1786 conn.recv_delay = recv_delay;
1787 tracing::debug!(
1788 "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1789 connection_id.0,
1790 send_delay,
1791 recv_delay
1792 );
1793 }
1794 }
1795
1796 pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1798 let inner = self.inner.borrow();
1799 inner
1800 .network
1801 .connections
1802 .get(&connection_id)
1803 .is_some_and(|conn| conn.is_closed)
1804 }
1805
1806 pub fn close_connection(&self, connection_id: ConnectionId) {
1810 self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1811 }
1812
1813 pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1817 self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1818 }
1819
1820 pub fn get_close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1822 let inner = self.inner.borrow();
1823 inner
1824 .network
1825 .connections
1826 .get(&connection_id)
1827 .map(|conn| conn.close_reason)
1828 .unwrap_or(CloseReason::None)
1829 }
1830
1831 fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1833 match reason {
1834 CloseReason::Graceful => self.close_connection_graceful(connection_id),
1835 CloseReason::Aborted => self.close_connection_aborted(connection_id),
1836 CloseReason::None => {}
1837 }
1838 }
1839
1840 fn close_connection_graceful(&self, connection_id: ConnectionId) {
1846 let mut inner = self.inner.borrow_mut();
1847
1848 let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
1850 (
1851 conn.paired_connection,
1852 conn.send_closed,
1853 conn.is_closed,
1854 conn.send_in_progress,
1855 conn.send_buffer.is_empty(),
1856 conn.last_data_delivery_scheduled_at,
1857 )
1858 });
1859
1860 let Some((
1861 paired_id,
1862 was_send_closed,
1863 was_closed,
1864 send_in_progress,
1865 send_buffer_empty,
1866 last_delivery_time,
1867 )) = conn_info
1868 else {
1869 return;
1870 };
1871
1872 if was_closed || was_send_closed {
1874 tracing::debug!(
1875 "Connection {} already closed/send_closed, skipping graceful close",
1876 connection_id.0
1877 );
1878 return;
1879 }
1880
1881 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1883 conn.is_closed = true;
1884 conn.close_reason = CloseReason::Graceful;
1885 conn.send_closed = true;
1886 tracing::debug!(
1887 "Connection {} graceful close (FIN) - local write shut down",
1888 connection_id.0
1889 );
1890 }
1891
1892 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1894 waker.wake();
1895 }
1896
1897 if send_in_progress || !send_buffer_empty {
1900 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1902 conn.graceful_close_pending = true;
1903 tracing::debug!(
1904 "Connection {} graceful close deferred (send pipeline active)",
1905 connection_id.0
1906 );
1907 }
1908 } else {
1909 Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
1911 }
1912 }
1913
1914 fn close_connection_aborted(&self, connection_id: ConnectionId) {
1919 let mut inner = self.inner.borrow_mut();
1920
1921 let paired_connection_id = inner
1922 .network
1923 .connections
1924 .get(&connection_id)
1925 .and_then(|conn| conn.paired_connection);
1926
1927 if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1928 && !conn.is_closed
1929 {
1930 conn.is_closed = true;
1931 conn.close_reason = CloseReason::Aborted;
1932 conn.graceful_close_pending = false;
1934 tracing::debug!(
1935 "Connection {} closed permanently (reason: Aborted)",
1936 connection_id.0
1937 );
1938 }
1939
1940 if let Some(paired_id) = paired_connection_id
1941 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1942 && !paired_conn.is_closed
1943 {
1944 paired_conn.is_closed = true;
1945 paired_conn.close_reason = CloseReason::Aborted;
1946 tracing::debug!(
1947 "Paired connection {} also closed (reason: Aborted)",
1948 paired_id.0
1949 );
1950 }
1951
1952 if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1953 tracing::debug!(
1954 "Waking read waker for aborted connection {}",
1955 connection_id.0
1956 );
1957 waker.wake();
1958 }
1959
1960 if let Some(paired_id) = paired_connection_id
1961 && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1962 {
1963 tracing::debug!(
1964 "Waking read waker for paired aborted connection {}",
1965 paired_id.0
1966 );
1967 paired_waker.wake();
1968 }
1969 }
1970
1971 pub fn close_connection_asymmetric(
1973 &self,
1974 connection_id: ConnectionId,
1975 close_send: bool,
1976 close_recv: bool,
1977 ) {
1978 let mut inner = self.inner.borrow_mut();
1979
1980 let paired_id = inner
1981 .network
1982 .connections
1983 .get(&connection_id)
1984 .and_then(|conn| conn.paired_connection);
1985
1986 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1987 conn.send_closed = true;
1988 conn.send_buffer.clear();
1989 tracing::debug!(
1990 "Connection {} send side closed (asymmetric)",
1991 connection_id.0
1992 );
1993 }
1994
1995 if close_recv
1996 && let Some(paired) = paired_id
1997 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1998 {
1999 paired_conn.recv_closed = true;
2000 tracing::debug!(
2001 "Connection {} recv side closed (asymmetric via peer)",
2002 paired.0
2003 );
2004 }
2005
2006 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2007 waker.wake();
2008 }
2009 if close_recv
2010 && let Some(paired) = paired_id
2011 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2012 {
2013 waker.wake();
2014 }
2015 }
2016
2017 pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
2019 let mut inner = self.inner.borrow_mut();
2020 let config = &inner.network.config;
2021
2022 if inner
2024 .network
2025 .connections
2026 .get(&connection_id)
2027 .is_some_and(|conn| conn.is_stable)
2028 {
2029 return None;
2030 }
2031
2032 if config.chaos.random_close_probability <= 0.0 {
2033 return None;
2034 }
2035
2036 let current_time = inner.current_time;
2037 let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
2038 if time_since_last < config.chaos.random_close_cooldown {
2039 return None;
2040 }
2041
2042 if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
2043 return None;
2044 }
2045
2046 inner.network.last_random_close_time = current_time;
2047
2048 let paired_id = inner
2049 .network
2050 .connections
2051 .get(&connection_id)
2052 .and_then(|conn| conn.paired_connection);
2053
2054 let a = super::rng::sim_random_f64();
2055 let close_recv = a < 0.66;
2056 let close_send = a > 0.33;
2057
2058 tracing::info!(
2059 "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
2060 connection_id.0,
2061 close_send,
2062 close_recv,
2063 a
2064 );
2065
2066 if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2067 conn.send_closed = true;
2068 conn.send_buffer.clear();
2069 }
2070
2071 if close_recv
2072 && let Some(paired) = paired_id
2073 && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2074 {
2075 paired_conn.recv_closed = true;
2076 }
2077
2078 if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2079 waker.wake();
2080 }
2081 if close_recv
2082 && let Some(paired) = paired_id
2083 && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2084 {
2085 waker.wake();
2086 }
2087
2088 let b = super::rng::sim_random_f64();
2089 let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
2090
2091 tracing::debug!(
2092 "Random close explicit={} (b={:.3}, ratio={:.2})",
2093 explicit,
2094 b,
2095 inner.network.config.chaos.random_close_explicit_ratio
2096 );
2097
2098 Some(explicit)
2099 }
2100
2101 pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
2103 let inner = self.inner.borrow();
2104 inner
2105 .network
2106 .connections
2107 .get(&connection_id)
2108 .is_some_and(|conn| conn.send_closed || conn.is_closed)
2109 }
2110
2111 pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
2113 let inner = self.inner.borrow();
2114 inner
2115 .network
2116 .connections
2117 .get(&connection_id)
2118 .is_some_and(|conn| conn.recv_closed || conn.is_closed)
2119 }
2120
2121 pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
2126 let inner = self.inner.borrow();
2127 inner
2128 .network
2129 .connections
2130 .get(&connection_id)
2131 .is_some_and(|conn| conn.remote_fin_received)
2132 }
2133
2134 pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
2147 let mut inner = self.inner.borrow_mut();
2148 let current_time = inner.current_time;
2149 let error_at = current_time + error_delay;
2150
2151 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2152 conn.is_half_open = true;
2153 conn.half_open_error_at = Some(error_at);
2154
2155 conn.paired_connection = None;
2158
2159 tracing::info!(
2160 "Connection {} now half-open, errors manifest at {:?}",
2161 connection_id.0,
2162 error_at
2163 );
2164 }
2165
2166 let wake_event = Event::Connection {
2168 id: connection_id.0,
2169 state: ConnectionStateChange::HalfOpenError,
2170 };
2171 let sequence = inner.next_sequence;
2172 inner.next_sequence += 1;
2173 let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
2174 inner.event_queue.schedule(scheduled_event);
2175 }
2176
2177 pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
2179 let inner = self.inner.borrow();
2180 inner
2181 .network
2182 .connections
2183 .get(&connection_id)
2184 .is_some_and(|conn| conn.is_half_open)
2185 }
2186
2187 pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
2189 let inner = self.inner.borrow();
2190 let current_time = inner.current_time;
2191 inner
2192 .network
2193 .connections
2194 .get(&connection_id)
2195 .is_some_and(|conn| {
2196 conn.is_half_open
2197 && conn
2198 .half_open_error_at
2199 .is_some_and(|error_at| current_time >= error_at)
2200 })
2201 }
2202
2203 pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
2220 let mut inner = self.inner.borrow_mut();
2221 if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2222 conn.is_stable = true;
2223 tracing::debug!("Connection {} marked as stable", connection_id.0);
2224
2225 if let Some(paired_id) = conn.paired_connection
2227 && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
2228 {
2229 paired_conn.is_stable = true;
2230 tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
2231 }
2232 }
2233 }
2234
2235 pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
2237 let inner = self.inner.borrow();
2238 inner
2239 .network
2240 .connections
2241 .get(&connection_id)
2242 .is_some_and(|conn| conn.is_stable)
2243 }
2244
2245 pub fn partition_pair(
2249 &self,
2250 from_ip: std::net::IpAddr,
2251 to_ip: std::net::IpAddr,
2252 duration: Duration,
2253 ) -> SimulationResult<()> {
2254 let mut inner = self.inner.borrow_mut();
2255 let expires_at = inner.current_time + duration;
2256
2257 inner
2258 .network
2259 .ip_partitions
2260 .insert((from_ip, to_ip), PartitionState { expires_at });
2261
2262 let restore_event = Event::Connection {
2263 id: 0,
2264 state: ConnectionStateChange::PartitionRestore,
2265 };
2266 let sequence = inner.next_sequence;
2267 inner.next_sequence += 1;
2268 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2269 inner.event_queue.schedule(scheduled_event);
2270
2271 tracing::debug!(
2272 "Partitioned {} -> {} until {:?}",
2273 from_ip,
2274 to_ip,
2275 expires_at
2276 );
2277 Ok(())
2278 }
2279
2280 pub fn partition_send_from(
2282 &self,
2283 ip: std::net::IpAddr,
2284 duration: Duration,
2285 ) -> SimulationResult<()> {
2286 let mut inner = self.inner.borrow_mut();
2287 let expires_at = inner.current_time + duration;
2288
2289 inner.network.send_partitions.insert(ip, expires_at);
2290
2291 let clear_event = Event::Connection {
2292 id: 0,
2293 state: ConnectionStateChange::SendPartitionClear,
2294 };
2295 let sequence = inner.next_sequence;
2296 inner.next_sequence += 1;
2297 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2298 inner.event_queue.schedule(scheduled_event);
2299
2300 tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
2301 Ok(())
2302 }
2303
2304 pub fn partition_recv_to(
2306 &self,
2307 ip: std::net::IpAddr,
2308 duration: Duration,
2309 ) -> SimulationResult<()> {
2310 let mut inner = self.inner.borrow_mut();
2311 let expires_at = inner.current_time + duration;
2312
2313 inner.network.recv_partitions.insert(ip, expires_at);
2314
2315 let clear_event = Event::Connection {
2316 id: 0,
2317 state: ConnectionStateChange::RecvPartitionClear,
2318 };
2319 let sequence = inner.next_sequence;
2320 inner.next_sequence += 1;
2321 let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2322 inner.event_queue.schedule(scheduled_event);
2323
2324 tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
2325 Ok(())
2326 }
2327
2328 pub fn restore_partition(
2330 &self,
2331 from_ip: std::net::IpAddr,
2332 to_ip: std::net::IpAddr,
2333 ) -> SimulationResult<()> {
2334 let mut inner = self.inner.borrow_mut();
2335 inner.network.ip_partitions.remove(&(from_ip, to_ip));
2336 tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
2337 Ok(())
2338 }
2339
2340 pub fn is_partitioned(
2342 &self,
2343 from_ip: std::net::IpAddr,
2344 to_ip: std::net::IpAddr,
2345 ) -> SimulationResult<bool> {
2346 let inner = self.inner.borrow();
2347 Ok(inner
2348 .network
2349 .is_partitioned(from_ip, to_ip, inner.current_time))
2350 }
2351
2352 fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
2359 let partition_config = &inner.network.config;
2360
2361 if partition_config.chaos.partition_probability == 0.0 {
2362 return;
2363 }
2364
2365 if sim_random::<f64>() >= partition_config.chaos.partition_probability {
2367 return;
2368 }
2369
2370 let unique_ips: HashSet<IpAddr> = inner
2372 .network
2373 .connections
2374 .values()
2375 .filter_map(|conn| conn.local_ip)
2376 .collect();
2377
2378 if unique_ips.len() < 2 {
2379 return; }
2381
2382 let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
2383 let partition_duration =
2384 crate::network::sample_duration(&partition_config.chaos.partition_duration);
2385 let expires_at = inner.current_time + partition_duration;
2386
2387 let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
2389 PartitionStrategy::Random => {
2390 ip_list
2392 .iter()
2393 .filter(|_| sim_random::<f64>() < 0.5)
2394 .copied()
2395 .collect()
2396 }
2397 PartitionStrategy::UniformSize => {
2398 let partition_size = sim_random_range(1..ip_list.len());
2400 let mut shuffled = ip_list.clone();
2402 for i in (1..shuffled.len()).rev() {
2404 let j = sim_random_range(0..i + 1);
2405 shuffled.swap(i, j);
2406 }
2407 shuffled.into_iter().take(partition_size).collect()
2408 }
2409 PartitionStrategy::IsolateSingle => {
2410 let idx = sim_random_range(0..ip_list.len());
2412 vec![ip_list[idx]]
2413 }
2414 };
2415
2416 if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
2418 return;
2419 }
2420
2421 let non_partitioned: Vec<IpAddr> = ip_list
2423 .iter()
2424 .filter(|ip| !partitioned_ips.contains(ip))
2425 .copied()
2426 .collect();
2427
2428 for &from_ip in &partitioned_ips {
2429 for &to_ip in &non_partitioned {
2430 if inner
2432 .network
2433 .is_partitioned(from_ip, to_ip, inner.current_time)
2434 {
2435 continue;
2436 }
2437
2438 inner
2440 .network
2441 .ip_partitions
2442 .insert((from_ip, to_ip), PartitionState { expires_at });
2443 inner
2444 .network
2445 .ip_partitions
2446 .insert((to_ip, from_ip), PartitionState { expires_at });
2447
2448 tracing::debug!(
2449 "Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
2450 from_ip,
2451 to_ip,
2452 expires_at,
2453 partition_config.chaos.partition_strategy
2454 );
2455 }
2456 }
2457
2458 let restore_event = Event::Connection {
2460 id: 0,
2461 state: ConnectionStateChange::PartitionRestore,
2462 };
2463 let sequence = inner.next_sequence;
2464 inner.next_sequence += 1;
2465 let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2466 inner.event_queue.schedule(scheduled_event);
2467 }
2468}
2469
2470impl Default for SimWorld {
2471 fn default() -> Self {
2472 Self::new()
2473 }
2474}
2475
2476#[derive(Debug)]
2481pub struct WeakSimWorld {
2482 pub(crate) inner: Weak<RefCell<SimInner>>,
2483}
2484
2485macro_rules! weak_forward {
2487 (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2489 $(#[$meta])*
2490 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2491 Ok(self.upgrade()?.$method($($arg),*))
2492 }
2493 };
2494 (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2496 $(#[$meta])*
2497 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2498 self.upgrade()?.$method($($arg),*)
2499 }
2500 };
2501 (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2503 $(#[$meta])*
2504 pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2505 self.upgrade()?.$method($($arg),*);
2506 Ok(())
2507 }
2508 };
2509}
2510
2511impl WeakSimWorld {
2512 pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2514 self.inner
2515 .upgrade()
2516 .map(|inner| SimWorld { inner })
2517 .ok_or(SimulationError::SimulationShutdown)
2518 }
2519
2520 weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2521 weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2522 weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2523 weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2524 weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2525 weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2526 weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2527 weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2528 weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2529 weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2530 weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2531
2532 pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2534 where
2535 F: FnOnce(&NetworkConfiguration) -> R,
2536 {
2537 Ok(self.upgrade()?.with_network_config(f))
2538 }
2539}
2540
2541impl Clone for WeakSimWorld {
2542 fn clone(&self) -> Self {
2543 Self {
2544 inner: self.inner.clone(),
2545 }
2546 }
2547}
2548
2549#[cfg(test)]
2550mod tests {
2551 use super::*;
2552
2553 #[test]
2554 fn sim_world_basic_lifecycle() {
2555 let mut sim = SimWorld::new();
2556
2557 assert_eq!(sim.current_time(), Duration::ZERO);
2559 assert!(!sim.has_pending_events());
2560 assert_eq!(sim.pending_event_count(), 0);
2561
2562 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2564
2565 assert!(sim.has_pending_events());
2566 assert_eq!(sim.pending_event_count(), 1);
2567 assert_eq!(sim.current_time(), Duration::ZERO);
2568
2569 let has_more = sim.step();
2571 assert!(!has_more);
2572 assert_eq!(sim.current_time(), Duration::from_millis(100));
2573 assert!(!sim.has_pending_events());
2574 assert_eq!(sim.pending_event_count(), 0);
2575 }
2576
2577 #[test]
2578 fn sim_world_multiple_events() {
2579 let mut sim = SimWorld::new();
2580
2581 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2583 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2584 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2585
2586 assert_eq!(sim.pending_event_count(), 3);
2587
2588 assert!(sim.step());
2590 assert_eq!(sim.current_time(), Duration::from_millis(100));
2591 assert_eq!(sim.pending_event_count(), 2);
2592
2593 assert!(sim.step());
2594 assert_eq!(sim.current_time(), Duration::from_millis(200));
2595 assert_eq!(sim.pending_event_count(), 1);
2596
2597 assert!(!sim.step());
2598 assert_eq!(sim.current_time(), Duration::from_millis(300));
2599 assert_eq!(sim.pending_event_count(), 0);
2600 }
2601
2602 #[test]
2603 fn sim_world_run_until_empty() {
2604 let mut sim = SimWorld::new();
2605
2606 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2608 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2609 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2610
2611 sim.run_until_empty();
2613
2614 assert_eq!(sim.current_time(), Duration::from_millis(300));
2615 assert!(!sim.has_pending_events());
2616 }
2617
2618 #[test]
2619 fn sim_world_schedule_at_specific_time() {
2620 let mut sim = SimWorld::new();
2621
2622 sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2624
2625 assert_eq!(sim.current_time(), Duration::ZERO);
2626
2627 sim.step();
2628
2629 assert_eq!(sim.current_time(), Duration::from_millis(500));
2630 }
2631
2632 #[test]
2633 fn weak_sim_world_lifecycle() {
2634 let sim = SimWorld::new();
2635 let weak = sim.downgrade();
2636
2637 assert_eq!(
2639 weak.current_time().expect("should get time"),
2640 Duration::ZERO
2641 );
2642
2643 weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2645 .expect("should schedule event");
2646
2647 assert!(sim.has_pending_events());
2649
2650 drop(sim);
2652
2653 assert_eq!(
2655 weak.current_time(),
2656 Err(SimulationError::SimulationShutdown)
2657 );
2658 assert_eq!(
2659 weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2660 Err(SimulationError::SimulationShutdown)
2661 );
2662 }
2663
2664 #[test]
2665 fn deterministic_event_ordering() {
2666 let mut sim = SimWorld::new();
2667
2668 sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2670 sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2671 sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2672
2673 assert!(sim.step());
2675 assert_eq!(sim.current_time(), Duration::from_millis(100));
2676 assert!(sim.step());
2677 assert_eq!(sim.current_time(), Duration::from_millis(100));
2678 assert!(!sim.step());
2679 assert_eq!(sim.current_time(), Duration::from_millis(100));
2680 }
2681}