commonware_broadcast/buffered/
engine.rs1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use bytes::Bytes;
4use commonware_codec::{Codec, Config as CodecCfg};
5use commonware_cryptography::{Digest, Digestible};
6use commonware_macros::select;
7use commonware_p2p::{Receiver, Recipients, Sender};
8use commonware_runtime::{
9 telemetry::metrics::status::{CounterExt, Status},
10 Clock, Handle, Metrics, Spawner,
11};
12use commonware_utils::Array;
13use futures::{
14 channel::{mpsc, oneshot},
15 StreamExt,
16};
17use std::{
18 collections::{HashMap, VecDeque},
19 marker::PhantomData,
20};
21use tracing::{debug, error, trace, warn};
22
23pub struct Engine<
31 E: Clock + Spawner + Metrics,
32 P: Array,
33 D: Digest,
34 Cfg: CodecCfg,
35 M: Digestible<D> + Codec<Cfg>,
36 NetS: Sender<PublicKey = P>,
37 NetR: Receiver<PublicKey = P>,
38> {
39 context: E,
43 _phantom: PhantomData<(NetS, NetR)>,
44
45 public_key: P,
50
51 priority: bool,
53
54 deque_size: usize,
56
57 decode_config: Cfg,
59
60 mailbox_receiver: mpsc::Receiver<Message<D, M>>,
65
66 waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
68
69 items: HashMap<D, M>,
74
75 deques: HashMap<P, VecDeque<D>>,
81
82 counts: HashMap<D, usize>,
86
87 metrics: metrics::Metrics,
92}
93
94impl<
95 E: Clock + Spawner + Metrics,
96 P: Array,
97 D: Digest,
98 Cfg: CodecCfg,
99 M: Digestible<D> + Codec<Cfg>,
100 NetS: Sender<PublicKey = P>,
101 NetR: Receiver<PublicKey = P>,
102 > Engine<E, P, D, Cfg, M, NetS, NetR>
103{
104 pub fn new(context: E, cfg: Config<Cfg, P>) -> (Self, Mailbox<D, M>) {
107 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
108 let mailbox = Mailbox::<D, M>::new(mailbox_sender);
109 let metrics = metrics::Metrics::init(context.clone());
110
111 let result = Self {
112 context,
113 _phantom: PhantomData,
114 public_key: cfg.public_key,
115 priority: cfg.priority,
116 deque_size: cfg.deque_size,
117 decode_config: cfg.decode_config,
118 mailbox_receiver,
119 waiters: HashMap::new(),
120 deques: HashMap::new(),
121 items: HashMap::new(),
122 counts: HashMap::new(),
123 metrics,
124 };
125
126 (result, mailbox)
127 }
128
129 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
131 self.context.spawn_ref()(self.run(network))
132 }
133
134 async fn run(mut self, network: (NetS, NetR)) {
136 let (mut net_sender, mut net_receiver) = network;
137 let mut shutdown = self.context.stopped();
138
139 loop {
140 self.cleanup_waiters();
142 self.metrics.waiters.set(self.waiters.len() as i64);
143
144 select! {
145 _ = &mut shutdown => {
147 debug!("shutdown");
148 },
149
150 mail = self.mailbox_receiver.next() => {
152 let Some(msg) = mail else {
153 error!("mailbox receiver failed");
154 break;
155 };
156 match msg {
157 Message::Broadcast{ message } => {
158 trace!("mailbox: broadcast");
159 self.handle_broadcast(&mut net_sender, message).await;
160 }
161 Message::Get{ digest, responder } => {
162 trace!("mailbox: get");
163 self.handle_get(digest, responder).await;
164 }
165 }
166 },
167
168 msg = net_receiver.recv() => {
170 let (peer, msg) = match msg {
172 Ok(r) => r,
173 Err(err) => {
174 error!(?err, "receiver failed");
175 break;
176 }
177 };
178
179 let message = match M::decode_cfg(msg, &self.decode_config) {
181 Ok(message) => message,
182 Err(err) => {
183 warn!(?err, ?peer, "failed to decode message");
184 self.metrics.receive.inc(Status::Invalid);
185 continue;
186 }
187 };
188
189 trace!(?peer, "network");
190 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
191 self.handle_network(peer, message).await;
192 },
193 }
194 }
195 }
196
197 async fn handle_broadcast(&mut self, net_sender: &mut NetS, msg: M) {
203 let _ = self.insert_message(self.public_key.clone(), msg.clone());
205
206 let recipients = Recipients::All;
208 let msg = Bytes::from(msg.encode());
209 if let Err(err) = net_sender.send(recipients, msg, self.priority).await {
210 warn!(?err, "failed to send message");
211 }
212 }
213
214 async fn handle_get(&mut self, digest: D, responder: oneshot::Sender<M>) {
219 if let Some(msg) = self.items.get(&digest) {
221 self.respond(responder, msg.clone());
222 return;
223 }
224
225 self.waiters.entry(digest).or_default().push(responder);
227 }
228
229 async fn handle_network(&mut self, peer: P, msg: M) {
231 if !self.insert_message(peer.clone(), msg) {
232 debug!(?peer, "message already stored");
233 self.metrics.receive.inc(Status::Dropped);
234 return;
235 }
236
237 self.metrics.receive.inc(Status::Success);
238 }
239
240 fn insert_message(&mut self, peer: P, msg: M) -> bool {
249 let digest = msg.digest();
250
251 if let Some(responders) = self.waiters.remove(&digest) {
253 for responder in responders {
254 self.respond(responder, msg.clone());
255 }
256 }
257
258 let deque = self
260 .deques
261 .entry(peer)
262 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
263
264 if let Some(i) = deque.iter().position(|d| *d == digest) {
266 if i != 0 {
267 deque.remove(i).unwrap(); deque.push_front(digest);
269 }
270 return false;
271 };
272
273 deque.push_front(digest);
277 let count = self
278 .counts
279 .entry(digest)
280 .and_modify(|c| *c = c.checked_add(1).unwrap())
281 .or_insert(1);
282 if *count == 1 {
283 let existing = self.items.insert(digest, msg);
284 assert!(existing.is_none());
285 }
286
287 if deque.len() > self.deque_size {
289 let stale = deque.pop_back().unwrap();
293 let count = self
294 .counts
295 .entry(stale)
296 .and_modify(|c| *c = c.checked_sub(1).unwrap())
297 .or_insert_with(|| unreachable!());
298 if *count == 0 {
299 let existing = self.counts.remove(&stale);
300 assert!(existing == Some(0));
301 self.items.remove(&stale).unwrap(); }
303 }
304
305 true
306 }
307
308 fn cleanup_waiters(&mut self) {
314 self.waiters.retain(|_, waiters| {
315 let initial_len = waiters.len();
316 waiters.retain(|waiter| !waiter.is_canceled());
317 let dropped_count = initial_len - waiters.len();
318
319 for _ in 0..dropped_count {
321 self.metrics.get.inc(Status::Dropped);
322 }
323
324 !waiters.is_empty()
325 });
326 }
327
328 fn respond(&mut self, responder: oneshot::Sender<M>, msg: M) {
331 let result = responder.send(msg);
332 self.metrics.get.inc(match result {
333 Ok(_) => Status::Success,
334 Err(_) => Status::Dropped,
335 });
336 }
337}