Skip to main content

moonpool_sim/sim/
world.rs

1//! Core simulation world and coordination logic.
2//!
3//! This module provides the central SimWorld coordinator that manages time,
4//! event processing, and network simulation state.
5
6use std::{
7    cell::RefCell,
8    collections::{BTreeMap, HashSet, VecDeque},
9    net::IpAddr,
10    rc::{Rc, Weak},
11    task::Waker,
12    time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17    SimulationError, SimulationResult,
18    network::{
19        NetworkConfiguration, PartitionStrategy,
20        sim::{ConnectionId, ListenerId, SimNetworkProvider},
21    },
22};
23
24use super::{
25    events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
26    rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
27    sleep::SleepFuture,
28    state::{
29        ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
30        StorageState,
31    },
32    wakers::WakerRegistry,
33};
34
35/// Internal simulation state holder
36#[derive(Debug)]
37pub(crate) struct SimInner {
38    pub(crate) current_time: Duration,
39    /// Drifted timer time (can be ahead of current_time)
40    /// FDB ref: sim2.actor.cpp:1058-1064 - timer() drifts 0-0.1s ahead of now()
41    pub(crate) timer_time: Duration,
42    pub(crate) event_queue: EventQueue,
43    pub(crate) next_sequence: u64,
44
45    // Network management
46    pub(crate) network: NetworkState,
47
48    // Storage management
49    pub(crate) storage: StorageState,
50
51    // Async coordination
52    pub(crate) wakers: WakerRegistry,
53
54    // Task management for sleep functionality
55    pub(crate) next_task_id: u64,
56    pub(crate) awakened_tasks: HashSet<u64>,
57
58    // Event processing metrics
59    pub(crate) events_processed: u64,
60
61    // Chaos tracking
62    pub(crate) last_bit_flip_time: Duration,
63}
64
65impl SimInner {
66    pub(crate) fn new() -> Self {
67        Self {
68            current_time: Duration::ZERO,
69            timer_time: Duration::ZERO,
70            event_queue: EventQueue::new(),
71            next_sequence: 0,
72            network: NetworkState::new(NetworkConfiguration::default()),
73            storage: StorageState::default(),
74            wakers: WakerRegistry::default(),
75            next_task_id: 0,
76            awakened_tasks: HashSet::new(),
77            events_processed: 0,
78            last_bit_flip_time: Duration::ZERO,
79        }
80    }
81
82    pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
83        Self {
84            current_time: Duration::ZERO,
85            timer_time: Duration::ZERO,
86            event_queue: EventQueue::new(),
87            next_sequence: 0,
88            network: NetworkState::new(network_config),
89            storage: StorageState::default(),
90            wakers: WakerRegistry::default(),
91            next_task_id: 0,
92            awakened_tasks: HashSet::new(),
93            events_processed: 0,
94            last_bit_flip_time: Duration::ZERO,
95        }
96    }
97
98    /// Calculate the number of bits to flip using a power-law distribution.
99    ///
100    /// Uses the formula: 32 - floor(log2(random_value))
101    /// This creates a power-law distribution biased toward fewer bits:
102    /// - 1-2 bits: very common
103    /// - 16 bits: uncommon
104    /// - 32 bits: very rare
105    ///
106    /// Matches FDB's approach in FlowTransport.actor.cpp:1297
107    pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
108        if random_value == 0 {
109            // Handle edge case: treat 0 as if it were 1
110            return max_bits.min(32);
111        }
112
113        // Formula: 32 - floor(log2(x)) = 1 + leading_zeros(x)
114        let bit_count = 1 + random_value.leading_zeros();
115
116        // Clamp to configured range
117        bit_count.clamp(min_bits, max_bits)
118    }
119}
120
121/// The central simulation coordinator that manages time and event processing.
122///
123/// `SimWorld` owns all mutable simulation state and provides the main interface
124/// for scheduling events and advancing simulation time. It uses a centralized
125/// ownership model with handle-based access to avoid borrow checker conflicts.
126#[derive(Debug)]
127pub struct SimWorld {
128    pub(crate) inner: Rc<RefCell<SimInner>>,
129}
130
131impl SimWorld {
132    /// Internal constructor that handles all initialization logic.
133    fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
134        reset_sim_rng();
135        set_sim_seed(seed);
136        crate::chaos::assertions::reset_assertion_results();
137
138        let inner = match network_config {
139            Some(config) => SimInner::new_with_config(config),
140            None => SimInner::new(),
141        };
142
143        Self {
144            inner: Rc::new(RefCell::new(inner)),
145        }
146    }
147
148    /// Creates a new simulation world with default network configuration.
149    ///
150    /// Uses default seed (0) for reproducible testing. For custom seeds,
151    /// use [`SimWorld::new_with_seed`].
152    pub fn new() -> Self {
153        Self::create(None, 0)
154    }
155
156    /// Creates a new simulation world with a specific seed for deterministic randomness.
157    ///
158    /// This method ensures clean thread-local RNG state by resetting before
159    /// setting the seed, making it safe for consecutive simulations on the same thread.
160    ///
161    /// # Parameters
162    ///
163    /// * `seed` - The seed value for deterministic randomness
164    pub fn new_with_seed(seed: u64) -> Self {
165        Self::create(None, seed)
166    }
167
168    /// Creates a new simulation world with custom network configuration.
169    pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
170        Self::create(Some(network_config), 0)
171    }
172
173    /// Creates a new simulation world with both custom network configuration and seed.
174    ///
175    /// # Parameters
176    ///
177    /// * `network_config` - Network configuration for latency and fault simulation
178    /// * `seed` - The seed value for deterministic randomness
179    pub fn new_with_network_config_and_seed(
180        network_config: NetworkConfiguration,
181        seed: u64,
182    ) -> Self {
183        Self::create(Some(network_config), seed)
184    }
185
186    /// Processes the next scheduled event and advances time.
187    ///
188    /// Returns `true` if more events are available for processing,
189    /// `false` if this was the last event or if no events are available.
190    #[instrument(skip(self))]
191    pub fn step(&mut self) -> bool {
192        let mut inner = self.inner.borrow_mut();
193
194        if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
195            // Advance logical time to event timestamp
196            inner.current_time = scheduled_event.time();
197
198            // Phase 7: Clear expired clogs after time advancement
199            Self::clear_expired_clogs_with_inner(&mut inner);
200
201            // Trigger random partitions based on configuration
202            Self::randomly_trigger_partitions_with_inner(&mut inner);
203
204            // Process the event with the mutable reference
205            Self::process_event_with_inner(&mut inner, scheduled_event.into_event());
206
207            // Return true if more events are available
208            !inner.event_queue.is_empty()
209        } else {
210            // No more events to process
211            false
212        }
213    }
214
215    /// Processes all scheduled events until the queue is empty or only infrastructure events remain.
216    ///
217    /// This method processes all workload-related events but stops early if only infrastructure
218    /// events (like connection restoration) remain. This prevents infinite loops where
219    /// infrastructure events keep the simulation running indefinitely after workloads complete.
220    #[instrument(skip(self))]
221    pub fn run_until_empty(&mut self) {
222        while self.step() {
223            // Periodically check if we should stop early (every 50 events for performance)
224            if self.inner.borrow().events_processed.is_multiple_of(50) {
225                let has_workload_events = !self
226                    .inner
227                    .borrow()
228                    .event_queue
229                    .has_only_infrastructure_events();
230                if !has_workload_events {
231                    tracing::debug!(
232                        "Early termination: only infrastructure events remain in queue"
233                    );
234                    break;
235                }
236            }
237        }
238    }
239
240    /// Returns the current simulation time.
241    pub fn current_time(&self) -> Duration {
242        self.inner.borrow().current_time
243    }
244
245    /// Returns the exact simulation time (equivalent to FDB's now()).
246    ///
247    /// This is the canonical simulation time used for scheduling events.
248    /// Use this for precise time comparisons and scheduling.
249    pub fn now(&self) -> Duration {
250        self.inner.borrow().current_time
251    }
252
253    /// Returns the drifted timer time (equivalent to FDB's timer()).
254    ///
255    /// The timer can be up to `clock_drift_max` (default 100ms) ahead of `now()`.
256    /// This simulates real-world clock drift between processes, which is important
257    /// for testing time-sensitive code like:
258    /// - Timeout handling
259    /// - Lease expiration
260    /// - Distributed consensus (leader election)
261    /// - Cache invalidation
262    /// - Heartbeat detection
263    ///
264    /// FDB formula: `timerTime += random01() * (time + 0.1 - timerTime) / 2.0`
265    ///
266    /// FDB ref: sim2.actor.cpp:1058-1064
267    pub fn timer(&self) -> Duration {
268        let mut inner = self.inner.borrow_mut();
269        let chaos = &inner.network.config.chaos;
270
271        // If clock drift is disabled, return exact simulation time
272        if !chaos.clock_drift_enabled {
273            return inner.current_time;
274        }
275
276        // FDB formula: timerTime += random01() * (time + 0.1 - timerTime) / 2.0
277        // This smoothly interpolates timerTime toward (time + drift_max)
278        // The /2.0 creates a damped approach (never overshoots)
279        let max_timer = inner.current_time + chaos.clock_drift_max;
280
281        // Only advance if timer is behind max
282        if inner.timer_time < max_timer {
283            let random_factor = sim_random::<f64>(); // 0.0 to 1.0
284            let gap = (max_timer - inner.timer_time).as_secs_f64();
285            let delta = random_factor * gap / 2.0;
286            inner.timer_time += Duration::from_secs_f64(delta);
287        }
288
289        // Ensure timer never goes backwards relative to simulation time
290        inner.timer_time = inner.timer_time.max(inner.current_time);
291
292        inner.timer_time
293    }
294
295    /// Schedules an event to execute after the specified delay from the current time.
296    #[instrument(skip(self))]
297    pub fn schedule_event(&self, event: Event, delay: Duration) {
298        let mut inner = self.inner.borrow_mut();
299        let scheduled_time = inner.current_time + delay;
300        let sequence = inner.next_sequence;
301        inner.next_sequence += 1;
302
303        let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
304        inner.event_queue.schedule(scheduled_event);
305    }
306
307    /// Schedules an event to execute at the specified absolute time.
308    pub fn schedule_event_at(&self, event: Event, time: Duration) {
309        let mut inner = self.inner.borrow_mut();
310        let sequence = inner.next_sequence;
311        inner.next_sequence += 1;
312
313        let scheduled_event = ScheduledEvent::new(time, event, sequence);
314        inner.event_queue.schedule(scheduled_event);
315    }
316
317    /// Creates a weak reference to this simulation world.
318    ///
319    /// Weak references can be used to access the simulation without preventing
320    /// it from being dropped, enabling handle-based access patterns.
321    pub fn downgrade(&self) -> WeakSimWorld {
322        WeakSimWorld {
323            inner: Rc::downgrade(&self.inner),
324        }
325    }
326
327    /// Returns `true` if there are events waiting to be processed.
328    pub fn has_pending_events(&self) -> bool {
329        !self.inner.borrow().event_queue.is_empty()
330    }
331
332    /// Returns the number of events waiting to be processed.
333    pub fn pending_event_count(&self) -> usize {
334        self.inner.borrow().event_queue.len()
335    }
336
337    /// Create a network provider for this simulation
338    pub fn network_provider(&self) -> SimNetworkProvider {
339        SimNetworkProvider::new(self.downgrade())
340    }
341
342    /// Create a time provider for this simulation
343    pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
344        crate::providers::SimTimeProvider::new(self.downgrade())
345    }
346
347    /// Create a task provider for this simulation
348    pub fn task_provider(&self) -> crate::TokioTaskProvider {
349        crate::TokioTaskProvider
350    }
351
352    /// Create a storage provider for this simulation.
353    pub fn storage_provider(&self) -> crate::storage::SimStorageProvider {
354        crate::storage::SimStorageProvider::new(self.downgrade())
355    }
356
357    /// Set storage configuration for this simulation.
358    ///
359    /// Must be called before any storage operations are performed.
360    pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
361        self.inner.borrow_mut().storage.config = config;
362    }
363
364    /// Access network configuration for latency calculations using thread-local RNG.
365    ///
366    /// This method provides access to the network configuration for calculating
367    /// latencies and other network parameters. Random values should be generated
368    /// using the thread-local RNG functions like `sim_random()`.
369    ///
370    /// Access the network configuration for this simulation.
371    pub fn with_network_config<F, R>(&self, f: F) -> R
372    where
373        F: FnOnce(&NetworkConfiguration) -> R,
374    {
375        let inner = self.inner.borrow();
376        f(&inner.network.config)
377    }
378
379    /// Create a listener in the simulation (used by SimNetworkProvider)
380    pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
381        let mut inner = self.inner.borrow_mut();
382        let listener_id = ListenerId(inner.network.next_listener_id);
383        inner.network.next_listener_id += 1;
384
385        inner.network.listeners.insert(
386            listener_id,
387            ListenerState {
388                id: listener_id,
389                addr,
390                pending_connections: VecDeque::new(),
391            },
392        );
393
394        Ok(listener_id)
395    }
396
397    /// Read data from connection's receive buffer (used by SimTcpStream)
398    pub(crate) fn read_from_connection(
399        &self,
400        connection_id: ConnectionId,
401        buf: &mut [u8],
402    ) -> SimulationResult<usize> {
403        let mut inner = self.inner.borrow_mut();
404
405        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
406            let mut bytes_read = 0;
407            while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
408                if let Some(byte) = connection.receive_buffer.pop_front() {
409                    buf[bytes_read] = byte;
410                    bytes_read += 1;
411                }
412            }
413            Ok(bytes_read)
414        } else {
415            Err(SimulationError::InvalidState(
416                "connection not found".to_string(),
417            ))
418        }
419    }
420
421    /// Write data to connection's receive buffer (used by SimTcpStream write operations)
422    pub(crate) fn write_to_connection(
423        &self,
424        connection_id: ConnectionId,
425        data: &[u8],
426    ) -> SimulationResult<()> {
427        let mut inner = self.inner.borrow_mut();
428
429        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
430            for &byte in data {
431                connection.receive_buffer.push_back(byte);
432            }
433            Ok(())
434        } else {
435            Err(SimulationError::InvalidState(
436                "connection not found".to_string(),
437            ))
438        }
439    }
440
441    /// Buffer data for ordered sending on a TCP connection.
442    ///
443    /// This method implements the core TCP ordering guarantee by ensuring that all
444    /// write operations on a single connection are processed in FIFO order.
445    pub(crate) fn buffer_send(
446        &self,
447        connection_id: ConnectionId,
448        data: Vec<u8>,
449    ) -> SimulationResult<()> {
450        tracing::debug!(
451            "buffer_send called for connection_id={} with {} bytes",
452            connection_id.0,
453            data.len()
454        );
455        let mut inner = self.inner.borrow_mut();
456
457        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
458            // Always add data to send buffer for TCP ordering
459            conn.send_buffer.push_back(data);
460            tracing::debug!(
461                "buffer_send: added data to send_buffer, new length: {}",
462                conn.send_buffer.len()
463            );
464
465            // If sender is not already active, start processing the buffer
466            if !conn.send_in_progress {
467                tracing::debug!(
468                    "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
469                );
470                conn.send_in_progress = true;
471
472                // Schedule immediate processing of the buffer
473                let scheduled_time = inner.current_time + std::time::Duration::ZERO;
474                let sequence = inner.next_sequence;
475                inner.next_sequence += 1;
476                let scheduled_event = ScheduledEvent::new(
477                    scheduled_time,
478                    Event::Network {
479                        connection_id: connection_id.0,
480                        operation: NetworkOperation::ProcessSendBuffer,
481                    },
482                    sequence,
483                );
484                inner.event_queue.schedule(scheduled_event);
485                tracing::debug!(
486                    "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
487                    sequence
488                );
489            } else {
490                tracing::debug!(
491                    "buffer_send: sender already in progress, not scheduling new event"
492                );
493            }
494
495            Ok(())
496        } else {
497            tracing::debug!(
498                "buffer_send: connection_id={} not found in connections table",
499                connection_id.0
500            );
501            Err(SimulationError::InvalidState(
502                "connection not found".to_string(),
503            ))
504        }
505    }
506
507    /// Create a bidirectional TCP connection pair for client-server communication.
508    ///
509    /// FDB Pattern (sim2.actor.cpp:1149-1175):
510    /// - Client connection stores server's real address as peer_address
511    /// - Server connection stores synthesized ephemeral address (random IP + port 40000-60000)
512    ///
513    /// This simulates real TCP behavior where servers see client ephemeral ports.
514    pub(crate) fn create_connection_pair(
515        &self,
516        client_addr: String,
517        server_addr: String,
518    ) -> SimulationResult<(ConnectionId, ConnectionId)> {
519        let mut inner = self.inner.borrow_mut();
520
521        let client_id = ConnectionId(inner.network.next_connection_id);
522        inner.network.next_connection_id += 1;
523
524        let server_id = ConnectionId(inner.network.next_connection_id);
525        inner.network.next_connection_id += 1;
526
527        // Capture current time to avoid borrow conflicts
528        let current_time = inner.current_time;
529
530        // Parse IP addresses for partition tracking
531        let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
532        let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
533
534        // FDB Pattern: Synthesize ephemeral address for server-side connection
535        // sim2.actor.cpp:1149-1175: randomInt(0,256) for IP offset, randomInt(40000,60000) for port
536        // Use thread-local sim_random_range for deterministic randomness
537        let ephemeral_peer_addr = match client_ip {
538            Some(std::net::IpAddr::V4(ipv4)) => {
539                let octets = ipv4.octets();
540                let ip_offset = sim_random_range(0u32..256) as u8;
541                let new_last_octet = octets[3].wrapping_add(ip_offset);
542                let ephemeral_ip =
543                    std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
544                let ephemeral_port = sim_random_range(40000u16..60000);
545                format!("{}:{}", ephemeral_ip, ephemeral_port)
546            }
547            Some(std::net::IpAddr::V6(ipv6)) => {
548                // For IPv6, just modify the last segment
549                let segments = ipv6.segments();
550                let mut new_segments = segments;
551                let ip_offset = sim_random_range(0u16..256);
552                new_segments[7] = new_segments[7].wrapping_add(ip_offset);
553                let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
554                let ephemeral_port = sim_random_range(40000u16..60000);
555                format!("[{}]:{}", ephemeral_ip, ephemeral_port)
556            }
557            None => {
558                // Fallback: use client address with random port
559                let ephemeral_port = sim_random_range(40000u16..60000);
560                format!("unknown:{}", ephemeral_port)
561            }
562        };
563
564        // Calculate send buffer capacity based on BDP (Bandwidth-Delay Product)
565        // Using a default of 64KB which is typical for TCP socket buffers
566        // In real simulations this could be: max_latency_ms * bandwidth_bytes_per_ms
567        const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; // 64KB
568
569        // Create paired connections
570        // Client stores server's real address as peer_address
571        inner.network.connections.insert(
572            client_id,
573            ConnectionState {
574                id: client_id,
575                addr: client_addr,
576                local_ip: client_ip,
577                remote_ip: server_ip,
578                peer_address: server_addr.clone(),
579                receive_buffer: VecDeque::new(),
580                paired_connection: Some(server_id),
581                send_buffer: VecDeque::new(),
582                send_in_progress: false,
583                next_send_time: current_time,
584                is_closed: false,
585                send_closed: false,
586                recv_closed: false,
587                is_cut: false,
588                cut_expiry: None,
589                close_reason: CloseReason::None,
590                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
591                send_delay: None,
592                recv_delay: None,
593                is_half_open: false,
594                half_open_error_at: None,
595                is_stable: false,
596                graceful_close_pending: false,
597                last_data_delivery_scheduled_at: None,
598                remote_fin_received: false,
599            },
600        );
601
602        // Server stores synthesized ephemeral address as peer_address
603        inner.network.connections.insert(
604            server_id,
605            ConnectionState {
606                id: server_id,
607                addr: server_addr,
608                local_ip: server_ip,
609                remote_ip: client_ip,
610                peer_address: ephemeral_peer_addr,
611                receive_buffer: VecDeque::new(),
612                paired_connection: Some(client_id),
613                send_buffer: VecDeque::new(),
614                send_in_progress: false,
615                next_send_time: current_time,
616                is_closed: false,
617                send_closed: false,
618                recv_closed: false,
619                is_cut: false,
620                cut_expiry: None,
621                close_reason: CloseReason::None,
622                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
623                send_delay: None,
624                recv_delay: None,
625                is_half_open: false,
626                half_open_error_at: None,
627                is_stable: false,
628                graceful_close_pending: false,
629                last_data_delivery_scheduled_at: None,
630                remote_fin_received: false,
631            },
632        );
633
634        Ok((client_id, server_id))
635    }
636
637    /// Register a waker for read operations
638    pub(crate) fn register_read_waker(
639        &self,
640        connection_id: ConnectionId,
641        waker: Waker,
642    ) -> SimulationResult<()> {
643        let mut inner = self.inner.borrow_mut();
644        let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
645        inner.wakers.read_wakers.insert(connection_id, waker);
646        tracing::debug!(
647            "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
648            connection_id.0,
649            is_replacement,
650            inner.wakers.read_wakers.len()
651        );
652        Ok(())
653    }
654
655    /// Register a waker for accept operations
656    pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
657        let mut inner = self.inner.borrow_mut();
658        // For simplicity, we'll use addr hash as listener ID for waker storage
659        use std::collections::hash_map::DefaultHasher;
660        use std::hash::{Hash, Hasher};
661        let mut hasher = DefaultHasher::new();
662        addr.hash(&mut hasher);
663        let listener_key = ListenerId(hasher.finish());
664
665        inner.wakers.listener_wakers.insert(listener_key, waker);
666        Ok(())
667    }
668
669    /// Store a pending connection for later accept() call
670    pub(crate) fn store_pending_connection(
671        &self,
672        addr: &str,
673        connection_id: ConnectionId,
674    ) -> SimulationResult<()> {
675        let mut inner = self.inner.borrow_mut();
676        inner
677            .network
678            .pending_connections
679            .insert(addr.to_string(), connection_id);
680
681        // Wake any accept() calls waiting for this connection
682        use std::collections::hash_map::DefaultHasher;
683        use std::hash::{Hash, Hasher};
684        let mut hasher = DefaultHasher::new();
685        addr.hash(&mut hasher);
686        let listener_key = ListenerId(hasher.finish());
687
688        if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
689            waker.wake();
690        }
691
692        Ok(())
693    }
694
695    /// Get a pending connection for accept() call
696    pub(crate) fn get_pending_connection(
697        &self,
698        addr: &str,
699    ) -> SimulationResult<Option<ConnectionId>> {
700        let mut inner = self.inner.borrow_mut();
701        Ok(inner.network.pending_connections.remove(addr))
702    }
703
704    /// Get the peer address for a connection.
705    ///
706    /// FDB Pattern (sim2.actor.cpp):
707    /// - For client-side connections: returns server's listening address
708    /// - For server-side connections: returns synthesized ephemeral address
709    ///
710    /// The returned address may not be connectable for server-side connections,
711    /// matching real TCP behavior where servers see client ephemeral ports.
712    pub(crate) fn get_connection_peer_address(
713        &self,
714        connection_id: ConnectionId,
715    ) -> Option<String> {
716        let inner = self.inner.borrow();
717        inner
718            .network
719            .connections
720            .get(&connection_id)
721            .map(|conn| conn.peer_address.clone())
722    }
723
724    /// Sleep for the specified duration in simulation time.
725    ///
726    /// Returns a future that will complete when the simulation time has advanced
727    /// by the specified duration.
728    #[instrument(skip(self))]
729    pub fn sleep(&self, duration: Duration) -> SleepFuture {
730        let task_id = self.generate_task_id();
731
732        // Apply buggified delay if enabled
733        let actual_duration = self.apply_buggified_delay(duration);
734
735        // Schedule a wake event for this task
736        self.schedule_event(Event::Timer { task_id }, actual_duration);
737
738        // Return a future that will be woken when the event is processed
739        SleepFuture::new(self.downgrade(), task_id)
740    }
741
742    /// Apply buggified delay to a duration if chaos is enabled.
743    fn apply_buggified_delay(&self, duration: Duration) -> Duration {
744        let inner = self.inner.borrow();
745        let chaos = &inner.network.config.chaos;
746
747        if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
748            return duration;
749        }
750
751        // 25% probability per FDB
752        if sim_random::<f64>() < chaos.buggified_delay_probability {
753            // Power-law distribution: pow(random01(), 1000) creates very skewed delays
754            let random_factor = sim_random::<f64>().powf(1000.0);
755            let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
756            tracing::trace!(
757                extra_delay_ms = extra_delay.as_millis(),
758                "Buggified delay applied"
759            );
760            duration + extra_delay
761        } else {
762            duration
763        }
764    }
765
766    /// Generate a unique task ID for sleep operations.
767    fn generate_task_id(&self) -> u64 {
768        let mut inner = self.inner.borrow_mut();
769        let task_id = inner.next_task_id;
770        inner.next_task_id += 1;
771        task_id
772    }
773
774    /// Wake all tasks associated with a connection
775    fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
776        if let Some(waker_list) = wakers.remove(&connection_id) {
777            for waker in waker_list {
778                waker.wake();
779            }
780        }
781    }
782
783    /// Check if a task has been awakened.
784    pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
785        let inner = self.inner.borrow();
786        Ok(inner.awakened_tasks.contains(&task_id))
787    }
788
789    /// Register a waker for a task.
790    pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
791        let mut inner = self.inner.borrow_mut();
792        inner.wakers.task_wakers.insert(task_id, waker);
793        Ok(())
794    }
795
796    /// Clear expired clogs and wake pending tasks (helper for use with SimInner)
797    fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
798        let now = inner.current_time;
799        let expired: Vec<ConnectionId> = inner
800            .network
801            .connection_clogs
802            .iter()
803            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
804            .collect();
805
806        for id in expired {
807            inner.network.connection_clogs.remove(&id);
808            Self::wake_all(&mut inner.wakers.clog_wakers, id);
809        }
810    }
811
812    /// Static event processor for simulation events.
813    #[instrument(skip(inner))]
814    fn process_event_with_inner(inner: &mut SimInner, event: Event) {
815        inner.events_processed += 1;
816
817        match event {
818            Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
819            Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
820            Event::Network {
821                connection_id,
822                operation,
823            } => Self::handle_network_event(inner, connection_id, operation),
824            Event::Storage { file_id, operation } => {
825                super::storage_ops::handle_storage_event(inner, file_id, operation)
826            }
827            Event::Shutdown => Self::handle_shutdown_event(inner),
828        }
829    }
830
831    /// Handle timer events - wake sleeping tasks.
832    fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
833        inner.awakened_tasks.insert(task_id);
834        if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
835            waker.wake();
836        }
837    }
838
839    /// Handle connection state change events.
840    fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
841        let connection_id = ConnectionId(id);
842
843        match state {
844            ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
845                // No action needed for these states
846            }
847            ConnectionStateChange::ClogClear => {
848                inner.network.connection_clogs.remove(&connection_id);
849                Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
850            }
851            ConnectionStateChange::ReadClogClear => {
852                inner.network.read_clogs.remove(&connection_id);
853                Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
854            }
855            ConnectionStateChange::PartitionRestore => {
856                Self::clear_expired_partitions(inner);
857            }
858            ConnectionStateChange::SendPartitionClear => {
859                Self::clear_expired_send_partitions(inner);
860            }
861            ConnectionStateChange::RecvPartitionClear => {
862                Self::clear_expired_recv_partitions(inner);
863            }
864            ConnectionStateChange::CutRestore => {
865                if let Some(conn) = inner.network.connections.get_mut(&connection_id)
866                    && conn.is_cut
867                {
868                    conn.is_cut = false;
869                    conn.cut_expiry = None;
870                    tracing::debug!("Connection {} restored via scheduled event", id);
871                    Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
872                }
873            }
874            ConnectionStateChange::HalfOpenError => {
875                tracing::debug!("Connection {} half-open error time reached", id);
876                if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
877                    waker.wake();
878                }
879                Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
880            }
881        }
882    }
883
884    /// Clear expired IP partitions.
885    fn clear_expired_partitions(inner: &mut SimInner) {
886        let now = inner.current_time;
887        let expired: Vec<_> = inner
888            .network
889            .ip_partitions
890            .iter()
891            .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
892            .collect();
893
894        for pair in expired {
895            inner.network.ip_partitions.remove(&pair);
896            tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
897        }
898    }
899
900    /// Clear expired send partitions.
901    fn clear_expired_send_partitions(inner: &mut SimInner) {
902        let now = inner.current_time;
903        let expired: Vec<_> = inner
904            .network
905            .send_partitions
906            .iter()
907            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
908            .collect();
909
910        for ip in expired {
911            inner.network.send_partitions.remove(&ip);
912            tracing::debug!("Cleared send partition for {}", ip);
913        }
914    }
915
916    /// Clear expired receive partitions.
917    fn clear_expired_recv_partitions(inner: &mut SimInner) {
918        let now = inner.current_time;
919        let expired: Vec<_> = inner
920            .network
921            .recv_partitions
922            .iter()
923            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
924            .collect();
925
926        for ip in expired {
927            inner.network.recv_partitions.remove(&ip);
928            tracing::debug!("Cleared receive partition for {}", ip);
929        }
930    }
931
932    /// Handle network events (data delivery and send buffer processing).
933    fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
934        let connection_id = ConnectionId(conn_id);
935
936        match operation {
937            NetworkOperation::DataDelivery { data } => {
938                Self::handle_data_delivery(inner, connection_id, data);
939            }
940            NetworkOperation::ProcessSendBuffer => {
941                Self::handle_process_send_buffer(inner, connection_id);
942            }
943            NetworkOperation::FinDelivery => {
944                Self::handle_fin_delivery(inner, connection_id);
945            }
946        }
947    }
948
949    /// Handle data delivery to a connection's receive buffer.
950    fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
951        tracing::trace!(
952            "DataDelivery: {} bytes to connection {}",
953            data.len(),
954            connection_id.0
955        );
956
957        // Check connection exists and if it's stable
958        let is_stable = inner
959            .network
960            .connections
961            .get(&connection_id)
962            .is_some_and(|conn| conn.is_stable);
963
964        if !inner.network.connections.contains_key(&connection_id) {
965            tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
966            return;
967        }
968
969        // Apply bit flipping chaos (needs mutable inner access) - skip for stable connections
970        let data_to_deliver = if is_stable {
971            data
972        } else {
973            Self::maybe_corrupt_data(inner, &data)
974        };
975
976        // Now get connection reference and deliver data
977        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
978            return;
979        };
980
981        for &byte in &data_to_deliver {
982            conn.receive_buffer.push_back(byte);
983        }
984
985        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
986            waker.wake();
987        }
988    }
989
990    /// Handle FIN delivery to a connection's receive side (TCP half-close).
991    ///
992    /// Sets `remote_fin_received` on the receiving connection so that `poll_read`
993    /// returns EOF after the receive buffer is drained. Ignores FIN if the connection
994    /// was already aborted (stale event).
995    fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
996        tracing::debug!(
997            "FinDelivery: FIN received on connection {}",
998            connection_id.0
999        );
1000
1001        // If connection was aborted after FIN was scheduled, ignore the stale FIN
1002        let is_closed = inner
1003            .network
1004            .connections
1005            .get(&connection_id)
1006            .is_some_and(|conn| conn.is_closed);
1007
1008        if is_closed {
1009            tracing::debug!(
1010                "FinDelivery: connection {} already closed, ignoring stale FIN",
1011                connection_id.0
1012            );
1013            return;
1014        }
1015
1016        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1017            conn.remote_fin_received = true;
1018        }
1019
1020        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1021            waker.wake();
1022        }
1023    }
1024
1025    /// Schedule a FinDelivery event to the peer connection.
1026    ///
1027    /// The FIN is scheduled after the last DataDelivery event to ensure all data
1028    /// arrives at the peer before EOF is signaled.
1029    fn schedule_fin_delivery(
1030        inner: &mut SimInner,
1031        paired_id: Option<ConnectionId>,
1032        last_delivery_time: Option<Duration>,
1033    ) {
1034        let Some(peer_id) = paired_id else {
1035            return;
1036        };
1037
1038        // FIN must arrive after all DataDelivery events
1039        let fin_time = match last_delivery_time {
1040            Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
1041            _ => inner.current_time + Duration::from_nanos(1),
1042        };
1043
1044        let sequence = inner.next_sequence;
1045        inner.next_sequence += 1;
1046
1047        tracing::debug!(
1048            "Scheduling FinDelivery to connection {} at {:?}",
1049            peer_id.0,
1050            fin_time
1051        );
1052
1053        inner.event_queue.schedule(ScheduledEvent::new(
1054            fin_time,
1055            Event::Network {
1056                connection_id: peer_id.0,
1057                operation: NetworkOperation::FinDelivery,
1058            },
1059            sequence,
1060        ));
1061    }
1062
1063    /// Maybe corrupt data with bit flips based on chaos configuration.
1064    fn maybe_corrupt_data(inner: &mut SimInner, data: &[u8]) -> Vec<u8> {
1065        if data.is_empty() {
1066            return data.to_vec();
1067        }
1068
1069        let chaos = &inner.network.config.chaos;
1070        let now = inner.current_time;
1071        let cooldown_elapsed =
1072            now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
1073
1074        if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
1075            return data.to_vec();
1076        }
1077
1078        let random_value = sim_random::<u32>();
1079        let flip_count = SimInner::calculate_flip_bit_count(
1080            random_value,
1081            chaos.bit_flip_min_bits,
1082            chaos.bit_flip_max_bits,
1083        );
1084
1085        let mut corrupted_data = data.to_vec();
1086        let mut flipped_positions = std::collections::HashSet::new();
1087
1088        for _ in 0..flip_count {
1089            let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
1090            let bit_idx = (sim_random::<u64>() as usize) % 8;
1091            let position = (byte_idx, bit_idx);
1092
1093            if !flipped_positions.contains(&position) {
1094                flipped_positions.insert(position);
1095                corrupted_data[byte_idx] ^= 1 << bit_idx;
1096            }
1097        }
1098
1099        inner.last_bit_flip_time = now;
1100        tracing::info!(
1101            "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
1102            data.len(),
1103            flip_count,
1104            flipped_positions.len()
1105        );
1106
1107        corrupted_data
1108    }
1109
1110    /// Handle processing of a connection's send buffer.
1111    fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1112        let is_partitioned = inner
1113            .network
1114            .is_connection_partitioned(connection_id, inner.current_time);
1115
1116        if is_partitioned {
1117            Self::handle_partitioned_send(inner, connection_id);
1118        } else {
1119            Self::handle_normal_send(inner, connection_id);
1120        }
1121    }
1122
1123    /// Handle send when connection is partitioned.
1124    fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1125        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1126            return;
1127        };
1128
1129        if let Some(data) = conn.send_buffer.pop_front() {
1130            tracing::debug!(
1131                "Connection {} partitioned, failing send of {} bytes",
1132                connection_id.0,
1133                data.len()
1134            );
1135            Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1136
1137            if !conn.send_buffer.is_empty() {
1138                Self::schedule_process_send_buffer(inner, connection_id);
1139            } else {
1140                conn.send_in_progress = false;
1141                // Check for pending graceful close when pipeline drains
1142                if conn.graceful_close_pending {
1143                    conn.graceful_close_pending = false;
1144                    let peer_id = conn.paired_connection;
1145                    let last_time = conn.last_data_delivery_scheduled_at;
1146                    Self::schedule_fin_delivery(inner, peer_id, last_time);
1147                }
1148            }
1149        } else {
1150            conn.send_in_progress = false;
1151            // Check for pending graceful close when pipeline drains
1152            if conn.graceful_close_pending {
1153                conn.graceful_close_pending = false;
1154                let peer_id = conn.paired_connection;
1155                let last_time = conn.last_data_delivery_scheduled_at;
1156                Self::schedule_fin_delivery(inner, peer_id, last_time);
1157            }
1158        }
1159    }
1160
1161    /// Handle normal (non-partitioned) send.
1162    fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1163        // Extract connection info first
1164        let Some(conn) = inner.network.connections.get(&connection_id) else {
1165            return;
1166        };
1167
1168        let paired_id = conn.paired_connection;
1169        let send_delay = conn.send_delay;
1170        let next_send_time = conn.next_send_time;
1171        let has_data = !conn.send_buffer.is_empty();
1172        let is_stable = conn.is_stable; // For stable connection checks
1173
1174        let recv_delay = paired_id.and_then(|pid| {
1175            inner
1176                .network
1177                .connections
1178                .get(&pid)
1179                .and_then(|c| c.recv_delay)
1180        });
1181
1182        if !has_data {
1183            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1184                conn.send_in_progress = false;
1185                // Check for pending graceful close when pipeline drains
1186                if conn.graceful_close_pending {
1187                    conn.graceful_close_pending = false;
1188                    let peer_id = conn.paired_connection;
1189                    let last_time = conn.last_data_delivery_scheduled_at;
1190                    Self::schedule_fin_delivery(inner, peer_id, last_time);
1191                }
1192            }
1193            return;
1194        }
1195
1196        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1197            return;
1198        };
1199
1200        let Some(mut data) = conn.send_buffer.pop_front() else {
1201            conn.send_in_progress = false;
1202            return;
1203        };
1204
1205        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1206
1207        // BUGGIFY: Simulate partial/short writes (skip for stable connections)
1208        if !is_stable && crate::buggify!() && !data.is_empty() {
1209            let max_send = std::cmp::min(
1210                data.len(),
1211                inner.network.config.chaos.partial_write_max_bytes,
1212            );
1213            let truncate_to = sim_random_range(0..max_send + 1);
1214
1215            if truncate_to < data.len() {
1216                let remainder = data.split_off(truncate_to);
1217                conn.send_buffer.push_front(remainder);
1218                tracing::debug!(
1219                    "BUGGIFY: Partial write on connection {} - sending {} bytes",
1220                    connection_id.0,
1221                    data.len()
1222                );
1223            }
1224        }
1225
1226        let has_more = !conn.send_buffer.is_empty();
1227        let base_delay = if has_more {
1228            Duration::from_nanos(1)
1229        } else {
1230            send_delay.unwrap_or_else(|| {
1231                crate::network::sample_duration(&inner.network.config.write_latency)
1232            })
1233        };
1234
1235        let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1236        conn.next_send_time = earliest_time + Duration::from_nanos(1);
1237
1238        // Schedule data delivery to paired connection
1239        if let Some(paired_id) = paired_id {
1240            let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1241            let sequence = inner.next_sequence;
1242            inner.next_sequence += 1;
1243
1244            inner.event_queue.schedule(ScheduledEvent::new(
1245                scheduled_time,
1246                Event::Network {
1247                    connection_id: paired_id.0,
1248                    operation: NetworkOperation::DataDelivery { data },
1249                },
1250                sequence,
1251            ));
1252
1253            // Track delivery time for FIN ordering (must be after last DataDelivery)
1254            conn.last_data_delivery_scheduled_at = Some(scheduled_time);
1255        }
1256
1257        // Schedule next send if more data
1258        if !conn.send_buffer.is_empty() {
1259            Self::schedule_process_send_buffer(inner, connection_id);
1260        } else {
1261            conn.send_in_progress = false;
1262            // Check for pending graceful close when pipeline drains
1263            if conn.graceful_close_pending {
1264                conn.graceful_close_pending = false;
1265                let peer_id = conn.paired_connection;
1266                let last_time = conn.last_data_delivery_scheduled_at;
1267                Self::schedule_fin_delivery(inner, peer_id, last_time);
1268            }
1269        }
1270    }
1271
1272    /// Schedule a ProcessSendBuffer event for the given connection.
1273    fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1274        let sequence = inner.next_sequence;
1275        inner.next_sequence += 1;
1276
1277        inner.event_queue.schedule(ScheduledEvent::new(
1278            inner.current_time,
1279            Event::Network {
1280                connection_id: connection_id.0,
1281                operation: NetworkOperation::ProcessSendBuffer,
1282            },
1283            sequence,
1284        ));
1285    }
1286
1287    /// Handle shutdown event - wake all pending tasks.
1288    fn handle_shutdown_event(inner: &mut SimInner) {
1289        tracing::debug!("Processing Shutdown event - waking all pending tasks");
1290
1291        for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
1292            tracing::trace!("Waking task {}", task_id);
1293            waker.wake();
1294        }
1295
1296        for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
1297            waker.wake();
1298        }
1299
1300        tracing::debug!("Shutdown event processed");
1301    }
1302
1303    /// Get current assertion results for all tracked assertions.
1304    pub fn assertion_results(
1305        &self,
1306    ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1307        crate::chaos::get_assertion_results()
1308    }
1309
1310    /// Reset assertion statistics to empty state.
1311    pub fn reset_assertion_results(&self) {
1312        crate::chaos::reset_assertion_results();
1313    }
1314
1315    /// Extract simulation metrics for reporting.
1316    pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1317        let inner = self.inner.borrow();
1318
1319        crate::runner::SimulationMetrics {
1320            wall_time: std::time::Duration::ZERO,
1321            simulated_time: inner.current_time,
1322            events_processed: inner.events_processed,
1323        }
1324    }
1325
1326    // Phase 7: Simple write clogging methods
1327
1328    /// Check if a write should be clogged based on probability
1329    pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1330        let inner = self.inner.borrow();
1331        let config = &inner.network.config;
1332
1333        // Skip stable connections (FDB: stableConnection exempt from chaos)
1334        if inner
1335            .network
1336            .connections
1337            .get(&connection_id)
1338            .is_some_and(|conn| conn.is_stable)
1339        {
1340            return false;
1341        }
1342
1343        // Skip if already clogged
1344        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1345            return inner.current_time < clog_state.expires_at;
1346        }
1347
1348        // Check probability
1349        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1350    }
1351
1352    /// Clog a connection's write operations
1353    pub fn clog_write(&self, connection_id: ConnectionId) {
1354        let mut inner = self.inner.borrow_mut();
1355        let config = &inner.network.config;
1356
1357        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1358        let expires_at = inner.current_time + clog_duration;
1359        inner
1360            .network
1361            .connection_clogs
1362            .insert(connection_id, ClogState { expires_at });
1363
1364        // Schedule an event to clear this clog
1365        let clear_event = Event::Connection {
1366            id: connection_id.0,
1367            state: ConnectionStateChange::ClogClear,
1368        };
1369        let sequence = inner.next_sequence;
1370        inner.next_sequence += 1;
1371        inner
1372            .event_queue
1373            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1374    }
1375
1376    /// Check if a connection's writes are currently clogged
1377    pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1378        let inner = self.inner.borrow();
1379
1380        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1381            inner.current_time < clog_state.expires_at
1382        } else {
1383            false
1384        }
1385    }
1386
1387    /// Register a waker for when write clog clears
1388    pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1389        let mut inner = self.inner.borrow_mut();
1390        inner
1391            .wakers
1392            .clog_wakers
1393            .entry(connection_id)
1394            .or_default()
1395            .push(waker);
1396    }
1397
1398    // Read clogging methods (symmetric with write clogging)
1399
1400    /// Check if a read should be clogged based on probability
1401    pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1402        let inner = self.inner.borrow();
1403        let config = &inner.network.config;
1404
1405        // Skip stable connections (FDB: stableConnection exempt from chaos)
1406        if inner
1407            .network
1408            .connections
1409            .get(&connection_id)
1410            .is_some_and(|conn| conn.is_stable)
1411        {
1412            return false;
1413        }
1414
1415        // Skip if already clogged
1416        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1417            return inner.current_time < clog_state.expires_at;
1418        }
1419
1420        // Check probability (same as write clogging)
1421        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1422    }
1423
1424    /// Clog a connection's read operations
1425    pub fn clog_read(&self, connection_id: ConnectionId) {
1426        let mut inner = self.inner.borrow_mut();
1427        let config = &inner.network.config;
1428
1429        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1430        let expires_at = inner.current_time + clog_duration;
1431        inner
1432            .network
1433            .read_clogs
1434            .insert(connection_id, ClogState { expires_at });
1435
1436        // Schedule an event to clear this read clog
1437        let clear_event = Event::Connection {
1438            id: connection_id.0,
1439            state: ConnectionStateChange::ReadClogClear,
1440        };
1441        let sequence = inner.next_sequence;
1442        inner.next_sequence += 1;
1443        inner
1444            .event_queue
1445            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1446    }
1447
1448    /// Check if a connection's reads are currently clogged
1449    pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1450        let inner = self.inner.borrow();
1451
1452        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1453            inner.current_time < clog_state.expires_at
1454        } else {
1455            false
1456        }
1457    }
1458
1459    /// Register a waker for when read clog clears
1460    pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1461        let mut inner = self.inner.borrow_mut();
1462        inner
1463            .wakers
1464            .read_clog_wakers
1465            .entry(connection_id)
1466            .or_default()
1467            .push(waker);
1468    }
1469
1470    /// Clear expired clogs and wake pending tasks
1471    pub fn clear_expired_clogs(&self) {
1472        let mut inner = self.inner.borrow_mut();
1473        let now = inner.current_time;
1474        let expired: Vec<ConnectionId> = inner
1475            .network
1476            .connection_clogs
1477            .iter()
1478            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1479            .collect();
1480
1481        for id in expired {
1482            inner.network.connection_clogs.remove(&id);
1483            Self::wake_all(&mut inner.wakers.clog_wakers, id);
1484        }
1485    }
1486
1487    // Connection Cut Methods (temporary network outage simulation)
1488
1489    /// Temporarily cut a connection for the specified duration.
1490    ///
1491    /// Unlike `close_connection`, a cut connection will be automatically restored
1492    /// after the duration expires. This simulates temporary network outages where
1493    /// the underlying connection remains but is temporarily unavailable.
1494    ///
1495    /// During a cut:
1496    /// - `poll_read` returns `Poll::Pending` (waits for restore)
1497    /// - `poll_write` returns `Poll::Pending` (waits for restore)
1498    /// - Buffered data is preserved
1499    pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1500        let mut inner = self.inner.borrow_mut();
1501        let expires_at = inner.current_time + duration;
1502
1503        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1504            conn.is_cut = true;
1505            conn.cut_expiry = Some(expires_at);
1506            tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1507
1508            // Schedule restoration event
1509            let restore_event = Event::Connection {
1510                id: connection_id.0,
1511                state: ConnectionStateChange::CutRestore,
1512            };
1513            let sequence = inner.next_sequence;
1514            inner.next_sequence += 1;
1515            inner
1516                .event_queue
1517                .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1518        }
1519    }
1520
1521    /// Check if a connection is temporarily cut.
1522    ///
1523    /// A cut connection is temporarily unavailable but will be restored.
1524    /// This is different from `is_connection_closed` which indicates permanent closure.
1525    pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1526        let inner = self.inner.borrow();
1527        inner
1528            .network
1529            .connections
1530            .get(&connection_id)
1531            .is_some_and(|conn| {
1532                conn.is_cut
1533                    && conn
1534                        .cut_expiry
1535                        .is_some_and(|expiry| inner.current_time < expiry)
1536            })
1537    }
1538
1539    /// Restore a cut connection immediately.
1540    ///
1541    /// This cancels the cut state and wakes any tasks waiting for restoration.
1542    pub fn restore_connection(&self, connection_id: ConnectionId) {
1543        let mut inner = self.inner.borrow_mut();
1544
1545        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1546            && conn.is_cut
1547        {
1548            conn.is_cut = false;
1549            conn.cut_expiry = None;
1550            tracing::debug!("Connection {} restored", connection_id.0);
1551
1552            // Wake any tasks waiting for restoration
1553            Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1554        }
1555    }
1556
1557    /// Register a waker for when a cut connection is restored.
1558    pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1559        let mut inner = self.inner.borrow_mut();
1560        inner
1561            .wakers
1562            .cut_wakers
1563            .entry(connection_id)
1564            .or_default()
1565            .push(waker);
1566    }
1567
1568    // Send buffer management methods
1569
1570    /// Get the send buffer capacity for a connection.
1571    pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1572        let inner = self.inner.borrow();
1573        inner
1574            .network
1575            .connections
1576            .get(&connection_id)
1577            .map(|conn| conn.send_buffer_capacity)
1578            .unwrap_or(0)
1579    }
1580
1581    /// Get the current send buffer usage for a connection.
1582    pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1583        let inner = self.inner.borrow();
1584        inner
1585            .network
1586            .connections
1587            .get(&connection_id)
1588            .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1589            .unwrap_or(0)
1590    }
1591
1592    /// Get the available send buffer space for a connection.
1593    pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1594        let capacity = self.send_buffer_capacity(connection_id);
1595        let used = self.send_buffer_used(connection_id);
1596        capacity.saturating_sub(used)
1597    }
1598
1599    /// Register a waker for when send buffer space becomes available.
1600    pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1601        let mut inner = self.inner.borrow_mut();
1602        inner
1603            .wakers
1604            .send_buffer_wakers
1605            .entry(connection_id)
1606            .or_default()
1607            .push(waker);
1608    }
1609
1610    /// Wake any tasks waiting for send buffer space on a connection.
1611    #[allow(dead_code)] // May be used for external buffer management
1612    fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1613        let mut inner = self.inner.borrow_mut();
1614        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1615    }
1616
1617    // Per-IP-pair base latency methods
1618
1619    /// Get the base latency for a connection pair.
1620    /// Returns the latency if already set, otherwise None.
1621    pub fn get_pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1622        let inner = self.inner.borrow();
1623        inner.network.pair_latencies.get(&(src, dst)).copied()
1624    }
1625
1626    /// Set the base latency for a connection pair if not already set.
1627    /// Returns the latency (existing or newly set).
1628    pub fn set_pair_latency_if_not_set(
1629        &self,
1630        src: IpAddr,
1631        dst: IpAddr,
1632        latency: Duration,
1633    ) -> Duration {
1634        let mut inner = self.inner.borrow_mut();
1635        *inner
1636            .network
1637            .pair_latencies
1638            .entry((src, dst))
1639            .or_insert_with(|| {
1640                tracing::debug!(
1641                    "Setting base latency for IP pair {} -> {} to {:?}",
1642                    src,
1643                    dst,
1644                    latency
1645                );
1646                latency
1647            })
1648    }
1649
1650    /// Get the base latency for a connection based on its IP pair.
1651    /// If not set, samples from config and sets it.
1652    pub fn get_connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1653        let inner = self.inner.borrow();
1654        let (local_ip, remote_ip) = inner
1655            .network
1656            .connections
1657            .get(&connection_id)
1658            .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1659            .unwrap_or({
1660                (
1661                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1662                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1663                )
1664            });
1665        drop(inner);
1666
1667        // Check if latency is already set
1668        if let Some(latency) = self.get_pair_latency(local_ip, remote_ip) {
1669            return latency;
1670        }
1671
1672        // Sample a new latency from config and set it
1673        let latency = self
1674            .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1675        self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1676    }
1677
1678    // Per-connection asymmetric delay methods
1679
1680    /// Get the send delay for a connection.
1681    /// Returns the per-connection override if set, otherwise None.
1682    pub fn get_send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1683        let inner = self.inner.borrow();
1684        inner
1685            .network
1686            .connections
1687            .get(&connection_id)
1688            .and_then(|conn| conn.send_delay)
1689    }
1690
1691    /// Get the receive delay for a connection.
1692    /// Returns the per-connection override if set, otherwise None.
1693    pub fn get_recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1694        let inner = self.inner.borrow();
1695        inner
1696            .network
1697            .connections
1698            .get(&connection_id)
1699            .and_then(|conn| conn.recv_delay)
1700    }
1701
1702    /// Set asymmetric delays for a connection.
1703    /// If a delay is None, the global configuration is used instead.
1704    pub fn set_asymmetric_delays(
1705        &self,
1706        connection_id: ConnectionId,
1707        send_delay: Option<Duration>,
1708        recv_delay: Option<Duration>,
1709    ) {
1710        let mut inner = self.inner.borrow_mut();
1711        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1712            conn.send_delay = send_delay;
1713            conn.recv_delay = recv_delay;
1714            tracing::debug!(
1715                "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1716                connection_id.0,
1717                send_delay,
1718                recv_delay
1719            );
1720        }
1721    }
1722
1723    /// Check if a connection is permanently closed
1724    pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1725        let inner = self.inner.borrow();
1726        inner
1727            .network
1728            .connections
1729            .get(&connection_id)
1730            .is_some_and(|conn| conn.is_closed)
1731    }
1732
1733    /// Close a connection gracefully (FIN semantics).
1734    ///
1735    /// The peer will receive EOF on read operations.
1736    pub fn close_connection(&self, connection_id: ConnectionId) {
1737        self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1738    }
1739
1740    /// Close a connection abruptly (RST semantics).
1741    ///
1742    /// The peer will receive ECONNRESET on both read and write operations.
1743    pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1744        self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1745    }
1746
1747    /// Get the close reason for a connection.
1748    pub fn get_close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1749        let inner = self.inner.borrow();
1750        inner
1751            .network
1752            .connections
1753            .get(&connection_id)
1754            .map(|conn| conn.close_reason)
1755            .unwrap_or(CloseReason::None)
1756    }
1757
1758    /// Close a connection with a specific close reason.
1759    fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1760        match reason {
1761            CloseReason::Graceful => self.close_connection_graceful(connection_id),
1762            CloseReason::Aborted => self.close_connection_aborted(connection_id),
1763            CloseReason::None => {}
1764        }
1765    }
1766
1767    /// Graceful close (FIN semantics) — TCP half-close.
1768    ///
1769    /// Marks the local write side as closed. The peer can still read all buffered
1770    /// and in-flight data. A `FinDelivery` event is scheduled after the last
1771    /// `DataDelivery` to signal EOF to the peer.
1772    fn close_connection_graceful(&self, connection_id: ConnectionId) {
1773        let mut inner = self.inner.borrow_mut();
1774
1775        // Extract connection info
1776        let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
1777            (
1778                conn.paired_connection,
1779                conn.send_closed,
1780                conn.is_closed,
1781                conn.send_in_progress,
1782                conn.send_buffer.is_empty(),
1783                conn.last_data_delivery_scheduled_at,
1784            )
1785        });
1786
1787        let Some((
1788            paired_id,
1789            was_send_closed,
1790            was_closed,
1791            send_in_progress,
1792            send_buffer_empty,
1793            last_delivery_time,
1794        )) = conn_info
1795        else {
1796            return;
1797        };
1798
1799        // Idempotent: if already closed or send_closed (by chaos or previous close), skip
1800        if was_closed || was_send_closed {
1801            tracing::debug!(
1802                "Connection {} already closed/send_closed, skipping graceful close",
1803                connection_id.0
1804            );
1805            return;
1806        }
1807
1808        // Mark local side as closed with send side shut down
1809        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1810            conn.is_closed = true;
1811            conn.close_reason = CloseReason::Graceful;
1812            conn.send_closed = true;
1813            tracing::debug!(
1814                "Connection {} graceful close (FIN) - local write shut down",
1815                connection_id.0
1816            );
1817        }
1818
1819        // Wake local read waker (so local poll_read returns EOF if needed)
1820        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1821            waker.wake();
1822        }
1823
1824        // Do NOT set is_closed on the peer — they can still read buffered data.
1825        // Instead, schedule a FIN delivery after all data has been delivered.
1826        if send_in_progress || !send_buffer_empty {
1827            // Pipeline has data — defer FIN until pipeline drains
1828            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1829                conn.graceful_close_pending = true;
1830                tracing::debug!(
1831                    "Connection {} graceful close deferred (send pipeline active)",
1832                    connection_id.0
1833                );
1834            }
1835        } else {
1836            // Pipeline is idle — schedule FIN delivery now
1837            Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
1838        }
1839    }
1840
1841    /// Aborted close (RST semantics) — immediate connection kill.
1842    ///
1843    /// Immediately sets `is_closed` on both endpoints. The peer will receive
1844    /// ECONNRESET on both read and write operations.
1845    fn close_connection_aborted(&self, connection_id: ConnectionId) {
1846        let mut inner = self.inner.borrow_mut();
1847
1848        let paired_connection_id = inner
1849            .network
1850            .connections
1851            .get(&connection_id)
1852            .and_then(|conn| conn.paired_connection);
1853
1854        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1855            && !conn.is_closed
1856        {
1857            conn.is_closed = true;
1858            conn.close_reason = CloseReason::Aborted;
1859            // Cancel any pending graceful close
1860            conn.graceful_close_pending = false;
1861            tracing::debug!(
1862                "Connection {} closed permanently (reason: Aborted)",
1863                connection_id.0
1864            );
1865        }
1866
1867        if let Some(paired_id) = paired_connection_id
1868            && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1869            && !paired_conn.is_closed
1870        {
1871            paired_conn.is_closed = true;
1872            paired_conn.close_reason = CloseReason::Aborted;
1873            tracing::debug!(
1874                "Paired connection {} also closed (reason: Aborted)",
1875                paired_id.0
1876            );
1877        }
1878
1879        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1880            tracing::debug!(
1881                "Waking read waker for aborted connection {}",
1882                connection_id.0
1883            );
1884            waker.wake();
1885        }
1886
1887        if let Some(paired_id) = paired_connection_id
1888            && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1889        {
1890            tracing::debug!(
1891                "Waking read waker for paired aborted connection {}",
1892                paired_id.0
1893            );
1894            paired_waker.wake();
1895        }
1896    }
1897
1898    /// Close connection asymmetrically (FDB rollRandomClose pattern)
1899    pub fn close_connection_asymmetric(
1900        &self,
1901        connection_id: ConnectionId,
1902        close_send: bool,
1903        close_recv: bool,
1904    ) {
1905        let mut inner = self.inner.borrow_mut();
1906
1907        let paired_id = inner
1908            .network
1909            .connections
1910            .get(&connection_id)
1911            .and_then(|conn| conn.paired_connection);
1912
1913        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1914            conn.send_closed = true;
1915            conn.send_buffer.clear();
1916            tracing::debug!(
1917                "Connection {} send side closed (asymmetric)",
1918                connection_id.0
1919            );
1920        }
1921
1922        if close_recv
1923            && let Some(paired) = paired_id
1924            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1925        {
1926            paired_conn.recv_closed = true;
1927            tracing::debug!(
1928                "Connection {} recv side closed (asymmetric via peer)",
1929                paired.0
1930            );
1931        }
1932
1933        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1934            waker.wake();
1935        }
1936        if close_recv
1937            && let Some(paired) = paired_id
1938            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
1939        {
1940            waker.wake();
1941        }
1942    }
1943
1944    /// Roll random close chaos injection (FDB rollRandomClose pattern)
1945    pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
1946        let mut inner = self.inner.borrow_mut();
1947        let config = &inner.network.config;
1948
1949        // Skip stable connections (FDB: stableConnection exempt from chaos)
1950        if inner
1951            .network
1952            .connections
1953            .get(&connection_id)
1954            .is_some_and(|conn| conn.is_stable)
1955        {
1956            return None;
1957        }
1958
1959        if config.chaos.random_close_probability <= 0.0 {
1960            return None;
1961        }
1962
1963        let current_time = inner.current_time;
1964        let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
1965        if time_since_last < config.chaos.random_close_cooldown {
1966            return None;
1967        }
1968
1969        if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
1970            return None;
1971        }
1972
1973        inner.network.last_random_close_time = current_time;
1974
1975        let paired_id = inner
1976            .network
1977            .connections
1978            .get(&connection_id)
1979            .and_then(|conn| conn.paired_connection);
1980
1981        let a = super::rng::sim_random_f64();
1982        let close_recv = a < 0.66;
1983        let close_send = a > 0.33;
1984
1985        tracing::info!(
1986            "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
1987            connection_id.0,
1988            close_send,
1989            close_recv,
1990            a
1991        );
1992
1993        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1994            conn.send_closed = true;
1995            conn.send_buffer.clear();
1996        }
1997
1998        if close_recv
1999            && let Some(paired) = paired_id
2000            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
2001        {
2002            paired_conn.recv_closed = true;
2003        }
2004
2005        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
2006            waker.wake();
2007        }
2008        if close_recv
2009            && let Some(paired) = paired_id
2010            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
2011        {
2012            waker.wake();
2013        }
2014
2015        let b = super::rng::sim_random_f64();
2016        let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
2017
2018        tracing::debug!(
2019            "Random close explicit={} (b={:.3}, ratio={:.2})",
2020            explicit,
2021            b,
2022            inner.network.config.chaos.random_close_explicit_ratio
2023        );
2024
2025        Some(explicit)
2026    }
2027
2028    /// Check if a connection's send side is closed
2029    pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
2030        let inner = self.inner.borrow();
2031        inner
2032            .network
2033            .connections
2034            .get(&connection_id)
2035            .is_some_and(|conn| conn.send_closed || conn.is_closed)
2036    }
2037
2038    /// Check if a connection's receive side is closed
2039    pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
2040        let inner = self.inner.borrow();
2041        inner
2042            .network
2043            .connections
2044            .get(&connection_id)
2045            .is_some_and(|conn| conn.recv_closed || conn.is_closed)
2046    }
2047
2048    /// Check if a FIN has been received from the remote peer (graceful close).
2049    ///
2050    /// When true, `poll_read` should return EOF after draining the receive buffer.
2051    /// Distinct from `is_recv_closed` which is used for chaos/asymmetric closure.
2052    pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
2053        let inner = self.inner.borrow();
2054        inner
2055            .network
2056            .connections
2057            .get(&connection_id)
2058            .is_some_and(|conn| conn.remote_fin_received)
2059    }
2060
2061    // Half-Open Connection Simulation Methods
2062
2063    /// Simulate a peer crash on a connection.
2064    ///
2065    /// This puts the connection in a half-open state where:
2066    /// - The local side still thinks it's connected
2067    /// - Writes succeed but data is silently discarded (peer is gone)
2068    /// - Reads block waiting for data that will never come
2069    /// - After `error_delay`, both read and write return ECONNRESET
2070    ///
2071    /// This simulates real-world scenarios where a remote peer crashes
2072    /// or becomes unreachable, but the local TCP stack hasn't detected it yet.
2073    pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
2074        let mut inner = self.inner.borrow_mut();
2075        let current_time = inner.current_time;
2076        let error_at = current_time + error_delay;
2077
2078        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2079            conn.is_half_open = true;
2080            conn.half_open_error_at = Some(error_at);
2081
2082            // Clear the paired connection to simulate peer being gone
2083            // Data sent will go nowhere
2084            conn.paired_connection = None;
2085
2086            tracing::info!(
2087                "Connection {} now half-open, errors manifest at {:?}",
2088                connection_id.0,
2089                error_at
2090            );
2091        }
2092
2093        // Schedule an event to wake any waiting readers when error time arrives
2094        let wake_event = Event::Connection {
2095            id: connection_id.0,
2096            state: ConnectionStateChange::HalfOpenError,
2097        };
2098        let sequence = inner.next_sequence;
2099        inner.next_sequence += 1;
2100        let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
2101        inner.event_queue.schedule(scheduled_event);
2102    }
2103
2104    /// Check if a connection is in half-open state
2105    pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
2106        let inner = self.inner.borrow();
2107        inner
2108            .network
2109            .connections
2110            .get(&connection_id)
2111            .is_some_and(|conn| conn.is_half_open)
2112    }
2113
2114    /// Check if a half-open connection should return errors now
2115    pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
2116        let inner = self.inner.borrow();
2117        let current_time = inner.current_time;
2118        inner
2119            .network
2120            .connections
2121            .get(&connection_id)
2122            .is_some_and(|conn| {
2123                conn.is_half_open
2124                    && conn
2125                        .half_open_error_at
2126                        .is_some_and(|error_at| current_time >= error_at)
2127            })
2128    }
2129
2130    // Stable Connection Methods
2131
2132    /// Mark a connection as stable, exempting it from chaos injection.
2133    ///
2134    /// Stable connections are exempt from:
2135    /// - Random close (`roll_random_close`)
2136    /// - Write clogging
2137    /// - Read clogging
2138    /// - Bit flip corruption
2139    /// - Partial write truncation
2140    ///
2141    /// FDB ref: sim2.actor.cpp:357-362 (`stableConnection` flag)
2142    ///
2143    /// # Real-World Scenario
2144    /// Use this for parent-child process connections or supervision channels
2145    /// that should remain reliable even during chaos testing.
2146    pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
2147        let mut inner = self.inner.borrow_mut();
2148        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
2149            conn.is_stable = true;
2150            tracing::debug!("Connection {} marked as stable", connection_id.0);
2151
2152            // Also mark the paired connection as stable
2153            if let Some(paired_id) = conn.paired_connection
2154                && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
2155            {
2156                paired_conn.is_stable = true;
2157                tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
2158            }
2159        }
2160    }
2161
2162    /// Check if a connection is marked as stable.
2163    pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
2164        let inner = self.inner.borrow();
2165        inner
2166            .network
2167            .connections
2168            .get(&connection_id)
2169            .is_some_and(|conn| conn.is_stable)
2170    }
2171
2172    // Network Partition Control Methods
2173
2174    /// Partition communication between two IP addresses for a specified duration
2175    pub fn partition_pair(
2176        &self,
2177        from_ip: std::net::IpAddr,
2178        to_ip: std::net::IpAddr,
2179        duration: Duration,
2180    ) -> SimulationResult<()> {
2181        let mut inner = self.inner.borrow_mut();
2182        let expires_at = inner.current_time + duration;
2183
2184        inner
2185            .network
2186            .ip_partitions
2187            .insert((from_ip, to_ip), PartitionState { expires_at });
2188
2189        let restore_event = Event::Connection {
2190            id: 0,
2191            state: ConnectionStateChange::PartitionRestore,
2192        };
2193        let sequence = inner.next_sequence;
2194        inner.next_sequence += 1;
2195        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2196        inner.event_queue.schedule(scheduled_event);
2197
2198        tracing::debug!(
2199            "Partitioned {} -> {} until {:?}",
2200            from_ip,
2201            to_ip,
2202            expires_at
2203        );
2204        Ok(())
2205    }
2206
2207    /// Block all outgoing communication from an IP address
2208    pub fn partition_send_from(
2209        &self,
2210        ip: std::net::IpAddr,
2211        duration: Duration,
2212    ) -> SimulationResult<()> {
2213        let mut inner = self.inner.borrow_mut();
2214        let expires_at = inner.current_time + duration;
2215
2216        inner.network.send_partitions.insert(ip, expires_at);
2217
2218        let clear_event = Event::Connection {
2219            id: 0,
2220            state: ConnectionStateChange::SendPartitionClear,
2221        };
2222        let sequence = inner.next_sequence;
2223        inner.next_sequence += 1;
2224        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2225        inner.event_queue.schedule(scheduled_event);
2226
2227        tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
2228        Ok(())
2229    }
2230
2231    /// Block all incoming communication to an IP address
2232    pub fn partition_recv_to(
2233        &self,
2234        ip: std::net::IpAddr,
2235        duration: Duration,
2236    ) -> SimulationResult<()> {
2237        let mut inner = self.inner.borrow_mut();
2238        let expires_at = inner.current_time + duration;
2239
2240        inner.network.recv_partitions.insert(ip, expires_at);
2241
2242        let clear_event = Event::Connection {
2243            id: 0,
2244            state: ConnectionStateChange::RecvPartitionClear,
2245        };
2246        let sequence = inner.next_sequence;
2247        inner.next_sequence += 1;
2248        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
2249        inner.event_queue.schedule(scheduled_event);
2250
2251        tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
2252        Ok(())
2253    }
2254
2255    /// Immediately restore communication between two IP addresses
2256    pub fn restore_partition(
2257        &self,
2258        from_ip: std::net::IpAddr,
2259        to_ip: std::net::IpAddr,
2260    ) -> SimulationResult<()> {
2261        let mut inner = self.inner.borrow_mut();
2262        inner.network.ip_partitions.remove(&(from_ip, to_ip));
2263        tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
2264        Ok(())
2265    }
2266
2267    /// Check if communication between two IP addresses is currently partitioned
2268    pub fn is_partitioned(
2269        &self,
2270        from_ip: std::net::IpAddr,
2271        to_ip: std::net::IpAddr,
2272    ) -> SimulationResult<bool> {
2273        let inner = self.inner.borrow();
2274        Ok(inner
2275            .network
2276            .is_partitioned(from_ip, to_ip, inner.current_time))
2277    }
2278
2279    /// Helper method for use with SimInner - randomly trigger partitions
2280    ///
2281    /// Supports different partition strategies based on configuration:
2282    /// - Random: randomly partition individual IP pairs
2283    /// - UniformSize: create uniform-sized partition groups
2284    /// - IsolateSingle: isolate exactly one node from the rest
2285    fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
2286        let partition_config = &inner.network.config;
2287
2288        if partition_config.chaos.partition_probability == 0.0 {
2289            return;
2290        }
2291
2292        // Check if we should trigger a partition this step
2293        if sim_random::<f64>() >= partition_config.chaos.partition_probability {
2294            return;
2295        }
2296
2297        // Collect unique IPs from connections
2298        let unique_ips: HashSet<IpAddr> = inner
2299            .network
2300            .connections
2301            .values()
2302            .filter_map(|conn| conn.local_ip)
2303            .collect();
2304
2305        if unique_ips.len() < 2 {
2306            return; // Need at least 2 IPs to partition
2307        }
2308
2309        let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
2310        let partition_duration =
2311            crate::network::sample_duration(&partition_config.chaos.partition_duration);
2312        let expires_at = inner.current_time + partition_duration;
2313
2314        // Select IPs to partition based on strategy
2315        let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
2316            PartitionStrategy::Random => {
2317                // Original behavior: randomly decide for each IP
2318                ip_list
2319                    .iter()
2320                    .filter(|_| sim_random::<f64>() < 0.5)
2321                    .copied()
2322                    .collect()
2323            }
2324            PartitionStrategy::UniformSize => {
2325                // TigerBeetle-style: random partition size from 1 to n-1
2326                let partition_size = sim_random_range(1..ip_list.len());
2327                // Shuffle and take first N IPs
2328                let mut shuffled = ip_list.clone();
2329                // Simple Fisher-Yates shuffle
2330                for i in (1..shuffled.len()).rev() {
2331                    let j = sim_random_range(0..i + 1);
2332                    shuffled.swap(i, j);
2333                }
2334                shuffled.into_iter().take(partition_size).collect()
2335            }
2336            PartitionStrategy::IsolateSingle => {
2337                // Isolate exactly one node
2338                let idx = sim_random_range(0..ip_list.len());
2339                vec![ip_list[idx]]
2340            }
2341        };
2342
2343        // Don't partition if we selected all IPs or none
2344        if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
2345            return;
2346        }
2347
2348        // Create bi-directional partitions between partitioned and non-partitioned groups
2349        let non_partitioned: Vec<IpAddr> = ip_list
2350            .iter()
2351            .filter(|ip| !partitioned_ips.contains(ip))
2352            .copied()
2353            .collect();
2354
2355        for &from_ip in &partitioned_ips {
2356            for &to_ip in &non_partitioned {
2357                // Skip if already partitioned
2358                if inner
2359                    .network
2360                    .is_partitioned(from_ip, to_ip, inner.current_time)
2361                {
2362                    continue;
2363                }
2364
2365                // Partition in both directions
2366                inner
2367                    .network
2368                    .ip_partitions
2369                    .insert((from_ip, to_ip), PartitionState { expires_at });
2370                inner
2371                    .network
2372                    .ip_partitions
2373                    .insert((to_ip, from_ip), PartitionState { expires_at });
2374
2375                tracing::debug!(
2376                    "Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
2377                    from_ip,
2378                    to_ip,
2379                    expires_at,
2380                    partition_config.chaos.partition_strategy
2381                );
2382            }
2383        }
2384
2385        // Schedule restoration event
2386        let restore_event = Event::Connection {
2387            id: 0,
2388            state: ConnectionStateChange::PartitionRestore,
2389        };
2390        let sequence = inner.next_sequence;
2391        inner.next_sequence += 1;
2392        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2393        inner.event_queue.schedule(scheduled_event);
2394    }
2395}
2396
2397impl Default for SimWorld {
2398    fn default() -> Self {
2399        Self::new()
2400    }
2401}
2402
2403/// A weak reference to a simulation world.
2404///
2405/// This provides handle-based access to the simulation without holding
2406/// a strong reference that would prevent cleanup.
2407#[derive(Debug)]
2408pub struct WeakSimWorld {
2409    pub(crate) inner: Weak<RefCell<SimInner>>,
2410}
2411
2412/// Macro to generate WeakSimWorld forwarding methods that wrap SimWorld results.
2413macro_rules! weak_forward {
2414    // For methods returning T that need Ok() wrapping
2415    (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2416        $(#[$meta])*
2417        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2418            Ok(self.upgrade()?.$method($($arg),*))
2419        }
2420    };
2421    // For methods already returning SimulationResult
2422    (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2423        $(#[$meta])*
2424        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2425            self.upgrade()?.$method($($arg),*)
2426        }
2427    };
2428    // For methods returning () that need Ok(()) wrapping
2429    (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2430        $(#[$meta])*
2431        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2432            self.upgrade()?.$method($($arg),*);
2433            Ok(())
2434        }
2435    };
2436}
2437
2438impl WeakSimWorld {
2439    /// Attempts to upgrade this weak reference to a strong reference.
2440    pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2441        self.inner
2442            .upgrade()
2443            .map(|inner| SimWorld { inner })
2444            .ok_or(SimulationError::SimulationShutdown)
2445    }
2446
2447    weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2448    weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2449    weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2450    weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2451    weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2452    weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2453    weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2454    weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2455    weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2456    weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2457    weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2458
2459    /// Access network configuration for latency calculations.
2460    pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2461    where
2462        F: FnOnce(&NetworkConfiguration) -> R,
2463    {
2464        Ok(self.upgrade()?.with_network_config(f))
2465    }
2466}
2467
2468impl Clone for WeakSimWorld {
2469    fn clone(&self) -> Self {
2470        Self {
2471            inner: self.inner.clone(),
2472        }
2473    }
2474}
2475
2476#[cfg(test)]
2477mod tests {
2478    use super::*;
2479
2480    #[test]
2481    fn sim_world_basic_lifecycle() {
2482        let mut sim = SimWorld::new();
2483
2484        // Initial state
2485        assert_eq!(sim.current_time(), Duration::ZERO);
2486        assert!(!sim.has_pending_events());
2487        assert_eq!(sim.pending_event_count(), 0);
2488
2489        // Schedule an event
2490        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2491
2492        assert!(sim.has_pending_events());
2493        assert_eq!(sim.pending_event_count(), 1);
2494        assert_eq!(sim.current_time(), Duration::ZERO);
2495
2496        // Process the event
2497        let has_more = sim.step();
2498        assert!(!has_more);
2499        assert_eq!(sim.current_time(), Duration::from_millis(100));
2500        assert!(!sim.has_pending_events());
2501        assert_eq!(sim.pending_event_count(), 0);
2502    }
2503
2504    #[test]
2505    fn sim_world_multiple_events() {
2506        let mut sim = SimWorld::new();
2507
2508        // Schedule multiple events
2509        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2510        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2511        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2512
2513        assert_eq!(sim.pending_event_count(), 3);
2514
2515        // Process events - should happen in time order
2516        assert!(sim.step());
2517        assert_eq!(sim.current_time(), Duration::from_millis(100));
2518        assert_eq!(sim.pending_event_count(), 2);
2519
2520        assert!(sim.step());
2521        assert_eq!(sim.current_time(), Duration::from_millis(200));
2522        assert_eq!(sim.pending_event_count(), 1);
2523
2524        assert!(!sim.step());
2525        assert_eq!(sim.current_time(), Duration::from_millis(300));
2526        assert_eq!(sim.pending_event_count(), 0);
2527    }
2528
2529    #[test]
2530    fn sim_world_run_until_empty() {
2531        let mut sim = SimWorld::new();
2532
2533        // Schedule multiple events
2534        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2535        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2536        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2537
2538        // Run until all events are processed
2539        sim.run_until_empty();
2540
2541        assert_eq!(sim.current_time(), Duration::from_millis(300));
2542        assert!(!sim.has_pending_events());
2543    }
2544
2545    #[test]
2546    fn sim_world_schedule_at_specific_time() {
2547        let mut sim = SimWorld::new();
2548
2549        // Schedule event at specific time
2550        sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2551
2552        assert_eq!(sim.current_time(), Duration::ZERO);
2553
2554        sim.step();
2555
2556        assert_eq!(sim.current_time(), Duration::from_millis(500));
2557    }
2558
2559    #[test]
2560    fn weak_sim_world_lifecycle() {
2561        let sim = SimWorld::new();
2562        let weak = sim.downgrade();
2563
2564        // Can upgrade and use weak reference
2565        assert_eq!(
2566            weak.current_time().expect("should get time"),
2567            Duration::ZERO
2568        );
2569
2570        // Schedule event through weak reference
2571        weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2572            .expect("should schedule event");
2573
2574        // Verify event was scheduled
2575        assert!(sim.has_pending_events());
2576
2577        // Drop the original simulation
2578        drop(sim);
2579
2580        // Weak reference should now fail
2581        assert_eq!(
2582            weak.current_time(),
2583            Err(SimulationError::SimulationShutdown)
2584        );
2585        assert_eq!(
2586            weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2587            Err(SimulationError::SimulationShutdown)
2588        );
2589    }
2590
2591    #[test]
2592    fn deterministic_event_ordering() {
2593        let mut sim = SimWorld::new();
2594
2595        // Schedule events at the same time
2596        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2597        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2598        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2599
2600        // All events are at the same time, but should be processed in sequence order
2601        assert!(sim.step());
2602        assert_eq!(sim.current_time(), Duration::from_millis(100));
2603        assert!(sim.step());
2604        assert_eq!(sim.current_time(), Duration::from_millis(100));
2605        assert!(!sim.step());
2606        assert_eq!(sim.current_time(), Duration::from_millis(100));
2607    }
2608}