moonpool_sim/sim/
world.rs

1//! Core simulation world and coordination logic.
2//!
3//! This module provides the central SimWorld coordinator that manages time,
4//! event processing, and network simulation state.
5
6use std::{
7    cell::RefCell,
8    collections::{HashMap, HashSet, VecDeque},
9    net::IpAddr,
10    rc::{Rc, Weak},
11    task::Waker,
12    time::Duration,
13};
14use tracing::instrument;
15
16use crate::{
17    SimulationError, SimulationResult,
18    network::{
19        NetworkConfiguration,
20        sim::{ConnectionId, ListenerId, SimNetworkProvider},
21    },
22};
23
24use super::{
25    events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
26    rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
27    sleep::SleepFuture,
28    state::{ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState},
29    wakers::WakerRegistry,
30};
31
32/// Internal simulation state holder
33#[derive(Debug)]
34pub(crate) struct SimInner {
35    pub(crate) current_time: Duration,
36    /// Drifted timer time (can be ahead of current_time)
37    /// FDB ref: sim2.actor.cpp:1058-1064 - timer() drifts 0-0.1s ahead of now()
38    pub(crate) timer_time: Duration,
39    pub(crate) event_queue: EventQueue,
40    pub(crate) next_sequence: u64,
41
42    // Network management
43    pub(crate) network: NetworkState,
44
45    // Async coordination
46    pub(crate) wakers: WakerRegistry,
47
48    // Task management for sleep functionality
49    pub(crate) next_task_id: u64,
50    pub(crate) awakened_tasks: HashSet<u64>,
51
52    // Event processing metrics
53    pub(crate) events_processed: u64,
54
55    // Chaos tracking
56    pub(crate) last_bit_flip_time: Duration,
57}
58
59impl SimInner {
60    pub(crate) fn new() -> Self {
61        Self {
62            current_time: Duration::ZERO,
63            timer_time: Duration::ZERO,
64            event_queue: EventQueue::new(),
65            next_sequence: 0,
66            network: NetworkState::new(NetworkConfiguration::default()),
67            wakers: WakerRegistry::default(),
68            next_task_id: 0,
69            awakened_tasks: HashSet::new(),
70            events_processed: 0,
71            last_bit_flip_time: Duration::ZERO,
72        }
73    }
74
75    pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
76        Self {
77            current_time: Duration::ZERO,
78            timer_time: Duration::ZERO,
79            event_queue: EventQueue::new(),
80            next_sequence: 0,
81            network: NetworkState::new(network_config),
82            wakers: WakerRegistry::default(),
83            next_task_id: 0,
84            awakened_tasks: HashSet::new(),
85            events_processed: 0,
86            last_bit_flip_time: Duration::ZERO,
87        }
88    }
89
90    /// Calculate the number of bits to flip using a power-law distribution.
91    ///
92    /// Uses the formula: 32 - floor(log2(random_value))
93    /// This creates a power-law distribution biased toward fewer bits:
94    /// - 1-2 bits: very common
95    /// - 16 bits: uncommon
96    /// - 32 bits: very rare
97    ///
98    /// Matches FDB's approach in FlowTransport.actor.cpp:1297
99    pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
100        if random_value == 0 {
101            // Handle edge case: treat 0 as if it were 1
102            return max_bits.min(32);
103        }
104
105        // Formula: 32 - floor(log2(x)) = 1 + leading_zeros(x)
106        let bit_count = 1 + random_value.leading_zeros();
107
108        // Clamp to configured range
109        bit_count.clamp(min_bits, max_bits)
110    }
111}
112
113/// The central simulation coordinator that manages time and event processing.
114///
115/// `SimWorld` owns all mutable simulation state and provides the main interface
116/// for scheduling events and advancing simulation time. It uses a centralized
117/// ownership model with handle-based access to avoid borrow checker conflicts.
118#[derive(Debug)]
119pub struct SimWorld {
120    pub(crate) inner: Rc<RefCell<SimInner>>,
121}
122
123impl SimWorld {
124    /// Internal constructor that handles all initialization logic.
125    fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
126        reset_sim_rng();
127        set_sim_seed(seed);
128        crate::chaos::assertions::reset_assertion_results();
129
130        let inner = match network_config {
131            Some(config) => SimInner::new_with_config(config),
132            None => SimInner::new(),
133        };
134
135        Self {
136            inner: Rc::new(RefCell::new(inner)),
137        }
138    }
139
140    /// Creates a new simulation world with default network configuration.
141    ///
142    /// Uses default seed (0) for reproducible testing. For custom seeds,
143    /// use [`SimWorld::new_with_seed`].
144    pub fn new() -> Self {
145        Self::create(None, 0)
146    }
147
148    /// Creates a new simulation world with a specific seed for deterministic randomness.
149    ///
150    /// This method ensures clean thread-local RNG state by resetting before
151    /// setting the seed, making it safe for consecutive simulations on the same thread.
152    ///
153    /// # Parameters
154    ///
155    /// * `seed` - The seed value for deterministic randomness
156    pub fn new_with_seed(seed: u64) -> Self {
157        Self::create(None, seed)
158    }
159
160    /// Creates a new simulation world with custom network configuration.
161    pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
162        Self::create(Some(network_config), 0)
163    }
164
165    /// Creates a new simulation world with both custom network configuration and seed.
166    ///
167    /// # Parameters
168    ///
169    /// * `network_config` - Network configuration for latency and fault simulation
170    /// * `seed` - The seed value for deterministic randomness
171    pub fn new_with_network_config_and_seed(
172        network_config: NetworkConfiguration,
173        seed: u64,
174    ) -> Self {
175        Self::create(Some(network_config), seed)
176    }
177
178    /// Processes the next scheduled event and advances time.
179    ///
180    /// Returns `true` if more events are available for processing,
181    /// `false` if this was the last event or if no events are available.
182    #[instrument(skip(self))]
183    pub fn step(&mut self) -> bool {
184        let mut inner = self.inner.borrow_mut();
185
186        if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
187            // Advance logical time to event timestamp
188            inner.current_time = scheduled_event.time();
189
190            // Phase 7: Clear expired clogs after time advancement
191            Self::clear_expired_clogs_with_inner(&mut inner);
192
193            // Trigger random partitions based on configuration
194            Self::randomly_trigger_partitions_with_inner(&mut inner);
195
196            // Process the event with the mutable reference
197            Self::process_event_with_inner(&mut inner, scheduled_event.into_event());
198
199            // Return true if more events are available
200            !inner.event_queue.is_empty()
201        } else {
202            // No more events to process
203            false
204        }
205    }
206
207    /// Processes all scheduled events until the queue is empty or only infrastructure events remain.
208    ///
209    /// This method processes all workload-related events but stops early if only infrastructure
210    /// events (like connection restoration) remain. This prevents infinite loops where
211    /// infrastructure events keep the simulation running indefinitely after workloads complete.
212    #[instrument(skip(self))]
213    pub fn run_until_empty(&mut self) {
214        while self.step() {
215            // Periodically check if we should stop early (every 50 events for performance)
216            if self.inner.borrow().events_processed.is_multiple_of(50) {
217                let has_workload_events = !self
218                    .inner
219                    .borrow()
220                    .event_queue
221                    .has_only_infrastructure_events();
222                if !has_workload_events {
223                    tracing::debug!(
224                        "Early termination: only infrastructure events remain in queue"
225                    );
226                    break;
227                }
228            }
229        }
230    }
231
232    /// Returns the current simulation time.
233    pub fn current_time(&self) -> Duration {
234        self.inner.borrow().current_time
235    }
236
237    /// Returns the exact simulation time (equivalent to FDB's now()).
238    ///
239    /// This is the canonical simulation time used for scheduling events.
240    /// Use this for precise time comparisons and scheduling.
241    pub fn now(&self) -> Duration {
242        self.inner.borrow().current_time
243    }
244
245    /// Returns the drifted timer time (equivalent to FDB's timer()).
246    ///
247    /// The timer can be up to `clock_drift_max` (default 100ms) ahead of `now()`.
248    /// This simulates real-world clock drift between processes, which is important
249    /// for testing time-sensitive code like:
250    /// - Timeout handling
251    /// - Lease expiration
252    /// - Distributed consensus (leader election)
253    /// - Cache invalidation
254    /// - Heartbeat detection
255    ///
256    /// FDB formula: `timerTime += random01() * (time + 0.1 - timerTime) / 2.0`
257    ///
258    /// FDB ref: sim2.actor.cpp:1058-1064
259    pub fn timer(&self) -> Duration {
260        let mut inner = self.inner.borrow_mut();
261        let chaos = &inner.network.config.chaos;
262
263        // If clock drift is disabled, return exact simulation time
264        if !chaos.clock_drift_enabled {
265            return inner.current_time;
266        }
267
268        // FDB formula: timerTime += random01() * (time + 0.1 - timerTime) / 2.0
269        // This smoothly interpolates timerTime toward (time + drift_max)
270        // The /2.0 creates a damped approach (never overshoots)
271        let max_timer = inner.current_time + chaos.clock_drift_max;
272
273        // Only advance if timer is behind max
274        if inner.timer_time < max_timer {
275            let random_factor = sim_random::<f64>(); // 0.0 to 1.0
276            let gap = (max_timer - inner.timer_time).as_secs_f64();
277            let delta = random_factor * gap / 2.0;
278            inner.timer_time += Duration::from_secs_f64(delta);
279        }
280
281        // Ensure timer never goes backwards relative to simulation time
282        inner.timer_time = inner.timer_time.max(inner.current_time);
283
284        inner.timer_time
285    }
286
287    /// Schedules an event to execute after the specified delay from the current time.
288    #[instrument(skip(self))]
289    pub fn schedule_event(&self, event: Event, delay: Duration) {
290        let mut inner = self.inner.borrow_mut();
291        let scheduled_time = inner.current_time + delay;
292        let sequence = inner.next_sequence;
293        inner.next_sequence += 1;
294
295        let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
296        inner.event_queue.schedule(scheduled_event);
297    }
298
299    /// Schedules an event to execute at the specified absolute time.
300    pub fn schedule_event_at(&self, event: Event, time: Duration) {
301        let mut inner = self.inner.borrow_mut();
302        let sequence = inner.next_sequence;
303        inner.next_sequence += 1;
304
305        let scheduled_event = ScheduledEvent::new(time, event, sequence);
306        inner.event_queue.schedule(scheduled_event);
307    }
308
309    /// Creates a weak reference to this simulation world.
310    ///
311    /// Weak references can be used to access the simulation without preventing
312    /// it from being dropped, enabling handle-based access patterns.
313    pub fn downgrade(&self) -> WeakSimWorld {
314        WeakSimWorld {
315            inner: Rc::downgrade(&self.inner),
316        }
317    }
318
319    /// Returns `true` if there are events waiting to be processed.
320    pub fn has_pending_events(&self) -> bool {
321        !self.inner.borrow().event_queue.is_empty()
322    }
323
324    /// Returns the number of events waiting to be processed.
325    pub fn pending_event_count(&self) -> usize {
326        self.inner.borrow().event_queue.len()
327    }
328
329    /// Create a network provider for this simulation
330    pub fn network_provider(&self) -> SimNetworkProvider {
331        SimNetworkProvider::new(self.downgrade())
332    }
333
334    /// Create a time provider for this simulation
335    pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
336        crate::providers::SimTimeProvider::new(self.downgrade())
337    }
338
339    /// Create a task provider for this simulation
340    pub fn task_provider(&self) -> crate::TokioTaskProvider {
341        crate::TokioTaskProvider
342    }
343
344    /// Access network configuration for latency calculations using thread-local RNG.
345    ///
346    /// This method provides access to the network configuration for calculating
347    /// latencies and other network parameters. Random values should be generated
348    /// using the thread-local RNG functions like `sim_random()`.
349    ///
350    /// Access the network configuration for this simulation.
351    pub fn with_network_config<F, R>(&self, f: F) -> R
352    where
353        F: FnOnce(&NetworkConfiguration) -> R,
354    {
355        let inner = self.inner.borrow();
356        f(&inner.network.config)
357    }
358
359    /// Create a listener in the simulation (used by SimNetworkProvider)
360    pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
361        let mut inner = self.inner.borrow_mut();
362        let listener_id = ListenerId(inner.network.next_listener_id);
363        inner.network.next_listener_id += 1;
364
365        inner.network.listeners.insert(
366            listener_id,
367            ListenerState {
368                id: listener_id,
369                addr,
370                pending_connections: VecDeque::new(),
371            },
372        );
373
374        Ok(listener_id)
375    }
376
377    /// Read data from connection's receive buffer (used by SimTcpStream)
378    pub(crate) fn read_from_connection(
379        &self,
380        connection_id: ConnectionId,
381        buf: &mut [u8],
382    ) -> SimulationResult<usize> {
383        let mut inner = self.inner.borrow_mut();
384
385        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
386            let mut bytes_read = 0;
387            while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
388                if let Some(byte) = connection.receive_buffer.pop_front() {
389                    buf[bytes_read] = byte;
390                    bytes_read += 1;
391                }
392            }
393            Ok(bytes_read)
394        } else {
395            Err(SimulationError::InvalidState(
396                "connection not found".to_string(),
397            ))
398        }
399    }
400
401    /// Write data to connection's receive buffer (used by SimTcpStream write operations)
402    pub(crate) fn write_to_connection(
403        &self,
404        connection_id: ConnectionId,
405        data: &[u8],
406    ) -> SimulationResult<()> {
407        let mut inner = self.inner.borrow_mut();
408
409        if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
410            for &byte in data {
411                connection.receive_buffer.push_back(byte);
412            }
413            Ok(())
414        } else {
415            Err(SimulationError::InvalidState(
416                "connection not found".to_string(),
417            ))
418        }
419    }
420
421    /// Buffer data for ordered sending on a TCP connection.
422    ///
423    /// This method implements the core TCP ordering guarantee by ensuring that all
424    /// write operations on a single connection are processed in FIFO order.
425    pub(crate) fn buffer_send(
426        &self,
427        connection_id: ConnectionId,
428        data: Vec<u8>,
429    ) -> SimulationResult<()> {
430        tracing::debug!(
431            "buffer_send called for connection_id={} with {} bytes",
432            connection_id.0,
433            data.len()
434        );
435        let mut inner = self.inner.borrow_mut();
436
437        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
438            // Always add data to send buffer for TCP ordering
439            conn.send_buffer.push_back(data);
440            tracing::debug!(
441                "buffer_send: added data to send_buffer, new length: {}",
442                conn.send_buffer.len()
443            );
444
445            // If sender is not already active, start processing the buffer
446            if !conn.send_in_progress {
447                tracing::debug!(
448                    "buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
449                );
450                conn.send_in_progress = true;
451
452                // Schedule immediate processing of the buffer
453                let scheduled_time = inner.current_time + std::time::Duration::ZERO;
454                let sequence = inner.next_sequence;
455                inner.next_sequence += 1;
456                let scheduled_event = ScheduledEvent::new(
457                    scheduled_time,
458                    Event::Network {
459                        connection_id: connection_id.0,
460                        operation: NetworkOperation::ProcessSendBuffer,
461                    },
462                    sequence,
463                );
464                inner.event_queue.schedule(scheduled_event);
465                tracing::debug!(
466                    "buffer_send: scheduled ProcessSendBuffer event with sequence {}",
467                    sequence
468                );
469            } else {
470                tracing::debug!(
471                    "buffer_send: sender already in progress, not scheduling new event"
472                );
473            }
474
475            Ok(())
476        } else {
477            tracing::debug!(
478                "buffer_send: connection_id={} not found in connections table",
479                connection_id.0
480            );
481            Err(SimulationError::InvalidState(
482                "connection not found".to_string(),
483            ))
484        }
485    }
486
487    /// Create a bidirectional TCP connection pair for client-server communication.
488    ///
489    /// FDB Pattern (sim2.actor.cpp:1149-1175):
490    /// - Client connection stores server's real address as peer_address
491    /// - Server connection stores synthesized ephemeral address (random IP + port 40000-60000)
492    ///
493    /// This simulates real TCP behavior where servers see client ephemeral ports.
494    pub(crate) fn create_connection_pair(
495        &self,
496        client_addr: String,
497        server_addr: String,
498    ) -> SimulationResult<(ConnectionId, ConnectionId)> {
499        let mut inner = self.inner.borrow_mut();
500
501        let client_id = ConnectionId(inner.network.next_connection_id);
502        inner.network.next_connection_id += 1;
503
504        let server_id = ConnectionId(inner.network.next_connection_id);
505        inner.network.next_connection_id += 1;
506
507        // Capture current time to avoid borrow conflicts
508        let current_time = inner.current_time;
509
510        // Parse IP addresses for partition tracking
511        let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
512        let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
513
514        // FDB Pattern: Synthesize ephemeral address for server-side connection
515        // sim2.actor.cpp:1149-1175: randomInt(0,256) for IP offset, randomInt(40000,60000) for port
516        // Use thread-local sim_random_range for deterministic randomness
517        let ephemeral_peer_addr = match client_ip {
518            Some(std::net::IpAddr::V4(ipv4)) => {
519                let octets = ipv4.octets();
520                let ip_offset = sim_random_range(0u32..256) as u8;
521                let new_last_octet = octets[3].wrapping_add(ip_offset);
522                let ephemeral_ip =
523                    std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
524                let ephemeral_port = sim_random_range(40000u16..60000);
525                format!("{}:{}", ephemeral_ip, ephemeral_port)
526            }
527            Some(std::net::IpAddr::V6(ipv6)) => {
528                // For IPv6, just modify the last segment
529                let segments = ipv6.segments();
530                let mut new_segments = segments;
531                let ip_offset = sim_random_range(0u16..256);
532                new_segments[7] = new_segments[7].wrapping_add(ip_offset);
533                let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
534                let ephemeral_port = sim_random_range(40000u16..60000);
535                format!("[{}]:{}", ephemeral_ip, ephemeral_port)
536            }
537            None => {
538                // Fallback: use client address with random port
539                let ephemeral_port = sim_random_range(40000u16..60000);
540                format!("unknown:{}", ephemeral_port)
541            }
542        };
543
544        // Calculate send buffer capacity based on BDP (Bandwidth-Delay Product)
545        // Using a default of 64KB which is typical for TCP socket buffers
546        // In real simulations this could be: max_latency_ms * bandwidth_bytes_per_ms
547        const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024; // 64KB
548
549        // Create paired connections
550        // Client stores server's real address as peer_address
551        inner.network.connections.insert(
552            client_id,
553            ConnectionState {
554                id: client_id,
555                addr: client_addr,
556                local_ip: client_ip,
557                remote_ip: server_ip,
558                peer_address: server_addr.clone(),
559                receive_buffer: VecDeque::new(),
560                paired_connection: Some(server_id),
561                send_buffer: VecDeque::new(),
562                send_in_progress: false,
563                next_send_time: current_time,
564                is_closed: false,
565                send_closed: false,
566                recv_closed: false,
567                is_cut: false,
568                cut_expiry: None,
569                close_reason: CloseReason::None,
570                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
571                send_delay: None,
572                recv_delay: None,
573                is_half_open: false,
574                half_open_error_at: None,
575            },
576        );
577
578        // Server stores synthesized ephemeral address as peer_address
579        inner.network.connections.insert(
580            server_id,
581            ConnectionState {
582                id: server_id,
583                addr: server_addr,
584                local_ip: server_ip,
585                remote_ip: client_ip,
586                peer_address: ephemeral_peer_addr,
587                receive_buffer: VecDeque::new(),
588                paired_connection: Some(client_id),
589                send_buffer: VecDeque::new(),
590                send_in_progress: false,
591                next_send_time: current_time,
592                is_closed: false,
593                send_closed: false,
594                recv_closed: false,
595                is_cut: false,
596                cut_expiry: None,
597                close_reason: CloseReason::None,
598                send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
599                send_delay: None,
600                recv_delay: None,
601                is_half_open: false,
602                half_open_error_at: None,
603            },
604        );
605
606        Ok((client_id, server_id))
607    }
608
609    /// Register a waker for read operations
610    pub(crate) fn register_read_waker(
611        &self,
612        connection_id: ConnectionId,
613        waker: Waker,
614    ) -> SimulationResult<()> {
615        let mut inner = self.inner.borrow_mut();
616        let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
617        inner.wakers.read_wakers.insert(connection_id, waker);
618        tracing::debug!(
619            "register_read_waker: connection_id={}, replacement={}, total_wakers={}",
620            connection_id.0,
621            is_replacement,
622            inner.wakers.read_wakers.len()
623        );
624        Ok(())
625    }
626
627    /// Register a waker for accept operations
628    pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
629        let mut inner = self.inner.borrow_mut();
630        // For simplicity, we'll use addr hash as listener ID for waker storage
631        use std::collections::hash_map::DefaultHasher;
632        use std::hash::{Hash, Hasher};
633        let mut hasher = DefaultHasher::new();
634        addr.hash(&mut hasher);
635        let listener_key = ListenerId(hasher.finish());
636
637        inner.wakers.listener_wakers.insert(listener_key, waker);
638        Ok(())
639    }
640
641    /// Store a pending connection for later accept() call
642    pub(crate) fn store_pending_connection(
643        &self,
644        addr: &str,
645        connection_id: ConnectionId,
646    ) -> SimulationResult<()> {
647        let mut inner = self.inner.borrow_mut();
648        inner
649            .network
650            .pending_connections
651            .insert(addr.to_string(), connection_id);
652
653        // Wake any accept() calls waiting for this connection
654        use std::collections::hash_map::DefaultHasher;
655        use std::hash::{Hash, Hasher};
656        let mut hasher = DefaultHasher::new();
657        addr.hash(&mut hasher);
658        let listener_key = ListenerId(hasher.finish());
659
660        if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
661            waker.wake();
662        }
663
664        Ok(())
665    }
666
667    /// Get a pending connection for accept() call
668    pub(crate) fn get_pending_connection(
669        &self,
670        addr: &str,
671    ) -> SimulationResult<Option<ConnectionId>> {
672        let mut inner = self.inner.borrow_mut();
673        Ok(inner.network.pending_connections.remove(addr))
674    }
675
676    /// Get the peer address for a connection.
677    ///
678    /// FDB Pattern (sim2.actor.cpp):
679    /// - For client-side connections: returns server's listening address
680    /// - For server-side connections: returns synthesized ephemeral address
681    ///
682    /// The returned address may not be connectable for server-side connections,
683    /// matching real TCP behavior where servers see client ephemeral ports.
684    pub(crate) fn get_connection_peer_address(
685        &self,
686        connection_id: ConnectionId,
687    ) -> Option<String> {
688        let inner = self.inner.borrow();
689        inner
690            .network
691            .connections
692            .get(&connection_id)
693            .map(|conn| conn.peer_address.clone())
694    }
695
696    /// Sleep for the specified duration in simulation time.
697    ///
698    /// Returns a future that will complete when the simulation time has advanced
699    /// by the specified duration.
700    #[instrument(skip(self))]
701    pub fn sleep(&self, duration: Duration) -> SleepFuture {
702        let task_id = self.generate_task_id();
703
704        // Apply buggified delay if enabled
705        let actual_duration = self.apply_buggified_delay(duration);
706
707        // Schedule a wake event for this task
708        self.schedule_event(Event::Timer { task_id }, actual_duration);
709
710        // Return a future that will be woken when the event is processed
711        SleepFuture::new(self.downgrade(), task_id)
712    }
713
714    /// Apply buggified delay to a duration if chaos is enabled.
715    fn apply_buggified_delay(&self, duration: Duration) -> Duration {
716        let inner = self.inner.borrow();
717        let chaos = &inner.network.config.chaos;
718
719        if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
720            return duration;
721        }
722
723        // 25% probability per FDB
724        if sim_random::<f64>() < chaos.buggified_delay_probability {
725            // Power-law distribution: pow(random01(), 1000) creates very skewed delays
726            let random_factor = sim_random::<f64>().powf(1000.0);
727            let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
728            tracing::trace!(
729                extra_delay_ms = extra_delay.as_millis(),
730                "Buggified delay applied"
731            );
732            duration + extra_delay
733        } else {
734            duration
735        }
736    }
737
738    /// Generate a unique task ID for sleep operations.
739    fn generate_task_id(&self) -> u64 {
740        let mut inner = self.inner.borrow_mut();
741        let task_id = inner.next_task_id;
742        inner.next_task_id += 1;
743        task_id
744    }
745
746    /// Wake all tasks associated with a connection
747    fn wake_all(wakers: &mut HashMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
748        if let Some(waker_list) = wakers.remove(&connection_id) {
749            for waker in waker_list {
750                waker.wake();
751            }
752        }
753    }
754
755    /// Check if a task has been awakened.
756    pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
757        let inner = self.inner.borrow();
758        Ok(inner.awakened_tasks.contains(&task_id))
759    }
760
761    /// Register a waker for a task.
762    pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
763        let mut inner = self.inner.borrow_mut();
764        inner.wakers.task_wakers.insert(task_id, waker);
765        Ok(())
766    }
767
768    /// Clear expired clogs and wake pending tasks (helper for use with SimInner)
769    fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
770        let now = inner.current_time;
771        let expired: Vec<ConnectionId> = inner
772            .network
773            .connection_clogs
774            .iter()
775            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
776            .collect();
777
778        for id in expired {
779            inner.network.connection_clogs.remove(&id);
780            Self::wake_all(&mut inner.wakers.clog_wakers, id);
781        }
782    }
783
784    /// Static event processor for simulation events.
785    #[instrument(skip(inner))]
786    fn process_event_with_inner(inner: &mut SimInner, event: Event) {
787        inner.events_processed += 1;
788
789        match event {
790            Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
791            Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
792            Event::Network {
793                connection_id,
794                operation,
795            } => Self::handle_network_event(inner, connection_id, operation),
796            Event::Shutdown => Self::handle_shutdown_event(inner),
797        }
798    }
799
800    /// Handle timer events - wake sleeping tasks.
801    fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
802        inner.awakened_tasks.insert(task_id);
803        if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
804            waker.wake();
805        }
806    }
807
808    /// Handle connection state change events.
809    fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
810        let connection_id = ConnectionId(id);
811
812        match state {
813            ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
814                // No action needed for these states
815            }
816            ConnectionStateChange::ClogClear => {
817                inner.network.connection_clogs.remove(&connection_id);
818                Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
819            }
820            ConnectionStateChange::ReadClogClear => {
821                inner.network.read_clogs.remove(&connection_id);
822                Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
823            }
824            ConnectionStateChange::PartitionRestore => {
825                Self::clear_expired_partitions(inner);
826            }
827            ConnectionStateChange::SendPartitionClear => {
828                Self::clear_expired_send_partitions(inner);
829            }
830            ConnectionStateChange::RecvPartitionClear => {
831                Self::clear_expired_recv_partitions(inner);
832            }
833            ConnectionStateChange::CutRestore => {
834                if let Some(conn) = inner.network.connections.get_mut(&connection_id)
835                    && conn.is_cut
836                {
837                    conn.is_cut = false;
838                    conn.cut_expiry = None;
839                    tracing::debug!("Connection {} restored via scheduled event", id);
840                    Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
841                }
842            }
843            ConnectionStateChange::HalfOpenError => {
844                tracing::debug!("Connection {} half-open error time reached", id);
845                if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
846                    waker.wake();
847                }
848                Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
849            }
850        }
851    }
852
853    /// Clear expired IP partitions.
854    fn clear_expired_partitions(inner: &mut SimInner) {
855        let now = inner.current_time;
856        let expired: Vec<_> = inner
857            .network
858            .ip_partitions
859            .iter()
860            .filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
861            .collect();
862
863        for pair in expired {
864            inner.network.ip_partitions.remove(&pair);
865            tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
866        }
867    }
868
869    /// Clear expired send partitions.
870    fn clear_expired_send_partitions(inner: &mut SimInner) {
871        let now = inner.current_time;
872        let expired: Vec<_> = inner
873            .network
874            .send_partitions
875            .iter()
876            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
877            .collect();
878
879        for ip in expired {
880            inner.network.send_partitions.remove(&ip);
881            tracing::debug!("Cleared send partition for {}", ip);
882        }
883    }
884
885    /// Clear expired receive partitions.
886    fn clear_expired_recv_partitions(inner: &mut SimInner) {
887        let now = inner.current_time;
888        let expired: Vec<_> = inner
889            .network
890            .recv_partitions
891            .iter()
892            .filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
893            .collect();
894
895        for ip in expired {
896            inner.network.recv_partitions.remove(&ip);
897            tracing::debug!("Cleared receive partition for {}", ip);
898        }
899    }
900
901    /// Handle network events (data delivery and send buffer processing).
902    fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
903        let connection_id = ConnectionId(conn_id);
904
905        match operation {
906            NetworkOperation::DataDelivery { data } => {
907                Self::handle_data_delivery(inner, connection_id, data);
908            }
909            NetworkOperation::ProcessSendBuffer => {
910                Self::handle_process_send_buffer(inner, connection_id);
911            }
912        }
913    }
914
915    /// Handle data delivery to a connection's receive buffer.
916    fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
917        tracing::trace!(
918            "DataDelivery: {} bytes to connection {}",
919            data.len(),
920            connection_id.0
921        );
922
923        // Check for packet loss
924        let packet_loss_prob = inner.network.config.chaos.packet_loss_probability;
925        if packet_loss_prob > 0.0 && crate::buggify_with_prob!(packet_loss_prob) {
926            tracing::info!(
927                "PacketLoss: connection={} bytes={} dropped",
928                connection_id.0,
929                data.len()
930            );
931            return;
932        }
933
934        // Check connection exists before potentially corrupting data
935        if !inner.network.connections.contains_key(&connection_id) {
936            tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
937            return;
938        }
939
940        // Apply bit flipping chaos (needs mutable inner access)
941        let data_to_deliver = Self::maybe_corrupt_data(inner, &data);
942
943        // Now get connection reference and deliver data
944        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
945            return;
946        };
947
948        for &byte in &data_to_deliver {
949            conn.receive_buffer.push_back(byte);
950        }
951
952        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
953            waker.wake();
954        }
955    }
956
957    /// Maybe corrupt data with bit flips based on chaos configuration.
958    fn maybe_corrupt_data(inner: &mut SimInner, data: &[u8]) -> Vec<u8> {
959        if data.is_empty() {
960            return data.to_vec();
961        }
962
963        let chaos = &inner.network.config.chaos;
964        let now = inner.current_time;
965        let cooldown_elapsed =
966            now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
967
968        if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
969            return data.to_vec();
970        }
971
972        let random_value = sim_random::<u32>();
973        let flip_count = SimInner::calculate_flip_bit_count(
974            random_value,
975            chaos.bit_flip_min_bits,
976            chaos.bit_flip_max_bits,
977        );
978
979        let mut corrupted_data = data.to_vec();
980        let mut flipped_positions = std::collections::HashSet::new();
981
982        for _ in 0..flip_count {
983            let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
984            let bit_idx = (sim_random::<u64>() as usize) % 8;
985            let position = (byte_idx, bit_idx);
986
987            if !flipped_positions.contains(&position) {
988                flipped_positions.insert(position);
989                corrupted_data[byte_idx] ^= 1 << bit_idx;
990            }
991        }
992
993        inner.last_bit_flip_time = now;
994        tracing::info!(
995            "BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
996            data.len(),
997            flip_count,
998            flipped_positions.len()
999        );
1000
1001        corrupted_data
1002    }
1003
1004    /// Handle processing of a connection's send buffer.
1005    fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1006        let is_partitioned = inner
1007            .network
1008            .is_connection_partitioned(connection_id, inner.current_time);
1009
1010        if is_partitioned {
1011            Self::handle_partitioned_send(inner, connection_id);
1012        } else {
1013            Self::handle_normal_send(inner, connection_id);
1014        }
1015    }
1016
1017    /// Handle send when connection is partitioned.
1018    fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
1019        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1020            return;
1021        };
1022
1023        if let Some(data) = conn.send_buffer.pop_front() {
1024            tracing::debug!(
1025                "Connection {} partitioned, failing send of {} bytes",
1026                connection_id.0,
1027                data.len()
1028            );
1029            Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1030
1031            if !conn.send_buffer.is_empty() {
1032                Self::schedule_process_send_buffer(inner, connection_id);
1033            } else {
1034                conn.send_in_progress = false;
1035            }
1036        } else {
1037            conn.send_in_progress = false;
1038        }
1039    }
1040
1041    /// Handle normal (non-partitioned) send.
1042    fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
1043        // Extract connection info first
1044        let Some(conn) = inner.network.connections.get(&connection_id) else {
1045            return;
1046        };
1047
1048        let paired_id = conn.paired_connection;
1049        let send_delay = conn.send_delay;
1050        let next_send_time = conn.next_send_time;
1051        let has_data = !conn.send_buffer.is_empty();
1052
1053        let recv_delay = paired_id.and_then(|pid| {
1054            inner
1055                .network
1056                .connections
1057                .get(&pid)
1058                .and_then(|c| c.recv_delay)
1059        });
1060
1061        if !has_data {
1062            if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1063                conn.send_in_progress = false;
1064            }
1065            return;
1066        }
1067
1068        let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
1069            return;
1070        };
1071
1072        let Some(mut data) = conn.send_buffer.pop_front() else {
1073            conn.send_in_progress = false;
1074            return;
1075        };
1076
1077        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1078
1079        // BUGGIFY: Simulate partial/short writes
1080        if crate::buggify!() && !data.is_empty() {
1081            let max_send = std::cmp::min(
1082                data.len(),
1083                inner.network.config.chaos.partial_write_max_bytes,
1084            );
1085            let truncate_to = sim_random_range(0..max_send + 1);
1086
1087            if truncate_to < data.len() {
1088                let remainder = data.split_off(truncate_to);
1089                conn.send_buffer.push_front(remainder);
1090                tracing::debug!(
1091                    "BUGGIFY: Partial write on connection {} - sending {} bytes",
1092                    connection_id.0,
1093                    data.len()
1094                );
1095            }
1096        }
1097
1098        let has_more = !conn.send_buffer.is_empty();
1099        let base_delay = if has_more {
1100            Duration::from_nanos(1)
1101        } else {
1102            send_delay.unwrap_or_else(|| {
1103                crate::network::sample_duration(&inner.network.config.write_latency)
1104            })
1105        };
1106
1107        let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
1108        conn.next_send_time = earliest_time + Duration::from_nanos(1);
1109
1110        // Schedule data delivery to paired connection
1111        if let Some(paired_id) = paired_id {
1112            let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
1113            let sequence = inner.next_sequence;
1114            inner.next_sequence += 1;
1115
1116            inner.event_queue.schedule(ScheduledEvent::new(
1117                scheduled_time,
1118                Event::Network {
1119                    connection_id: paired_id.0,
1120                    operation: NetworkOperation::DataDelivery { data },
1121                },
1122                sequence,
1123            ));
1124        }
1125
1126        // Schedule next send if more data
1127        if !conn.send_buffer.is_empty() {
1128            Self::schedule_process_send_buffer(inner, connection_id);
1129        } else {
1130            conn.send_in_progress = false;
1131        }
1132    }
1133
1134    /// Schedule a ProcessSendBuffer event for the given connection.
1135    fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
1136        let sequence = inner.next_sequence;
1137        inner.next_sequence += 1;
1138
1139        inner.event_queue.schedule(ScheduledEvent::new(
1140            inner.current_time,
1141            Event::Network {
1142                connection_id: connection_id.0,
1143                operation: NetworkOperation::ProcessSendBuffer,
1144            },
1145            sequence,
1146        ));
1147    }
1148
1149    /// Handle shutdown event - wake all pending tasks.
1150    fn handle_shutdown_event(inner: &mut SimInner) {
1151        tracing::debug!("Processing Shutdown event - waking all pending tasks");
1152
1153        for (task_id, waker) in inner.wakers.task_wakers.drain() {
1154            tracing::trace!("Waking task {}", task_id);
1155            waker.wake();
1156        }
1157
1158        for (_conn_id, waker) in inner.wakers.read_wakers.drain() {
1159            waker.wake();
1160        }
1161
1162        tracing::debug!("Shutdown event processed");
1163    }
1164
1165    /// Get current assertion results for all tracked assertions.
1166    pub fn assertion_results(
1167        &self,
1168    ) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
1169        crate::chaos::get_assertion_results()
1170    }
1171
1172    /// Reset assertion statistics to empty state.
1173    pub fn reset_assertion_results(&self) {
1174        crate::chaos::reset_assertion_results();
1175    }
1176
1177    /// Extract simulation metrics for reporting.
1178    pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
1179        let inner = self.inner.borrow();
1180
1181        crate::runner::SimulationMetrics {
1182            wall_time: std::time::Duration::ZERO,
1183            simulated_time: inner.current_time,
1184            events_processed: inner.events_processed,
1185        }
1186    }
1187
1188    // Phase 7: Simple write clogging methods
1189
1190    /// Check if a write should be clogged based on probability
1191    pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
1192        let inner = self.inner.borrow();
1193        let config = &inner.network.config;
1194
1195        // Skip if already clogged
1196        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1197            return inner.current_time < clog_state.expires_at;
1198        }
1199
1200        // Check probability
1201        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1202    }
1203
1204    /// Clog a connection's write operations
1205    pub fn clog_write(&self, connection_id: ConnectionId) {
1206        let mut inner = self.inner.borrow_mut();
1207        let config = &inner.network.config;
1208
1209        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1210        let expires_at = inner.current_time + clog_duration;
1211        inner
1212            .network
1213            .connection_clogs
1214            .insert(connection_id, ClogState { expires_at });
1215
1216        // Schedule an event to clear this clog
1217        let clear_event = Event::Connection {
1218            id: connection_id.0,
1219            state: ConnectionStateChange::ClogClear,
1220        };
1221        let sequence = inner.next_sequence;
1222        inner.next_sequence += 1;
1223        inner
1224            .event_queue
1225            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1226    }
1227
1228    /// Check if a connection's writes are currently clogged
1229    pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
1230        let inner = self.inner.borrow();
1231
1232        if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
1233            inner.current_time < clog_state.expires_at
1234        } else {
1235            false
1236        }
1237    }
1238
1239    /// Register a waker for when write clog clears
1240    pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1241        let mut inner = self.inner.borrow_mut();
1242        inner
1243            .wakers
1244            .clog_wakers
1245            .entry(connection_id)
1246            .or_default()
1247            .push(waker);
1248    }
1249
1250    // Read clogging methods (symmetric with write clogging)
1251
1252    /// Check if a read should be clogged based on probability
1253    pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
1254        let inner = self.inner.borrow();
1255        let config = &inner.network.config;
1256
1257        // Skip if already clogged
1258        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1259            return inner.current_time < clog_state.expires_at;
1260        }
1261
1262        // Check probability (same as write clogging)
1263        config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
1264    }
1265
1266    /// Clog a connection's read operations
1267    pub fn clog_read(&self, connection_id: ConnectionId) {
1268        let mut inner = self.inner.borrow_mut();
1269        let config = &inner.network.config;
1270
1271        let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
1272        let expires_at = inner.current_time + clog_duration;
1273        inner
1274            .network
1275            .read_clogs
1276            .insert(connection_id, ClogState { expires_at });
1277
1278        // Schedule an event to clear this read clog
1279        let clear_event = Event::Connection {
1280            id: connection_id.0,
1281            state: ConnectionStateChange::ReadClogClear,
1282        };
1283        let sequence = inner.next_sequence;
1284        inner.next_sequence += 1;
1285        inner
1286            .event_queue
1287            .schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
1288    }
1289
1290    /// Check if a connection's reads are currently clogged
1291    pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
1292        let inner = self.inner.borrow();
1293
1294        if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
1295            inner.current_time < clog_state.expires_at
1296        } else {
1297            false
1298        }
1299    }
1300
1301    /// Register a waker for when read clog clears
1302    pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
1303        let mut inner = self.inner.borrow_mut();
1304        inner
1305            .wakers
1306            .read_clog_wakers
1307            .entry(connection_id)
1308            .or_default()
1309            .push(waker);
1310    }
1311
1312    /// Clear expired clogs and wake pending tasks
1313    pub fn clear_expired_clogs(&self) {
1314        let mut inner = self.inner.borrow_mut();
1315        let now = inner.current_time;
1316        let expired: Vec<ConnectionId> = inner
1317            .network
1318            .connection_clogs
1319            .iter()
1320            .filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
1321            .collect();
1322
1323        for id in expired {
1324            inner.network.connection_clogs.remove(&id);
1325            Self::wake_all(&mut inner.wakers.clog_wakers, id);
1326        }
1327    }
1328
1329    // Connection Cut Methods (temporary network outage simulation)
1330
1331    /// Temporarily cut a connection for the specified duration.
1332    ///
1333    /// Unlike `close_connection`, a cut connection will be automatically restored
1334    /// after the duration expires. This simulates temporary network outages where
1335    /// the underlying connection remains but is temporarily unavailable.
1336    ///
1337    /// During a cut:
1338    /// - `poll_read` returns `Poll::Pending` (waits for restore)
1339    /// - `poll_write` returns `Poll::Pending` (waits for restore)
1340    /// - Buffered data is preserved
1341    pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
1342        let mut inner = self.inner.borrow_mut();
1343        let expires_at = inner.current_time + duration;
1344
1345        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1346            conn.is_cut = true;
1347            conn.cut_expiry = Some(expires_at);
1348            tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
1349
1350            // Schedule restoration event
1351            let restore_event = Event::Connection {
1352                id: connection_id.0,
1353                state: ConnectionStateChange::CutRestore,
1354            };
1355            let sequence = inner.next_sequence;
1356            inner.next_sequence += 1;
1357            inner
1358                .event_queue
1359                .schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
1360        }
1361    }
1362
1363    /// Check if a connection is temporarily cut.
1364    ///
1365    /// A cut connection is temporarily unavailable but will be restored.
1366    /// This is different from `is_connection_closed` which indicates permanent closure.
1367    pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
1368        let inner = self.inner.borrow();
1369        inner
1370            .network
1371            .connections
1372            .get(&connection_id)
1373            .is_some_and(|conn| {
1374                conn.is_cut
1375                    && conn
1376                        .cut_expiry
1377                        .is_some_and(|expiry| inner.current_time < expiry)
1378            })
1379    }
1380
1381    /// Restore a cut connection immediately.
1382    ///
1383    /// This cancels the cut state and wakes any tasks waiting for restoration.
1384    pub fn restore_connection(&self, connection_id: ConnectionId) {
1385        let mut inner = self.inner.borrow_mut();
1386
1387        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1388            && conn.is_cut
1389        {
1390            conn.is_cut = false;
1391            conn.cut_expiry = None;
1392            tracing::debug!("Connection {} restored", connection_id.0);
1393
1394            // Wake any tasks waiting for restoration
1395            Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
1396        }
1397    }
1398
1399    /// Register a waker for when a cut connection is restored.
1400    pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
1401        let mut inner = self.inner.borrow_mut();
1402        inner
1403            .wakers
1404            .cut_wakers
1405            .entry(connection_id)
1406            .or_default()
1407            .push(waker);
1408    }
1409
1410    // Send buffer management methods
1411
1412    /// Get the send buffer capacity for a connection.
1413    pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
1414        let inner = self.inner.borrow();
1415        inner
1416            .network
1417            .connections
1418            .get(&connection_id)
1419            .map(|conn| conn.send_buffer_capacity)
1420            .unwrap_or(0)
1421    }
1422
1423    /// Get the current send buffer usage for a connection.
1424    pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
1425        let inner = self.inner.borrow();
1426        inner
1427            .network
1428            .connections
1429            .get(&connection_id)
1430            .map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
1431            .unwrap_or(0)
1432    }
1433
1434    /// Get the available send buffer space for a connection.
1435    pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
1436        let capacity = self.send_buffer_capacity(connection_id);
1437        let used = self.send_buffer_used(connection_id);
1438        capacity.saturating_sub(used)
1439    }
1440
1441    /// Register a waker for when send buffer space becomes available.
1442    pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
1443        let mut inner = self.inner.borrow_mut();
1444        inner
1445            .wakers
1446            .send_buffer_wakers
1447            .entry(connection_id)
1448            .or_default()
1449            .push(waker);
1450    }
1451
1452    /// Wake any tasks waiting for send buffer space on a connection.
1453    #[allow(dead_code)] // May be used for external buffer management
1454    fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
1455        let mut inner = self.inner.borrow_mut();
1456        Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
1457    }
1458
1459    // Per-IP-pair base latency methods
1460
1461    /// Get the base latency for a connection pair.
1462    /// Returns the latency if already set, otherwise None.
1463    pub fn get_pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
1464        let inner = self.inner.borrow();
1465        inner.network.pair_latencies.get(&(src, dst)).copied()
1466    }
1467
1468    /// Set the base latency for a connection pair if not already set.
1469    /// Returns the latency (existing or newly set).
1470    pub fn set_pair_latency_if_not_set(
1471        &self,
1472        src: IpAddr,
1473        dst: IpAddr,
1474        latency: Duration,
1475    ) -> Duration {
1476        let mut inner = self.inner.borrow_mut();
1477        *inner
1478            .network
1479            .pair_latencies
1480            .entry((src, dst))
1481            .or_insert_with(|| {
1482                tracing::debug!(
1483                    "Setting base latency for IP pair {} -> {} to {:?}",
1484                    src,
1485                    dst,
1486                    latency
1487                );
1488                latency
1489            })
1490    }
1491
1492    /// Get the base latency for a connection based on its IP pair.
1493    /// If not set, samples from config and sets it.
1494    pub fn get_connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
1495        let inner = self.inner.borrow();
1496        let (local_ip, remote_ip) = inner
1497            .network
1498            .connections
1499            .get(&connection_id)
1500            .and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
1501            .unwrap_or({
1502                (
1503                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1504                    IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
1505                )
1506            });
1507        drop(inner);
1508
1509        // Check if latency is already set
1510        if let Some(latency) = self.get_pair_latency(local_ip, remote_ip) {
1511            return latency;
1512        }
1513
1514        // Sample a new latency from config and set it
1515        let latency = self
1516            .with_network_config(|config| crate::network::sample_duration(&config.write_latency));
1517        self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
1518    }
1519
1520    // Per-connection asymmetric delay methods
1521
1522    /// Get the send delay for a connection.
1523    /// Returns the per-connection override if set, otherwise None.
1524    pub fn get_send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1525        let inner = self.inner.borrow();
1526        inner
1527            .network
1528            .connections
1529            .get(&connection_id)
1530            .and_then(|conn| conn.send_delay)
1531    }
1532
1533    /// Get the receive delay for a connection.
1534    /// Returns the per-connection override if set, otherwise None.
1535    pub fn get_recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
1536        let inner = self.inner.borrow();
1537        inner
1538            .network
1539            .connections
1540            .get(&connection_id)
1541            .and_then(|conn| conn.recv_delay)
1542    }
1543
1544    /// Set asymmetric delays for a connection.
1545    /// If a delay is None, the global configuration is used instead.
1546    pub fn set_asymmetric_delays(
1547        &self,
1548        connection_id: ConnectionId,
1549        send_delay: Option<Duration>,
1550        recv_delay: Option<Duration>,
1551    ) {
1552        let mut inner = self.inner.borrow_mut();
1553        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1554            conn.send_delay = send_delay;
1555            conn.recv_delay = recv_delay;
1556            tracing::debug!(
1557                "Connection {} asymmetric delays set: send={:?}, recv={:?}",
1558                connection_id.0,
1559                send_delay,
1560                recv_delay
1561            );
1562        }
1563    }
1564
1565    /// Check if a connection is permanently closed
1566    pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
1567        let inner = self.inner.borrow();
1568        inner
1569            .network
1570            .connections
1571            .get(&connection_id)
1572            .is_some_and(|conn| conn.is_closed)
1573    }
1574
1575    /// Close a connection gracefully (FIN semantics).
1576    ///
1577    /// The peer will receive EOF on read operations.
1578    pub fn close_connection(&self, connection_id: ConnectionId) {
1579        self.close_connection_with_reason(connection_id, CloseReason::Graceful);
1580    }
1581
1582    /// Close a connection abruptly (RST semantics).
1583    ///
1584    /// The peer will receive ECONNRESET on both read and write operations.
1585    pub fn close_connection_abort(&self, connection_id: ConnectionId) {
1586        self.close_connection_with_reason(connection_id, CloseReason::Aborted);
1587    }
1588
1589    /// Get the close reason for a connection.
1590    pub fn get_close_reason(&self, connection_id: ConnectionId) -> CloseReason {
1591        let inner = self.inner.borrow();
1592        inner
1593            .network
1594            .connections
1595            .get(&connection_id)
1596            .map(|conn| conn.close_reason)
1597            .unwrap_or(CloseReason::None)
1598    }
1599
1600    /// Close a connection with a specific close reason.
1601    fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
1602        let mut inner = self.inner.borrow_mut();
1603
1604        let paired_connection_id = inner
1605            .network
1606            .connections
1607            .get(&connection_id)
1608            .and_then(|conn| conn.paired_connection);
1609
1610        if let Some(conn) = inner.network.connections.get_mut(&connection_id)
1611            && !conn.is_closed
1612        {
1613            conn.is_closed = true;
1614            conn.close_reason = reason;
1615            tracing::debug!(
1616                "Connection {} closed permanently (reason: {:?})",
1617                connection_id.0,
1618                reason
1619            );
1620        }
1621
1622        if let Some(paired_id) = paired_connection_id
1623            && let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
1624            && !paired_conn.is_closed
1625        {
1626            paired_conn.is_closed = true;
1627            paired_conn.close_reason = reason;
1628            tracing::debug!(
1629                "Paired connection {} also closed (reason: {:?})",
1630                paired_id.0,
1631                reason
1632            );
1633        }
1634
1635        if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1636            tracing::debug!(
1637                "Waking read waker for closed connection {}",
1638                connection_id.0
1639            );
1640            waker.wake();
1641        }
1642
1643        if let Some(paired_id) = paired_connection_id
1644            && let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
1645        {
1646            tracing::debug!(
1647                "Waking read waker for paired closed connection {}",
1648                paired_id.0
1649            );
1650            paired_waker.wake();
1651        }
1652    }
1653
1654    /// Close connection asymmetrically (FDB rollRandomClose pattern)
1655    pub fn close_connection_asymmetric(
1656        &self,
1657        connection_id: ConnectionId,
1658        close_send: bool,
1659        close_recv: bool,
1660    ) {
1661        let mut inner = self.inner.borrow_mut();
1662
1663        let paired_id = inner
1664            .network
1665            .connections
1666            .get(&connection_id)
1667            .and_then(|conn| conn.paired_connection);
1668
1669        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1670            conn.send_closed = true;
1671            conn.send_buffer.clear();
1672            tracing::debug!(
1673                "Connection {} send side closed (asymmetric)",
1674                connection_id.0
1675            );
1676        }
1677
1678        if close_recv
1679            && let Some(paired) = paired_id
1680            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1681        {
1682            paired_conn.recv_closed = true;
1683            tracing::debug!(
1684                "Connection {} recv side closed (asymmetric via peer)",
1685                paired.0
1686            );
1687        }
1688
1689        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1690            waker.wake();
1691        }
1692        if close_recv
1693            && let Some(paired) = paired_id
1694            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
1695        {
1696            waker.wake();
1697        }
1698    }
1699
1700    /// Roll random close chaos injection (FDB rollRandomClose pattern)
1701    pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
1702        let mut inner = self.inner.borrow_mut();
1703        let config = &inner.network.config;
1704
1705        if config.chaos.random_close_probability <= 0.0 {
1706            return None;
1707        }
1708
1709        let current_time = inner.current_time;
1710        let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
1711        if time_since_last < config.chaos.random_close_cooldown {
1712            return None;
1713        }
1714
1715        if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
1716            return None;
1717        }
1718
1719        inner.network.last_random_close_time = current_time;
1720
1721        let paired_id = inner
1722            .network
1723            .connections
1724            .get(&connection_id)
1725            .and_then(|conn| conn.paired_connection);
1726
1727        let a = super::rng::sim_random_f64();
1728        let close_recv = a < 0.66;
1729        let close_send = a > 0.33;
1730
1731        tracing::info!(
1732            "Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
1733            connection_id.0,
1734            close_send,
1735            close_recv,
1736            a
1737        );
1738
1739        if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1740            conn.send_closed = true;
1741            conn.send_buffer.clear();
1742        }
1743
1744        if close_recv
1745            && let Some(paired) = paired_id
1746            && let Some(paired_conn) = inner.network.connections.get_mut(&paired)
1747        {
1748            paired_conn.recv_closed = true;
1749        }
1750
1751        if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
1752            waker.wake();
1753        }
1754        if close_recv
1755            && let Some(paired) = paired_id
1756            && let Some(waker) = inner.wakers.read_wakers.remove(&paired)
1757        {
1758            waker.wake();
1759        }
1760
1761        let b = super::rng::sim_random_f64();
1762        let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
1763
1764        tracing::debug!(
1765            "Random close explicit={} (b={:.3}, ratio={:.2})",
1766            explicit,
1767            b,
1768            inner.network.config.chaos.random_close_explicit_ratio
1769        );
1770
1771        Some(explicit)
1772    }
1773
1774    /// Check if a connection's send side is closed
1775    pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
1776        let inner = self.inner.borrow();
1777        inner
1778            .network
1779            .connections
1780            .get(&connection_id)
1781            .is_some_and(|conn| conn.send_closed || conn.is_closed)
1782    }
1783
1784    /// Check if a connection's receive side is closed
1785    pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
1786        let inner = self.inner.borrow();
1787        inner
1788            .network
1789            .connections
1790            .get(&connection_id)
1791            .is_some_and(|conn| conn.recv_closed || conn.is_closed)
1792    }
1793
1794    // Half-Open Connection Simulation Methods
1795
1796    /// Simulate a peer crash on a connection.
1797    ///
1798    /// This puts the connection in a half-open state where:
1799    /// - The local side still thinks it's connected
1800    /// - Writes succeed but data is silently discarded (peer is gone)
1801    /// - Reads block waiting for data that will never come
1802    /// - After `error_delay`, both read and write return ECONNRESET
1803    ///
1804    /// This simulates real-world scenarios where a remote peer crashes
1805    /// or becomes unreachable, but the local TCP stack hasn't detected it yet.
1806    pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
1807        let mut inner = self.inner.borrow_mut();
1808        let current_time = inner.current_time;
1809        let error_at = current_time + error_delay;
1810
1811        if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
1812            conn.is_half_open = true;
1813            conn.half_open_error_at = Some(error_at);
1814
1815            // Clear the paired connection to simulate peer being gone
1816            // Data sent will go nowhere
1817            conn.paired_connection = None;
1818
1819            tracing::info!(
1820                "Connection {} now half-open, errors manifest at {:?}",
1821                connection_id.0,
1822                error_at
1823            );
1824        }
1825
1826        // Schedule an event to wake any waiting readers when error time arrives
1827        let wake_event = Event::Connection {
1828            id: connection_id.0,
1829            state: ConnectionStateChange::HalfOpenError,
1830        };
1831        let sequence = inner.next_sequence;
1832        inner.next_sequence += 1;
1833        let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
1834        inner.event_queue.schedule(scheduled_event);
1835    }
1836
1837    /// Check if a connection is in half-open state
1838    pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
1839        let inner = self.inner.borrow();
1840        inner
1841            .network
1842            .connections
1843            .get(&connection_id)
1844            .is_some_and(|conn| conn.is_half_open)
1845    }
1846
1847    /// Check if a half-open connection should return errors now
1848    pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
1849        let inner = self.inner.borrow();
1850        let current_time = inner.current_time;
1851        inner
1852            .network
1853            .connections
1854            .get(&connection_id)
1855            .is_some_and(|conn| {
1856                conn.is_half_open
1857                    && conn
1858                        .half_open_error_at
1859                        .is_some_and(|error_at| current_time >= error_at)
1860            })
1861    }
1862
1863    // Network Partition Control Methods
1864
1865    /// Partition communication between two IP addresses for a specified duration
1866    pub fn partition_pair(
1867        &self,
1868        from_ip: std::net::IpAddr,
1869        to_ip: std::net::IpAddr,
1870        duration: Duration,
1871    ) -> SimulationResult<()> {
1872        let mut inner = self.inner.borrow_mut();
1873        let expires_at = inner.current_time + duration;
1874
1875        inner
1876            .network
1877            .ip_partitions
1878            .insert((from_ip, to_ip), PartitionState { expires_at });
1879
1880        let restore_event = Event::Connection {
1881            id: 0,
1882            state: ConnectionStateChange::PartitionRestore,
1883        };
1884        let sequence = inner.next_sequence;
1885        inner.next_sequence += 1;
1886        let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
1887        inner.event_queue.schedule(scheduled_event);
1888
1889        tracing::debug!(
1890            "Partitioned {} -> {} until {:?}",
1891            from_ip,
1892            to_ip,
1893            expires_at
1894        );
1895        Ok(())
1896    }
1897
1898    /// Block all outgoing communication from an IP address
1899    pub fn partition_send_from(
1900        &self,
1901        ip: std::net::IpAddr,
1902        duration: Duration,
1903    ) -> SimulationResult<()> {
1904        let mut inner = self.inner.borrow_mut();
1905        let expires_at = inner.current_time + duration;
1906
1907        inner.network.send_partitions.insert(ip, expires_at);
1908
1909        let clear_event = Event::Connection {
1910            id: 0,
1911            state: ConnectionStateChange::SendPartitionClear,
1912        };
1913        let sequence = inner.next_sequence;
1914        inner.next_sequence += 1;
1915        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
1916        inner.event_queue.schedule(scheduled_event);
1917
1918        tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
1919        Ok(())
1920    }
1921
1922    /// Block all incoming communication to an IP address
1923    pub fn partition_recv_to(
1924        &self,
1925        ip: std::net::IpAddr,
1926        duration: Duration,
1927    ) -> SimulationResult<()> {
1928        let mut inner = self.inner.borrow_mut();
1929        let expires_at = inner.current_time + duration;
1930
1931        inner.network.recv_partitions.insert(ip, expires_at);
1932
1933        let clear_event = Event::Connection {
1934            id: 0,
1935            state: ConnectionStateChange::RecvPartitionClear,
1936        };
1937        let sequence = inner.next_sequence;
1938        inner.next_sequence += 1;
1939        let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
1940        inner.event_queue.schedule(scheduled_event);
1941
1942        tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
1943        Ok(())
1944    }
1945
1946    /// Immediately restore communication between two IP addresses
1947    pub fn restore_partition(
1948        &self,
1949        from_ip: std::net::IpAddr,
1950        to_ip: std::net::IpAddr,
1951    ) -> SimulationResult<()> {
1952        let mut inner = self.inner.borrow_mut();
1953        inner.network.ip_partitions.remove(&(from_ip, to_ip));
1954        tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
1955        Ok(())
1956    }
1957
1958    /// Check if communication between two IP addresses is currently partitioned
1959    pub fn is_partitioned(
1960        &self,
1961        from_ip: std::net::IpAddr,
1962        to_ip: std::net::IpAddr,
1963    ) -> SimulationResult<bool> {
1964        let inner = self.inner.borrow();
1965        Ok(inner
1966            .network
1967            .is_partitioned(from_ip, to_ip, inner.current_time))
1968    }
1969
1970    /// Helper method for use with SimInner - randomly trigger partitions
1971    fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
1972        let partition_config = &inner.network.config;
1973
1974        if partition_config.chaos.partition_probability == 0.0 {
1975            return;
1976        }
1977
1978        let ip_pairs: Vec<(std::net::IpAddr, std::net::IpAddr)> = inner
1979            .network
1980            .connections
1981            .values()
1982            .filter_map(|conn| {
1983                if let (Some(local), Some(remote)) = (conn.local_ip, conn.remote_ip) {
1984                    Some((local, remote))
1985                } else {
1986                    None
1987                }
1988            })
1989            .collect();
1990
1991        for (from_ip, to_ip) in ip_pairs {
1992            if inner
1993                .network
1994                .is_partitioned(from_ip, to_ip, inner.current_time)
1995            {
1996                continue;
1997            }
1998
1999            if sim_random::<f64>() < partition_config.chaos.partition_probability {
2000                let partition_duration =
2001                    crate::network::sample_duration(&partition_config.chaos.partition_duration);
2002                let expires_at = inner.current_time + partition_duration;
2003
2004                inner
2005                    .network
2006                    .ip_partitions
2007                    .insert((from_ip, to_ip), PartitionState { expires_at });
2008
2009                let restore_event = Event::Connection {
2010                    id: 0,
2011                    state: ConnectionStateChange::PartitionRestore,
2012                };
2013                let sequence = inner.next_sequence;
2014                inner.next_sequence += 1;
2015                let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
2016                inner.event_queue.schedule(scheduled_event);
2017
2018                tracing::debug!(
2019                    "Randomly triggered partition {} -> {} until {:?}, restoration event scheduled with sequence {}",
2020                    from_ip,
2021                    to_ip,
2022                    expires_at,
2023                    sequence
2024                );
2025            }
2026        }
2027    }
2028}
2029
2030impl Default for SimWorld {
2031    fn default() -> Self {
2032        Self::new()
2033    }
2034}
2035
2036/// A weak reference to a simulation world.
2037///
2038/// This provides handle-based access to the simulation without holding
2039/// a strong reference that would prevent cleanup.
2040#[derive(Debug)]
2041pub struct WeakSimWorld {
2042    pub(crate) inner: Weak<RefCell<SimInner>>,
2043}
2044
2045/// Macro to generate WeakSimWorld forwarding methods that wrap SimWorld results.
2046macro_rules! weak_forward {
2047    // For methods returning T that need Ok() wrapping
2048    (wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2049        $(#[$meta])*
2050        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2051            Ok(self.upgrade()?.$method($($arg),*))
2052        }
2053    };
2054    // For methods already returning SimulationResult
2055    (pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
2056        $(#[$meta])*
2057        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
2058            self.upgrade()?.$method($($arg),*)
2059        }
2060    };
2061    // For methods returning () that need Ok(()) wrapping
2062    (unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
2063        $(#[$meta])*
2064        pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
2065            self.upgrade()?.$method($($arg),*);
2066            Ok(())
2067        }
2068    };
2069}
2070
2071impl WeakSimWorld {
2072    /// Attempts to upgrade this weak reference to a strong reference.
2073    pub fn upgrade(&self) -> SimulationResult<SimWorld> {
2074        self.inner
2075            .upgrade()
2076            .map(|inner| SimWorld { inner })
2077            .ok_or(SimulationError::SimulationShutdown)
2078    }
2079
2080    weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
2081    weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
2082    weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
2083    weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
2084    weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
2085    weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
2086    weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
2087    weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
2088    weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
2089    weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
2090    weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
2091
2092    /// Access network configuration for latency calculations.
2093    pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
2094    where
2095        F: FnOnce(&NetworkConfiguration) -> R,
2096    {
2097        Ok(self.upgrade()?.with_network_config(f))
2098    }
2099}
2100
2101impl Clone for WeakSimWorld {
2102    fn clone(&self) -> Self {
2103        Self {
2104            inner: self.inner.clone(),
2105        }
2106    }
2107}
2108
2109#[cfg(test)]
2110mod tests {
2111    use super::*;
2112
2113    #[test]
2114    fn sim_world_basic_lifecycle() {
2115        let mut sim = SimWorld::new();
2116
2117        // Initial state
2118        assert_eq!(sim.current_time(), Duration::ZERO);
2119        assert!(!sim.has_pending_events());
2120        assert_eq!(sim.pending_event_count(), 0);
2121
2122        // Schedule an event
2123        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2124
2125        assert!(sim.has_pending_events());
2126        assert_eq!(sim.pending_event_count(), 1);
2127        assert_eq!(sim.current_time(), Duration::ZERO);
2128
2129        // Process the event
2130        let has_more = sim.step();
2131        assert!(!has_more);
2132        assert_eq!(sim.current_time(), Duration::from_millis(100));
2133        assert!(!sim.has_pending_events());
2134        assert_eq!(sim.pending_event_count(), 0);
2135    }
2136
2137    #[test]
2138    fn sim_world_multiple_events() {
2139        let mut sim = SimWorld::new();
2140
2141        // Schedule multiple events
2142        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2143        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2144        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2145
2146        assert_eq!(sim.pending_event_count(), 3);
2147
2148        // Process events - should happen in time order
2149        assert!(sim.step());
2150        assert_eq!(sim.current_time(), Duration::from_millis(100));
2151        assert_eq!(sim.pending_event_count(), 2);
2152
2153        assert!(sim.step());
2154        assert_eq!(sim.current_time(), Duration::from_millis(200));
2155        assert_eq!(sim.pending_event_count(), 1);
2156
2157        assert!(!sim.step());
2158        assert_eq!(sim.current_time(), Duration::from_millis(300));
2159        assert_eq!(sim.pending_event_count(), 0);
2160    }
2161
2162    #[test]
2163    fn sim_world_run_until_empty() {
2164        let mut sim = SimWorld::new();
2165
2166        // Schedule multiple events
2167        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2168        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
2169        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
2170
2171        // Run until all events are processed
2172        sim.run_until_empty();
2173
2174        assert_eq!(sim.current_time(), Duration::from_millis(300));
2175        assert!(!sim.has_pending_events());
2176    }
2177
2178    #[test]
2179    fn sim_world_schedule_at_specific_time() {
2180        let mut sim = SimWorld::new();
2181
2182        // Schedule event at specific time
2183        sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
2184
2185        assert_eq!(sim.current_time(), Duration::ZERO);
2186
2187        sim.step();
2188
2189        assert_eq!(sim.current_time(), Duration::from_millis(500));
2190    }
2191
2192    #[test]
2193    fn weak_sim_world_lifecycle() {
2194        let sim = SimWorld::new();
2195        let weak = sim.downgrade();
2196
2197        // Can upgrade and use weak reference
2198        assert_eq!(
2199            weak.current_time().expect("should get time"),
2200            Duration::ZERO
2201        );
2202
2203        // Schedule event through weak reference
2204        weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
2205            .expect("should schedule event");
2206
2207        // Verify event was scheduled
2208        assert!(sim.has_pending_events());
2209
2210        // Drop the original simulation
2211        drop(sim);
2212
2213        // Weak reference should now fail
2214        assert_eq!(
2215            weak.current_time(),
2216            Err(SimulationError::SimulationShutdown)
2217        );
2218        assert_eq!(
2219            weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
2220            Err(SimulationError::SimulationShutdown)
2221        );
2222    }
2223
2224    #[test]
2225    fn deterministic_event_ordering() {
2226        let mut sim = SimWorld::new();
2227
2228        // Schedule events at the same time
2229        sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
2230        sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
2231        sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
2232
2233        // All events are at the same time, but should be processed in sequence order
2234        assert!(sim.step());
2235        assert_eq!(sim.current_time(), Duration::from_millis(100));
2236        assert!(sim.step());
2237        assert_eq!(sim.current_time(), Duration::from_millis(100));
2238        assert!(!sim.step());
2239        assert_eq!(sim.current_time(), Duration::from_millis(100));
2240    }
2241}