battleware_node/aggregator/
ingress.rs1use battleware_execution::state_transition::StateTransitionResult;
2use battleware_types::execution::{Output, Value};
3use bytes::Bytes;
4use commonware_consensus::{
5 aggregation::types::{Activity, Certificate, Index},
6 threshold_simplex::types::View,
7 Automaton, Reporter,
8};
9use commonware_cryptography::{bls12381::primitives::variant::MinSig, sha256::Digest};
10use commonware_resolver::{p2p::Producer, Consumer};
11use commonware_storage::{
12 mmr::verification::Proof,
13 store::operation::{Keyless, Variable},
14};
15use commonware_utils::sequence::U64;
16use futures::{
17 channel::{mpsc, oneshot},
18 SinkExt,
19};
20
21pub enum Message {
22 Executed {
23 view: View,
24 height: u64,
25 commitment: Digest,
26 result: StateTransitionResult,
27 state_proof: Proof<Digest>,
28 state_proof_ops: Vec<Variable<Digest, Value>>,
29 events_proof: Proof<Digest>,
30 events_proof_ops: Vec<Keyless<Output>>,
31 response: oneshot::Sender<()>,
32 },
33 Genesis {
34 response: oneshot::Sender<Digest>,
35 },
36 Propose {
37 index: Index,
38 response: oneshot::Sender<Digest>,
39 },
40 Verify {
41 index: Index,
42 payload: Digest,
43 response: oneshot::Sender<bool>,
44 },
45 Tip {
46 index: Index,
47 },
48 Certified {
49 certificate: Certificate<MinSig, Digest>,
50 },
51 Deliver {
52 index: Index,
53 certificate: Bytes,
54 response: oneshot::Sender<bool>,
55 },
56 Produce {
57 index: Index,
58 response: oneshot::Sender<Bytes>,
59 },
60 Uploaded {
61 index: Index,
62 },
63}
64
65#[derive(Clone)]
66pub struct Mailbox {
67 sender: mpsc::Sender<Message>,
68}
69
70impl Mailbox {
71 pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
72 Self { sender }
73 }
74
75 pub(super) async fn uploaded(&mut self, index: Index) {
76 self.sender
77 .send(Message::Uploaded { index })
78 .await
79 .expect("failed to send uploaded");
80 }
81
82 #[allow(clippy::too_many_arguments)]
83 pub async fn executed(
84 &mut self,
85 view: View,
86 height: u64,
87 commitment: Digest,
88 result: StateTransitionResult,
89 state_proof: Proof<Digest>,
90 state_proof_ops: Vec<Variable<Digest, Value>>,
91 events_proof: Proof<Digest>,
92 events_proof_ops: Vec<Keyless<Output>>,
93 response: oneshot::Sender<()>,
94 ) {
95 self.sender
96 .send(Message::Executed {
97 view,
98 height,
99 commitment,
100 result,
101 state_proof,
102 state_proof_ops,
103 events_proof,
104 events_proof_ops,
105 response,
106 })
107 .await
108 .expect("failed to send executed");
109 }
110}
111
112impl Automaton for Mailbox {
113 type Digest = Digest;
114 type Context = Index;
115
116 async fn genesis(&mut self) -> Self::Digest {
117 let (response, receiver) = oneshot::channel();
118 self.sender
119 .send(Message::Genesis { response })
120 .await
121 .expect("Failed to send aggregation genesis");
122 receiver
123 .await
124 .expect("Failed to receive aggregation genesis")
125 }
126
127 async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
128 let (response, receiver) = oneshot::channel();
129 self.sender
130 .send(Message::Propose {
131 index: context,
132 response,
133 })
134 .await
135 .expect("Failed to send aggregation propose");
136 receiver
137 }
138
139 async fn verify(
140 &mut self,
141 context: Self::Context,
142 payload: Self::Digest,
143 ) -> oneshot::Receiver<bool> {
144 let (response, receiver) = oneshot::channel();
145 self.sender
146 .send(Message::Verify {
147 index: context,
148 payload,
149 response,
150 })
151 .await
152 .expect("Failed to send aggregation verify");
153 receiver
154 }
155}
156
157impl Reporter for Mailbox {
158 type Activity = Activity<MinSig, Digest>;
159
160 async fn report(&mut self, activity: Self::Activity) {
161 match activity {
162 Activity::Certified(certificate) => {
163 self.sender
164 .send(Message::Certified { certificate })
165 .await
166 .expect("Failed to send aggregation certified");
167 }
168 Activity::Tip(index) => {
169 self.sender
170 .send(Message::Tip { index })
171 .await
172 .expect("Failed to send aggregation tip");
173 }
174 _ => {}
175 }
176 }
177}
178
179impl Consumer for Mailbox {
180 type Key = U64;
181 type Value = Bytes;
182 type Failure = ();
183
184 async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
185 let (sender, receiver) = oneshot::channel();
186 self.sender
187 .send(Message::Deliver {
188 index: key.into(),
189 certificate: value,
190 response: sender,
191 })
192 .await
193 .expect("failed to send deliver");
194 receiver.await.unwrap_or(true) }
196
197 async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
198 }
200}
201
202impl Producer for Mailbox {
203 type Key = U64;
204
205 async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
206 let (sender, receiver) = oneshot::channel();
207 self.sender
208 .send(Message::Produce {
209 index: key.into(),
210 response: sender,
211 })
212 .await
213 .expect("failed to send produce");
214 receiver
215 }
216}