battleware_node/seeder/
ingress.rs1use battleware_types::{Activity, Seed};
2use bytes::Bytes;
3use commonware_consensus::{
4 threshold_simplex::types::{Seedable, View},
5 Reporter,
6};
7use commonware_resolver::{p2p::Producer, Consumer};
8use commonware_utils::sequence::U64;
9use futures::{
10 channel::{mpsc, oneshot},
11 SinkExt,
12};
13
14pub enum Message {
15 Put(Seed),
16 Get {
17 view: View,
18 response: oneshot::Sender<Seed>,
19 },
20 Deliver {
21 view: View,
22 signature: Bytes,
23 response: oneshot::Sender<bool>,
24 },
25 Produce {
26 view: View,
27 response: oneshot::Sender<Bytes>,
28 },
29 Uploaded {
30 view: View,
31 },
32}
33
34#[derive(Clone)]
35pub struct Mailbox {
36 sender: mpsc::Sender<Message>,
37}
38
39impl Mailbox {
40 pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
41 Self { sender }
42 }
43
44 pub async fn put(&mut self, seed: Seed) {
45 self.sender
46 .send(Message::Put(seed))
47 .await
48 .expect("failed to send put");
49 }
50
51 pub async fn get(&mut self, view: View) -> Seed {
52 let (sender, receiver) = oneshot::channel();
53 self.sender
54 .send(Message::Get {
55 view,
56 response: sender,
57 })
58 .await
59 .expect("failed to send get");
60 receiver.await.expect("failed to receive get")
61 }
62
63 pub async fn uploaded(&mut self, view: View) {
64 self.sender
65 .send(Message::Uploaded { view })
66 .await
67 .expect("failed to send uploaded");
68 }
69}
70
71impl Consumer for Mailbox {
72 type Key = U64;
73 type Value = Bytes;
74 type Failure = ();
75
76 async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
77 let (sender, receiver) = oneshot::channel();
78 self.sender
79 .send(Message::Deliver {
80 view: key.into(),
81 signature: value,
82 response: sender,
83 })
84 .await
85 .expect("failed to send deliver");
86 receiver.await.unwrap_or(true) }
88
89 async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
90 }
92}
93
94impl Producer for Mailbox {
95 type Key = U64;
96
97 async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
98 let (sender, receiver) = oneshot::channel();
99 self.sender
100 .send(Message::Produce {
101 view: key.into(),
102 response: sender,
103 })
104 .await
105 .expect("failed to send produce");
106 receiver
107 }
108}
109
110impl Reporter for Mailbox {
111 type Activity = Activity;
112
113 async fn report(&mut self, activity: Self::Activity) {
114 match activity {
115 Activity::Notarization(notarization) => {
116 self.put(notarization.seed()).await;
117 }
118 Activity::Nullification(nullification) => {
119 self.put(nullification.seed()).await;
120 }
121 Activity::Finalization(finalization) => {
122 self.put(finalization.seed()).await;
123 }
124 _ => {}
125 }
126 }
127}