Skip to main content

moonpool_sim/sim/
world.rs

1//! Core simulation world and coordination logic.
2//!
3//! This module provides the central SimWorld coordinator that manages time,
4//! event processing, and network simulation state.
5
6use std::{
7    cell::RefCell,
8    collections::{BTreeMap, HashSet, VecDeque},
9    net::IpAddr,
10    rc::{Rc, Weak},
11    task::Waker,
12    time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17    SimulationError, SimulationResult,
18    chaos::fault_events::{SIM_FAULT_TIMELINE, SimFaultEvent},
19    chaos::state_handle::StateHandle,
20    network::{
21        NetworkConfiguration, PartitionStrategy,
22        sim::{ConnectionId, ListenerId, SimNetworkProvider},
23    },
24};
25
26use super::{
27    events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
28    rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
29    sleep::SleepFuture,
30    state::{
31        ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
32        StorageState,
33    },
34    wakers::WakerRegistry,
35};
36
37/// Internal simulation state holder
38#[derive(Debug)]
39pub(crate) struct SimInner {
40    pub(crate) current_time: Duration,
41    /// Drifted timer time (can be ahead of current_time)
42    /// FDB ref: sim2.actor.cpp:1058-1064 - timer() drifts 0-0.1s ahead of now()
43    pub(crate) timer_time: Duration,
44    pub(crate) event_queue: EventQueue,
45    pub(crate) next_sequence: u64,
46
47    // Network management
48    pub(crate) network: NetworkState,
49
50    // Storage management
51    pub(crate) storage: StorageState,
52
53    // Async coordination
54    pub(crate) wakers: WakerRegistry,
55
56    // Task management for sleep functionality
57    pub(crate) next_task_id: u64,
58    pub(crate) awakened_tasks: HashSet<u64>,
59
60    // Event processing metrics
61    pub(crate) events_processed: u64,
62
63    // Chaos tracking
64    pub(crate) last_bit_flip_time: Duration,
65
66    // Last event processed by step() — used by orchestrator to detect ProcessRestart
67    pub(crate) last_processed_event: Option<Event>,
68
69    // Optional state handle for emitting fault events to the timeline
70    pub(crate) state: Option<StateHandle>,
71}
72
73impl SimInner {
74    pub(crate) fn new() -> Self {
75        Self {
76            current_time: Duration::ZERO,
77            timer_time: Duration::ZERO,
78            event_queue: EventQueue::new(),
79            next_sequence: 0,
80            network: NetworkState::new(NetworkConfiguration::default()),
81            storage: StorageState::default(),
82            wakers: WakerRegistry::default(),
83            next_task_id: 0,
84            awakened_tasks: HashSet::new(),
85            events_processed: 0,
86            last_bit_flip_time: Duration::ZERO,
87            last_processed_event: None,
88            state: None,
89        }
90    }
91
92    pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
93        Self {
94            current_time: Duration::ZERO,
95            timer_time: Duration::ZERO,
96            event_queue: EventQueue::new(),
97            next_sequence: 0,
98            network: NetworkState::new(network_config),
99            storage: StorageState::default(),
100            wakers: WakerRegistry::default(),
101            next_task_id: 0,
102            awakened_tasks: HashSet::new(),
103            events_processed: 0,
104            last_bit_flip_time: Duration::ZERO,
105            last_processed_event: None,
106            state: None,
107        }
108    }
109
110    /// Emit a fault event to the timeline, if state is attached.
111    pub(crate) fn emit_fault(&self, event: SimFaultEvent) {
112        if let Some(ref state) = self.state {
113            let time_ms = self.current_time.as_millis() as u64;
114            state.emit_raw(SIM_FAULT_TIMELINE, event, time_ms, "sim");
115        }
116    }
117
118    /// Calculate the number of bits to flip using a power-law distribution.
119    ///
120    /// Uses the formula: 32 - floor(log2(random_value))
121    /// This creates a power-law distribution biased toward fewer bits:
122    /// - 1-2 bits: very common
123    /// - 16 bits: uncommon
124    /// - 32 bits: very rare
125    ///
126    /// Matches FDB's approach in FlowTransport.actor.cpp:1297
127    pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
128        if random_value == 0 {
129            // Handle edge case: treat 0 as if it were 1
130            return max_bits.min(32);
131        }
132
133        // Formula: 32 - floor(log2(x)) = 1 + leading_zeros(x)
134        let bit_count = 1 + random_value.leading_zeros();
135
136        // Clamp to configured range
137        bit_count.clamp(min_bits, max_bits)
138    }
139}
140
141/// The central simulation coordinator that manages time and event processing.
142///
143/// `SimWorld` owns all mutable simulation state and provides the main interface
144/// for scheduling events and advancing simulation time. It uses a centralized
145/// ownership model with handle-based access to avoid borrow checker conflicts.
146#[derive(Debug)]
147pub struct SimWorld {
148    pub(crate) inner: Rc<RefCell<SimInner>>,
149}
150
151impl SimWorld {
152    /// Internal constructor that handles all initialization logic.
153    fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
154        reset_sim_rng();
155        set_sim_seed(seed);
156        crate::chaos::assertions::reset_assertion_results();
157
158        let inner = match network_config {
159            Some(config) => SimInner::new_with_config(config),
160            None => SimInner::new(),
161        };
162
163        Self {
164            inner: Rc::new(RefCell::new(inner)),
165        }
166    }
167
168    /// Creates a new simulation world with default network configuration.
169    ///
170    /// Uses default seed (0) for reproducible testing. For custom seeds,
171    /// use [`SimWorld::new_with_seed`].
172    pub fn new() -> Self {
173        Self::create(None, 0)
174    }
175
176    /// Creates a new simulation world with a specific seed for deterministic randomness.
177    ///
178    /// This method ensures clean thread-local RNG state by resetting before
179    /// setting the seed, making it safe for consecutive simulations on the same thread.
180    ///
181    /// # Parameters
182    ///
183    /// * `seed` - The seed value for deterministic randomness
184    pub fn new_with_seed(seed: u64) -> Self {
185        Self::create(None, seed)
186    }
187
188    /// Creates a new simulation world with custom network configuration.
189    pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
190        Self::create(Some(network_config), 0)
191    }
192
193    /// Creates a new simulation world with both custom network configuration and seed.
194    ///
195    /// # Parameters
196    ///
197    /// * `network_config` - Network configuration for latency and fault simulation
198    /// * `seed` - The seed value for deterministic randomness
199    pub fn new_with_network_config_and_seed(
200        network_config: NetworkConfiguration,
201        seed: u64,
202    ) -> Self {
203        Self::create(Some(network_config), seed)
204    }
205
206    /// Attach a state handle for fault event emission.
207    ///
208    /// Once set, the simulator emits [`SimFaultEvent`]s to the `"sim:faults"` timeline
209    /// whenever faults are injected.
210    pub fn set_state(&self, state: StateHandle) {
211        self.inner.borrow_mut().state = Some(state);
212    }
213
214    /// Processes the next scheduled event and advances time.
215    ///
216    /// Returns `true` if more events are available for processing,
217    /// `false` if this was the last event or if no events are available.
218    #[instrument(skip(self))]
219    pub fn step(&mut self) -> bool {
220        let mut inner = self.inner.borrow_mut();
221
222        if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
223            // Advance logical time to event timestamp
224            inner.current_time = scheduled_event.time();
225
226            // Phase 7: Clear expired clogs after time advancement
227            Self::clear_expired_clogs_with_inner(&mut inner);
228
229            // Trigger random partitions based on configuration
230            Self::randomly_trigger_partitions_with_inner(&mut inner);
231
232            // Store the event for orchestrator inspection (e.g., ProcessRestart)
233            let event = scheduled_event.into_event();
234            inner.last_processed_event = Some(event.clone());
235
236            // Process the event with the mutable reference
237            Self::process_event_with_inner(&mut inner, event);
238
239            // Return true if more events are available
240            !inner.event_queue.is_empty()
241        } else {
242            inner.last_processed_event = None;
243            // No more events to process
244            false
245        }
246    }
247
248    /// Processes all scheduled events until the queue is empty or only infrastructure events remain.
249    ///
250    /// This method processes all workload-related events but stops early if only infrastructure
251    /// events (like connection restoration) remain. This prevents infinite loops where
252    /// infrastructure events keep the simulation running indefinitely after workloads complete.
253    #[instrument(skip(self))]
254    pub fn run_until_empty(&mut self) {
255        while self.step() {
256            // Periodically check if we should stop early (every 50 events for performance)
257            if self.inner.borrow().events_processed.is_multiple_of(50) {
258                let has_workload_events = !self
259                    .inner
260                    .borrow()
261                    .event_queue
262                    .has_only_infrastructure_events();
263                if !has_workload_events {
264                    tracing::debug!(
265                        "Early termination: only infrastructure events remain in queue"
266                    );
267                    break;
268                }
269            }
270        }
271    }
272
273    /// Returns the current simulation time.
274    pub fn current_time(&self) -> Duration {
275        self.inner.borrow().current_time
276    }
277
278    /// Returns the exact simulation time (equivalent to FDB's now()).
279    ///
280    /// This is the canonical simulation time used for scheduling events.
281    /// Use this for precise time comparisons and scheduling.
282    pub fn now(&self) -> Duration {
283        self.inner.borrow().current_time
284    }
285
286    /// Returns the drifted timer time (equivalent to FDB's timer()).
287    ///
288    /// The timer can be up to `clock_drift_max` (default 100ms) ahead of `now()`.
289    /// This simulates real-world clock drift between processes, which is important
290    /// for testing time-sensitive code like:
291    /// - Timeout handling
292    /// - Lease expiration
293    /// - Distributed consensus (leader election)
294    /// - Cache invalidation
295    /// - Heartbeat detection
296    ///
297    /// FDB formula: `timerTime += random01() * (time + 0.1 - timerTime) / 2.0`
298    ///
299    /// FDB ref: sim2.actor.cpp:1058-1064
300    pub fn timer(&self) -> Duration {
301        let mut inner = self.inner.borrow_mut();
302        let chaos = &inner.network.config.chaos;
303
304        // If clock drift is disabled, return exact simulation time
305        if !chaos.clock_drift_enabled {
306            return inner.current_time;
307        }
308
309        // FDB formula: timerTime += random01() * (time + 0.1 - timerTime) / 2.0
310        // This smoothly interpolates timerTime toward (time + drift_max)
311        // The /2.0 creates a damped approach (never overshoots)
312        let max_timer = inner.current_time + chaos.clock_drift_max;
313
314        // Only advance if timer is behind max
315        if inner.timer_time < max_timer {
316            let random_factor = sim_random::<f64>(); // 0.0 to 1.0
317            let gap = (max_timer - inner.timer_time).as_secs_f64();
318            let delta = random_factor * gap / 2.0;
319            inner.timer_time += Duration::from_secs_f64(delta);
320        }
321
322        // Ensure timer never goes backwards relative to simulation time
323        inner.timer_time = inner.timer_time.max(inner.current_time);
324
325        inner.timer_time
326    }
327
328    /// Schedules an event to execute after the specified delay from the current time.
329    #[instrument(skip(self))]
330    pub fn schedule_event(&self, event: Event, delay: Duration) {
331        let mut inner = self.inner.borrow_mut();
332        let scheduled_time = inner.current_time + delay;
333        let sequence = inner.next_sequence;
334        inner.next_sequence += 1;
335
336        let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
337        inner.event_queue.schedule(scheduled_event);
338    }
339
340    /// Schedules an event to execute at the specified absolute time.
341    pub fn schedule_event_at(&self, event: Event, time: Duration) {
342        let mut inner = self.inner.borrow_mut();
343        let sequence = inner.next_sequence;
344        inner.next_sequence += 1;
345
346        let scheduled_event = ScheduledEvent::new(time, event, sequence);
347        inner.event_queue.schedule(scheduled_event);
348    }
349
350    /// Creates a weak reference to this simulation world.
351    ///
352    /// Weak references can be used to access the simulation without preventing
353    /// it from being dropped, enabling handle-based access patterns.
354    pub fn downgrade(&self) -> WeakSimWorld {
355        WeakSimWorld {
356            inner: Rc::downgrade(&self.inner),
357        }
358    }
359
360    /// Returns `true` if there are events waiting to be processed.
361    pub fn has_pending_events(&self) -> bool {
362        !self.inner.borrow().event_queue.is_empty()
363    }
364
365    /// Returns the number of events waiting to be processed.
366    pub fn pending_event_count(&self) -> usize {
367        self.inner.borrow().event_queue.len()
368    }
369
370    /// Create a network provider for this simulation
371    pub fn network_provider(&self) -> SimNetworkProvider {
372        SimNetworkProvider::new(self.downgrade())
373    }
374
375    /// Create a time provider for this simulation
376    pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
377        crate::providers::SimTimeProvider::new(self.downgrade())
378    }
379
380    /// Create a task provider for this simulation
381    pub fn task_provider(&self) -> crate::TokioTaskProvider {
382        crate::TokioTaskProvider
383    }
384
385    /// Create a storage provider for this simulation scoped to a process IP.
386    pub fn storage_provider(&self, ip: std::net::IpAddr) -> crate::storage::SimStorageProvider {
387        crate::storage::SimStorageProvider::new(self.downgrade(), ip)
388    }
389
390    /// Set the default storage configuration for this simulation.
391    ///
392    /// Used as fallback when no per-process config is set for a given IP.
393    pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
394        self.inner.borrow_mut().storage.config = config;
395    }
396
397    /// Access network configuration for latency calculations using thread-local RNG.
398    ///
399    /// This method provides access to the network configuration for calculating
400    /// latencies and other network parameters. Random values should be generated
401    /// using the thread-local RNG functions like `sim_random()`.
402    ///
403    /// Access the network configuration for this simulation.
404    pub fn with_network_config<F, R>(&self, f: F) -> R
405    where
406        F: FnOnce(&NetworkConfiguration) -> R,
407    {
408        let inner = self.inner.borrow();
409        f(&inner.network.config)
410    }
411
412    /// Create a listener in the simulation (used by SimNetworkProvider)
413    pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
414        let mut inner = self.inner.borrow_mut();
415        let listener_id = ListenerId(inner.network.next_listener_id);
416        inner.network.next_listener_id += 1;
417
418        inner.network.listeners.insert(
419            listener_id,
420            ListenerState {
421                id: listener_id,
422                addr,
423                pending_connections: VecDeque::new(),
424            },
425        );
426
427        Ok(listener_id)
428    }
429
430    /// Read data from connection's receive buffer (used by SimTcpStream)
431    pub(crate) fn read_from_connection(
432        &self,
433        connection_id: ConnectionId,
434        buf: &mut [u8],
435    ) -> SimulationResult<usize> {
436        let mut inner = self.inner.borrow_mut();
437
438        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
439            let mut bytes_read = 0;
440            while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
441                if let Some(byte) = connection.receive_buffer.pop_front() {
442                    buf[bytes_read] = byte;
443                    bytes_read += 1;
444                }
445            }
446            Ok(bytes_read)
447        } else {
448            Err(SimulationError::InvalidState(
449                "connection not found".to_string(),
450            ))
451        }
452    }
453
454    /// Write data to connection's receive buffer (used by SimTcpStream write operations)
455    pub(crate) fn write_to_connection(
456        &self,
457        connection_id: ConnectionId,
458        data: &[u8],
459    ) -> SimulationResult<()> {
460        let mut inner = self.inner.borrow_mut();
461
462        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
463            for &byte in data {
464                connection.receive_buffer.push_back(byte);
465            }
466            Ok(())
467        } else {
468            Err(SimulationError::InvalidState(
469                "connection not found".to_string(),
470            ))
471        }
472    }
473
474    /// Buffer data for ordered sending on a TCP connection.
475    ///
476    /// This method implements the core TCP ordering guarantee by ensuring that all
477    /// write operations on a single connection are processed in FIFO order.
478    pub(crate) fn buffer_send(
479        &self,
480        connection_id: ConnectionId,
481        data: Vec<u8>,
482    ) -> SimulationResult<()> {
483        tracing::debug!(
484            "buffer_send called for connection_id={} with {} bytes",
485            connection_id.0,
486            data.len()
487        );
488        let mut inner = self.inner.borrow_mut();
489
490        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
491            // Always add data to send buffer for TCP ordering
492            conn.send_buffer.push_back(data);
493            tracing::debug!(
494                "buffer_send: added data to send_buffer, new length: {}",
495                conn.send_buffer.len()
496            );
497
498            // If sender is not already active, start processing the buffer
499            if !conn.send_in_progress {
500                tracing::debug!(
501                    "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
502                );
503                conn.send_in_progress = true;
504
505                // Schedule immediate processing of the buffer
506                let scheduled_time = inner.current_time + std::time::Duration::ZERO;
507                let sequence = inner.next_sequence;
508                inner.next_sequence += 1;
509                let scheduled_event = ScheduledEvent::new(
510                    scheduled_time,
511                    Event::Network {
512                        connection_id: connection_id.0,
513                        operation: NetworkOperation::ProcessSendBuffer,
514                    },
515                    sequence,
516                );
517                inner.event_queue.schedule(scheduled_event);
518                tracing::debug!(
519                    "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
520                    sequence
521                );
522            } else {
523                tracing::debug!(
524                    "buffer_send: sender already in progress, not scheduling new event"
525                );
526            }
527
528            Ok(())
529        } else {
530            tracing::debug!(
531                "buffer_send: connection_id={} not found in connections table",
532                connection_id.0
533            );
534            Err(SimulationError::InvalidState(
535                "connection not found".to_string(),
536            ))
537        }
538    }
539
540    /// Create a bidirectional TCP connection pair for client-server communication.
541    ///
542    /// FDB Pattern (sim2.actor.cpp:1149-1175):
543    /// - Client connection stores server's real address as peer_address
544    /// - Server connection stores synthesized ephemeral address (random IP + port 40000-60000)
545    ///
546    /// This simulates real TCP behavior where servers see client ephemeral ports.
547    pub(crate) fn create_connection_pair(
548        &self,
549        client_addr: String,
550        server_addr: String,
551    ) -> SimulationResult<(ConnectionId, ConnectionId)> {
552        let mut inner = self.inner.borrow_mut();
553
554        let client_id = ConnectionId(inner.network.next_connection_id);
555        inner.network.next_connection_id += 1;
556
557        let server_id = ConnectionId(inner.network.next_connection_id);
558        inner.network.next_connection_id += 1;
559
560        // Capture current time to avoid borrow conflicts
561        let current_time = inner.current_time;
562
563        // Parse IP addresses for partition tracking
564        let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
565        let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
566
567        // FDB Pattern: Synthesize ephemeral address for server-side connection
568        // sim2.actor.cpp:1149-1175: randomInt(0,256) for IP offset, randomInt(40000,60000) for port
569        // Use thread-local sim_random_range for deterministic randomness
570        let ephemeral_peer_addr = match client_ip {
571            Some(std::net::IpAddr::V4(ipv4)) => {
572                let octets = ipv4.octets();
573                let ip_offset = sim_random_range(0u32..256) as u8;
574                let new_last_octet = octets[3].wrapping_add(ip_offset);
575                let ephemeral_ip =
576                    std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
577                let ephemeral_port = sim_random_range(40000u16..60000);
578                format!("{}:{}", ephemeral_ip, ephemeral_port)
579            }
580            Some(std::net::IpAddr::V6(ipv6)) => {
581                // For IPv6, just modify the last segment
582                let segments = ipv6.segments();
583                let mut new_segments = segments;
584                let ip_offset = sim_random_range(0u16..256);
585                new_segments[7] = new_segments[7].wrapping_add(ip_offset);
586                let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
587                let ephemeral_port = sim_random_range(40000u16..60000);
588                format!("[{}]:{}", ephemeral_ip, ephemeral_port)
589            }
590            None => {
591                // Fallback: use client address with random port
592                let ephemeral_port = sim_random_range(40000u16..60000);
593                format!("unknown:{}", ephemeral_port)
594            }
595        };
596
597        // Calculate send buffer capacity based on BDP (Bandwidth-Delay Product)
598        // Using a default of 64KB which is typical for TCP socket buffers
599        // In real simulations this could be: max_latency_ms * bandwidth_bytes_per_ms
600        const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; // 64KB
601
602        // Create paired connections
603        // Client stores server's real address as peer_address
604        inner.network.connections.insert(
605            client_id,
606            ConnectionState {
607                id: client_id,
608                addr: client_addr,
609                local_ip: client_ip,
610                remote_ip: server_ip,
611                peer_address: server_addr.clone(),
612                receive_buffer: VecDeque::new(),
613                paired_connection: Some(server_id),
614                send_buffer: VecDeque::new(),
615                send_in_progress: false,
616                next_send_time: current_time,
617                is_closed: false,
618                send_closed: false,
619                recv_closed: false,
620                is_cut: false,
621                cut_expiry: None,
622                close_reason: CloseReason::None,
623                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
624                send_delay: None,
625                recv_delay: None,
626                is_half_open: false,
627                half_open_error_at: None,
628                is_stable: false,
629                graceful_close_pending: false,
630                last_data_delivery_scheduled_at: None,
631                remote_fin_received: false,
632            },
633        );
634
635        // Server stores synthesized ephemeral address as peer_address
636        inner.network.connections.insert(
637            server_id,
638            ConnectionState {
639                id: server_id,
640                addr: server_addr,
641                local_ip: server_ip,
642                remote_ip: client_ip,
643                peer_address: ephemeral_peer_addr,
644                receive_buffer: VecDeque::new(),
645                paired_connection: Some(client_id),
646                send_buffer: VecDeque::new(),
647                send_in_progress: false,
648                next_send_time: current_time,
649                is_closed: false,
650                send_closed: false,
651                recv_closed: false,
652                is_cut: false,
653                cut_expiry: None,
654                close_reason: CloseReason::None,
655                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
656                send_delay: None,
657                recv_delay: None,
658                is_half_open: false,
659                half_open_error_at: None,
660                is_stable: false,
661                graceful_close_pending: false,
662                last_data_delivery_scheduled_at: None,
663                remote_fin_received: false,
664            },
665        );
666
667        Ok((client_id, server_id))
668    }
669
670    /// Register a waker for read operations
671    pub(crate) fn register_read_waker(
672        &self,
673        connection_id: ConnectionId,
674        waker: Waker,
675    ) -> SimulationResult<()> {
676        let mut inner = self.inner.borrow_mut();
677        let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
678        inner.wakers.read_wakers.insert(connection_id, waker);
679        tracing::debug!(
680            "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
681            connection_id.0,
682            is_replacement,
683            inner.wakers.read_wakers.len()
684        );
685        Ok(())
686    }
687
688    /// Register a waker for accept operations
689    pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
690        let mut inner = self.inner.borrow_mut();
691        // For simplicity, we'll use addr hash as listener ID for waker storage
692        use std::collections::hash_map::DefaultHasher;
693        use std::hash::{Hash, Hasher};
694        let mut hasher = DefaultHasher::new();
695        addr.hash(&mut hasher);
696        let listener_key = ListenerId(hasher.finish());
697
698        inner.wakers.listener_wakers.insert(listener_key, waker);
699        Ok(())
700    }
701
702    /// Store a pending connection for later accept() call
703    pub(crate) fn store_pending_connection(
704        &self,
705        addr: &str,
706        connection_id: ConnectionId,
707    ) -> SimulationResult<()> {
708        let mut inner = self.inner.borrow_mut();
709        inner
710            .network
711            .pending_connections
712            .insert(addr.to_string(), connection_id);
713
714        // Wake any accept() calls waiting for this connection
715        use std::collections::hash_map::DefaultHasher;
716        use std::hash::{Hash, Hasher};
717        let mut hasher = DefaultHasher::new();
718        addr.hash(&mut hasher);
719        let listener_key = ListenerId(hasher.finish());
720
721        if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
722            waker.wake();
723        }
724
725        Ok(())
726    }
727
728    /// Get a pending connection for accept() call
729    pub(crate) fn pending_connection(&self, addr: &str) -> SimulationResult<Option<ConnectionId>> {
730        let mut inner = self.inner.borrow_mut();
731        Ok(inner.network.pending_connections.remove(addr))
732    }
733
734    /// Get the peer address for a connection.
735    ///
736    /// FDB Pattern (sim2.actor.cpp):
737    /// - For client-side connections: returns server's listening address
738    /// - For server-side connections: returns synthesized ephemeral address
739    ///
740    /// The returned address may not be connectable for server-side connections,
741    /// matching real TCP behavior where servers see client ephemeral ports.
742    pub(crate) fn connection_peer_address(&self, connection_id: ConnectionId) -> Option<String> {
743        let inner = self.inner.borrow();
744        inner
745            .network
746            .connections
747            .get(&connection_id)
748            .map(|conn| conn.peer_address.clone())
749    }
750
751    /// Sleep for the specified duration in simulation time.
752    ///
753    /// Returns a future that will complete when the simulation time has advanced
754    /// by the specified duration.
755    #[instrument(skip(self))]
756    pub fn sleep(&self, duration: Duration) -> SleepFuture {
757        let task_id = self.generate_task_id();
758
759        // Apply buggified delay if enabled
760        let actual_duration = self.apply_buggified_delay(duration);
761
762        // Schedule a wake event for this task
763        self.schedule_event(Event::Timer { task_id }, actual_duration);
764
765        // Return a future that will be woken when the event is processed
766        SleepFuture::new(self.downgrade(), task_id)
767    }
768
769    /// Apply buggified delay to a duration if chaos is enabled.
770    fn apply_buggified_delay(&self, duration: Duration) -> Duration {
771        let inner = self.inner.borrow();
772        let chaos = &inner.network.config.chaos;
773
774        if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
775            return duration;
776        }
777
778        // 25% probability per FDB
779        if sim_random::<f64>() < chaos.buggified_delay_probability {
780            // Power-law distribution: pow(random01(), 1000) creates very skewed delays
781            let random_factor = sim_random::<f64>().powf(1000.0);
782            let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
783            tracing::trace!(
784                extra_delay_ms = extra_delay.as_millis(),
785                "Buggified delay applied"
786            );
787            duration + extra_delay
788        } else {
789            duration
790        }
791    }
792
793    /// Generate a unique task ID for sleep operations.
794    fn generate_task_id(&self) -> u64 {
795        let mut inner = self.inner.borrow_mut();
796        let task_id = inner.next_task_id;
797        inner.next_task_id += 1;
798        task_id
799    }
800
801    /// Wake all tasks associated with a connection
802    fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
803        if let Some(waker_list) = wakers.remove(&connection_id) {
804            for waker in waker_list {
805                waker.wake();
806            }
807        }
808    }
809
810    /// Check if a task has been awakened.
811    pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
812        let inner = self.inner.borrow();
813        Ok(inner.awakened_tasks.contains(&task_id))
814    }
815
816    /// Register a waker for a task.
817    pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
818        let mut inner = self.inner.borrow_mut();
819        inner.wakers.task_wakers.insert(task_id, waker);
820        Ok(())
821    }
822
823    /// Clear expired clogs and wake pending tasks (helper for use with SimInner)
824    fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
825        let now = inner.current_time;
826        let expired: Vec<ConnectionId> = inner
827            .network
828            .connection_clogs
829            .iter()
830            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
831            .collect();
832
833        for id in expired {
834            inner.network.connection_clogs.remove(&id);
835            Self::wake_all(&mut inner.wakers.clog_wakers, id);
836        }
837    }
838
839    /// Static event processor for simulation events.
840    #[instrument(skip(inner))]
841    fn process_event_with_inner(inner: &mut SimInner, event: Event) {
842        inner.events_processed += 1;
843
844        match event {
845            Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
846            Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
847            Event::Network {
848                connection_id,
849                operation,
850            } => Self::handle_network_event(inner, connection_id, operation),
851            Event::Storage { file_id, operation } => {
852                super::storage_ops::handle_storage_event(inner, file_id, operation)
853            }
854            Event::Shutdown => Self::handle_shutdown_event(inner),
855            Event::ProcessRestart { ip }
856            | Event::ProcessGracefulShutdown { ip, .. }
857            | Event::ProcessForceKill { ip, .. } => {
858                // Process lifecycle events are handled by the orchestrator, not SimWorld.
859                // We just log them here; the orchestrator reads the event after step().
860                tracing::debug!("Process lifecycle event for IP {}", ip);
861            }
862        }
863    }
864
865    /// Handle timer events - wake sleeping tasks.
866    fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
867        inner.awakened_tasks.insert(task_id);
868        if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
869            waker.wake();
870        }
871    }
872
873    /// Handle connection state change events.
874    fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
875        let connection_id = ConnectionId(id);
876
877        match state {
878            ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
879                // No action needed for these states
880            }
881            ConnectionStateChange::ClogClear => {
882                inner.network.connection_clogs.remove(&connection_id);
883                Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
884            }
885            ConnectionStateChange::ReadClogClear => {
886                inner.network.read_clogs.remove(&connection_id);
887                Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
888            }
889            ConnectionStateChange::PartitionRestore => {
890                Self::clear_expired_partitions(inner);
891            }
892            ConnectionStateChange::SendPartitionClear => {
893                Self::clear_expired_send_partitions(inner);
894            }
895            ConnectionStateChange::RecvPartitionClear => {
896                Self::clear_expired_recv_partitions(inner);
897            }
898            ConnectionStateChange::CutRestore => {
899                if let Some(conn) = inner.network.connections.get_mut(&connection_id)
900                    && conn.is_cut
901                {
902                    conn.is_cut = false;
903                    conn.cut_expiry = None;
904                    inner.emit_fault(SimFaultEvent::CutRestored { connection_id: id });
905                    tracing::debug!("Connection {} restored via scheduled event", id);
906                    Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
907                }
908            }
909            ConnectionStateChange::HalfOpenError => {
910                inner.emit_fault(SimFaultEvent::HalfOpenError { connection_id: id });
911                tracing::debug!("Connection {} half-open error time reached", id);
912                if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
913                    waker.wake();
914                }
915                Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
916            }
917        }
918    }
919
920    /// Clear expired IP partitions.
921    fn clear_expired_partitions(inner: &mut SimInner) {
922        let now = inner.current_time;
923        let expired: Vec<_> = inner
924            .network
925            .ip_partitions
926            .iter()
927            .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
928            .collect();
929
930        for pair in expired {
931            inner.network.ip_partitions.remove(&pair);
932            tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
933        }
934    }
935
936    /// Clear expired send partitions.
937    fn clear_expired_send_partitions(inner: &mut SimInner) {
938        let now = inner.current_time;
939        let expired: Vec<_> = inner
940            .network
941            .send_partitions
942            .iter()
943            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
944            .collect();
945
946        for ip in expired {
947            inner.network.send_partitions.remove(&ip);
948            tracing::debug!("Cleared send partition for {}", ip);
949        }
950    }
951
952    /// Clear expired receive partitions.
953    fn clear_expired_recv_partitions(inner: &mut SimInner) {
954        let now = inner.current_time;
955        let expired: Vec<_> = inner
956            .network
957            .recv_partitions
958            .iter()
959            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
960            .collect();
961
962        for ip in expired {
963            inner.network.recv_partitions.remove(&ip);
964            tracing::debug!("Cleared receive partition for {}", ip);
965        }
966    }
967
968    /// Handle network events (data delivery and send buffer processing).
969    fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
970        let connection_id = ConnectionId(conn_id);
971
972        match operation {
973            NetworkOperation::DataDelivery { data } => {
974                Self::handle_data_delivery(inner, connection_id, data);
975            }
976            NetworkOperation::ProcessSendBuffer => {
977                Self::handle_process_send_buffer(inner, connection_id);
978            }
979            NetworkOperation::FinDelivery => {
980                Self::handle_fin_delivery(inner, connection_id);
981            }
982        }
983    }
984
985    /// Handle data delivery to a connection's receive buffer.
986    fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
987        tracing::trace!(
988            "DataDelivery: {} bytes to connection {}",
989            data.len(),
990            connection_id.0
991        );
992
993        // Check connection exists and if it's stable
994        let is_stable = inner
995            .network
996            .connections
997            .get(&connection_id)
998            .is_some_and(|conn| conn.is_stable);
999
1000        if !inner.network.connections.contains_key(&connection_id) {
1001            tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
1002            return;
1003        }
1004
1005        // Apply bit flipping chaos (needs mutable inner access) - skip for stable connections
1006        let data_to_deliver = if is_stable {
1007            data
1008        } else {
1009            Self::maybe_corrupt_data(inner, connection_id, &data)
1010        };
1011
1012        // Now get connection reference and deliver data
1013        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1014            return;
1015        };
1016
1017        for &byte in &data_to_deliver {
1018            conn.receive_buffer.push_back(byte);
1019        }
1020
1021        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1022            waker.wake();
1023        }
1024    }
1025
1026    /// Handle FIN delivery to a connection's receive side (TCP half-close).
1027    ///
1028    /// Sets `remote_fin_received` on the receiving connection so that `poll_read`
1029    /// returns EOF after the receive buffer is drained. Ignores FIN if the connection
1030    /// was already aborted (stale event).
1031    fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
1032        tracing::debug!(
1033            "FinDelivery: FIN received on connection {}",
1034            connection_id.0
1035        );
1036
1037        // If connection was aborted after FIN was scheduled, ignore the stale FIN
1038        let is_closed = inner
1039            .network
1040            .connections
1041            .get(&connection_id)
1042            .is_some_and(|conn| conn.is_closed);
1043
1044        if is_closed {
1045            tracing::debug!(
1046                "FinDelivery: connection {} already closed, ignoring stale FIN",
1047                connection_id.0
1048            );
1049            return;
1050        }
1051
1052        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1053            conn.remote_fin_received = true;
1054        }
1055
1056        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1057            waker.wake();
1058        }
1059    }
1060
1061    /// Schedule a FinDelivery event to the peer connection.
1062    ///
1063    /// The FIN is scheduled after the last DataDelivery event to ensure all data
1064    /// arrives at the peer before EOF is signaled.
1065    fn schedule_fin_delivery(
1066        inner: &mut SimInner,
1067        paired_id: Option<ConnectionId>,
1068        last_delivery_time: Option<Duration>,
1069    ) {
1070        let Some(peer_id) = paired_id else {
1071            return;
1072        };
1073
1074        // FIN must arrive after all DataDelivery events
1075        let fin_time = match last_delivery_time {
1076            Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
1077            _ => inner.current_time + Duration::from_nanos(1),
1078        };
1079
1080        let sequence = inner.next_sequence;
1081        inner.next_sequence += 1;
1082
1083        tracing::debug!(
1084            "Scheduling FinDelivery to connection {} at {:?}",
1085            peer_id.0,
1086            fin_time
1087        );
1088
1089        inner.event_queue.schedule(ScheduledEvent::new(
1090            fin_time,
1091            Event::Network {
1092                connection_id: peer_id.0,
1093                operation: NetworkOperation::FinDelivery,
1094            },
1095            sequence,
1096        ));
1097    }
1098
1099    /// Maybe corrupt data with bit flips based on chaos configuration.
1100    fn maybe_corrupt_data(
1101        inner: &mut SimInner,
1102        connection_id: ConnectionId,
1103        data: &[u8],
1104    ) -> Vec<u8> {
1105        if data.is_empty() {
1106            return data.to_vec();
1107        }
1108
1109        let chaos = &inner.network.config.chaos;
1110        let now = inner.current_time;
1111        let cooldown_elapsed =
1112            now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
1113
1114        if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
1115            return data.to_vec();
1116        }
1117
1118        let random_value = sim_random::<u32>();
1119        let flip_count = SimInner::calculate_flip_bit_count(
1120            random_value,
1121            chaos.bit_flip_min_bits,
1122            chaos.bit_flip_max_bits,
1123        );
1124
1125        let mut corrupted_data = data.to_vec();
1126        let mut flipped_positions = std::collections::HashSet::new();
1127
1128        for _ in 0..flip_count {
1129            let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
1130            let bit_idx = (sim_random::<u64>() as usize) % 8;
1131            let position = (byte_idx, bit_idx);
1132
1133            if !flipped_positions.contains(&position) {
1134                flipped_positions.insert(position);
1135                corrupted_data[byte_idx] ^= 1 << bit_idx;
1136            }
1137        }
1138
1139        inner.last_bit_flip_time = now;
1140        tracing::info!(
1141            "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
1142            data.len(),
1143            flip_count,
1144            flipped_positions.len()
1145        );
1146
1147        inner.emit_fault(SimFaultEvent::BitFlip {
1148            connection_id: connection_id.0,
1149            flip_count: flipped_positions.len(),
1150        });
1151
1152        corrupted_data
1153    }
1154
1155    /// Handle processing of a connection's send buffer.
1156    fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1157        let is_partitioned = inner
1158            .network
1159            .is_connection_partitioned(connection_id, inner.current_time);
1160
1161        if is_partitioned {
1162            Self::handle_partitioned_send(inner, connection_id);
1163        } else {
1164            Self::handle_normal_send(inner, connection_id);
1165        }
1166    }
1167
1168    /// Handle send when connection is partitioned.
1169    fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1170        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1171            return;
1172        };
1173
1174        if let Some(data) = conn.send_buffer.pop_front() {
1175            tracing::debug!(
1176                "Connection {} partitioned, failing send of {} bytes",
1177                connection_id.0,
1178                data.len()
1179            );
1180            Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1181
1182            if !conn.send_buffer.is_empty() {
1183                Self::schedule_process_send_buffer(inner, connection_id);
1184            } else {
1185                conn.send_in_progress = false;
1186                // Check for pending graceful close when pipeline drains
1187                if conn.graceful_close_pending {
1188                    conn.graceful_close_pending = false;
1189                    let peer_id = conn.paired_connection;
1190                    let last_time = conn.last_data_delivery_scheduled_at;
1191                    Self::schedule_fin_delivery(inner, peer_id, last_time);
1192                }
1193            }
1194        } else {
1195            conn.send_in_progress = false;
1196            // Check for pending graceful close when pipeline drains
1197            if conn.graceful_close_pending {
1198                conn.graceful_close_pending = false;
1199                let peer_id = conn.paired_connection;
1200                let last_time = conn.last_data_delivery_scheduled_at;
1201                Self::schedule_fin_delivery(inner, peer_id, last_time);
1202            }
1203        }
1204    }
1205
1206    /// Handle normal (non-partitioned) send.
1207    fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1208        // Extract connection info first
1209        let Some(conn) = inner.network.connections.get(&connection_id) else {
1210            return;
1211        };
1212
1213        let paired_id = conn.paired_connection;
1214        let send_delay = conn.send_delay;
1215        let next_send_time = conn.next_send_time;
1216        let has_data = !conn.send_buffer.is_empty();
1217        let is_stable = conn.is_stable; // For stable connection checks
1218
1219        let recv_delay = paired_id.and_then(|pid| {
1220            inner
1221                .network
1222                .connections
1223                .get(&pid)
1224                .and_then(|c| c.recv_delay)
1225        });
1226
1227        if !has_data {
1228            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1229                conn.send_in_progress = false;
1230                // Check for pending graceful close when pipeline drains
1231                if conn.graceful_close_pending {
1232                    conn.graceful_close_pending = false;
1233                    let peer_id = conn.paired_connection;
1234                    let last_time = conn.last_data_delivery_scheduled_at;
1235                    Self::schedule_fin_delivery(inner, peer_id, last_time);
1236                }
1237            }
1238            return;
1239        }
1240
1241        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1242            return;
1243        };
1244
1245        let Some(mut data) = conn.send_buffer.pop_front() else {
1246            conn.send_in_progress = false;
1247            return;
1248        };
1249
1250        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1251
1252        // BUGGIFY: Simulate partial/short writes (skip for stable connections)
1253        if !is_stable && crate::buggify!() && !data.is_empty() {
1254            let max_send = std::cmp::min(
1255                data.len(),
1256                inner.network.config.chaos.partial_write_max_bytes,
1257            );
1258            let truncate_to = sim_random_range(0..max_send + 1);
1259
1260            if truncate_to < data.len() {
1261                let remainder = data.split_off(truncate_to);
1262                conn.send_buffer.push_front(remainder);
1263                tracing::debug!(
1264                    "BUGGIFY: Partial write on connection {} - sending {} bytes",
1265                    connection_id.0,
1266                    data.len()
1267                );
1268            }
1269        }
1270
1271        let has_more = !conn.send_buffer.is_empty();
1272        let base_delay = if has_more {
1273            Duration::from_nanos(1)
1274        } else {
1275            send_delay.unwrap_or_else(|| {
1276                crate::network::sample_duration(&inner.network.config.write_latency)
1277            })
1278        };
1279
1280        let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1281        conn.next_send_time = earliest_time + Duration::from_nanos(1);
1282
1283        // Schedule data delivery to paired connection
1284        if let Some(paired_id) = paired_id {
1285            let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1286            let sequence = inner.next_sequence;
1287            inner.next_sequence += 1;
1288
1289            inner.event_queue.schedule(ScheduledEvent::new(
1290                scheduled_time,
1291                Event::Network {
1292                    connection_id: paired_id.0,
1293                    operation: NetworkOperation::DataDelivery { data },
1294                },
1295                sequence,
1296            ));
1297
1298            // Track delivery time for FIN ordering (must be after last DataDelivery)
1299            conn.last_data_delivery_scheduled_at = Some(scheduled_time);
1300        }
1301
1302        // Schedule next send if more data
1303        if !conn.send_buffer.is_empty() {
1304            Self::schedule_process_send_buffer(inner, connection_id);
1305        } else {
1306            conn.send_in_progress = false;
1307            // Check for pending graceful close when pipeline drains
1308            if conn.graceful_close_pending {
1309                conn.graceful_close_pending = false;
1310                let peer_id = conn.paired_connection;
1311                let last_time = conn.last_data_delivery_scheduled_at;
1312                Self::schedule_fin_delivery(inner, peer_id, last_time);
1313            }
1314        }
1315    }
1316
1317    /// Schedule a ProcessSendBuffer event for the given connection.
1318    fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1319        let sequence = inner.next_sequence;
1320        inner.next_sequence += 1;
1321
1322        inner.event_queue.schedule(ScheduledEvent::new(
1323            inner.current_time,
1324            Event::Network {
1325                connection_id: connection_id.0,
1326                operation: NetworkOperation::ProcessSendBuffer,
1327            },
1328            sequence,
1329        ));
1330    }
1331
1332    /// Handle shutdown event - wake all pending tasks.
1333    fn handle_shutdown_event(inner: &mut SimInner) {
1334        tracing::debug!("Processing Shutdown event - waking all pending tasks");
1335
1336        for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
1337            tracing::trace!("Waking task {}", task_id);
1338            waker.wake();
1339        }
1340
1341        for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
1342            waker.wake();
1343        }
1344
1345        tracing::debug!("Shutdown event processed");
1346    }
1347
1348    /// Get current assertion results for all tracked assertions.
1349    pub fn assertion_results(
1350        &self,
1351    ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1352        crate::chaos::assertion_results()
1353    }
1354
1355    /// Reset assertion statistics to empty state.
1356    pub fn reset_assertion_results(&self) {
1357        crate::chaos::reset_assertion_results();
1358    }
1359
1360    /// Abort all connections involving a specific IP address.
1361    ///
1362    /// This is used during process reboot to immediately kill all network
1363    /// connections for the rebooted process. Both local and remote connections
1364    /// are aborted (RST semantics — peer sees ECONNRESET).
1365    pub fn abort_all_connections_for_ip(&self, ip: std::net::IpAddr) {
1366        let connection_ids: Vec<ConnectionId> = {
1367            let inner = self.inner.borrow();
1368            inner
1369                .network
1370                .connections
1371                .iter()
1372                .filter_map(|(id, conn)| {
1373                    if conn.local_ip == Some(ip) || conn.remote_ip == Some(ip) {
1374                        Some(*id)
1375                    } else {
1376                        None
1377                    }
1378                })
1379                .collect()
1380        };
1381
1382        let count = connection_ids.len();
1383        for conn_id in connection_ids {
1384            self.close_connection_abort(conn_id);
1385        }
1386
1387        if count > 0 {
1388            tracing::debug!("Aborted {} connections for rebooted IP {}", count, ip);
1389        }
1390    }
1391
1392    /// Schedule a `ProcessRestart` event after a recovery delay.
1393    ///
1394    /// Called after a process is killed to schedule its restart.
1395    pub fn schedule_process_restart(
1396        &self,
1397        ip: std::net::IpAddr,
1398        recovery_delay: std::time::Duration,
1399    ) {
1400        self.schedule_event(Event::ProcessRestart { ip }, recovery_delay);
1401        tracing::debug!(
1402            "Scheduled process restart for IP {} in {:?}",
1403            ip,
1404            recovery_delay
1405        );
1406    }
1407
1408    /// Returns the last event processed by `step()`, if any.
1409    ///
1410    /// This is used by the orchestrator to detect `ProcessRestart` events
1411    /// and handle them (respawn the process).
1412    pub fn last_processed_event(&self) -> Option<Event> {
1413        self.inner.borrow().last_processed_event.clone()
1414    }
1415
1416    /// Extract simulation metrics (simulated time, events processed).
1417    pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1418        let inner = self.inner.borrow();
1419
1420        crate::runner::SimulationMetrics {
1421            wall_time: std::time::Duration::ZERO,
1422            simulated_time: inner.current_time,
1423            events_processed: inner.events_processed,
1424        }
1425    }
1426
1427    // Phase 7: Simple write clogging methods
1428
1429    /// Check if a write should be clogged based on probability
1430    pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1431        let inner = self.inner.borrow();
1432        let config = &inner.network.config;
1433
1434        // Skip stable connections (FDB: stableConnection exempt from chaos)
1435        if inner
1436            .network
1437            .connections
1438            .get(&connection_id)
1439            .is_some_and(|conn| conn.is_stable)
1440        {
1441            return false;
1442        }
1443
1444        // Skip if already clogged
1445        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1446            return inner.current_time < clog_state.expires_at;
1447        }
1448
1449        // Check probability
1450        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1451    }
1452
1453    /// Clog a connection's write operations
1454    pub fn clog_write(&self, connection_id: ConnectionId) {
1455        let mut inner = self.inner.borrow_mut();
1456        let config = &inner.network.config;
1457
1458        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1459        let expires_at = inner.current_time + clog_duration;
1460        inner
1461            .network
1462            .connection_clogs
1463            .insert(connection_id, ClogState { expires_at });
1464
1465        // Schedule an event to clear this clog
1466        let clear_event = Event::Connection {
1467            id: connection_id.0,
1468            state: ConnectionStateChange::ClogClear,
1469        };
1470        let sequence = inner.next_sequence;
1471        inner.next_sequence += 1;
1472        inner
1473            .event_queue
1474            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1475    }
1476
1477    /// Check if a connection's writes are currently clogged
1478    pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1479        let inner = self.inner.borrow();
1480
1481        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1482            inner.current_time < clog_state.expires_at
1483        } else {
1484            false
1485        }
1486    }
1487
1488    /// Register a waker for when write clog clears
1489    pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1490        let mut inner = self.inner.borrow_mut();
1491        inner
1492            .wakers
1493            .clog_wakers
1494            .entry(connection_id)
1495            .or_default()
1496            .push(waker);
1497    }
1498
1499    // Read clogging methods (symmetric with write clogging)
1500
1501    /// Check if a read should be clogged based on probability
1502    pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1503        let inner = self.inner.borrow();
1504        let config = &inner.network.config;
1505
1506        // Skip stable connections (FDB: stableConnection exempt from chaos)
1507        if inner
1508            .network
1509            .connections
1510            .get(&connection_id)
1511            .is_some_and(|conn| conn.is_stable)
1512        {
1513            return false;
1514        }
1515
1516        // Skip if already clogged
1517        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1518            return inner.current_time < clog_state.expires_at;
1519        }
1520
1521        // Check probability (same as write clogging)
1522        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1523    }
1524
1525    /// Clog a connection's read operations
1526    pub fn clog_read(&self, connection_id: ConnectionId) {
1527        let mut inner = self.inner.borrow_mut();
1528        let config = &inner.network.config;
1529
1530        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1531        let expires_at = inner.current_time + clog_duration;
1532        inner
1533            .network
1534            .read_clogs
1535            .insert(connection_id, ClogState { expires_at });
1536
1537        // Schedule an event to clear this read clog
1538        let clear_event = Event::Connection {
1539            id: connection_id.0,
1540            state: ConnectionStateChange::ReadClogClear,
1541        };
1542        let sequence = inner.next_sequence;
1543        inner.next_sequence += 1;
1544        inner
1545            .event_queue
1546            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1547    }
1548
1549    /// Check if a connection's reads are currently clogged
1550    pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1551        let inner = self.inner.borrow();
1552
1553        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1554            inner.current_time < clog_state.expires_at
1555        } else {
1556            false
1557        }
1558    }
1559
1560    /// Register a waker for when read clog clears
1561    pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1562        let mut inner = self.inner.borrow_mut();
1563        inner
1564            .wakers
1565            .read_clog_wakers
1566            .entry(connection_id)
1567            .or_default()
1568            .push(waker);
1569    }
1570
1571    /// Clear expired clogs and wake pending tasks
1572    pub fn clear_expired_clogs(&self) {
1573        let mut inner = self.inner.borrow_mut();
1574        let now = inner.current_time;
1575        let expired: Vec<ConnectionId> = inner
1576            .network
1577            .connection_clogs
1578            .iter()
1579            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1580            .collect();
1581
1582        for id in expired {
1583            inner.network.connection_clogs.remove(&id);
1584            Self::wake_all(&mut inner.wakers.clog_wakers, id);
1585        }
1586    }
1587
1588    // Connection Cut Methods (temporary network outage simulation)
1589
1590    /// Temporarily cut a connection for the specified duration.
1591    ///
1592    /// Unlike `close_connection`, a cut connection will be automatically restored
1593    /// after the duration expires. This simulates temporary network outages where
1594    /// the underlying connection remains but is temporarily unavailable.
1595    ///
1596    /// During a cut:
1597    /// - `poll_read` returns `Poll::Pending` (waits for restore)
1598    /// - `poll_write` returns `Poll::Pending` (waits for restore)
1599    /// - Buffered data is preserved
1600    pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1601        let mut inner = self.inner.borrow_mut();
1602        let expires_at = inner.current_time + duration;
1603
1604        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1605            conn.is_cut = true;
1606            conn.cut_expiry = Some(expires_at);
1607
1608            inner.emit_fault(SimFaultEvent::ConnectionCut {
1609                connection_id: connection_id.0,
1610                duration_ms: duration.as_millis() as u64,
1611            });
1612
1613            tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1614
1615            // Schedule restoration event
1616            let restore_event = Event::Connection {
1617                id: connection_id.0,
1618                state: ConnectionStateChange::CutRestore,
1619            };
1620            let sequence = inner.next_sequence;
1621            inner.next_sequence += 1;
1622            inner
1623                .event_queue
1624                .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1625        }
1626    }
1627
1628    /// Check if a connection is temporarily cut.
1629    ///
1630    /// A cut connection is temporarily unavailable but will be restored.
1631    /// This is different from `is_connection_closed` which indicates permanent closure.
1632    pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1633        let inner = self.inner.borrow();
1634        inner
1635            .network
1636            .connections
1637            .get(&connection_id)
1638            .is_some_and(|conn| {
1639                conn.is_cut
1640                    && conn
1641                        .cut_expiry
1642                        .is_some_and(|expiry| inner.current_time < expiry)
1643            })
1644    }
1645
1646    /// Restore a cut connection immediately.
1647    ///
1648    /// This cancels the cut state and wakes any tasks waiting for restoration.
1649    pub fn restore_connection(&self, connection_id: ConnectionId) {
1650        let mut inner = self.inner.borrow_mut();
1651
1652        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1653            && conn.is_cut
1654        {
1655            conn.is_cut = false;
1656            conn.cut_expiry = None;
1657            tracing::debug!("Connection {} restored", connection_id.0);
1658
1659            // Wake any tasks waiting for restoration
1660            Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1661        }
1662    }
1663
1664    /// Register a waker for when a cut connection is restored.
1665    pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1666        let mut inner = self.inner.borrow_mut();
1667        inner
1668            .wakers
1669            .cut_wakers
1670            .entry(connection_id)
1671            .or_default()
1672            .push(waker);
1673    }
1674
1675    // Send buffer management methods
1676
1677    /// Get the send buffer capacity for a connection.
1678    pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1679        let inner = self.inner.borrow();
1680        inner
1681            .network
1682            .connections
1683            .get(&connection_id)
1684            .map(|conn| conn.send_buffer_capacity)
1685            .unwrap_or(0)
1686    }
1687
1688    /// Get the current send buffer usage for a connection.
1689    pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1690        let inner = self.inner.borrow();
1691        inner
1692            .network
1693            .connections
1694            .get(&connection_id)
1695            .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1696            .unwrap_or(0)
1697    }
1698
1699    /// Get the available send buffer space for a connection.
1700    pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1701        let capacity = self.send_buffer_capacity(connection_id);
1702        let used = self.send_buffer_used(connection_id);
1703        capacity.saturating_sub(used)
1704    }
1705
1706    /// Register a waker for when send buffer space becomes available.
1707    pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1708        let mut inner = self.inner.borrow_mut();
1709        inner
1710            .wakers
1711            .send_buffer_wakers
1712            .entry(connection_id)
1713            .or_default()
1714            .push(waker);
1715    }
1716
1717    /// Wake any tasks waiting for send buffer space on a connection.
1718    #[allow(dead_code)] // May be used for external buffer management
1719    fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1720        let mut inner = self.inner.borrow_mut();
1721        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1722    }
1723
1724    // Per-IP-pair base latency methods
1725
1726    /// Get the base latency for a connection pair.
1727    /// Returns the latency if already set, otherwise None.
1728    pub fn pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1729        let inner = self.inner.borrow();
1730        inner.network.pair_latencies.get(&(src, dst)).copied()
1731    }
1732
1733    /// Set the base latency for a connection pair if not already set.
1734    /// Returns the latency (existing or newly set).
1735    pub fn set_pair_latency_if_not_set(
1736        &self,
1737        src: IpAddr,
1738        dst: IpAddr,
1739        latency: Duration,
1740    ) -> Duration {
1741        let mut inner = self.inner.borrow_mut();
1742        *inner
1743            .network
1744            .pair_latencies
1745            .entry((src, dst))
1746            .or_insert_with(|| {
1747                tracing::debug!(
1748                    "Setting base latency for IP pair {} -> {} to {:?}",
1749                    src,
1750                    dst,
1751                    latency
1752                );
1753                latency
1754            })
1755    }
1756
1757    /// Get the base latency for a connection based on its IP pair.
1758    /// If not set, samples from config and sets it.
1759    pub fn connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1760        let inner = self.inner.borrow();
1761        let (local_ip, remote_ip) = inner
1762            .network
1763            .connections
1764            .get(&connection_id)
1765            .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1766            .unwrap_or({
1767                (
1768                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1769                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1770                )
1771            });
1772        drop(inner);
1773
1774        // Check if latency is already set
1775        if let Some(latency) = self.pair_latency(local_ip, remote_ip) {
1776            return latency;
1777        }
1778
1779        // Sample a new latency from config and set it
1780        let latency = self
1781            .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1782        self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1783    }
1784
1785    // Per-connection asymmetric delay methods
1786
1787    /// Get the send delay for a connection.
1788    /// Returns the per-connection override if set, otherwise None.
1789    pub fn send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1790        let inner = self.inner.borrow();
1791        inner
1792            .network
1793            .connections
1794            .get(&connection_id)
1795            .and_then(|conn| conn.send_delay)
1796    }
1797
1798    /// Get the receive delay for a connection.
1799    /// Returns the per-connection override if set, otherwise None.
1800    pub fn recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1801        let inner = self.inner.borrow();
1802        inner
1803            .network
1804            .connections
1805            .get(&connection_id)
1806            .and_then(|conn| conn.recv_delay)
1807    }
1808
1809    /// Set asymmetric delays for a connection.
1810    /// If a delay is None, the global configuration is used instead.
1811    pub fn set_asymmetric_delays(
1812        &self,
1813        connection_id: ConnectionId,
1814        send_delay: Option<Duration>,
1815        recv_delay: Option<Duration>,
1816    ) {
1817        let mut inner = self.inner.borrow_mut();
1818        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1819            conn.send_delay = send_delay;
1820            conn.recv_delay = recv_delay;
1821            tracing::debug!(
1822                "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1823                connection_id.0,
1824                send_delay,
1825                recv_delay
1826            );
1827        }
1828    }
1829
1830    /// Check if a connection is permanently closed
1831    pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1832        let inner = self.inner.borrow();
1833        inner
1834            .network
1835            .connections
1836            .get(&connection_id)
1837            .is_some_and(|conn| conn.is_closed)
1838    }
1839
1840    /// Close a connection gracefully (FIN semantics).
1841    ///
1842    /// The peer will receive EOF on read operations.
1843    pub fn close_connection(&self, connection_id: ConnectionId) {
1844        self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1845    }
1846
1847    /// Close a connection abruptly (RST semantics).
1848    ///
1849    /// The peer will receive ECONNRESET on both read and write operations.
1850    pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1851        self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1852    }
1853
1854    /// Get the close reason for a connection.
1855    pub fn close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1856        let inner = self.inner.borrow();
1857        inner
1858            .network
1859            .connections
1860            .get(&connection_id)
1861            .map(|conn| conn.close_reason)
1862            .unwrap_or(CloseReason::None)
1863    }
1864
1865    /// Close a connection with a specific close reason.
1866    fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1867        match reason {
1868            CloseReason::Graceful => self.close_connection_graceful(connection_id),
1869            CloseReason::Aborted => self.close_connection_aborted(connection_id),
1870            CloseReason::None => {}
1871        }
1872    }
1873
1874    /// Graceful close (FIN semantics) — TCP half-close.
1875    ///
1876    /// Marks the local write side as closed. The peer can still read all buffered
1877    /// and in-flight data. A `FinDelivery` event is scheduled after the last
1878    /// `DataDelivery` to signal EOF to the peer.
1879    fn close_connection_graceful(&self, connection_id: ConnectionId) {
1880        let mut inner = self.inner.borrow_mut();
1881
1882        // Extract connection info
1883        let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
1884            (
1885                conn.paired_connection,
1886                conn.send_closed,
1887                conn.is_closed,
1888                conn.send_in_progress,
1889                conn.send_buffer.is_empty(),
1890                conn.last_data_delivery_scheduled_at,
1891            )
1892        });
1893
1894        let Some((
1895            paired_id,
1896            was_send_closed,
1897            was_closed,
1898            send_in_progress,
1899            send_buffer_empty,
1900            last_delivery_time,
1901        )) = conn_info
1902        else {
1903            return;
1904        };
1905
1906        // Idempotent: if already closed or send_closed (by chaos or previous close), skip
1907        if was_closed || was_send_closed {
1908            tracing::debug!(
1909                "Connection {} already closed/send_closed, skipping graceful close",
1910                connection_id.0
1911            );
1912            return;
1913        }
1914
1915        // Mark local side as closed with send side shut down
1916        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1917            conn.is_closed = true;
1918            conn.close_reason = CloseReason::Graceful;
1919            conn.send_closed = true;
1920            tracing::debug!(
1921                "Connection {} graceful close (FIN) - local write shut down",
1922                connection_id.0
1923            );
1924        }
1925
1926        // Wake local read waker (so local poll_read returns EOF if needed)
1927        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1928            waker.wake();
1929        }
1930
1931        // Do NOT set is_closed on the peer — they can still read buffered data.
1932        // Instead, schedule a FIN delivery after all data has been delivered.
1933        if send_in_progress || !send_buffer_empty {
1934            // Pipeline has data — defer FIN until pipeline drains
1935            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1936                conn.graceful_close_pending = true;
1937                tracing::debug!(
1938                    "Connection {} graceful close deferred (send pipeline active)",
1939                    connection_id.0
1940                );
1941            }
1942        } else {
1943            // Pipeline is idle — schedule FIN delivery now
1944            Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
1945        }
1946    }
1947
1948    /// Aborted close (RST semantics) — immediate connection kill.
1949    ///
1950    /// Immediately sets `is_closed` on both endpoints. The peer will receive
1951    /// ECONNRESET on both read and write operations.
1952    fn close_connection_aborted(&self, connection_id: ConnectionId) {
1953        let mut inner = self.inner.borrow_mut();
1954
1955        let paired_connection_id = inner
1956            .network
1957            .connections
1958            .get(&connection_id)
1959            .and_then(|conn| conn.paired_connection);
1960
1961        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1962            && !conn.is_closed
1963        {
1964            conn.is_closed = true;
1965            conn.close_reason = CloseReason::Aborted;
1966            // Cancel any pending graceful close
1967            conn.graceful_close_pending = false;
1968            tracing::debug!(
1969                "Connection {} closed permanently (reason: Aborted)",
1970                connection_id.0
1971            );
1972        }
1973
1974        if let Some(paired_id) = paired_connection_id
1975            && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1976            && !paired_conn.is_closed
1977        {
1978            paired_conn.is_closed = true;
1979            paired_conn.close_reason = CloseReason::Aborted;
1980            tracing::debug!(
1981                "Paired connection {} also closed (reason: Aborted)",
1982                paired_id.0
1983            );
1984        }
1985
1986        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1987            tracing::debug!(
1988                "Waking read waker for aborted connection {}",
1989                connection_id.0
1990            );
1991            waker.wake();
1992        }
1993
1994        if let Some(paired_id) = paired_connection_id
1995            && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1996        {
1997            tracing::debug!(
1998                "Waking read waker for paired aborted connection {}",
1999                paired_id.0
2000            );
2001            paired_waker.wake();
2002        }
2003    }
2004
2005    /// Close connection asymmetrically (FDB rollRandomClose pattern)
2006    pub fn close_connection_asymmetric(
2007        &self,
2008        connection_id: ConnectionId,
2009        close_send: bool,
2010        close_recv: bool,
2011    ) {
2012        let mut inner = self.inner.borrow_mut();
2013
2014        let paired_id = inner
2015            .network
2016            .connections
2017            .get(&connection_id)
2018            .and_then(|conn| conn.paired_connection);
2019
2020        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2021            conn.send_closed = true;
2022            conn.send_buffer.clear();
2023            tracing::debug!(
2024                "Connection {} send side closed (asymmetric)",
2025                connection_id.0
2026            );
2027        }
2028
2029        if close_recv
2030            && let Some(paired) = paired_id
2031            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2032        {
2033            paired_conn.recv_closed = true;
2034            tracing::debug!(
2035                "Connection {} recv side closed (asymmetric via peer)",
2036                paired.0
2037            );
2038        }
2039
2040        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2041            waker.wake();
2042        }
2043        if close_recv
2044            && let Some(paired) = paired_id
2045            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2046        {
2047            waker.wake();
2048        }
2049    }
2050
2051    /// Roll random close chaos injection (FDB rollRandomClose pattern)
2052    pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
2053        let mut inner = self.inner.borrow_mut();
2054        let config = &inner.network.config;
2055
2056        // Skip stable connections (FDB: stableConnection exempt from chaos)
2057        if inner
2058            .network
2059            .connections
2060            .get(&connection_id)
2061            .is_some_and(|conn| conn.is_stable)
2062        {
2063            return None;
2064        }
2065
2066        if config.chaos.random_close_probability <= 0.0 {
2067            return None;
2068        }
2069
2070        let current_time = inner.current_time;
2071        let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
2072        if time_since_last < config.chaos.random_close_cooldown {
2073            return None;
2074        }
2075
2076        if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
2077            return None;
2078        }
2079
2080        inner.network.last_random_close_time = current_time;
2081
2082        inner.emit_fault(SimFaultEvent::RandomClose {
2083            connection_id: connection_id.0,
2084        });
2085
2086        let paired_id = inner
2087            .network
2088            .connections
2089            .get(&connection_id)
2090            .and_then(|conn| conn.paired_connection);
2091
2092        let a = super::rng::sim_random_f64();
2093        let close_recv = a < 0.66;
2094        let close_send = a > 0.33;
2095
2096        tracing::info!(
2097            "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
2098            connection_id.0,
2099            close_send,
2100            close_recv,
2101            a
2102        );
2103
2104        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2105            conn.send_closed = true;
2106            conn.send_buffer.clear();
2107        }
2108
2109        if close_recv
2110            && let Some(paired) = paired_id
2111            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2112        {
2113            paired_conn.recv_closed = true;
2114        }
2115
2116        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2117            waker.wake();
2118        }
2119        if close_recv
2120            && let Some(paired) = paired_id
2121            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2122        {
2123            waker.wake();
2124        }
2125
2126        let b = super::rng::sim_random_f64();
2127        let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
2128
2129        tracing::debug!(
2130            "Random close explicit={} (b={:.3}, ratio={:.2})",
2131            explicit,
2132            b,
2133            inner.network.config.chaos.random_close_explicit_ratio
2134        );
2135
2136        Some(explicit)
2137    }
2138
2139    /// Check if a connection's send side is closed
2140    pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
2141        let inner = self.inner.borrow();
2142        inner
2143            .network
2144            .connections
2145            .get(&connection_id)
2146            .is_some_and(|conn| conn.send_closed || conn.is_closed)
2147    }
2148
2149    /// Check if a connection's receive side is closed
2150    pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
2151        let inner = self.inner.borrow();
2152        inner
2153            .network
2154            .connections
2155            .get(&connection_id)
2156            .is_some_and(|conn| conn.recv_closed || conn.is_closed)
2157    }
2158
2159    /// Check if a FIN has been received from the remote peer (graceful close).
2160    ///
2161    /// When true, `poll_read` should return EOF after draining the receive buffer.
2162    /// Distinct from `is_recv_closed` which is used for chaos/asymmetric closure.
2163    pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
2164        let inner = self.inner.borrow();
2165        inner
2166            .network
2167            .connections
2168            .get(&connection_id)
2169            .is_some_and(|conn| conn.remote_fin_received)
2170    }
2171
2172    // Half-Open Connection Simulation Methods
2173
2174    /// Simulate a peer crash on a connection.
2175    ///
2176    /// This puts the connection in a half-open state where:
2177    /// - The local side still thinks it's connected
2178    /// - Writes succeed but data is silently discarded (peer is gone)
2179    /// - Reads block waiting for data that will never come
2180    /// - After `error_delay`, both read and write return ECONNRESET
2181    ///
2182    /// This simulates real-world scenarios where a remote peer crashes
2183    /// or becomes unreachable, but the local TCP stack hasn't detected it yet.
2184    pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
2185        let mut inner = self.inner.borrow_mut();
2186        let current_time = inner.current_time;
2187        let error_at = current_time + error_delay;
2188
2189        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2190            conn.is_half_open = true;
2191            conn.half_open_error_at = Some(error_at);
2192
2193            // Clear the paired connection to simulate peer being gone
2194            // Data sent will go nowhere
2195            conn.paired_connection = None;
2196
2197            inner.emit_fault(SimFaultEvent::PeerCrash {
2198                connection_id: connection_id.0,
2199            });
2200
2201            tracing::info!(
2202                "Connection {} now half-open, errors manifest at {:?}",
2203                connection_id.0,
2204                error_at
2205            );
2206        }
2207
2208        // Schedule an event to wake any waiting readers when error time arrives
2209        let wake_event = Event::Connection {
2210            id: connection_id.0,
2211            state: ConnectionStateChange::HalfOpenError,
2212        };
2213        let sequence = inner.next_sequence;
2214        inner.next_sequence += 1;
2215        let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
2216        inner.event_queue.schedule(scheduled_event);
2217    }
2218
2219    /// Check if a connection is in half-open state
2220    pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
2221        let inner = self.inner.borrow();
2222        inner
2223            .network
2224            .connections
2225            .get(&connection_id)
2226            .is_some_and(|conn| conn.is_half_open)
2227    }
2228
2229    /// Check if a half-open connection should return errors now
2230    pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
2231        let inner = self.inner.borrow();
2232        let current_time = inner.current_time;
2233        inner
2234            .network
2235            .connections
2236            .get(&connection_id)
2237            .is_some_and(|conn| {
2238                conn.is_half_open
2239                    && conn
2240                        .half_open_error_at
2241                        .is_some_and(|error_at| current_time >= error_at)
2242            })
2243    }
2244
2245    // Stable Connection Methods
2246
2247    /// Mark a connection as stable, exempting it from chaos injection.
2248    ///
2249    /// Stable connections are exempt from:
2250    /// - Random close (`roll_random_close`)
2251    /// - Write clogging
2252    /// - Read clogging
2253    /// - Bit flip corruption
2254    /// - Partial write truncation
2255    ///
2256    /// FDB ref: sim2.actor.cpp:357-362 (`stableConnection` flag)
2257    ///
2258    /// # Real-World Scenario
2259    /// Use this for parent-child process connections or supervision channels
2260    /// that should remain reliable even during chaos testing.
2261    pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
2262        let mut inner = self.inner.borrow_mut();
2263        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2264            conn.is_stable = true;
2265            tracing::debug!("Connection {} marked as stable", connection_id.0);
2266
2267            // Also mark the paired connection as stable
2268            if let Some(paired_id) = conn.paired_connection
2269                && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
2270            {
2271                paired_conn.is_stable = true;
2272                tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
2273            }
2274        }
2275    }
2276
2277    /// Check if a connection is marked as stable.
2278    pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
2279        let inner = self.inner.borrow();
2280        inner
2281            .network
2282            .connections
2283            .get(&connection_id)
2284            .is_some_and(|conn| conn.is_stable)
2285    }
2286
2287    // Network Partition Control Methods
2288
2289    /// Partition communication between two IP addresses for a specified duration
2290    pub fn partition_pair(
2291        &self,
2292        from_ip: std::net::IpAddr,
2293        to_ip: std::net::IpAddr,
2294        duration: Duration,
2295    ) -> SimulationResult<()> {
2296        let mut inner = self.inner.borrow_mut();
2297        let expires_at = inner.current_time + duration;
2298
2299        inner
2300            .network
2301            .ip_partitions
2302            .insert((from_ip, to_ip), PartitionState { expires_at });
2303
2304        let restore_event = Event::Connection {
2305            id: 0,
2306            state: ConnectionStateChange::PartitionRestore,
2307        };
2308        let sequence = inner.next_sequence;
2309        inner.next_sequence += 1;
2310        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2311        inner.event_queue.schedule(scheduled_event);
2312
2313        inner.emit_fault(SimFaultEvent::PartitionCreated {
2314            from: from_ip.to_string(),
2315            to: to_ip.to_string(),
2316        });
2317
2318        tracing::debug!(
2319            "Partitioned {} -> {} until {:?}",
2320            from_ip,
2321            to_ip,
2322            expires_at
2323        );
2324        Ok(())
2325    }
2326
2327    /// Block all outgoing communication from an IP address
2328    pub fn partition_send_from(
2329        &self,
2330        ip: std::net::IpAddr,
2331        duration: Duration,
2332    ) -> SimulationResult<()> {
2333        let mut inner = self.inner.borrow_mut();
2334        let expires_at = inner.current_time + duration;
2335
2336        inner.network.send_partitions.insert(ip, expires_at);
2337
2338        let clear_event = Event::Connection {
2339            id: 0,
2340            state: ConnectionStateChange::SendPartitionClear,
2341        };
2342        let sequence = inner.next_sequence;
2343        inner.next_sequence += 1;
2344        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2345        inner.event_queue.schedule(scheduled_event);
2346
2347        inner.emit_fault(SimFaultEvent::SendPartitionCreated { ip: ip.to_string() });
2348        tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
2349        Ok(())
2350    }
2351
2352    /// Block all incoming communication to an IP address
2353    pub fn partition_recv_to(
2354        &self,
2355        ip: std::net::IpAddr,
2356        duration: Duration,
2357    ) -> SimulationResult<()> {
2358        let mut inner = self.inner.borrow_mut();
2359        let expires_at = inner.current_time + duration;
2360
2361        inner.network.recv_partitions.insert(ip, expires_at);
2362
2363        let clear_event = Event::Connection {
2364            id: 0,
2365            state: ConnectionStateChange::RecvPartitionClear,
2366        };
2367        let sequence = inner.next_sequence;
2368        inner.next_sequence += 1;
2369        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2370        inner.event_queue.schedule(scheduled_event);
2371
2372        inner.emit_fault(SimFaultEvent::RecvPartitionCreated { ip: ip.to_string() });
2373        tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
2374        Ok(())
2375    }
2376
2377    /// Immediately restore communication between two IP addresses
2378    pub fn restore_partition(
2379        &self,
2380        from_ip: std::net::IpAddr,
2381        to_ip: std::net::IpAddr,
2382    ) -> SimulationResult<()> {
2383        let mut inner = self.inner.borrow_mut();
2384        inner.network.ip_partitions.remove(&(from_ip, to_ip));
2385        inner.emit_fault(SimFaultEvent::PartitionHealed {
2386            from: from_ip.to_string(),
2387            to: to_ip.to_string(),
2388        });
2389        tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
2390        Ok(())
2391    }
2392
2393    /// Check if communication between two IP addresses is currently partitioned
2394    pub fn is_partitioned(
2395        &self,
2396        from_ip: std::net::IpAddr,
2397        to_ip: std::net::IpAddr,
2398    ) -> SimulationResult<bool> {
2399        let inner = self.inner.borrow();
2400        Ok(inner
2401            .network
2402            .is_partitioned(from_ip, to_ip, inner.current_time))
2403    }
2404
2405    /// Helper method for use with SimInner - randomly trigger partitions
2406    ///
2407    /// Supports different partition strategies based on configuration:
2408    /// - Random: randomly partition individual IP pairs
2409    /// - UniformSize: create uniform-sized partition groups
2410    /// - IsolateSingle: isolate exactly one node from the rest
2411    fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
2412        let partition_config = &inner.network.config;
2413
2414        if partition_config.chaos.partition_probability == 0.0 {
2415            return;
2416        }
2417
2418        // Check if we should trigger a partition this step
2419        if sim_random::<f64>() >= partition_config.chaos.partition_probability {
2420            return;
2421        }
2422
2423        // Collect unique IPs from connections
2424        let unique_ips: HashSet<IpAddr> = inner
2425            .network
2426            .connections
2427            .values()
2428            .filter_map(|conn| conn.local_ip)
2429            .collect();
2430
2431        if unique_ips.len() < 2 {
2432            return; // Need at least 2 IPs to partition
2433        }
2434
2435        let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
2436        let partition_duration =
2437            crate::network::sample_duration(&partition_config.chaos.partition_duration);
2438        let expires_at = inner.current_time + partition_duration;
2439
2440        // Select IPs to partition based on strategy
2441        let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
2442            PartitionStrategy::Random => {
2443                // Original behavior: randomly decide for each IP
2444                ip_list
2445                    .iter()
2446                    .filter(|_| sim_random::<f64>() < 0.5)
2447                    .copied()
2448                    .collect()
2449            }
2450            PartitionStrategy::UniformSize => {
2451                // TigerBeetle-style: random partition size from 1 to n-1
2452                let partition_size = sim_random_range(1..ip_list.len());
2453                // Shuffle and take first N IPs
2454                let mut shuffled = ip_list.clone();
2455                // Simple Fisher-Yates shuffle
2456                for i in (1..shuffled.len()).rev() {
2457                    let j = sim_random_range(0..i + 1);
2458                    shuffled.swap(i, j);
2459                }
2460                shuffled.into_iter().take(partition_size).collect()
2461            }
2462            PartitionStrategy::IsolateSingle => {
2463                // Isolate exactly one node
2464                let idx = sim_random_range(0..ip_list.len());
2465                vec![ip_list[idx]]
2466            }
2467        };
2468
2469        // Don't partition if we selected all IPs or none
2470        if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
2471            return;
2472        }
2473
2474        // Create bi-directional partitions between partitioned and non-partitioned groups
2475        let non_partitioned: Vec<IpAddr> = ip_list
2476            .iter()
2477            .filter(|ip| !partitioned_ips.contains(ip))
2478            .copied()
2479            .collect();
2480
2481        for &from_ip in &partitioned_ips {
2482            for &to_ip in &non_partitioned {
2483                // Skip if already partitioned
2484                if inner
2485                    .network
2486                    .is_partitioned(from_ip, to_ip, inner.current_time)
2487                {
2488                    continue;
2489                }
2490
2491                // Partition in both directions
2492                inner
2493                    .network
2494                    .ip_partitions
2495                    .insert((from_ip, to_ip), PartitionState { expires_at });
2496                inner
2497                    .network
2498                    .ip_partitions
2499                    .insert((to_ip, from_ip), PartitionState { expires_at });
2500
2501                inner.emit_fault(SimFaultEvent::PartitionCreated {
2502                    from: from_ip.to_string(),
2503                    to: to_ip.to_string(),
2504                });
2505
2506                tracing::debug!(
2507                    "Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
2508                    from_ip,
2509                    to_ip,
2510                    expires_at,
2511                    partition_config.chaos.partition_strategy
2512                );
2513            }
2514        }
2515
2516        // Schedule restoration event
2517        let restore_event = Event::Connection {
2518            id: 0,
2519            state: ConnectionStateChange::PartitionRestore,
2520        };
2521        let sequence = inner.next_sequence;
2522        inner.next_sequence += 1;
2523        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2524        inner.event_queue.schedule(scheduled_event);
2525    }
2526}
2527
2528impl Default for SimWorld {
2529    fn default() -> Self {
2530        Self::new()
2531    }
2532}
2533
2534/// A weak reference to a simulation world.
2535///
2536/// This provides handle-based access to the simulation without holding
2537/// a strong reference that would prevent cleanup.
2538#[derive(Debug)]
2539pub struct WeakSimWorld {
2540    pub(crate) inner: Weak<RefCell<SimInner>>,
2541}
2542
2543/// Macro to generate WeakSimWorld forwarding methods that wrap SimWorld results.
2544macro_rules! weak_forward {
2545    // For methods returning T that need Ok() wrapping
2546    (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2547        $(#[$meta])*
2548        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2549            Ok(self.upgrade()?.$method($($arg),*))
2550        }
2551    };
2552    // For methods already returning SimulationResult
2553    (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2554        $(#[$meta])*
2555        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2556            self.upgrade()?.$method($($arg),*)
2557        }
2558    };
2559    // For methods returning () that need Ok(()) wrapping
2560    (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2561        $(#[$meta])*
2562        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2563            self.upgrade()?.$method($($arg),*);
2564            Ok(())
2565        }
2566    };
2567}
2568
2569impl WeakSimWorld {
2570    /// Attempts to upgrade this weak reference to a strong reference.
2571    pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2572        self.inner
2573            .upgrade()
2574            .map(|inner| SimWorld { inner })
2575            .ok_or(SimulationError::SimulationShutdown)
2576    }
2577
2578    weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2579    weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2580    weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2581    weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2582    weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2583    weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2584    weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2585    weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2586    weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2587    weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2588    weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2589
2590    /// Access network configuration for latency calculations.
2591    pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2592    where
2593        F: FnOnce(&NetworkConfiguration) -> R,
2594    {
2595        Ok(self.upgrade()?.with_network_config(f))
2596    }
2597}
2598
2599impl Clone for WeakSimWorld {
2600    fn clone(&self) -> Self {
2601        Self {
2602            inner: self.inner.clone(),
2603        }
2604    }
2605}
2606
2607#[cfg(test)]
2608mod tests {
2609    use super::*;
2610
2611    #[test]
2612    fn sim_world_basic_lifecycle() {
2613        let mut sim = SimWorld::new();
2614
2615        // Initial state
2616        assert_eq!(sim.current_time(), Duration::ZERO);
2617        assert!(!sim.has_pending_events());
2618        assert_eq!(sim.pending_event_count(), 0);
2619
2620        // Schedule an event
2621        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2622
2623        assert!(sim.has_pending_events());
2624        assert_eq!(sim.pending_event_count(), 1);
2625        assert_eq!(sim.current_time(), Duration::ZERO);
2626
2627        // Process the event
2628        let has_more = sim.step();
2629        assert!(!has_more);
2630        assert_eq!(sim.current_time(), Duration::from_millis(100));
2631        assert!(!sim.has_pending_events());
2632        assert_eq!(sim.pending_event_count(), 0);
2633    }
2634
2635    #[test]
2636    fn sim_world_multiple_events() {
2637        let mut sim = SimWorld::new();
2638
2639        // Schedule multiple events
2640        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2641        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2642        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2643
2644        assert_eq!(sim.pending_event_count(), 3);
2645
2646        // Process events - should happen in time order
2647        assert!(sim.step());
2648        assert_eq!(sim.current_time(), Duration::from_millis(100));
2649        assert_eq!(sim.pending_event_count(), 2);
2650
2651        assert!(sim.step());
2652        assert_eq!(sim.current_time(), Duration::from_millis(200));
2653        assert_eq!(sim.pending_event_count(), 1);
2654
2655        assert!(!sim.step());
2656        assert_eq!(sim.current_time(), Duration::from_millis(300));
2657        assert_eq!(sim.pending_event_count(), 0);
2658    }
2659
2660    #[test]
2661    fn sim_world_run_until_empty() {
2662        let mut sim = SimWorld::new();
2663
2664        // Schedule multiple events
2665        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2666        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2667        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2668
2669        // Run until all events are processed
2670        sim.run_until_empty();
2671
2672        assert_eq!(sim.current_time(), Duration::from_millis(300));
2673        assert!(!sim.has_pending_events());
2674    }
2675
2676    #[test]
2677    fn sim_world_schedule_at_specific_time() {
2678        let mut sim = SimWorld::new();
2679
2680        // Schedule event at specific time
2681        sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2682
2683        assert_eq!(sim.current_time(), Duration::ZERO);
2684
2685        sim.step();
2686
2687        assert_eq!(sim.current_time(), Duration::from_millis(500));
2688    }
2689
2690    #[test]
2691    fn weak_sim_world_lifecycle() {
2692        let sim = SimWorld::new();
2693        let weak = sim.downgrade();
2694
2695        // Can upgrade and use weak reference
2696        assert_eq!(
2697            weak.current_time().expect("should get time"),
2698            Duration::ZERO
2699        );
2700
2701        // Schedule event through weak reference
2702        weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2703            .expect("should schedule event");
2704
2705        // Verify event was scheduled
2706        assert!(sim.has_pending_events());
2707
2708        // Drop the original simulation
2709        drop(sim);
2710
2711        // Weak reference should now fail
2712        assert_eq!(
2713            weak.current_time(),
2714            Err(SimulationError::SimulationShutdown)
2715        );
2716        assert_eq!(
2717            weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2718            Err(SimulationError::SimulationShutdown)
2719        );
2720    }
2721
2722    #[test]
2723    fn deterministic_event_ordering() {
2724        let mut sim = SimWorld::new();
2725
2726        // Schedule events at the same time
2727        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2728        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2729        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2730
2731        // All events are at the same time, but should be processed in sequence order
2732        assert!(sim.step());
2733        assert_eq!(sim.current_time(), Duration::from_millis(100));
2734        assert!(sim.step());
2735        assert_eq!(sim.current_time(), Duration::from_millis(100));
2736        assert!(!sim.step());
2737        assert_eq!(sim.current_time(), Duration::from_millis(100));
2738    }
2739
2740    #[test]
2741    fn emit_fault_without_state_is_noop() {
2742        let inner = SimInner::new();
2743        assert!(inner.state.is_none());
2744        // Should not panic
2745        inner.emit_fault(SimFaultEvent::StorageCrash {
2746            ip: "10.0.1.1".to_string(),
2747        });
2748    }
2749
2750    #[test]
2751    fn emit_fault_with_state_writes_to_timeline() {
2752        let mut inner = SimInner::new();
2753        let state = StateHandle::new();
2754        inner.state = Some(state.clone());
2755        inner.current_time = Duration::from_millis(500);
2756
2757        inner.emit_fault(SimFaultEvent::StorageCrash {
2758            ip: "10.0.1.1".to_string(),
2759        });
2760
2761        let tl = state
2762            .timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
2763            .expect("timeline should exist");
2764        assert_eq!(tl.len(), 1);
2765        let entry = tl.last().expect("should have entry");
2766        assert_eq!(entry.time_ms, 500);
2767        assert_eq!(entry.source, "sim");
2768        assert!(matches!(entry.event, SimFaultEvent::StorageCrash { .. }));
2769    }
2770
2771    #[test]
2772    fn partition_pair_emits_fault_event() {
2773        let sim = SimWorld::new();
2774        let state = StateHandle::new();
2775        sim.set_state(state.clone());
2776
2777        let from: std::net::IpAddr = "10.0.1.1".parse().expect("valid ip");
2778        let to: std::net::IpAddr = "10.0.1.2".parse().expect("valid ip");
2779        sim.partition_pair(from, to, Duration::from_secs(10))
2780            .expect("partition should succeed");
2781
2782        let tl = state
2783            .timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
2784            .expect("timeline should exist");
2785        assert_eq!(tl.len(), 1);
2786        assert!(matches!(
2787            &tl.all()[0].event,
2788            SimFaultEvent::PartitionCreated { from, to }
2789            if from == "10.0.1.1" && to == "10.0.1.2"
2790        ));
2791    }
2792
2793    #[test]
2794    fn restore_partition_emits_fault_event() {
2795        let sim = SimWorld::new();
2796        let state = StateHandle::new();
2797        sim.set_state(state.clone());
2798
2799        let from: std::net::IpAddr = "10.0.1.1".parse().expect("valid ip");
2800        let to: std::net::IpAddr = "10.0.1.2".parse().expect("valid ip");
2801        sim.partition_pair(from, to, Duration::from_secs(10))
2802            .expect("partition");
2803        sim.restore_partition(from, to).expect("restore");
2804
2805        let tl = state
2806            .timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
2807            .expect("timeline should exist");
2808        assert_eq!(tl.len(), 2);
2809        assert!(matches!(
2810            &tl.all()[0].event,
2811            SimFaultEvent::PartitionCreated { .. }
2812        ));
2813        assert!(matches!(
2814            &tl.all()[1].event,
2815            SimFaultEvent::PartitionHealed { .. }
2816        ));
2817    }
2818}