1use std::{cell::RefCell, rc::Rc, time::Duration};
2
3use fxhash::{FxHashMap, FxHashSet};
4use rand::Rng;
5use rand_chacha::ChaCha8Rng;
6use rand_core::SeedableRng;
7use serde::{Deserialize, Serialize};
8use simulon::{api, simulation::SimulationBuilder};
9
10#[derive(Serialize, Deserialize, Debug)]
11enum Message {
12 Advr(usize),
13 Want(usize),
14 Payload(usize, Vec<u8>),
15}
16
17#[derive(Default)]
18struct NodeState {
19 conns: FxHashMap<api::RemoteAddr, BroadcastConnection>,
20 messages: FxHashMap<usize, Vec<u8>>,
21}
22
23struct BroadcastConnection {
24 writer: api::OwnedWriter,
25 seen: FxHashSet<usize>,
26}
27
28impl NodeState {
29 fn handle_message_internal(&mut self, id: usize, payload: Vec<u8>) {
30 if self.messages.insert(id, payload.clone()).is_none() {
31 api::emit(String::from_utf8(payload).unwrap());
32 }
33
34 for (_addr, conn) in self.conns.iter_mut() {
35 if conn.seen.contains(&id) {
36 continue;
37 }
38
39 conn.writer.write(&Message::Advr(id));
40 }
41 }
42
43 pub fn handle_message_from_client(&mut self, id: usize, payload: Vec<u8>) {
44 self.handle_message_internal(id, payload);
45 }
46
47 pub fn handle_message(&mut self, sender: api::RemoteAddr, id: usize, payload: Vec<u8>) {
48 let conn = self.conns.get_mut(&sender).unwrap();
49 conn.seen.insert(id);
50
51 self.handle_message_internal(id, payload);
52 }
53
54 pub fn handle_advr(&mut self, sender: api::RemoteAddr, id: usize) {
55 let conn = self.conns.get_mut(&sender).unwrap();
56
57 conn.seen.insert(id);
58
59 if self.messages.contains_key(&id) {
61 return;
62 }
63
64 conn.writer.write(&Message::Want(id));
65 }
66
67 pub fn handle_want(&mut self, sender: api::RemoteAddr, id: usize) {
68 if let Some(payload) = self.messages.get(&id) {
69 let conn = self.conns.get_mut(&sender).unwrap();
70 conn.seen.insert(id);
71 conn.writer.write(&Message::Payload(id, payload.clone()));
72 }
73 }
74}
75
76impl From<api::OwnedWriter> for BroadcastConnection {
77 fn from(writer: api::OwnedWriter) -> Self {
78 Self {
79 writer,
80 seen: FxHashSet::default(),
81 }
82 }
83}
84
85type NodeStateRef = Rc<RefCell<NodeState>>;
86
87async fn run_node(n: usize) {
89 let state = Rc::new(RefCell::new(NodeState::default()));
90
91 api::spawn(listen_for_connections(n, state.clone()));
93
94 api::spawn(make_connections(n, state));
96}
97
98async fn listen_for_connections(n: usize, state: NodeStateRef) {
99 let mut listener = api::listen(80);
100 while let Some(conn) = listener.accept().await {
101 if n == *conn.remote() {
102 api::spawn(handle_client_connection(state.clone(), conn));
103 } else {
104 api::spawn(handle_connection(state.clone(), conn));
105 }
106 }
107}
108
109async fn make_connections(n: usize, state: NodeStateRef) {
110 let index = *api::RemoteAddr::whoami();
111 let connect_to = (index + 1) % n;
112 let addr = api::RemoteAddr::from_global_index(connect_to);
113 let conn = api::connect(addr, 80).await.expect("Could not connect.");
114 handle_connection(state, conn).await;
115}
116
117async fn handle_client_connection(state: NodeStateRef, conn: api::Connection) {
119 let (mut reader, _writer) = conn.split();
120 let msg = reader.recv::<Message>().await.unwrap();
121 if let Message::Payload(id, payload) = msg {
122 state.borrow_mut().handle_message_from_client(id, payload);
123 } else {
124 panic!("unexpected.");
125 }
126}
127
128async fn handle_connection(state: NodeStateRef, conn: api::Connection) {
130 let remote = conn.remote();
131 let (mut reader, writer) = conn.split();
132
133 let b_conn: BroadcastConnection = writer.into();
135 state.borrow_mut().conns.insert(remote, b_conn);
136
137 while let Some(msg) = reader.recv::<Message>().await {
138 match msg {
139 Message::Payload(id, payload) => {
140 state.borrow_mut().handle_message(remote, id, payload);
141 },
142 Message::Advr(id) => {
143 state.borrow_mut().handle_advr(remote, id);
144 },
145 Message::Want(id) => {
146 state.borrow_mut().handle_want(remote, id);
147 },
148 }
149 }
150}
151
152async fn run_client(n: usize) {
155 let mut rng = ChaCha8Rng::from_seed([0; 32]);
156
157 for i in 0.. {
158 let index = rng.gen_range(0..n);
159 let addr = api::RemoteAddr::from_global_index(index);
160
161 let mut conn = api::connect(addr, 80).await.expect("Connection failed.");
162
163 let msg = format!("message {i}");
164 conn.write(&Message::Payload(i, msg.into()));
165
166 api::sleep(Duration::from_secs(5)).await;
167 }
168}
169
170fn exec(n: usize) {
171 if n == *api::RemoteAddr::whoami() {
172 api::spawn(run_client(n));
173 } else {
174 api::spawn(run_node(n));
175 }
176}
177
178pub fn main() {
179 const N: usize = 1500;
180
181 let time = std::time::Instant::now();
182 SimulationBuilder::new(|| exec(N))
183 .with_nodes(N + 1)
184 .set_node_metrics_rate(Duration::ZERO)
189 .enable_progress_bar()
190 .run(Duration::from_secs(120));
191 println!("Took {} ms", time.elapsed().as_millis());
192}