1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Provider, Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 spawn_cell,
12 telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::{
16 channel::{fallible::OneshotExt, mpsc, oneshot},
17 ordered::Set,
18};
19use std::collections::{BTreeMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22struct Waiter<M> {
24 responder: oneshot::Sender<M>,
26}
27
28pub struct Engine<E, P, M, D>
36where
37 E: BufferPooler + Clock + Spawner + Metrics,
38 P: PublicKey,
39 M: Digestible + Codec,
40 D: Provider<PublicKey = P>,
41{
42 context: ContextCell<E>,
46
47 public_key: P,
52
53 priority: bool,
55
56 deque_size: usize,
58
59 codec_config: M::Cfg,
61
62 mailbox_receiver: mpsc::Receiver<Message<P, M>>,
67
68 waiters: BTreeMap<M::Digest, Vec<Waiter<M>>>,
70
71 peer_provider: D,
73
74 items: BTreeMap<M::Digest, M>,
79
80 deques: BTreeMap<P, VecDeque<M::Digest>>,
86
87 counts: BTreeMap<M::Digest, usize>,
92
93 metrics: metrics::Metrics,
98}
99
100impl<E, P, M, D> Engine<E, P, M, D>
101where
102 E: BufferPooler + Clock + Spawner + Metrics,
103 P: PublicKey,
104 M: Digestible + Codec,
105 D: Provider<PublicKey = P>,
106{
107 pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
110 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
111 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
112
113 let metrics = metrics::Metrics::init(context.clone());
115
116 let result = Self {
117 context: ContextCell::new(context),
118 public_key: cfg.public_key,
119 priority: cfg.priority,
120 deque_size: cfg.deque_size,
121 codec_config: cfg.codec_config,
122 mailbox_receiver,
123 waiters: BTreeMap::new(),
124 deques: BTreeMap::new(),
125 items: BTreeMap::new(),
126 counts: BTreeMap::new(),
127 peer_provider: cfg.peer_provider,
128 metrics,
129 };
130
131 (result, mailbox)
132 }
133
134 pub fn start(
136 mut self,
137 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
138 ) -> Handle<()> {
139 spawn_cell!(self.context, self.run(network).await)
140 }
141
142 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
144 let (mut sender, mut receiver) = wrap(
145 self.codec_config.clone(),
146 self.context.network_buffer_pool().clone(),
147 network.0,
148 network.1,
149 );
150 let peer_set_subscription = &mut self.peer_provider.subscribe().await;
151
152 select_loop! {
153 self.context,
154 on_start => {
155 self.cleanup_waiters();
157 let _ = self.metrics.waiters.try_set(self.waiters.len());
158 },
159 on_stopped => {
160 debug!("shutdown");
161 },
162 Some(msg) = self.mailbox_receiver.recv() else {
164 error!("mailbox receiver failed");
165 break;
166 } => match msg {
167 Message::Broadcast {
168 recipients,
169 message,
170 responder,
171 } => {
172 trace!("mailbox: broadcast");
173 self.handle_broadcast(&mut sender, recipients, message, responder)
174 .await;
175 }
176 Message::Subscribe { digest, responder } => {
177 trace!("mailbox: subscribe");
178 self.handle_subscribe(digest, responder);
179 }
180 Message::Get { digest, responder } => {
181 trace!("mailbox: get");
182 self.handle_get(digest, responder);
183 }
184 },
185 msg = receiver.recv() => {
187 let (peer, msg) = match msg {
189 Ok(r) => r,
190 Err(err) => {
191 error!(?err, "receiver failed");
192 break;
193 }
194 };
195
196 let msg = match msg {
198 Ok(msg) => msg,
199 Err(err) => {
200 warn!(?err, ?peer, "failed to decode message");
201 self.metrics.receive.inc(Status::Invalid);
202 continue;
203 }
204 };
205
206 trace!(?peer, "network");
207 self.metrics
208 .peer
209 .get_or_create(&SequencerLabel::from(&peer))
210 .inc();
211 self.handle_network(peer, msg);
212 },
213 Some((_, _, tracked_peers)) = peer_set_subscription.recv() else {
214 debug!("peer set subscription closed");
215 break;
216 } => {
217 self.evict_untracked_peers(&tracked_peers);
218 },
219 }
220 }
221
222 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
228 &mut self,
229 sender: &mut WrappedSender<Sr, M>,
230 recipients: Recipients<P>,
231 msg: M,
232 responder: oneshot::Sender<Vec<P>>,
233 ) {
234 let _ = self.insert_message(self.public_key.clone(), msg.clone());
236
237 let sent_to = sender
239 .send(recipients, msg, self.priority)
240 .await
241 .unwrap_or_else(|err| {
242 error!(?err, "failed to send message");
243 vec![]
244 });
245 responder.send_lossy(sent_to);
246 }
247
248 fn handle_subscribe(&mut self, digest: M::Digest, responder: oneshot::Sender<M>) {
253 if let Some(item) = self.items.get(&digest).cloned() {
255 self.respond_subscribe(responder, item);
256 return;
257 }
258
259 self.waiters
261 .entry(digest)
262 .or_default()
263 .push(Waiter { responder });
264 }
265
266 fn handle_get(&mut self, digest: M::Digest, responder: oneshot::Sender<Option<M>>) {
268 let item = self.items.get(&digest).cloned();
269 self.respond_get(responder, item);
270 }
271
272 fn handle_network(&mut self, peer: P, msg: M) {
274 if !self.insert_message(peer.clone(), msg) {
275 debug!(?peer, "message already stored");
276 self.metrics.receive.inc(Status::Dropped);
277 return;
278 }
279
280 self.metrics.receive.inc(Status::Success);
281 }
282
283 fn insert_message(&mut self, peer: P, msg: M) -> bool {
292 let digest = msg.digest();
293
294 if let Some(waiters) = self.waiters.remove(&digest) {
296 for waiter in waiters {
297 self.respond_subscribe(waiter.responder, msg.clone());
298 }
299 }
300
301 let deque = self
303 .deques
304 .entry(peer)
305 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
306
307 if let Some(i) = deque.iter().position(|d| *d == digest) {
309 if i != 0 {
310 let v = deque.remove(i).unwrap(); deque.push_front(v);
312 }
313 return false;
314 };
315
316 deque.push_front(digest);
320 let count = self
321 .counts
322 .entry(digest)
323 .and_modify(|c| *c = c.checked_add(1).unwrap())
324 .or_insert(1);
325 if *count == 1 {
326 let existing = self.items.insert(digest, msg);
327 assert!(existing.is_none());
328 }
329
330 if deque.len() > self.deque_size {
332 let stale = deque.pop_back().unwrap();
336 decrement_digest_refcount(&mut self.counts, &mut self.items, &stale);
337 }
338
339 true
340 }
341
342 fn evict_untracked_peers(&mut self, tracked_peers: &Set<P>) {
343 let tracked = tracked_peers.as_ref();
344 for (peer, deque) in self
345 .deques
346 .extract_if(.., |peer, _| !tracked.contains(peer))
347 {
348 debug!(?peer, digests = deque.len(), "evicting disconnected peer");
349 for digest in deque {
350 decrement_digest_refcount(&mut self.counts, &mut self.items, &digest);
351 }
352 }
353 }
354
355 fn cleanup_waiters(&mut self) {
361 self.waiters.retain(|_, waiters| {
362 let initial_len = waiters.len();
363 waiters.retain(|waiter| !waiter.responder.is_closed());
364 let dropped_count = initial_len - waiters.len();
365
366 for _ in 0..dropped_count {
368 self.metrics.get.inc(Status::Dropped);
369 }
370
371 !waiters.is_empty()
372 });
373 }
374
375 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
378 self.metrics.subscribe.inc(if responder.send_lossy(msg) {
379 Status::Success
380 } else {
381 Status::Dropped
382 });
383 }
384
385 fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
388 let found = msg.is_some();
389 self.metrics.get.inc(if responder.send_lossy(msg) {
390 if found {
391 Status::Success
392 } else {
393 Status::Failure
394 }
395 } else {
396 Status::Dropped
397 });
398 }
399}
400
401fn decrement_digest_refcount<D: Ord, M>(
403 counts: &mut BTreeMap<D, usize>,
404 items: &mut BTreeMap<D, M>,
405 digest: &D,
406) {
407 let should_remove = {
408 let count = counts.get_mut(digest).expect("count must exist");
409 *count = count.checked_sub(1).expect("count must be > 0");
410 *count == 0
411 };
412 if should_remove {
413 let existing = counts.remove(digest);
414 assert!(existing == Some(0));
415 items.remove(digest);
416 }
417}