Skip to main content

bones_sim/
network.rs

1use std::collections::BTreeSet;
2
3use serde::{Deserialize, Serialize};
4
5use crate::agent::AgentId;
6use crate::rng::DeterministicRng;
7
8/// Fault injection configuration for simulated network delivery.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10pub struct FaultConfig {
11    /// Maximum delivery delay in rounds.
12    pub max_delay_rounds: u8,
13    /// Percentage of sends dropped.
14    pub drop_rate_percent: u8,
15    /// Percentage of sends duplicated.
16    pub duplicate_rate_percent: u8,
17    /// Percentage chance of reordering ready messages at each tick.
18    pub reorder_rate_percent: u8,
19    /// Percentage chance per round to toggle a random network partition.
20    pub partition_rate_percent: u8,
21    /// Percentage chance per round to freeze a random clock.
22    pub freeze_rate_percent: u8,
23    /// Number of rounds to keep a frozen clock frozen.
24    pub freeze_duration_rounds: u8,
25}
26
27impl Default for FaultConfig {
28    fn default() -> Self {
29        Self {
30            max_delay_rounds: 2,
31            drop_rate_percent: 5,
32            duplicate_rate_percent: 3,
33            reorder_rate_percent: 5,
34            partition_rate_percent: 2,
35            freeze_rate_percent: 2,
36            freeze_duration_rounds: 2,
37        }
38    }
39}
40
41/// Message carried by the simulated network.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43pub struct NetworkMessage {
44    /// Sender.
45    pub from: AgentId,
46    /// Receiver.
47    pub to: AgentId,
48    /// Event identifier being replicated.
49    pub event_id: u64,
50    /// Sender-local sequence.
51    pub seq: u64,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55struct PendingMessage {
56    deliver_at_round: u64,
57    message: NetworkMessage,
58}
59
60/// Result of a send attempt.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub struct SendOutcome {
63    /// Message dropped before enqueue.
64    pub dropped: bool,
65    /// Message was duplicated.
66    pub duplicated: bool,
67    /// Delay assigned for primary enqueue.
68    pub delay_rounds: u8,
69}
70
71/// Result of delivering all ready messages for a round.
72#[derive(Debug, Clone, PartialEq, Eq)]
73pub struct DeliverOutcome {
74    /// Messages delivered this tick.
75    pub delivered: Vec<NetworkMessage>,
76    /// Whether delivery order was shuffled.
77    pub reordered: bool,
78}
79
80/// Deterministic fault-injecting network model.
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct SimulatedNetwork {
83    pending: Vec<PendingMessage>,
84    partitioned_agents: BTreeSet<AgentId>,
85    fault: FaultConfig,
86}
87
88impl SimulatedNetwork {
89    /// Create a new network with fault injection.
90    #[must_use]
91    pub const fn new(fault: FaultConfig) -> Self {
92        Self {
93            pending: Vec::new(),
94            partitioned_agents: BTreeSet::new(),
95            fault,
96        }
97    }
98
99    /// Return configured fault options.
100    #[must_use]
101    pub const fn fault_config(&self) -> FaultConfig {
102        self.fault
103    }
104
105    /// Isolate or reconnect an agent from the network.
106    pub fn set_partitioned(&mut self, agent: AgentId, isolated: bool) {
107        if isolated {
108            self.partitioned_agents.insert(agent);
109        } else {
110            self.partitioned_agents.remove(&agent);
111        }
112    }
113
114    /// Test whether an agent is currently partitioned.
115    #[must_use]
116    pub fn is_partitioned(&self, agent: AgentId) -> bool {
117        self.partitioned_agents.contains(&agent)
118    }
119
120    /// Number of queued in-flight messages.
121    #[must_use]
122    pub const fn pending_len(&self) -> usize {
123        self.pending.len()
124    }
125
126    /// Send a message with fault injection.
127    #[must_use]
128    pub fn send(
129        &mut self,
130        message: NetworkMessage,
131        round: u64,
132        rng: &mut DeterministicRng,
133    ) -> SendOutcome {
134        if self.is_partitioned(message.from) || self.is_partitioned(message.to) {
135            return SendOutcome {
136                dropped: true,
137                duplicated: false,
138                delay_rounds: 0,
139            };
140        }
141
142        if rng.hit_rate_percent(self.fault.drop_rate_percent) {
143            return SendOutcome {
144                dropped: true,
145                duplicated: false,
146                delay_rounds: 0,
147            };
148        }
149
150        let delay_bound = u64::from(self.fault.max_delay_rounds).saturating_add(1);
151        let primary_delay_u64 = rng.next_bounded(delay_bound);
152        let primary_delay = u8::try_from(primary_delay_u64).unwrap_or(self.fault.max_delay_rounds);
153
154        self.pending.push(PendingMessage {
155            deliver_at_round: round.saturating_add(u64::from(primary_delay)),
156            message,
157        });
158
159        let duplicated = rng.hit_rate_percent(self.fault.duplicate_rate_percent);
160        if duplicated {
161            self.pending.push(PendingMessage {
162                deliver_at_round: round.saturating_add(u64::from(primary_delay)),
163                message,
164            });
165        }
166
167        SendOutcome {
168            dropped: false,
169            duplicated,
170            delay_rounds: primary_delay,
171        }
172    }
173
174    /// Deliver all messages whose delivery round has arrived.
175    #[must_use]
176    pub fn deliver_ready(&mut self, round: u64, rng: &mut DeterministicRng) -> DeliverOutcome {
177        let mut ready = Vec::new();
178        let mut future = Vec::new();
179
180        for pending in self.pending.drain(..) {
181            if pending.deliver_at_round <= round {
182                ready.push(pending.message);
183            } else {
184                future.push(pending);
185            }
186        }
187
188        self.pending = future;
189
190        let should_reorder =
191            ready.len() > 1 && rng.hit_rate_percent(self.fault.reorder_rate_percent);
192        if should_reorder {
193            ready.reverse();
194        }
195
196        DeliverOutcome {
197            delivered: ready,
198            reordered: should_reorder,
199        }
200    }
201}