battleware_node/application/
ingress.rs1use battleware_types::{Block, Seed};
2use commonware_consensus::threshold_simplex::types::{Context, View};
3use commonware_consensus::{Automaton, Relay, Reporter};
4use commonware_cryptography::sha256::Digest;
5use commonware_runtime::{telemetry::metrics::histogram, Clock};
6use futures::{
7 channel::{mpsc, oneshot},
8 SinkExt,
9};
10
11pub enum Message<E: Clock> {
13 Genesis {
14 response: oneshot::Sender<Digest>,
15 },
16 Propose {
17 view: View,
18 parent: (View, Digest),
19 response: oneshot::Sender<Digest>,
20 },
21 Ancestry {
22 view: View,
23 blocks: Vec<Block>,
24 timer: histogram::Timer<E>,
25 response: oneshot::Sender<Digest>,
26 },
27 Broadcast {
28 payload: Digest,
29 },
30 Verify {
31 view: View,
32 parent: (View, Digest),
33 payload: Digest,
34 response: oneshot::Sender<bool>,
35 },
36 Finalized {
37 block: Block,
38 response: oneshot::Sender<()>,
39 },
40 Seeded {
41 block: Block,
42 seed: Seed,
43 timer: histogram::Timer<E>,
44 response: oneshot::Sender<()>,
45 },
46}
47
48#[derive(Clone)]
50pub struct Mailbox<E: Clock> {
51 sender: mpsc::Sender<Message<E>>,
52}
53
54impl<E: Clock> Mailbox<E> {
55 pub(super) fn new(sender: mpsc::Sender<Message<E>>) -> Self {
56 Self { sender }
57 }
58
59 pub(super) async fn ancestry(
60 &mut self,
61 view: View,
62 blocks: Vec<Block>,
63 timer: histogram::Timer<E>,
64 response: oneshot::Sender<Digest>,
65 ) {
66 self.sender
67 .send(Message::Ancestry {
68 view,
69 blocks,
70 timer,
71 response,
72 })
73 .await
74 .expect("Failed to send ancestry");
75 }
76
77 pub(super) async fn seeded(
78 &mut self,
79 block: Block,
80 seed: Seed,
81 timer: histogram::Timer<E>,
82 response: oneshot::Sender<()>,
83 ) {
84 self.sender
85 .send(Message::Seeded {
86 block,
87 seed,
88 timer,
89 response,
90 })
91 .await
92 .expect("Failed to send seeded");
93 }
94}
95
96impl<E: Clock> Automaton for Mailbox<E> {
97 type Digest = Digest;
98 type Context = Context<Self::Digest>;
99
100 async fn genesis(&mut self) -> Self::Digest {
101 let (response, receiver) = oneshot::channel();
102 self.sender
103 .send(Message::Genesis { response })
104 .await
105 .expect("Failed to send genesis");
106 receiver.await.expect("Failed to receive genesis")
107 }
108
109 async fn propose(&mut self, context: Context<Self::Digest>) -> oneshot::Receiver<Self::Digest> {
110 let (response, receiver) = oneshot::channel();
113 self.sender
114 .send(Message::Propose {
115 view: context.view,
116 parent: context.parent,
117 response,
118 })
119 .await
120 .expect("Failed to send propose");
121 receiver
122 }
123
124 async fn verify(
125 &mut self,
126 context: Context<Self::Digest>,
127 payload: Self::Digest,
128 ) -> oneshot::Receiver<bool> {
129 let (response, receiver) = oneshot::channel();
132 self.sender
133 .send(Message::Verify {
134 view: context.view,
135 parent: context.parent,
136 payload,
137 response,
138 })
139 .await
140 .expect("Failed to send verify");
141 receiver
142 }
143}
144
145impl<E: Clock> Relay for Mailbox<E> {
146 type Digest = Digest;
147
148 async fn broadcast(&mut self, digest: Self::Digest) {
149 self.sender
150 .send(Message::Broadcast { payload: digest })
151 .await
152 .expect("Failed to send broadcast");
153 }
154}
155
156impl<E: Clock> Reporter for Mailbox<E> {
157 type Activity = Block;
158
159 async fn report(&mut self, block: Self::Activity) {
160 let (response, receiver) = oneshot::channel();
161 self.sender
162 .send(Message::Finalized { block, response })
163 .await
164 .expect("Failed to send finalized");
165
166 let _ = receiver.await; }
169}