1use std::{cell::RefCell, rc::Rc, time::Duration};
2
3use fxhash::{FxHashMap, FxHashSet};
4use rand::Rng;
5use rand_chacha::ChaCha8Rng;
6use rand_core::SeedableRng;
7use serde::{Deserialize, Serialize};
8use simulon::{api, simulation::SimulationBuilder};
9
10#[derive(Serialize, Deserialize, Debug)]
11enum Message {
12    Advr(usize),
13    Want(usize),
14    Payload(usize, Vec<u8>),
15}
16
17#[derive(Default)]
18struct NodeState {
19    conns: FxHashMap<api::RemoteAddr, BroadcastConnection>,
20    messages: FxHashMap<usize, Vec<u8>>,
21}
22
23struct BroadcastConnection {
24    writer: api::OwnedWriter,
25    seen: FxHashSet<usize>,
26}
27
28impl NodeState {
29    fn handle_message_internal(&mut self, id: usize, payload: Vec<u8>) {
30        if self.messages.insert(id, payload.clone()).is_none() {
31            api::emit(String::from_utf8(payload).unwrap());
32        }
33
34        for (_addr, conn) in self.conns.iter_mut() {
35            if conn.seen.contains(&id) {
36                continue;
37            }
38
39            conn.writer.write(&Message::Advr(id));
40        }
41    }
42
43    pub fn handle_message_from_client(&mut self, id: usize, payload: Vec<u8>) {
44        self.handle_message_internal(id, payload);
45    }
46
47    pub fn handle_message(&mut self, sender: api::RemoteAddr, id: usize, payload: Vec<u8>) {
48        let conn = self.conns.get_mut(&sender).unwrap();
49        conn.seen.insert(id);
50
51        self.handle_message_internal(id, payload);
52    }
53
54    pub fn handle_advr(&mut self, sender: api::RemoteAddr, id: usize) {
55        let conn = self.conns.get_mut(&sender).unwrap();
56
57        conn.seen.insert(id);
58
59        if self.messages.contains_key(&id) {
61            return;
62        }
63
64        conn.writer.write(&Message::Want(id));
65    }
66
67    pub fn handle_want(&mut self, sender: api::RemoteAddr, id: usize) {
68        if let Some(payload) = self.messages.get(&id) {
69            let conn = self.conns.get_mut(&sender).unwrap();
70            conn.seen.insert(id);
71            conn.writer.write(&Message::Payload(id, payload.clone()));
72        }
73    }
74}
75
76impl From<api::OwnedWriter> for BroadcastConnection {
77    fn from(writer: api::OwnedWriter) -> Self {
78        Self {
79            writer,
80            seen: FxHashSet::default(),
81        }
82    }
83}
84
85type NodeStateRef = Rc<RefCell<NodeState>>;
86
87async fn run_node(n: usize) {
89    let state = Rc::new(RefCell::new(NodeState::default()));
90
91    api::spawn(listen_for_connections(n, state.clone()));
93
94    api::spawn(make_connections(n, state));
96}
97
98async fn listen_for_connections(n: usize, state: NodeStateRef) {
99    let mut listener = api::listen(80);
100    while let Some(conn) = listener.accept().await {
101        if n == *conn.remote() {
102            api::spawn(handle_client_connection(state.clone(), conn));
103        } else {
104            api::spawn(handle_connection(state.clone(), conn));
105        }
106    }
107}
108
109async fn make_connections(n: usize, state: NodeStateRef) {
110    let index = *api::RemoteAddr::whoami();
111    let connect_to = (index + 1) % n;
112    let addr = api::RemoteAddr::from_global_index(connect_to);
113    let conn = api::connect(addr, 80).await.expect("Could not connect.");
114    handle_connection(state, conn).await;
115}
116
117async fn handle_client_connection(state: NodeStateRef, conn: api::Connection) {
119    let (mut reader, _writer) = conn.split();
120    let msg = reader.recv::<Message>().await.unwrap();
121    if let Message::Payload(id, payload) = msg {
122        state.borrow_mut().handle_message_from_client(id, payload);
123    } else {
124        panic!("unexpected.");
125    }
126}
127
128async fn handle_connection(state: NodeStateRef, conn: api::Connection) {
130    let remote = conn.remote();
131    let (mut reader, writer) = conn.split();
132
133    let b_conn: BroadcastConnection = writer.into();
135    state.borrow_mut().conns.insert(remote, b_conn);
136
137    while let Some(msg) = reader.recv::<Message>().await {
138        match msg {
139            Message::Payload(id, payload) => {
140                state.borrow_mut().handle_message(remote, id, payload);
141            },
142            Message::Advr(id) => {
143                state.borrow_mut().handle_advr(remote, id);
144            },
145            Message::Want(id) => {
146                state.borrow_mut().handle_want(remote, id);
147            },
148        }
149    }
150}
151
152async fn run_client(n: usize) {
155    let mut rng = ChaCha8Rng::from_seed([0; 32]);
156
157    for i in 0.. {
158        let index = rng.gen_range(0..n);
159        let addr = api::RemoteAddr::from_global_index(index);
160
161        let mut conn = api::connect(addr, 80).await.expect("Connection failed.");
162
163        let msg = format!("message {i}");
164        conn.write(&Message::Payload(i, msg.into()));
165
166        api::sleep(Duration::from_secs(5)).await;
167    }
168}
169
170fn exec(n: usize) {
171    if n == *api::RemoteAddr::whoami() {
172        api::spawn(run_client(n));
173    } else {
174        api::spawn(run_node(n));
175    }
176}
177
178pub fn main() {
179    const N: usize = 1500;
180
181    let time = std::time::Instant::now();
182    SimulationBuilder::new(|| exec(N))
183        .with_nodes(N + 1)
184        .set_node_metrics_rate(Duration::ZERO)
189        .enable_progress_bar()
190        .run(Duration::from_secs(120));
191    println!("Took {} ms", time.elapsed().as_millis());
192}