Skip to main content

moonpool_sim/sim/
world.rs

1//! Core simulation world and coordination logic.
2//!
3//! This module provides the central SimWorld coordinator that manages time,
4//! event processing, and network simulation state.
5
6use std::{
7    cell::RefCell,
8    collections::{BTreeMap, HashSet, VecDeque},
9    net::IpAddr,
10    rc::{Rc, Weak},
11    task::Waker,
12    time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17    SimulationError, SimulationResult,
18    network::{
19        NetworkConfiguration, PartitionStrategy,
20        sim::{ConnectionId, ListenerId, SimNetworkProvider},
21    },
22};
23
24use super::{
25    events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
26    rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
27    sleep::SleepFuture,
28    state::{
29        ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
30        StorageState,
31    },
32    wakers::WakerRegistry,
33};
34
35/// Internal simulation state holder
36#[derive(Debug)]
37pub(crate) struct SimInner {
38    pub(crate) current_time: Duration,
39    /// Drifted timer time (can be ahead of current_time)
40    /// FDB ref: sim2.actor.cpp:1058-1064 - timer() drifts 0-0.1s ahead of now()
41    pub(crate) timer_time: Duration,
42    pub(crate) event_queue: EventQueue,
43    pub(crate) next_sequence: u64,
44
45    // Network management
46    pub(crate) network: NetworkState,
47
48    // Storage management
49    pub(crate) storage: StorageState,
50
51    // Async coordination
52    pub(crate) wakers: WakerRegistry,
53
54    // Task management for sleep functionality
55    pub(crate) next_task_id: u64,
56    pub(crate) awakened_tasks: HashSet<u64>,
57
58    // Event processing metrics
59    pub(crate) events_processed: u64,
60
61    // Chaos tracking
62    pub(crate) last_bit_flip_time: Duration,
63
64    // Last event processed by step() — used by orchestrator to detect ProcessRestart
65    pub(crate) last_processed_event: Option<Event>,
66}
67
68impl SimInner {
69    pub(crate) fn new() -> Self {
70        Self {
71            current_time: Duration::ZERO,
72            timer_time: Duration::ZERO,
73            event_queue: EventQueue::new(),
74            next_sequence: 0,
75            network: NetworkState::new(NetworkConfiguration::default()),
76            storage: StorageState::default(),
77            wakers: WakerRegistry::default(),
78            next_task_id: 0,
79            awakened_tasks: HashSet::new(),
80            events_processed: 0,
81            last_bit_flip_time: Duration::ZERO,
82            last_processed_event: None,
83        }
84    }
85
86    pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
87        Self {
88            current_time: Duration::ZERO,
89            timer_time: Duration::ZERO,
90            event_queue: EventQueue::new(),
91            next_sequence: 0,
92            network: NetworkState::new(network_config),
93            storage: StorageState::default(),
94            wakers: WakerRegistry::default(),
95            next_task_id: 0,
96            awakened_tasks: HashSet::new(),
97            events_processed: 0,
98            last_bit_flip_time: Duration::ZERO,
99            last_processed_event: None,
100        }
101    }
102
103    /// Calculate the number of bits to flip using a power-law distribution.
104    ///
105    /// Uses the formula: 32 - floor(log2(random_value))
106    /// This creates a power-law distribution biased toward fewer bits:
107    /// - 1-2 bits: very common
108    /// - 16 bits: uncommon
109    /// - 32 bits: very rare
110    ///
111    /// Matches FDB's approach in FlowTransport.actor.cpp:1297
112    pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
113        if random_value == 0 {
114            // Handle edge case: treat 0 as if it were 1
115            return max_bits.min(32);
116        }
117
118        // Formula: 32 - floor(log2(x)) = 1 + leading_zeros(x)
119        let bit_count = 1 + random_value.leading_zeros();
120
121        // Clamp to configured range
122        bit_count.clamp(min_bits, max_bits)
123    }
124}
125
126/// The central simulation coordinator that manages time and event processing.
127///
128/// `SimWorld` owns all mutable simulation state and provides the main interface
129/// for scheduling events and advancing simulation time. It uses a centralized
130/// ownership model with handle-based access to avoid borrow checker conflicts.
131#[derive(Debug)]
132pub struct SimWorld {
133    pub(crate) inner: Rc<RefCell<SimInner>>,
134}
135
136impl SimWorld {
137    /// Internal constructor that handles all initialization logic.
138    fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
139        reset_sim_rng();
140        set_sim_seed(seed);
141        crate::chaos::assertions::reset_assertion_results();
142
143        let inner = match network_config {
144            Some(config) => SimInner::new_with_config(config),
145            None => SimInner::new(),
146        };
147
148        Self {
149            inner: Rc::new(RefCell::new(inner)),
150        }
151    }
152
153    /// Creates a new simulation world with default network configuration.
154    ///
155    /// Uses default seed (0) for reproducible testing. For custom seeds,
156    /// use [`SimWorld::new_with_seed`].
157    pub fn new() -> Self {
158        Self::create(None, 0)
159    }
160
161    /// Creates a new simulation world with a specific seed for deterministic randomness.
162    ///
163    /// This method ensures clean thread-local RNG state by resetting before
164    /// setting the seed, making it safe for consecutive simulations on the same thread.
165    ///
166    /// # Parameters
167    ///
168    /// * `seed` - The seed value for deterministic randomness
169    pub fn new_with_seed(seed: u64) -> Self {
170        Self::create(None, seed)
171    }
172
173    /// Creates a new simulation world with custom network configuration.
174    pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
175        Self::create(Some(network_config), 0)
176    }
177
178    /// Creates a new simulation world with both custom network configuration and seed.
179    ///
180    /// # Parameters
181    ///
182    /// * `network_config` - Network configuration for latency and fault simulation
183    /// * `seed` - The seed value for deterministic randomness
184    pub fn new_with_network_config_and_seed(
185        network_config: NetworkConfiguration,
186        seed: u64,
187    ) -> Self {
188        Self::create(Some(network_config), seed)
189    }
190
191    /// Processes the next scheduled event and advances time.
192    ///
193    /// Returns `true` if more events are available for processing,
194    /// `false` if this was the last event or if no events are available.
195    #[instrument(skip(self))]
196    pub fn step(&mut self) -> bool {
197        let mut inner = self.inner.borrow_mut();
198
199        if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
200            // Advance logical time to event timestamp
201            inner.current_time = scheduled_event.time();
202
203            // Phase 7: Clear expired clogs after time advancement
204            Self::clear_expired_clogs_with_inner(&mut inner);
205
206            // Trigger random partitions based on configuration
207            Self::randomly_trigger_partitions_with_inner(&mut inner);
208
209            // Store the event for orchestrator inspection (e.g., ProcessRestart)
210            let event = scheduled_event.into_event();
211            inner.last_processed_event = Some(event.clone());
212
213            // Process the event with the mutable reference
214            Self::process_event_with_inner(&mut inner, event);
215
216            // Return true if more events are available
217            !inner.event_queue.is_empty()
218        } else {
219            inner.last_processed_event = None;
220            // No more events to process
221            false
222        }
223    }
224
225    /// Processes all scheduled events until the queue is empty or only infrastructure events remain.
226    ///
227    /// This method processes all workload-related events but stops early if only infrastructure
228    /// events (like connection restoration) remain. This prevents infinite loops where
229    /// infrastructure events keep the simulation running indefinitely after workloads complete.
230    #[instrument(skip(self))]
231    pub fn run_until_empty(&mut self) {
232        while self.step() {
233            // Periodically check if we should stop early (every 50 events for performance)
234            if self.inner.borrow().events_processed.is_multiple_of(50) {
235                let has_workload_events = !self
236                    .inner
237                    .borrow()
238                    .event_queue
239                    .has_only_infrastructure_events();
240                if !has_workload_events {
241                    tracing::debug!(
242                        "Early termination: only infrastructure events remain in queue"
243                    );
244                    break;
245                }
246            }
247        }
248    }
249
250    /// Returns the current simulation time.
251    pub fn current_time(&self) -> Duration {
252        self.inner.borrow().current_time
253    }
254
255    /// Returns the exact simulation time (equivalent to FDB's now()).
256    ///
257    /// This is the canonical simulation time used for scheduling events.
258    /// Use this for precise time comparisons and scheduling.
259    pub fn now(&self) -> Duration {
260        self.inner.borrow().current_time
261    }
262
263    /// Returns the drifted timer time (equivalent to FDB's timer()).
264    ///
265    /// The timer can be up to `clock_drift_max` (default 100ms) ahead of `now()`.
266    /// This simulates real-world clock drift between processes, which is important
267    /// for testing time-sensitive code like:
268    /// - Timeout handling
269    /// - Lease expiration
270    /// - Distributed consensus (leader election)
271    /// - Cache invalidation
272    /// - Heartbeat detection
273    ///
274    /// FDB formula: `timerTime += random01() * (time + 0.1 - timerTime) / 2.0`
275    ///
276    /// FDB ref: sim2.actor.cpp:1058-1064
277    pub fn timer(&self) -> Duration {
278        let mut inner = self.inner.borrow_mut();
279        let chaos = &inner.network.config.chaos;
280
281        // If clock drift is disabled, return exact simulation time
282        if !chaos.clock_drift_enabled {
283            return inner.current_time;
284        }
285
286        // FDB formula: timerTime += random01() * (time + 0.1 - timerTime) / 2.0
287        // This smoothly interpolates timerTime toward (time + drift_max)
288        // The /2.0 creates a damped approach (never overshoots)
289        let max_timer = inner.current_time + chaos.clock_drift_max;
290
291        // Only advance if timer is behind max
292        if inner.timer_time < max_timer {
293            let random_factor = sim_random::<f64>(); // 0.0 to 1.0
294            let gap = (max_timer - inner.timer_time).as_secs_f64();
295            let delta = random_factor * gap / 2.0;
296            inner.timer_time += Duration::from_secs_f64(delta);
297        }
298
299        // Ensure timer never goes backwards relative to simulation time
300        inner.timer_time = inner.timer_time.max(inner.current_time);
301
302        inner.timer_time
303    }
304
305    /// Schedules an event to execute after the specified delay from the current time.
306    #[instrument(skip(self))]
307    pub fn schedule_event(&self, event: Event, delay: Duration) {
308        let mut inner = self.inner.borrow_mut();
309        let scheduled_time = inner.current_time + delay;
310        let sequence = inner.next_sequence;
311        inner.next_sequence += 1;
312
313        let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
314        inner.event_queue.schedule(scheduled_event);
315    }
316
317    /// Schedules an event to execute at the specified absolute time.
318    pub fn schedule_event_at(&self, event: Event, time: Duration) {
319        let mut inner = self.inner.borrow_mut();
320        let sequence = inner.next_sequence;
321        inner.next_sequence += 1;
322
323        let scheduled_event = ScheduledEvent::new(time, event, sequence);
324        inner.event_queue.schedule(scheduled_event);
325    }
326
327    /// Creates a weak reference to this simulation world.
328    ///
329    /// Weak references can be used to access the simulation without preventing
330    /// it from being dropped, enabling handle-based access patterns.
331    pub fn downgrade(&self) -> WeakSimWorld {
332        WeakSimWorld {
333            inner: Rc::downgrade(&self.inner),
334        }
335    }
336
337    /// Returns `true` if there are events waiting to be processed.
338    pub fn has_pending_events(&self) -> bool {
339        !self.inner.borrow().event_queue.is_empty()
340    }
341
342    /// Returns the number of events waiting to be processed.
343    pub fn pending_event_count(&self) -> usize {
344        self.inner.borrow().event_queue.len()
345    }
346
347    /// Create a network provider for this simulation
348    pub fn network_provider(&self) -> SimNetworkProvider {
349        SimNetworkProvider::new(self.downgrade())
350    }
351
352    /// Create a time provider for this simulation
353    pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
354        crate::providers::SimTimeProvider::new(self.downgrade())
355    }
356
357    /// Create a task provider for this simulation
358    pub fn task_provider(&self) -> crate::TokioTaskProvider {
359        crate::TokioTaskProvider
360    }
361
362    /// Create a storage provider for this simulation.
363    pub fn storage_provider(&self) -> crate::storage::SimStorageProvider {
364        crate::storage::SimStorageProvider::new(self.downgrade())
365    }
366
367    /// Set storage configuration for this simulation.
368    ///
369    /// Must be called before any storage operations are performed.
370    pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
371        self.inner.borrow_mut().storage.config = config;
372    }
373
374    /// Access network configuration for latency calculations using thread-local RNG.
375    ///
376    /// This method provides access to the network configuration for calculating
377    /// latencies and other network parameters. Random values should be generated
378    /// using the thread-local RNG functions like `sim_random()`.
379    ///
380    /// Access the network configuration for this simulation.
381    pub fn with_network_config<F, R>(&self, f: F) -> R
382    where
383        F: FnOnce(&NetworkConfiguration) -> R,
384    {
385        let inner = self.inner.borrow();
386        f(&inner.network.config)
387    }
388
389    /// Create a listener in the simulation (used by SimNetworkProvider)
390    pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
391        let mut inner = self.inner.borrow_mut();
392        let listener_id = ListenerId(inner.network.next_listener_id);
393        inner.network.next_listener_id += 1;
394
395        inner.network.listeners.insert(
396            listener_id,
397            ListenerState {
398                id: listener_id,
399                addr,
400                pending_connections: VecDeque::new(),
401            },
402        );
403
404        Ok(listener_id)
405    }
406
407    /// Read data from connection's receive buffer (used by SimTcpStream)
408    pub(crate) fn read_from_connection(
409        &self,
410        connection_id: ConnectionId,
411        buf: &mut [u8],
412    ) -> SimulationResult<usize> {
413        let mut inner = self.inner.borrow_mut();
414
415        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
416            let mut bytes_read = 0;
417            while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
418                if let Some(byte) = connection.receive_buffer.pop_front() {
419                    buf[bytes_read] = byte;
420                    bytes_read += 1;
421                }
422            }
423            Ok(bytes_read)
424        } else {
425            Err(SimulationError::InvalidState(
426                "connection not found".to_string(),
427            ))
428        }
429    }
430
431    /// Write data to connection's receive buffer (used by SimTcpStream write operations)
432    pub(crate) fn write_to_connection(
433        &self,
434        connection_id: ConnectionId,
435        data: &[u8],
436    ) -> SimulationResult<()> {
437        let mut inner = self.inner.borrow_mut();
438
439        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
440            for &byte in data {
441                connection.receive_buffer.push_back(byte);
442            }
443            Ok(())
444        } else {
445            Err(SimulationError::InvalidState(
446                "connection not found".to_string(),
447            ))
448        }
449    }
450
451    /// Buffer data for ordered sending on a TCP connection.
452    ///
453    /// This method implements the core TCP ordering guarantee by ensuring that all
454    /// write operations on a single connection are processed in FIFO order.
455    pub(crate) fn buffer_send(
456        &self,
457        connection_id: ConnectionId,
458        data: Vec<u8>,
459    ) -> SimulationResult<()> {
460        tracing::debug!(
461            "buffer_send called for connection_id={} with {} bytes",
462            connection_id.0,
463            data.len()
464        );
465        let mut inner = self.inner.borrow_mut();
466
467        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
468            // Always add data to send buffer for TCP ordering
469            conn.send_buffer.push_back(data);
470            tracing::debug!(
471                "buffer_send: added data to send_buffer, new length: {}",
472                conn.send_buffer.len()
473            );
474
475            // If sender is not already active, start processing the buffer
476            if !conn.send_in_progress {
477                tracing::debug!(
478                    "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
479                );
480                conn.send_in_progress = true;
481
482                // Schedule immediate processing of the buffer
483                let scheduled_time = inner.current_time + std::time::Duration::ZERO;
484                let sequence = inner.next_sequence;
485                inner.next_sequence += 1;
486                let scheduled_event = ScheduledEvent::new(
487                    scheduled_time,
488                    Event::Network {
489                        connection_id: connection_id.0,
490                        operation: NetworkOperation::ProcessSendBuffer,
491                    },
492                    sequence,
493                );
494                inner.event_queue.schedule(scheduled_event);
495                tracing::debug!(
496                    "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
497                    sequence
498                );
499            } else {
500                tracing::debug!(
501                    "buffer_send: sender already in progress, not scheduling new event"
502                );
503            }
504
505            Ok(())
506        } else {
507            tracing::debug!(
508                "buffer_send: connection_id={} not found in connections table",
509                connection_id.0
510            );
511            Err(SimulationError::InvalidState(
512                "connection not found".to_string(),
513            ))
514        }
515    }
516
517    /// Create a bidirectional TCP connection pair for client-server communication.
518    ///
519    /// FDB Pattern (sim2.actor.cpp:1149-1175):
520    /// - Client connection stores server's real address as peer_address
521    /// - Server connection stores synthesized ephemeral address (random IP + port 40000-60000)
522    ///
523    /// This simulates real TCP behavior where servers see client ephemeral ports.
524    pub(crate) fn create_connection_pair(
525        &self,
526        client_addr: String,
527        server_addr: String,
528    ) -> SimulationResult<(ConnectionId, ConnectionId)> {
529        let mut inner = self.inner.borrow_mut();
530
531        let client_id = ConnectionId(inner.network.next_connection_id);
532        inner.network.next_connection_id += 1;
533
534        let server_id = ConnectionId(inner.network.next_connection_id);
535        inner.network.next_connection_id += 1;
536
537        // Capture current time to avoid borrow conflicts
538        let current_time = inner.current_time;
539
540        // Parse IP addresses for partition tracking
541        let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
542        let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
543
544        // FDB Pattern: Synthesize ephemeral address for server-side connection
545        // sim2.actor.cpp:1149-1175: randomInt(0,256) for IP offset, randomInt(40000,60000) for port
546        // Use thread-local sim_random_range for deterministic randomness
547        let ephemeral_peer_addr = match client_ip {
548            Some(std::net::IpAddr::V4(ipv4)) => {
549                let octets = ipv4.octets();
550                let ip_offset = sim_random_range(0u32..256) as u8;
551                let new_last_octet = octets[3].wrapping_add(ip_offset);
552                let ephemeral_ip =
553                    std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
554                let ephemeral_port = sim_random_range(40000u16..60000);
555                format!("{}:{}", ephemeral_ip, ephemeral_port)
556            }
557            Some(std::net::IpAddr::V6(ipv6)) => {
558                // For IPv6, just modify the last segment
559                let segments = ipv6.segments();
560                let mut new_segments = segments;
561                let ip_offset = sim_random_range(0u16..256);
562                new_segments[7] = new_segments[7].wrapping_add(ip_offset);
563                let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
564                let ephemeral_port = sim_random_range(40000u16..60000);
565                format!("[{}]:{}", ephemeral_ip, ephemeral_port)
566            }
567            None => {
568                // Fallback: use client address with random port
569                let ephemeral_port = sim_random_range(40000u16..60000);
570                format!("unknown:{}", ephemeral_port)
571            }
572        };
573
574        // Calculate send buffer capacity based on BDP (Bandwidth-Delay Product)
575        // Using a default of 64KB which is typical for TCP socket buffers
576        // In real simulations this could be: max_latency_ms * bandwidth_bytes_per_ms
577        const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; // 64KB
578
579        // Create paired connections
580        // Client stores server's real address as peer_address
581        inner.network.connections.insert(
582            client_id,
583            ConnectionState {
584                id: client_id,
585                addr: client_addr,
586                local_ip: client_ip,
587                remote_ip: server_ip,
588                peer_address: server_addr.clone(),
589                receive_buffer: VecDeque::new(),
590                paired_connection: Some(server_id),
591                send_buffer: VecDeque::new(),
592                send_in_progress: false,
593                next_send_time: current_time,
594                is_closed: false,
595                send_closed: false,
596                recv_closed: false,
597                is_cut: false,
598                cut_expiry: None,
599                close_reason: CloseReason::None,
600                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
601                send_delay: None,
602                recv_delay: None,
603                is_half_open: false,
604                half_open_error_at: None,
605                is_stable: false,
606                graceful_close_pending: false,
607                last_data_delivery_scheduled_at: None,
608                remote_fin_received: false,
609            },
610        );
611
612        // Server stores synthesized ephemeral address as peer_address
613        inner.network.connections.insert(
614            server_id,
615            ConnectionState {
616                id: server_id,
617                addr: server_addr,
618                local_ip: server_ip,
619                remote_ip: client_ip,
620                peer_address: ephemeral_peer_addr,
621                receive_buffer: VecDeque::new(),
622                paired_connection: Some(client_id),
623                send_buffer: VecDeque::new(),
624                send_in_progress: false,
625                next_send_time: current_time,
626                is_closed: false,
627                send_closed: false,
628                recv_closed: false,
629                is_cut: false,
630                cut_expiry: None,
631                close_reason: CloseReason::None,
632                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
633                send_delay: None,
634                recv_delay: None,
635                is_half_open: false,
636                half_open_error_at: None,
637                is_stable: false,
638                graceful_close_pending: false,
639                last_data_delivery_scheduled_at: None,
640                remote_fin_received: false,
641            },
642        );
643
644        Ok((client_id, server_id))
645    }
646
647    /// Register a waker for read operations
648    pub(crate) fn register_read_waker(
649        &self,
650        connection_id: ConnectionId,
651        waker: Waker,
652    ) -> SimulationResult<()> {
653        let mut inner = self.inner.borrow_mut();
654        let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
655        inner.wakers.read_wakers.insert(connection_id, waker);
656        tracing::debug!(
657            "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
658            connection_id.0,
659            is_replacement,
660            inner.wakers.read_wakers.len()
661        );
662        Ok(())
663    }
664
665    /// Register a waker for accept operations
666    pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
667        let mut inner = self.inner.borrow_mut();
668        // For simplicity, we'll use addr hash as listener ID for waker storage
669        use std::collections::hash_map::DefaultHasher;
670        use std::hash::{Hash, Hasher};
671        let mut hasher = DefaultHasher::new();
672        addr.hash(&mut hasher);
673        let listener_key = ListenerId(hasher.finish());
674
675        inner.wakers.listener_wakers.insert(listener_key, waker);
676        Ok(())
677    }
678
679    /// Store a pending connection for later accept() call
680    pub(crate) fn store_pending_connection(
681        &self,
682        addr: &str,
683        connection_id: ConnectionId,
684    ) -> SimulationResult<()> {
685        let mut inner = self.inner.borrow_mut();
686        inner
687            .network
688            .pending_connections
689            .insert(addr.to_string(), connection_id);
690
691        // Wake any accept() calls waiting for this connection
692        use std::collections::hash_map::DefaultHasher;
693        use std::hash::{Hash, Hasher};
694        let mut hasher = DefaultHasher::new();
695        addr.hash(&mut hasher);
696        let listener_key = ListenerId(hasher.finish());
697
698        if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
699            waker.wake();
700        }
701
702        Ok(())
703    }
704
705    /// Get a pending connection for accept() call
706    pub(crate) fn get_pending_connection(
707        &self,
708        addr: &str,
709    ) -> SimulationResult<Option<ConnectionId>> {
710        let mut inner = self.inner.borrow_mut();
711        Ok(inner.network.pending_connections.remove(addr))
712    }
713
714    /// Get the peer address for a connection.
715    ///
716    /// FDB Pattern (sim2.actor.cpp):
717    /// - For client-side connections: returns server's listening address
718    /// - For server-side connections: returns synthesized ephemeral address
719    ///
720    /// The returned address may not be connectable for server-side connections,
721    /// matching real TCP behavior where servers see client ephemeral ports.
722    pub(crate) fn get_connection_peer_address(
723        &self,
724        connection_id: ConnectionId,
725    ) -> Option<String> {
726        let inner = self.inner.borrow();
727        inner
728            .network
729            .connections
730            .get(&connection_id)
731            .map(|conn| conn.peer_address.clone())
732    }
733
734    /// Sleep for the specified duration in simulation time.
735    ///
736    /// Returns a future that will complete when the simulation time has advanced
737    /// by the specified duration.
738    #[instrument(skip(self))]
739    pub fn sleep(&self, duration: Duration) -> SleepFuture {
740        let task_id = self.generate_task_id();
741
742        // Apply buggified delay if enabled
743        let actual_duration = self.apply_buggified_delay(duration);
744
745        // Schedule a wake event for this task
746        self.schedule_event(Event::Timer { task_id }, actual_duration);
747
748        // Return a future that will be woken when the event is processed
749        SleepFuture::new(self.downgrade(), task_id)
750    }
751
752    /// Apply buggified delay to a duration if chaos is enabled.
753    fn apply_buggified_delay(&self, duration: Duration) -> Duration {
754        let inner = self.inner.borrow();
755        let chaos = &inner.network.config.chaos;
756
757        if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
758            return duration;
759        }
760
761        // 25% probability per FDB
762        if sim_random::<f64>() < chaos.buggified_delay_probability {
763            // Power-law distribution: pow(random01(), 1000) creates very skewed delays
764            let random_factor = sim_random::<f64>().powf(1000.0);
765            let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
766            tracing::trace!(
767                extra_delay_ms = extra_delay.as_millis(),
768                "Buggified delay applied"
769            );
770            duration + extra_delay
771        } else {
772            duration
773        }
774    }
775
776    /// Generate a unique task ID for sleep operations.
777    fn generate_task_id(&self) -> u64 {
778        let mut inner = self.inner.borrow_mut();
779        let task_id = inner.next_task_id;
780        inner.next_task_id += 1;
781        task_id
782    }
783
784    /// Wake all tasks associated with a connection
785    fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
786        if let Some(waker_list) = wakers.remove(&connection_id) {
787            for waker in waker_list {
788                waker.wake();
789            }
790        }
791    }
792
793    /// Check if a task has been awakened.
794    pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
795        let inner = self.inner.borrow();
796        Ok(inner.awakened_tasks.contains(&task_id))
797    }
798
799    /// Register a waker for a task.
800    pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
801        let mut inner = self.inner.borrow_mut();
802        inner.wakers.task_wakers.insert(task_id, waker);
803        Ok(())
804    }
805
806    /// Clear expired clogs and wake pending tasks (helper for use with SimInner)
807    fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
808        let now = inner.current_time;
809        let expired: Vec<ConnectionId> = inner
810            .network
811            .connection_clogs
812            .iter()
813            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
814            .collect();
815
816        for id in expired {
817            inner.network.connection_clogs.remove(&id);
818            Self::wake_all(&mut inner.wakers.clog_wakers, id);
819        }
820    }
821
822    /// Static event processor for simulation events.
823    #[instrument(skip(inner))]
824    fn process_event_with_inner(inner: &mut SimInner, event: Event) {
825        inner.events_processed += 1;
826
827        match event {
828            Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
829            Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
830            Event::Network {
831                connection_id,
832                operation,
833            } => Self::handle_network_event(inner, connection_id, operation),
834            Event::Storage { file_id, operation } => {
835                super::storage_ops::handle_storage_event(inner, file_id, operation)
836            }
837            Event::Shutdown => Self::handle_shutdown_event(inner),
838            Event::ProcessRestart { ip }
839            | Event::ProcessGracefulShutdown { ip, .. }
840            | Event::ProcessForceKill { ip, .. } => {
841                // Process lifecycle events are handled by the orchestrator, not SimWorld.
842                // We just log them here; the orchestrator reads the event after step().
843                tracing::debug!("Process lifecycle event for IP {}", ip);
844            }
845        }
846    }
847
848    /// Handle timer events - wake sleeping tasks.
849    fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
850        inner.awakened_tasks.insert(task_id);
851        if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
852            waker.wake();
853        }
854    }
855
856    /// Handle connection state change events.
857    fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
858        let connection_id = ConnectionId(id);
859
860        match state {
861            ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
862                // No action needed for these states
863            }
864            ConnectionStateChange::ClogClear => {
865                inner.network.connection_clogs.remove(&connection_id);
866                Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
867            }
868            ConnectionStateChange::ReadClogClear => {
869                inner.network.read_clogs.remove(&connection_id);
870                Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
871            }
872            ConnectionStateChange::PartitionRestore => {
873                Self::clear_expired_partitions(inner);
874            }
875            ConnectionStateChange::SendPartitionClear => {
876                Self::clear_expired_send_partitions(inner);
877            }
878            ConnectionStateChange::RecvPartitionClear => {
879                Self::clear_expired_recv_partitions(inner);
880            }
881            ConnectionStateChange::CutRestore => {
882                if let Some(conn) = inner.network.connections.get_mut(&connection_id)
883                    && conn.is_cut
884                {
885                    conn.is_cut = false;
886                    conn.cut_expiry = None;
887                    tracing::debug!("Connection {} restored via scheduled event", id);
888                    Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
889                }
890            }
891            ConnectionStateChange::HalfOpenError => {
892                tracing::debug!("Connection {} half-open error time reached", id);
893                if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
894                    waker.wake();
895                }
896                Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
897            }
898        }
899    }
900
901    /// Clear expired IP partitions.
902    fn clear_expired_partitions(inner: &mut SimInner) {
903        let now = inner.current_time;
904        let expired: Vec<_> = inner
905            .network
906            .ip_partitions
907            .iter()
908            .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
909            .collect();
910
911        for pair in expired {
912            inner.network.ip_partitions.remove(&pair);
913            tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
914        }
915    }
916
917    /// Clear expired send partitions.
918    fn clear_expired_send_partitions(inner: &mut SimInner) {
919        let now = inner.current_time;
920        let expired: Vec<_> = inner
921            .network
922            .send_partitions
923            .iter()
924            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
925            .collect();
926
927        for ip in expired {
928            inner.network.send_partitions.remove(&ip);
929            tracing::debug!("Cleared send partition for {}", ip);
930        }
931    }
932
933    /// Clear expired receive partitions.
934    fn clear_expired_recv_partitions(inner: &mut SimInner) {
935        let now = inner.current_time;
936        let expired: Vec<_> = inner
937            .network
938            .recv_partitions
939            .iter()
940            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
941            .collect();
942
943        for ip in expired {
944            inner.network.recv_partitions.remove(&ip);
945            tracing::debug!("Cleared receive partition for {}", ip);
946        }
947    }
948
949    /// Handle network events (data delivery and send buffer processing).
950    fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
951        let connection_id = ConnectionId(conn_id);
952
953        match operation {
954            NetworkOperation::DataDelivery { data } => {
955                Self::handle_data_delivery(inner, connection_id, data);
956            }
957            NetworkOperation::ProcessSendBuffer => {
958                Self::handle_process_send_buffer(inner, connection_id);
959            }
960            NetworkOperation::FinDelivery => {
961                Self::handle_fin_delivery(inner, connection_id);
962            }
963        }
964    }
965
966    /// Handle data delivery to a connection's receive buffer.
967    fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
968        tracing::trace!(
969            "DataDelivery: {} bytes to connection {}",
970            data.len(),
971            connection_id.0
972        );
973
974        // Check connection exists and if it's stable
975        let is_stable = inner
976            .network
977            .connections
978            .get(&connection_id)
979            .is_some_and(|conn| conn.is_stable);
980
981        if !inner.network.connections.contains_key(&connection_id) {
982            tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
983            return;
984        }
985
986        // Apply bit flipping chaos (needs mutable inner access) - skip for stable connections
987        let data_to_deliver = if is_stable {
988            data
989        } else {
990            Self::maybe_corrupt_data(inner, &data)
991        };
992
993        // Now get connection reference and deliver data
994        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
995            return;
996        };
997
998        for &byte in &data_to_deliver {
999            conn.receive_buffer.push_back(byte);
1000        }
1001
1002        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1003            waker.wake();
1004        }
1005    }
1006
1007    /// Handle FIN delivery to a connection's receive side (TCP half-close).
1008    ///
1009    /// Sets `remote_fin_received` on the receiving connection so that `poll_read`
1010    /// returns EOF after the receive buffer is drained. Ignores FIN if the connection
1011    /// was already aborted (stale event).
1012    fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
1013        tracing::debug!(
1014            "FinDelivery: FIN received on connection {}",
1015            connection_id.0
1016        );
1017
1018        // If connection was aborted after FIN was scheduled, ignore the stale FIN
1019        let is_closed = inner
1020            .network
1021            .connections
1022            .get(&connection_id)
1023            .is_some_and(|conn| conn.is_closed);
1024
1025        if is_closed {
1026            tracing::debug!(
1027                "FinDelivery: connection {} already closed, ignoring stale FIN",
1028                connection_id.0
1029            );
1030            return;
1031        }
1032
1033        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1034            conn.remote_fin_received = true;
1035        }
1036
1037        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1038            waker.wake();
1039        }
1040    }
1041
1042    /// Schedule a FinDelivery event to the peer connection.
1043    ///
1044    /// The FIN is scheduled after the last DataDelivery event to ensure all data
1045    /// arrives at the peer before EOF is signaled.
1046    fn schedule_fin_delivery(
1047        inner: &mut SimInner,
1048        paired_id: Option<ConnectionId>,
1049        last_delivery_time: Option<Duration>,
1050    ) {
1051        let Some(peer_id) = paired_id else {
1052            return;
1053        };
1054
1055        // FIN must arrive after all DataDelivery events
1056        let fin_time = match last_delivery_time {
1057            Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
1058            _ => inner.current_time + Duration::from_nanos(1),
1059        };
1060
1061        let sequence = inner.next_sequence;
1062        inner.next_sequence += 1;
1063
1064        tracing::debug!(
1065            "Scheduling FinDelivery to connection {} at {:?}",
1066            peer_id.0,
1067            fin_time
1068        );
1069
1070        inner.event_queue.schedule(ScheduledEvent::new(
1071            fin_time,
1072            Event::Network {
1073                connection_id: peer_id.0,
1074                operation: NetworkOperation::FinDelivery,
1075            },
1076            sequence,
1077        ));
1078    }
1079
1080    /// Maybe corrupt data with bit flips based on chaos configuration.
1081    fn maybe_corrupt_data(inner: &mut SimInner, data: &[u8]) -> Vec<u8> {
1082        if data.is_empty() {
1083            return data.to_vec();
1084        }
1085
1086        let chaos = &inner.network.config.chaos;
1087        let now = inner.current_time;
1088        let cooldown_elapsed =
1089            now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
1090
1091        if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
1092            return data.to_vec();
1093        }
1094
1095        let random_value = sim_random::<u32>();
1096        let flip_count = SimInner::calculate_flip_bit_count(
1097            random_value,
1098            chaos.bit_flip_min_bits,
1099            chaos.bit_flip_max_bits,
1100        );
1101
1102        let mut corrupted_data = data.to_vec();
1103        let mut flipped_positions = std::collections::HashSet::new();
1104
1105        for _ in 0..flip_count {
1106            let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
1107            let bit_idx = (sim_random::<u64>() as usize) % 8;
1108            let position = (byte_idx, bit_idx);
1109
1110            if !flipped_positions.contains(&position) {
1111                flipped_positions.insert(position);
1112                corrupted_data[byte_idx] ^= 1 << bit_idx;
1113            }
1114        }
1115
1116        inner.last_bit_flip_time = now;
1117        tracing::info!(
1118            "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
1119            data.len(),
1120            flip_count,
1121            flipped_positions.len()
1122        );
1123
1124        corrupted_data
1125    }
1126
1127    /// Handle processing of a connection's send buffer.
1128    fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1129        let is_partitioned = inner
1130            .network
1131            .is_connection_partitioned(connection_id, inner.current_time);
1132
1133        if is_partitioned {
1134            Self::handle_partitioned_send(inner, connection_id);
1135        } else {
1136            Self::handle_normal_send(inner, connection_id);
1137        }
1138    }
1139
1140    /// Handle send when connection is partitioned.
1141    fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1142        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1143            return;
1144        };
1145
1146        if let Some(data) = conn.send_buffer.pop_front() {
1147            tracing::debug!(
1148                "Connection {} partitioned, failing send of {} bytes",
1149                connection_id.0,
1150                data.len()
1151            );
1152            Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1153
1154            if !conn.send_buffer.is_empty() {
1155                Self::schedule_process_send_buffer(inner, connection_id);
1156            } else {
1157                conn.send_in_progress = false;
1158                // Check for pending graceful close when pipeline drains
1159                if conn.graceful_close_pending {
1160                    conn.graceful_close_pending = false;
1161                    let peer_id = conn.paired_connection;
1162                    let last_time = conn.last_data_delivery_scheduled_at;
1163                    Self::schedule_fin_delivery(inner, peer_id, last_time);
1164                }
1165            }
1166        } else {
1167            conn.send_in_progress = false;
1168            // Check for pending graceful close when pipeline drains
1169            if conn.graceful_close_pending {
1170                conn.graceful_close_pending = false;
1171                let peer_id = conn.paired_connection;
1172                let last_time = conn.last_data_delivery_scheduled_at;
1173                Self::schedule_fin_delivery(inner, peer_id, last_time);
1174            }
1175        }
1176    }
1177
1178    /// Handle normal (non-partitioned) send.
1179    fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1180        // Extract connection info first
1181        let Some(conn) = inner.network.connections.get(&connection_id) else {
1182            return;
1183        };
1184
1185        let paired_id = conn.paired_connection;
1186        let send_delay = conn.send_delay;
1187        let next_send_time = conn.next_send_time;
1188        let has_data = !conn.send_buffer.is_empty();
1189        let is_stable = conn.is_stable; // For stable connection checks
1190
1191        let recv_delay = paired_id.and_then(|pid| {
1192            inner
1193                .network
1194                .connections
1195                .get(&pid)
1196                .and_then(|c| c.recv_delay)
1197        });
1198
1199        if !has_data {
1200            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1201                conn.send_in_progress = false;
1202                // Check for pending graceful close when pipeline drains
1203                if conn.graceful_close_pending {
1204                    conn.graceful_close_pending = false;
1205                    let peer_id = conn.paired_connection;
1206                    let last_time = conn.last_data_delivery_scheduled_at;
1207                    Self::schedule_fin_delivery(inner, peer_id, last_time);
1208                }
1209            }
1210            return;
1211        }
1212
1213        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1214            return;
1215        };
1216
1217        let Some(mut data) = conn.send_buffer.pop_front() else {
1218            conn.send_in_progress = false;
1219            return;
1220        };
1221
1222        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1223
1224        // BUGGIFY: Simulate partial/short writes (skip for stable connections)
1225        if !is_stable && crate::buggify!() && !data.is_empty() {
1226            let max_send = std::cmp::min(
1227                data.len(),
1228                inner.network.config.chaos.partial_write_max_bytes,
1229            );
1230            let truncate_to = sim_random_range(0..max_send + 1);
1231
1232            if truncate_to < data.len() {
1233                let remainder = data.split_off(truncate_to);
1234                conn.send_buffer.push_front(remainder);
1235                tracing::debug!(
1236                    "BUGGIFY: Partial write on connection {} - sending {} bytes",
1237                    connection_id.0,
1238                    data.len()
1239                );
1240            }
1241        }
1242
1243        let has_more = !conn.send_buffer.is_empty();
1244        let base_delay = if has_more {
1245            Duration::from_nanos(1)
1246        } else {
1247            send_delay.unwrap_or_else(|| {
1248                crate::network::sample_duration(&inner.network.config.write_latency)
1249            })
1250        };
1251
1252        let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1253        conn.next_send_time = earliest_time + Duration::from_nanos(1);
1254
1255        // Schedule data delivery to paired connection
1256        if let Some(paired_id) = paired_id {
1257            let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1258            let sequence = inner.next_sequence;
1259            inner.next_sequence += 1;
1260
1261            inner.event_queue.schedule(ScheduledEvent::new(
1262                scheduled_time,
1263                Event::Network {
1264                    connection_id: paired_id.0,
1265                    operation: NetworkOperation::DataDelivery { data },
1266                },
1267                sequence,
1268            ));
1269
1270            // Track delivery time for FIN ordering (must be after last DataDelivery)
1271            conn.last_data_delivery_scheduled_at = Some(scheduled_time);
1272        }
1273
1274        // Schedule next send if more data
1275        if !conn.send_buffer.is_empty() {
1276            Self::schedule_process_send_buffer(inner, connection_id);
1277        } else {
1278            conn.send_in_progress = false;
1279            // Check for pending graceful close when pipeline drains
1280            if conn.graceful_close_pending {
1281                conn.graceful_close_pending = false;
1282                let peer_id = conn.paired_connection;
1283                let last_time = conn.last_data_delivery_scheduled_at;
1284                Self::schedule_fin_delivery(inner, peer_id, last_time);
1285            }
1286        }
1287    }
1288
1289    /// Schedule a ProcessSendBuffer event for the given connection.
1290    fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1291        let sequence = inner.next_sequence;
1292        inner.next_sequence += 1;
1293
1294        inner.event_queue.schedule(ScheduledEvent::new(
1295            inner.current_time,
1296            Event::Network {
1297                connection_id: connection_id.0,
1298                operation: NetworkOperation::ProcessSendBuffer,
1299            },
1300            sequence,
1301        ));
1302    }
1303
1304    /// Handle shutdown event - wake all pending tasks.
1305    fn handle_shutdown_event(inner: &mut SimInner) {
1306        tracing::debug!("Processing Shutdown event - waking all pending tasks");
1307
1308        for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
1309            tracing::trace!("Waking task {}", task_id);
1310            waker.wake();
1311        }
1312
1313        for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
1314            waker.wake();
1315        }
1316
1317        tracing::debug!("Shutdown event processed");
1318    }
1319
1320    /// Get current assertion results for all tracked assertions.
1321    pub fn assertion_results(
1322        &self,
1323    ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1324        crate::chaos::get_assertion_results()
1325    }
1326
1327    /// Reset assertion statistics to empty state.
1328    pub fn reset_assertion_results(&self) {
1329        crate::chaos::reset_assertion_results();
1330    }
1331
1332    /// Abort all connections involving a specific IP address.
1333    ///
1334    /// This is used during process reboot to immediately kill all network
1335    /// connections for the rebooted process. Both local and remote connections
1336    /// are aborted (RST semantics — peer sees ECONNRESET).
1337    pub fn abort_all_connections_for_ip(&self, ip: std::net::IpAddr) {
1338        let connection_ids: Vec<ConnectionId> = {
1339            let inner = self.inner.borrow();
1340            inner
1341                .network
1342                .connections
1343                .iter()
1344                .filter_map(|(id, conn)| {
1345                    if conn.local_ip == Some(ip) || conn.remote_ip == Some(ip) {
1346                        Some(*id)
1347                    } else {
1348                        None
1349                    }
1350                })
1351                .collect()
1352        };
1353
1354        let count = connection_ids.len();
1355        for conn_id in connection_ids {
1356            self.close_connection_abort(conn_id);
1357        }
1358
1359        if count > 0 {
1360            tracing::debug!("Aborted {} connections for rebooted IP {}", count, ip);
1361        }
1362    }
1363
1364    /// Schedule a `ProcessRestart` event after a recovery delay.
1365    ///
1366    /// Called after a process is killed to schedule its restart.
1367    pub fn schedule_process_restart(
1368        &self,
1369        ip: std::net::IpAddr,
1370        recovery_delay: std::time::Duration,
1371    ) {
1372        self.schedule_event(Event::ProcessRestart { ip }, recovery_delay);
1373        tracing::debug!(
1374            "Scheduled process restart for IP {} in {:?}",
1375            ip,
1376            recovery_delay
1377        );
1378    }
1379
1380    /// Returns the last event processed by `step()`, if any.
1381    ///
1382    /// This is used by the orchestrator to detect `ProcessRestart` events
1383    /// and handle them (respawn the process).
1384    pub fn last_processed_event(&self) -> Option<Event> {
1385        self.inner.borrow().last_processed_event.clone()
1386    }
1387
1388    /// Extract simulation metrics (simulated time, events processed).
1389    pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1390        let inner = self.inner.borrow();
1391
1392        crate::runner::SimulationMetrics {
1393            wall_time: std::time::Duration::ZERO,
1394            simulated_time: inner.current_time,
1395            events_processed: inner.events_processed,
1396        }
1397    }
1398
1399    // Phase 7: Simple write clogging methods
1400
1401    /// Check if a write should be clogged based on probability
1402    pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1403        let inner = self.inner.borrow();
1404        let config = &inner.network.config;
1405
1406        // Skip stable connections (FDB: stableConnection exempt from chaos)
1407        if inner
1408            .network
1409            .connections
1410            .get(&connection_id)
1411            .is_some_and(|conn| conn.is_stable)
1412        {
1413            return false;
1414        }
1415
1416        // Skip if already clogged
1417        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1418            return inner.current_time < clog_state.expires_at;
1419        }
1420
1421        // Check probability
1422        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1423    }
1424
1425    /// Clog a connection's write operations
1426    pub fn clog_write(&self, connection_id: ConnectionId) {
1427        let mut inner = self.inner.borrow_mut();
1428        let config = &inner.network.config;
1429
1430        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1431        let expires_at = inner.current_time + clog_duration;
1432        inner
1433            .network
1434            .connection_clogs
1435            .insert(connection_id, ClogState { expires_at });
1436
1437        // Schedule an event to clear this clog
1438        let clear_event = Event::Connection {
1439            id: connection_id.0,
1440            state: ConnectionStateChange::ClogClear,
1441        };
1442        let sequence = inner.next_sequence;
1443        inner.next_sequence += 1;
1444        inner
1445            .event_queue
1446            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1447    }
1448
1449    /// Check if a connection's writes are currently clogged
1450    pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1451        let inner = self.inner.borrow();
1452
1453        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1454            inner.current_time < clog_state.expires_at
1455        } else {
1456            false
1457        }
1458    }
1459
1460    /// Register a waker for when write clog clears
1461    pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1462        let mut inner = self.inner.borrow_mut();
1463        inner
1464            .wakers
1465            .clog_wakers
1466            .entry(connection_id)
1467            .or_default()
1468            .push(waker);
1469    }
1470
1471    // Read clogging methods (symmetric with write clogging)
1472
1473    /// Check if a read should be clogged based on probability
1474    pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1475        let inner = self.inner.borrow();
1476        let config = &inner.network.config;
1477
1478        // Skip stable connections (FDB: stableConnection exempt from chaos)
1479        if inner
1480            .network
1481            .connections
1482            .get(&connection_id)
1483            .is_some_and(|conn| conn.is_stable)
1484        {
1485            return false;
1486        }
1487
1488        // Skip if already clogged
1489        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1490            return inner.current_time < clog_state.expires_at;
1491        }
1492
1493        // Check probability (same as write clogging)
1494        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1495    }
1496
1497    /// Clog a connection's read operations
1498    pub fn clog_read(&self, connection_id: ConnectionId) {
1499        let mut inner = self.inner.borrow_mut();
1500        let config = &inner.network.config;
1501
1502        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1503        let expires_at = inner.current_time + clog_duration;
1504        inner
1505            .network
1506            .read_clogs
1507            .insert(connection_id, ClogState { expires_at });
1508
1509        // Schedule an event to clear this read clog
1510        let clear_event = Event::Connection {
1511            id: connection_id.0,
1512            state: ConnectionStateChange::ReadClogClear,
1513        };
1514        let sequence = inner.next_sequence;
1515        inner.next_sequence += 1;
1516        inner
1517            .event_queue
1518            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1519    }
1520
1521    /// Check if a connection's reads are currently clogged
1522    pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1523        let inner = self.inner.borrow();
1524
1525        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1526            inner.current_time < clog_state.expires_at
1527        } else {
1528            false
1529        }
1530    }
1531
1532    /// Register a waker for when read clog clears
1533    pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1534        let mut inner = self.inner.borrow_mut();
1535        inner
1536            .wakers
1537            .read_clog_wakers
1538            .entry(connection_id)
1539            .or_default()
1540            .push(waker);
1541    }
1542
1543    /// Clear expired clogs and wake pending tasks
1544    pub fn clear_expired_clogs(&self) {
1545        let mut inner = self.inner.borrow_mut();
1546        let now = inner.current_time;
1547        let expired: Vec<ConnectionId> = inner
1548            .network
1549            .connection_clogs
1550            .iter()
1551            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1552            .collect();
1553
1554        for id in expired {
1555            inner.network.connection_clogs.remove(&id);
1556            Self::wake_all(&mut inner.wakers.clog_wakers, id);
1557        }
1558    }
1559
1560    // Connection Cut Methods (temporary network outage simulation)
1561
1562    /// Temporarily cut a connection for the specified duration.
1563    ///
1564    /// Unlike `close_connection`, a cut connection will be automatically restored
1565    /// after the duration expires. This simulates temporary network outages where
1566    /// the underlying connection remains but is temporarily unavailable.
1567    ///
1568    /// During a cut:
1569    /// - `poll_read` returns `Poll::Pending` (waits for restore)
1570    /// - `poll_write` returns `Poll::Pending` (waits for restore)
1571    /// - Buffered data is preserved
1572    pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1573        let mut inner = self.inner.borrow_mut();
1574        let expires_at = inner.current_time + duration;
1575
1576        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1577            conn.is_cut = true;
1578            conn.cut_expiry = Some(expires_at);
1579            tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1580
1581            // Schedule restoration event
1582            let restore_event = Event::Connection {
1583                id: connection_id.0,
1584                state: ConnectionStateChange::CutRestore,
1585            };
1586            let sequence = inner.next_sequence;
1587            inner.next_sequence += 1;
1588            inner
1589                .event_queue
1590                .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1591        }
1592    }
1593
1594    /// Check if a connection is temporarily cut.
1595    ///
1596    /// A cut connection is temporarily unavailable but will be restored.
1597    /// This is different from `is_connection_closed` which indicates permanent closure.
1598    pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1599        let inner = self.inner.borrow();
1600        inner
1601            .network
1602            .connections
1603            .get(&connection_id)
1604            .is_some_and(|conn| {
1605                conn.is_cut
1606                    && conn
1607                        .cut_expiry
1608                        .is_some_and(|expiry| inner.current_time < expiry)
1609            })
1610    }
1611
1612    /// Restore a cut connection immediately.
1613    ///
1614    /// This cancels the cut state and wakes any tasks waiting for restoration.
1615    pub fn restore_connection(&self, connection_id: ConnectionId) {
1616        let mut inner = self.inner.borrow_mut();
1617
1618        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1619            && conn.is_cut
1620        {
1621            conn.is_cut = false;
1622            conn.cut_expiry = None;
1623            tracing::debug!("Connection {} restored", connection_id.0);
1624
1625            // Wake any tasks waiting for restoration
1626            Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1627        }
1628    }
1629
1630    /// Register a waker for when a cut connection is restored.
1631    pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1632        let mut inner = self.inner.borrow_mut();
1633        inner
1634            .wakers
1635            .cut_wakers
1636            .entry(connection_id)
1637            .or_default()
1638            .push(waker);
1639    }
1640
1641    // Send buffer management methods
1642
1643    /// Get the send buffer capacity for a connection.
1644    pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1645        let inner = self.inner.borrow();
1646        inner
1647            .network
1648            .connections
1649            .get(&connection_id)
1650            .map(|conn| conn.send_buffer_capacity)
1651            .unwrap_or(0)
1652    }
1653
1654    /// Get the current send buffer usage for a connection.
1655    pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1656        let inner = self.inner.borrow();
1657        inner
1658            .network
1659            .connections
1660            .get(&connection_id)
1661            .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1662            .unwrap_or(0)
1663    }
1664
1665    /// Get the available send buffer space for a connection.
1666    pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1667        let capacity = self.send_buffer_capacity(connection_id);
1668        let used = self.send_buffer_used(connection_id);
1669        capacity.saturating_sub(used)
1670    }
1671
1672    /// Register a waker for when send buffer space becomes available.
1673    pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1674        let mut inner = self.inner.borrow_mut();
1675        inner
1676            .wakers
1677            .send_buffer_wakers
1678            .entry(connection_id)
1679            .or_default()
1680            .push(waker);
1681    }
1682
1683    /// Wake any tasks waiting for send buffer space on a connection.
1684    #[allow(dead_code)] // May be used for external buffer management
1685    fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1686        let mut inner = self.inner.borrow_mut();
1687        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1688    }
1689
1690    // Per-IP-pair base latency methods
1691
1692    /// Get the base latency for a connection pair.
1693    /// Returns the latency if already set, otherwise None.
1694    pub fn get_pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1695        let inner = self.inner.borrow();
1696        inner.network.pair_latencies.get(&(src, dst)).copied()
1697    }
1698
1699    /// Set the base latency for a connection pair if not already set.
1700    /// Returns the latency (existing or newly set).
1701    pub fn set_pair_latency_if_not_set(
1702        &self,
1703        src: IpAddr,
1704        dst: IpAddr,
1705        latency: Duration,
1706    ) -> Duration {
1707        let mut inner = self.inner.borrow_mut();
1708        *inner
1709            .network
1710            .pair_latencies
1711            .entry((src, dst))
1712            .or_insert_with(|| {
1713                tracing::debug!(
1714                    "Setting base latency for IP pair {} -> {} to {:?}",
1715                    src,
1716                    dst,
1717                    latency
1718                );
1719                latency
1720            })
1721    }
1722
1723    /// Get the base latency for a connection based on its IP pair.
1724    /// If not set, samples from config and sets it.
1725    pub fn get_connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1726        let inner = self.inner.borrow();
1727        let (local_ip, remote_ip) = inner
1728            .network
1729            .connections
1730            .get(&connection_id)
1731            .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1732            .unwrap_or({
1733                (
1734                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1735                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1736                )
1737            });
1738        drop(inner);
1739
1740        // Check if latency is already set
1741        if let Some(latency) = self.get_pair_latency(local_ip, remote_ip) {
1742            return latency;
1743        }
1744
1745        // Sample a new latency from config and set it
1746        let latency = self
1747            .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1748        self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1749    }
1750
1751    // Per-connection asymmetric delay methods
1752
1753    /// Get the send delay for a connection.
1754    /// Returns the per-connection override if set, otherwise None.
1755    pub fn get_send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1756        let inner = self.inner.borrow();
1757        inner
1758            .network
1759            .connections
1760            .get(&connection_id)
1761            .and_then(|conn| conn.send_delay)
1762    }
1763
1764    /// Get the receive delay for a connection.
1765    /// Returns the per-connection override if set, otherwise None.
1766    pub fn get_recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1767        let inner = self.inner.borrow();
1768        inner
1769            .network
1770            .connections
1771            .get(&connection_id)
1772            .and_then(|conn| conn.recv_delay)
1773    }
1774
1775    /// Set asymmetric delays for a connection.
1776    /// If a delay is None, the global configuration is used instead.
1777    pub fn set_asymmetric_delays(
1778        &self,
1779        connection_id: ConnectionId,
1780        send_delay: Option<Duration>,
1781        recv_delay: Option<Duration>,
1782    ) {
1783        let mut inner = self.inner.borrow_mut();
1784        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1785            conn.send_delay = send_delay;
1786            conn.recv_delay = recv_delay;
1787            tracing::debug!(
1788                "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1789                connection_id.0,
1790                send_delay,
1791                recv_delay
1792            );
1793        }
1794    }
1795
1796    /// Check if a connection is permanently closed
1797    pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1798        let inner = self.inner.borrow();
1799        inner
1800            .network
1801            .connections
1802            .get(&connection_id)
1803            .is_some_and(|conn| conn.is_closed)
1804    }
1805
1806    /// Close a connection gracefully (FIN semantics).
1807    ///
1808    /// The peer will receive EOF on read operations.
1809    pub fn close_connection(&self, connection_id: ConnectionId) {
1810        self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1811    }
1812
1813    /// Close a connection abruptly (RST semantics).
1814    ///
1815    /// The peer will receive ECONNRESET on both read and write operations.
1816    pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1817        self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1818    }
1819
1820    /// Get the close reason for a connection.
1821    pub fn get_close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1822        let inner = self.inner.borrow();
1823        inner
1824            .network
1825            .connections
1826            .get(&connection_id)
1827            .map(|conn| conn.close_reason)
1828            .unwrap_or(CloseReason::None)
1829    }
1830
1831    /// Close a connection with a specific close reason.
1832    fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1833        match reason {
1834            CloseReason::Graceful => self.close_connection_graceful(connection_id),
1835            CloseReason::Aborted => self.close_connection_aborted(connection_id),
1836            CloseReason::None => {}
1837        }
1838    }
1839
1840    /// Graceful close (FIN semantics) — TCP half-close.
1841    ///
1842    /// Marks the local write side as closed. The peer can still read all buffered
1843    /// and in-flight data. A `FinDelivery` event is scheduled after the last
1844    /// `DataDelivery` to signal EOF to the peer.
1845    fn close_connection_graceful(&self, connection_id: ConnectionId) {
1846        let mut inner = self.inner.borrow_mut();
1847
1848        // Extract connection info
1849        let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
1850            (
1851                conn.paired_connection,
1852                conn.send_closed,
1853                conn.is_closed,
1854                conn.send_in_progress,
1855                conn.send_buffer.is_empty(),
1856                conn.last_data_delivery_scheduled_at,
1857            )
1858        });
1859
1860        let Some((
1861            paired_id,
1862            was_send_closed,
1863            was_closed,
1864            send_in_progress,
1865            send_buffer_empty,
1866            last_delivery_time,
1867        )) = conn_info
1868        else {
1869            return;
1870        };
1871
1872        // Idempotent: if already closed or send_closed (by chaos or previous close), skip
1873        if was_closed || was_send_closed {
1874            tracing::debug!(
1875                "Connection {} already closed/send_closed, skipping graceful close",
1876                connection_id.0
1877            );
1878            return;
1879        }
1880
1881        // Mark local side as closed with send side shut down
1882        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1883            conn.is_closed = true;
1884            conn.close_reason = CloseReason::Graceful;
1885            conn.send_closed = true;
1886            tracing::debug!(
1887                "Connection {} graceful close (FIN) - local write shut down",
1888                connection_id.0
1889            );
1890        }
1891
1892        // Wake local read waker (so local poll_read returns EOF if needed)
1893        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1894            waker.wake();
1895        }
1896
1897        // Do NOT set is_closed on the peer — they can still read buffered data.
1898        // Instead, schedule a FIN delivery after all data has been delivered.
1899        if send_in_progress || !send_buffer_empty {
1900            // Pipeline has data — defer FIN until pipeline drains
1901            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1902                conn.graceful_close_pending = true;
1903                tracing::debug!(
1904                    "Connection {} graceful close deferred (send pipeline active)",
1905                    connection_id.0
1906                );
1907            }
1908        } else {
1909            // Pipeline is idle — schedule FIN delivery now
1910            Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
1911        }
1912    }
1913
1914    /// Aborted close (RST semantics) — immediate connection kill.
1915    ///
1916    /// Immediately sets `is_closed` on both endpoints. The peer will receive
1917    /// ECONNRESET on both read and write operations.
1918    fn close_connection_aborted(&self, connection_id: ConnectionId) {
1919        let mut inner = self.inner.borrow_mut();
1920
1921        let paired_connection_id = inner
1922            .network
1923            .connections
1924            .get(&connection_id)
1925            .and_then(|conn| conn.paired_connection);
1926
1927        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1928            && !conn.is_closed
1929        {
1930            conn.is_closed = true;
1931            conn.close_reason = CloseReason::Aborted;
1932            // Cancel any pending graceful close
1933            conn.graceful_close_pending = false;
1934            tracing::debug!(
1935                "Connection {} closed permanently (reason: Aborted)",
1936                connection_id.0
1937            );
1938        }
1939
1940        if let Some(paired_id) = paired_connection_id
1941            && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1942            && !paired_conn.is_closed
1943        {
1944            paired_conn.is_closed = true;
1945            paired_conn.close_reason = CloseReason::Aborted;
1946            tracing::debug!(
1947                "Paired connection {} also closed (reason: Aborted)",
1948                paired_id.0
1949            );
1950        }
1951
1952        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1953            tracing::debug!(
1954                "Waking read waker for aborted connection {}",
1955                connection_id.0
1956            );
1957            waker.wake();
1958        }
1959
1960        if let Some(paired_id) = paired_connection_id
1961            && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1962        {
1963            tracing::debug!(
1964                "Waking read waker for paired aborted connection {}",
1965                paired_id.0
1966            );
1967            paired_waker.wake();
1968        }
1969    }
1970
1971    /// Close connection asymmetrically (FDB rollRandomClose pattern)
1972    pub fn close_connection_asymmetric(
1973        &self,
1974        connection_id: ConnectionId,
1975        close_send: bool,
1976        close_recv: bool,
1977    ) {
1978        let mut inner = self.inner.borrow_mut();
1979
1980        let paired_id = inner
1981            .network
1982            .connections
1983            .get(&connection_id)
1984            .and_then(|conn| conn.paired_connection);
1985
1986        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1987            conn.send_closed = true;
1988            conn.send_buffer.clear();
1989            tracing::debug!(
1990                "Connection {} send side closed (asymmetric)",
1991                connection_id.0
1992            );
1993        }
1994
1995        if close_recv
1996            && let Some(paired) = paired_id
1997            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1998        {
1999            paired_conn.recv_closed = true;
2000            tracing::debug!(
2001                "Connection {} recv side closed (asymmetric via peer)",
2002                paired.0
2003            );
2004        }
2005
2006        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2007            waker.wake();
2008        }
2009        if close_recv
2010            && let Some(paired) = paired_id
2011            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2012        {
2013            waker.wake();
2014        }
2015    }
2016
2017    /// Roll random close chaos injection (FDB rollRandomClose pattern)
2018    pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
2019        let mut inner = self.inner.borrow_mut();
2020        let config = &inner.network.config;
2021
2022        // Skip stable connections (FDB: stableConnection exempt from chaos)
2023        if inner
2024            .network
2025            .connections
2026            .get(&connection_id)
2027            .is_some_and(|conn| conn.is_stable)
2028        {
2029            return None;
2030        }
2031
2032        if config.chaos.random_close_probability <= 0.0 {
2033            return None;
2034        }
2035
2036        let current_time = inner.current_time;
2037        let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
2038        if time_since_last < config.chaos.random_close_cooldown {
2039            return None;
2040        }
2041
2042        if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
2043            return None;
2044        }
2045
2046        inner.network.last_random_close_time = current_time;
2047
2048        let paired_id = inner
2049            .network
2050            .connections
2051            .get(&connection_id)
2052            .and_then(|conn| conn.paired_connection);
2053
2054        let a = super::rng::sim_random_f64();
2055        let close_recv = a < 0.66;
2056        let close_send = a > 0.33;
2057
2058        tracing::info!(
2059            "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
2060            connection_id.0,
2061            close_send,
2062            close_recv,
2063            a
2064        );
2065
2066        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2067            conn.send_closed = true;
2068            conn.send_buffer.clear();
2069        }
2070
2071        if close_recv
2072            && let Some(paired) = paired_id
2073            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2074        {
2075            paired_conn.recv_closed = true;
2076        }
2077
2078        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2079            waker.wake();
2080        }
2081        if close_recv
2082            && let Some(paired) = paired_id
2083            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2084        {
2085            waker.wake();
2086        }
2087
2088        let b = super::rng::sim_random_f64();
2089        let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
2090
2091        tracing::debug!(
2092            "Random close explicit={} (b={:.3}, ratio={:.2})",
2093            explicit,
2094            b,
2095            inner.network.config.chaos.random_close_explicit_ratio
2096        );
2097
2098        Some(explicit)
2099    }
2100
2101    /// Check if a connection's send side is closed
2102    pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
2103        let inner = self.inner.borrow();
2104        inner
2105            .network
2106            .connections
2107            .get(&connection_id)
2108            .is_some_and(|conn| conn.send_closed || conn.is_closed)
2109    }
2110
2111    /// Check if a connection's receive side is closed
2112    pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
2113        let inner = self.inner.borrow();
2114        inner
2115            .network
2116            .connections
2117            .get(&connection_id)
2118            .is_some_and(|conn| conn.recv_closed || conn.is_closed)
2119    }
2120
2121    /// Check if a FIN has been received from the remote peer (graceful close).
2122    ///
2123    /// When true, `poll_read` should return EOF after draining the receive buffer.
2124    /// Distinct from `is_recv_closed` which is used for chaos/asymmetric closure.
2125    pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
2126        let inner = self.inner.borrow();
2127        inner
2128            .network
2129            .connections
2130            .get(&connection_id)
2131            .is_some_and(|conn| conn.remote_fin_received)
2132    }
2133
2134    // Half-Open Connection Simulation Methods
2135
2136    /// Simulate a peer crash on a connection.
2137    ///
2138    /// This puts the connection in a half-open state where:
2139    /// - The local side still thinks it's connected
2140    /// - Writes succeed but data is silently discarded (peer is gone)
2141    /// - Reads block waiting for data that will never come
2142    /// - After `error_delay`, both read and write return ECONNRESET
2143    ///
2144    /// This simulates real-world scenarios where a remote peer crashes
2145    /// or becomes unreachable, but the local TCP stack hasn't detected it yet.
2146    pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
2147        let mut inner = self.inner.borrow_mut();
2148        let current_time = inner.current_time;
2149        let error_at = current_time + error_delay;
2150
2151        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2152            conn.is_half_open = true;
2153            conn.half_open_error_at = Some(error_at);
2154
2155            // Clear the paired connection to simulate peer being gone
2156            // Data sent will go nowhere
2157            conn.paired_connection = None;
2158
2159            tracing::info!(
2160                "Connection {} now half-open, errors manifest at {:?}",
2161                connection_id.0,
2162                error_at
2163            );
2164        }
2165
2166        // Schedule an event to wake any waiting readers when error time arrives
2167        let wake_event = Event::Connection {
2168            id: connection_id.0,
2169            state: ConnectionStateChange::HalfOpenError,
2170        };
2171        let sequence = inner.next_sequence;
2172        inner.next_sequence += 1;
2173        let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
2174        inner.event_queue.schedule(scheduled_event);
2175    }
2176
2177    /// Check if a connection is in half-open state
2178    pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
2179        let inner = self.inner.borrow();
2180        inner
2181            .network
2182            .connections
2183            .get(&connection_id)
2184            .is_some_and(|conn| conn.is_half_open)
2185    }
2186
2187    /// Check if a half-open connection should return errors now
2188    pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
2189        let inner = self.inner.borrow();
2190        let current_time = inner.current_time;
2191        inner
2192            .network
2193            .connections
2194            .get(&connection_id)
2195            .is_some_and(|conn| {
2196                conn.is_half_open
2197                    && conn
2198                        .half_open_error_at
2199                        .is_some_and(|error_at| current_time >= error_at)
2200            })
2201    }
2202
2203    // Stable Connection Methods
2204
2205    /// Mark a connection as stable, exempting it from chaos injection.
2206    ///
2207    /// Stable connections are exempt from:
2208    /// - Random close (`roll_random_close`)
2209    /// - Write clogging
2210    /// - Read clogging
2211    /// - Bit flip corruption
2212    /// - Partial write truncation
2213    ///
2214    /// FDB ref: sim2.actor.cpp:357-362 (`stableConnection` flag)
2215    ///
2216    /// # Real-World Scenario
2217    /// Use this for parent-child process connections or supervision channels
2218    /// that should remain reliable even during chaos testing.
2219    pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
2220        let mut inner = self.inner.borrow_mut();
2221        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2222            conn.is_stable = true;
2223            tracing::debug!("Connection {} marked as stable", connection_id.0);
2224
2225            // Also mark the paired connection as stable
2226            if let Some(paired_id) = conn.paired_connection
2227                && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
2228            {
2229                paired_conn.is_stable = true;
2230                tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
2231            }
2232        }
2233    }
2234
2235    /// Check if a connection is marked as stable.
2236    pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
2237        let inner = self.inner.borrow();
2238        inner
2239            .network
2240            .connections
2241            .get(&connection_id)
2242            .is_some_and(|conn| conn.is_stable)
2243    }
2244
2245    // Network Partition Control Methods
2246
2247    /// Partition communication between two IP addresses for a specified duration
2248    pub fn partition_pair(
2249        &self,
2250        from_ip: std::net::IpAddr,
2251        to_ip: std::net::IpAddr,
2252        duration: Duration,
2253    ) -> SimulationResult<()> {
2254        let mut inner = self.inner.borrow_mut();
2255        let expires_at = inner.current_time + duration;
2256
2257        inner
2258            .network
2259            .ip_partitions
2260            .insert((from_ip, to_ip), PartitionState { expires_at });
2261
2262        let restore_event = Event::Connection {
2263            id: 0,
2264            state: ConnectionStateChange::PartitionRestore,
2265        };
2266        let sequence = inner.next_sequence;
2267        inner.next_sequence += 1;
2268        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2269        inner.event_queue.schedule(scheduled_event);
2270
2271        tracing::debug!(
2272            "Partitioned {} -> {} until {:?}",
2273            from_ip,
2274            to_ip,
2275            expires_at
2276        );
2277        Ok(())
2278    }
2279
2280    /// Block all outgoing communication from an IP address
2281    pub fn partition_send_from(
2282        &self,
2283        ip: std::net::IpAddr,
2284        duration: Duration,
2285    ) -> SimulationResult<()> {
2286        let mut inner = self.inner.borrow_mut();
2287        let expires_at = inner.current_time + duration;
2288
2289        inner.network.send_partitions.insert(ip, expires_at);
2290
2291        let clear_event = Event::Connection {
2292            id: 0,
2293            state: ConnectionStateChange::SendPartitionClear,
2294        };
2295        let sequence = inner.next_sequence;
2296        inner.next_sequence += 1;
2297        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2298        inner.event_queue.schedule(scheduled_event);
2299
2300        tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
2301        Ok(())
2302    }
2303
2304    /// Block all incoming communication to an IP address
2305    pub fn partition_recv_to(
2306        &self,
2307        ip: std::net::IpAddr,
2308        duration: Duration,
2309    ) -> SimulationResult<()> {
2310        let mut inner = self.inner.borrow_mut();
2311        let expires_at = inner.current_time + duration;
2312
2313        inner.network.recv_partitions.insert(ip, expires_at);
2314
2315        let clear_event = Event::Connection {
2316            id: 0,
2317            state: ConnectionStateChange::RecvPartitionClear,
2318        };
2319        let sequence = inner.next_sequence;
2320        inner.next_sequence += 1;
2321        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2322        inner.event_queue.schedule(scheduled_event);
2323
2324        tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
2325        Ok(())
2326    }
2327
2328    /// Immediately restore communication between two IP addresses
2329    pub fn restore_partition(
2330        &self,
2331        from_ip: std::net::IpAddr,
2332        to_ip: std::net::IpAddr,
2333    ) -> SimulationResult<()> {
2334        let mut inner = self.inner.borrow_mut();
2335        inner.network.ip_partitions.remove(&(from_ip, to_ip));
2336        tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
2337        Ok(())
2338    }
2339
2340    /// Check if communication between two IP addresses is currently partitioned
2341    pub fn is_partitioned(
2342        &self,
2343        from_ip: std::net::IpAddr,
2344        to_ip: std::net::IpAddr,
2345    ) -> SimulationResult<bool> {
2346        let inner = self.inner.borrow();
2347        Ok(inner
2348            .network
2349            .is_partitioned(from_ip, to_ip, inner.current_time))
2350    }
2351
2352    /// Helper method for use with SimInner - randomly trigger partitions
2353    ///
2354    /// Supports different partition strategies based on configuration:
2355    /// - Random: randomly partition individual IP pairs
2356    /// - UniformSize: create uniform-sized partition groups
2357    /// - IsolateSingle: isolate exactly one node from the rest
2358    fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
2359        let partition_config = &inner.network.config;
2360
2361        if partition_config.chaos.partition_probability == 0.0 {
2362            return;
2363        }
2364
2365        // Check if we should trigger a partition this step
2366        if sim_random::<f64>() >= partition_config.chaos.partition_probability {
2367            return;
2368        }
2369
2370        // Collect unique IPs from connections
2371        let unique_ips: HashSet<IpAddr> = inner
2372            .network
2373            .connections
2374            .values()
2375            .filter_map(|conn| conn.local_ip)
2376            .collect();
2377
2378        if unique_ips.len() < 2 {
2379            return; // Need at least 2 IPs to partition
2380        }
2381
2382        let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
2383        let partition_duration =
2384            crate::network::sample_duration(&partition_config.chaos.partition_duration);
2385        let expires_at = inner.current_time + partition_duration;
2386
2387        // Select IPs to partition based on strategy
2388        let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
2389            PartitionStrategy::Random => {
2390                // Original behavior: randomly decide for each IP
2391                ip_list
2392                    .iter()
2393                    .filter(|_| sim_random::<f64>() < 0.5)
2394                    .copied()
2395                    .collect()
2396            }
2397            PartitionStrategy::UniformSize => {
2398                // TigerBeetle-style: random partition size from 1 to n-1
2399                let partition_size = sim_random_range(1..ip_list.len());
2400                // Shuffle and take first N IPs
2401                let mut shuffled = ip_list.clone();
2402                // Simple Fisher-Yates shuffle
2403                for i in (1..shuffled.len()).rev() {
2404                    let j = sim_random_range(0..i + 1);
2405                    shuffled.swap(i, j);
2406                }
2407                shuffled.into_iter().take(partition_size).collect()
2408            }
2409            PartitionStrategy::IsolateSingle => {
2410                // Isolate exactly one node
2411                let idx = sim_random_range(0..ip_list.len());
2412                vec![ip_list[idx]]
2413            }
2414        };
2415
2416        // Don't partition if we selected all IPs or none
2417        if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
2418            return;
2419        }
2420
2421        // Create bi-directional partitions between partitioned and non-partitioned groups
2422        let non_partitioned: Vec<IpAddr> = ip_list
2423            .iter()
2424            .filter(|ip| !partitioned_ips.contains(ip))
2425            .copied()
2426            .collect();
2427
2428        for &from_ip in &partitioned_ips {
2429            for &to_ip in &non_partitioned {
2430                // Skip if already partitioned
2431                if inner
2432                    .network
2433                    .is_partitioned(from_ip, to_ip, inner.current_time)
2434                {
2435                    continue;
2436                }
2437
2438                // Partition in both directions
2439                inner
2440                    .network
2441                    .ip_partitions
2442                    .insert((from_ip, to_ip), PartitionState { expires_at });
2443                inner
2444                    .network
2445                    .ip_partitions
2446                    .insert((to_ip, from_ip), PartitionState { expires_at });
2447
2448                tracing::debug!(
2449                    "Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
2450                    from_ip,
2451                    to_ip,
2452                    expires_at,
2453                    partition_config.chaos.partition_strategy
2454                );
2455            }
2456        }
2457
2458        // Schedule restoration event
2459        let restore_event = Event::Connection {
2460            id: 0,
2461            state: ConnectionStateChange::PartitionRestore,
2462        };
2463        let sequence = inner.next_sequence;
2464        inner.next_sequence += 1;
2465        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2466        inner.event_queue.schedule(scheduled_event);
2467    }
2468}
2469
2470impl Default for SimWorld {
2471    fn default() -> Self {
2472        Self::new()
2473    }
2474}
2475
2476/// A weak reference to a simulation world.
2477///
2478/// This provides handle-based access to the simulation without holding
2479/// a strong reference that would prevent cleanup.
2480#[derive(Debug)]
2481pub struct WeakSimWorld {
2482    pub(crate) inner: Weak<RefCell<SimInner>>,
2483}
2484
2485/// Macro to generate WeakSimWorld forwarding methods that wrap SimWorld results.
2486macro_rules! weak_forward {
2487    // For methods returning T that need Ok() wrapping
2488    (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2489        $(#[$meta])*
2490        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2491            Ok(self.upgrade()?.$method($($arg),*))
2492        }
2493    };
2494    // For methods already returning SimulationResult
2495    (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2496        $(#[$meta])*
2497        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2498            self.upgrade()?.$method($($arg),*)
2499        }
2500    };
2501    // For methods returning () that need Ok(()) wrapping
2502    (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2503        $(#[$meta])*
2504        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2505            self.upgrade()?.$method($($arg),*);
2506            Ok(())
2507        }
2508    };
2509}
2510
2511impl WeakSimWorld {
2512    /// Attempts to upgrade this weak reference to a strong reference.
2513    pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2514        self.inner
2515            .upgrade()
2516            .map(|inner| SimWorld { inner })
2517            .ok_or(SimulationError::SimulationShutdown)
2518    }
2519
2520    weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2521    weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2522    weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2523    weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2524    weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2525    weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2526    weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2527    weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2528    weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2529    weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2530    weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2531
2532    /// Access network configuration for latency calculations.
2533    pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2534    where
2535        F: FnOnce(&NetworkConfiguration) -> R,
2536    {
2537        Ok(self.upgrade()?.with_network_config(f))
2538    }
2539}
2540
2541impl Clone for WeakSimWorld {
2542    fn clone(&self) -> Self {
2543        Self {
2544            inner: self.inner.clone(),
2545        }
2546    }
2547}
2548
2549#[cfg(test)]
2550mod tests {
2551    use super::*;
2552
2553    #[test]
2554    fn sim_world_basic_lifecycle() {
2555        let mut sim = SimWorld::new();
2556
2557        // Initial state
2558        assert_eq!(sim.current_time(), Duration::ZERO);
2559        assert!(!sim.has_pending_events());
2560        assert_eq!(sim.pending_event_count(), 0);
2561
2562        // Schedule an event
2563        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2564
2565        assert!(sim.has_pending_events());
2566        assert_eq!(sim.pending_event_count(), 1);
2567        assert_eq!(sim.current_time(), Duration::ZERO);
2568
2569        // Process the event
2570        let has_more = sim.step();
2571        assert!(!has_more);
2572        assert_eq!(sim.current_time(), Duration::from_millis(100));
2573        assert!(!sim.has_pending_events());
2574        assert_eq!(sim.pending_event_count(), 0);
2575    }
2576
2577    #[test]
2578    fn sim_world_multiple_events() {
2579        let mut sim = SimWorld::new();
2580
2581        // Schedule multiple events
2582        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2583        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2584        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2585
2586        assert_eq!(sim.pending_event_count(), 3);
2587
2588        // Process events - should happen in time order
2589        assert!(sim.step());
2590        assert_eq!(sim.current_time(), Duration::from_millis(100));
2591        assert_eq!(sim.pending_event_count(), 2);
2592
2593        assert!(sim.step());
2594        assert_eq!(sim.current_time(), Duration::from_millis(200));
2595        assert_eq!(sim.pending_event_count(), 1);
2596
2597        assert!(!sim.step());
2598        assert_eq!(sim.current_time(), Duration::from_millis(300));
2599        assert_eq!(sim.pending_event_count(), 0);
2600    }
2601
2602    #[test]
2603    fn sim_world_run_until_empty() {
2604        let mut sim = SimWorld::new();
2605
2606        // Schedule multiple events
2607        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2608        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2609        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2610
2611        // Run until all events are processed
2612        sim.run_until_empty();
2613
2614        assert_eq!(sim.current_time(), Duration::from_millis(300));
2615        assert!(!sim.has_pending_events());
2616    }
2617
2618    #[test]
2619    fn sim_world_schedule_at_specific_time() {
2620        let mut sim = SimWorld::new();
2621
2622        // Schedule event at specific time
2623        sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2624
2625        assert_eq!(sim.current_time(), Duration::ZERO);
2626
2627        sim.step();
2628
2629        assert_eq!(sim.current_time(), Duration::from_millis(500));
2630    }
2631
2632    #[test]
2633    fn weak_sim_world_lifecycle() {
2634        let sim = SimWorld::new();
2635        let weak = sim.downgrade();
2636
2637        // Can upgrade and use weak reference
2638        assert_eq!(
2639            weak.current_time().expect("should get time"),
2640            Duration::ZERO
2641        );
2642
2643        // Schedule event through weak reference
2644        weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2645            .expect("should schedule event");
2646
2647        // Verify event was scheduled
2648        assert!(sim.has_pending_events());
2649
2650        // Drop the original simulation
2651        drop(sim);
2652
2653        // Weak reference should now fail
2654        assert_eq!(
2655            weak.current_time(),
2656            Err(SimulationError::SimulationShutdown)
2657        );
2658        assert_eq!(
2659            weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2660            Err(SimulationError::SimulationShutdown)
2661        );
2662    }
2663
2664    #[test]
2665    fn deterministic_event_ordering() {
2666        let mut sim = SimWorld::new();
2667
2668        // Schedule events at the same time
2669        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2670        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2671        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2672
2673        // All events are at the same time, but should be processed in sequence order
2674        assert!(sim.step());
2675        assert_eq!(sim.current_time(), Duration::from_millis(100));
2676        assert!(sim.step());
2677        assert_eq!(sim.current_time(), Duration::from_millis(100));
2678        assert!(!sim.step());
2679        assert_eq!(sim.current_time(), Duration::from_millis(100));
2680    }
2681}