1use super::{metrics, Config, Mailbox, Message};
2use commonware_actor::mailbox;
3use commonware_codec::Codec;
4use commonware_cryptography::{Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Provider, Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 spawn_cell,
12 telemetry::metrics::{status::Status, GaugeExt},
13 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::{
16 channel::{fallible::OneshotExt, oneshot},
17 ordered::Set,
18};
19use std::collections::{BTreeMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22struct Waiter<M> {
24 responder: oneshot::Sender<M>,
26}
27
28enum InsertMessageResult {
30 Inserted,
31 Duplicate,
32 Ineligible,
33}
34
35pub struct Engine<E, P, M, D>
43where
44 E: BufferPooler + Clock + Spawner + Metrics,
45 P: PublicKey,
46 M: Digestible + Codec,
47 D: Provider<PublicKey = P>,
48{
49 context: ContextCell<E>,
53
54 public_key: P,
59
60 priority: bool,
62
63 deque_size: usize,
65
66 codec_config: M::Cfg,
68
69 mailbox_receiver: mailbox::Receiver<Message<P, M>>,
74
75 waiters: BTreeMap<M::Digest, Vec<Waiter<M>>>,
77
78 peer_provider: D,
80
81 items: BTreeMap<M::Digest, M>,
86
87 deques: BTreeMap<P, VecDeque<M::Digest>>,
93
94 counts: BTreeMap<M::Digest, usize>,
99
100 latest_primary_peers: Set<P>,
102
103 metrics: metrics::Metrics<P>,
108}
109
110impl<E, P, M, D> Engine<E, P, M, D>
111where
112 E: BufferPooler + Clock + Spawner + Metrics,
113 P: PublicKey,
114 M: Digestible + Codec,
115 D: Provider<PublicKey = P>,
116{
117 pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
120 let (mailbox_sender, mailbox_receiver) =
121 mailbox::new(context.child("mailbox"), cfg.mailbox_size);
122 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
123
124 let metrics = metrics::Metrics::init(&context);
125
126 let result = Self {
127 context: ContextCell::new(context),
128 public_key: cfg.public_key,
129 priority: cfg.priority,
130 deque_size: cfg.deque_size,
131 codec_config: cfg.codec_config,
132 mailbox_receiver,
133 waiters: BTreeMap::new(),
134 deques: BTreeMap::new(),
135 items: BTreeMap::new(),
136 counts: BTreeMap::new(),
137 latest_primary_peers: Set::default(),
138 peer_provider: cfg.peer_provider,
139 metrics,
140 };
141
142 (result, mailbox)
143 }
144
145 pub fn start(
147 mut self,
148 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
149 ) -> Handle<()> {
150 spawn_cell!(self.context, self.run(network))
151 }
152
153 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
155 let (mut sender, mut receiver) = wrap(
156 self.codec_config.clone(),
157 self.context.network_buffer_pool().clone(),
158 network.0,
159 network.1,
160 );
161 let mut peer_set_subscription = self.peer_provider.subscribe().await;
162
163 select_loop! {
164 self.context,
165 on_start => {
166 self.cleanup_waiters();
168 let _ = self.metrics.waiters.try_set(self.waiters.len());
169 },
170 on_stopped => {
171 debug!("shutdown");
172 },
173 Some(update) = peer_set_subscription.recv() else {
175 debug!("peer set subscription closed");
176 break;
177 } => {
178 self.update_latest_primary_peers(update.latest.primary);
180 },
181 Some(msg) = self.mailbox_receiver.recv() else {
183 error!("mailbox receiver failed");
184 break;
185 } => match msg {
186 Message::Broadcast {
187 recipients,
188 message,
189 } => {
190 trace!("mailbox: broadcast");
191 self.handle_broadcast(&mut sender, recipients, message);
192 }
193 Message::Subscribe { digest, responder } => {
194 trace!("mailbox: subscribe");
195 self.handle_subscribe(digest, responder);
196 }
197 Message::Get { digest, responder } => {
198 trace!("mailbox: get");
199 self.handle_get(digest, responder);
200 }
201 },
202 msg = receiver.recv() => {
204 let (peer, msg) = match msg {
206 Ok(r) => r,
207 Err(err) => {
208 error!(?err, "receiver failed");
209 break;
210 }
211 };
212
213 let msg = match msg {
215 Ok(msg) => msg,
216 Err(err) => {
217 warn!(?err, ?peer, "failed to decode message");
218 self.metrics.receive.inc(Status::Invalid);
219 continue;
220 }
221 };
222
223 trace!(?peer, "network");
224 self.metrics.peer.get_or_create_by(&peer).inc();
225 self.handle_network(peer, msg);
226 },
227 }
228 }
229
230 fn handle_broadcast<Sr: Sender<PublicKey = P>>(
236 &mut self,
237 sender: &mut WrappedSender<Sr, M>,
238 recipients: Recipients<P>,
239 msg: M,
240 ) {
241 let digest = msg.digest();
243 let _ = self.insert_message(self.public_key.clone(), digest, msg.clone());
244
245 sender.send(recipients, msg, self.priority);
247 }
248
249 fn handle_subscribe(&mut self, digest: M::Digest, responder: oneshot::Sender<M>) {
254 if let Some(item) = self.items.get(&digest).cloned() {
256 self.respond_subscribe(responder, item);
257 return;
258 }
259
260 self.waiters
262 .entry(digest)
263 .or_default()
264 .push(Waiter { responder });
265 }
266
267 fn handle_get(&mut self, digest: M::Digest, responder: oneshot::Sender<Option<M>>) {
269 let item = self.items.get(&digest).cloned();
270 self.respond_get(responder, item);
271 }
272
273 fn handle_network(&mut self, peer: P, msg: M) {
275 let digest = msg.digest();
276 match self.insert_message(peer.clone(), digest, msg) {
277 InsertMessageResult::Inserted => {
278 self.metrics.receive.inc(Status::Success);
279 }
280 InsertMessageResult::Duplicate => {
281 debug!(?peer, "message already stored");
282 self.metrics.receive.inc(Status::Dropped);
283 }
284 InsertMessageResult::Ineligible => {
285 debug!(?peer, "message from peer outside latest.primary not cached");
286 self.metrics.receive.inc(Status::Dropped);
287 }
288 }
289 }
290
291 fn insert_message(&mut self, peer: P, digest: M::Digest, msg: M) -> InsertMessageResult {
300 if let Some(waiters) = self.waiters.remove(&digest) {
302 for waiter in waiters {
303 self.respond_subscribe(waiter.responder, msg.clone());
304 }
305 }
306
307 if self.latest_primary_peers.position(&peer).is_none() {
309 return InsertMessageResult::Ineligible;
310 }
311
312 let deque = self
314 .deques
315 .entry(peer)
316 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
317
318 if let Some(i) = deque.iter().position(|d| *d == digest) {
320 if i != 0 {
321 let v = deque.remove(i).unwrap(); deque.push_front(v);
323 }
324 return InsertMessageResult::Duplicate;
325 };
326
327 deque.push_front(digest);
331 let count = self
332 .counts
333 .entry(digest)
334 .and_modify(|c| *c = c.checked_add(1).unwrap())
335 .or_insert(1);
336 if *count == 1 {
337 let existing = self.items.insert(digest, msg);
338 assert!(existing.is_none());
339 }
340
341 if deque.len() > self.deque_size {
343 let stale = deque.pop_back().unwrap();
347 decrement_digest_refcount(&mut self.counts, &mut self.items, &stale);
348 }
349
350 InsertMessageResult::Inserted
351 }
352
353 fn update_latest_primary_peers(&mut self, peers: Set<P>) {
354 for (peer, deque) in self
355 .deques
356 .extract_if(.., |peer, _| peers.position(peer).is_none())
357 {
358 debug!(?peer, digests = deque.len(), "evicting disconnected peer");
359 for digest in deque {
360 decrement_digest_refcount(&mut self.counts, &mut self.items, &digest);
361 }
362 }
363 self.latest_primary_peers = peers;
364 }
365
366 fn cleanup_waiters(&mut self) {
372 self.waiters.retain(|_, waiters| {
373 let initial_len = waiters.len();
374 waiters.retain(|waiter| !waiter.responder.is_closed());
375 let dropped_count = initial_len - waiters.len();
376
377 for _ in 0..dropped_count {
379 self.metrics.get.inc(Status::Dropped);
380 }
381
382 !waiters.is_empty()
383 });
384 }
385
386 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
389 self.metrics.subscribe.inc(if responder.send_lossy(msg) {
390 Status::Success
391 } else {
392 Status::Dropped
393 });
394 }
395
396 fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
399 let found = msg.is_some();
400 self.metrics.get.inc(if responder.send_lossy(msg) {
401 if found {
402 Status::Success
403 } else {
404 Status::Failure
405 }
406 } else {
407 Status::Dropped
408 });
409 }
410}
411
412fn decrement_digest_refcount<D: Ord, M>(
414 counts: &mut BTreeMap<D, usize>,
415 items: &mut BTreeMap<D, M>,
416 digest: &D,
417) {
418 let should_remove = {
419 let count = counts.get_mut(digest).expect("count must exist");
420 *count = count.checked_sub(1).expect("count must be > 0");
421 *count == 0
422 };
423 if should_remove {
424 let existing = counts.remove(digest);
425 assert!(existing == Some(0));
426 items.remove(digest);
427 }
428}