commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Digest, Digestible};
4use futures::{
5 channel::{mpsc, oneshot},
6 SinkExt,
7};
8
9pub enum Message<D, M> {
11 Broadcast { message: M },
13
14 Get {
19 digest: D,
20 responder: oneshot::Sender<M>,
21 },
22}
23
24#[derive(Clone)]
26pub struct Mailbox<D: Digest, M: Digestible<D>> {
27 sender: mpsc::Sender<Message<D, M>>,
28}
29
30impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
31 pub(super) fn new(sender: mpsc::Sender<Message<D, M>>) -> Self {
32 Self { sender }
33 }
34}
35
36impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
37 pub async fn get(&mut self, digest: D) -> oneshot::Receiver<M> {
39 let (sender, receiver) = oneshot::channel();
40 self.sender
41 .send(Message::Get {
42 digest,
43 responder: sender,
44 })
45 .await
46 .expect("mailbox closed");
47 receiver
48 }
49}
50
51impl<D: Digest, M: Codec + Digestible<D>> Broadcaster for Mailbox<D, M> {
52 type Message = M;
53
54 async fn broadcast(&mut self, message: Self::Message) {
55 self.sender
56 .send(Message::Broadcast { message })
57 .await
58 .expect("mailbox closed");
59 }
60}