battleware_node/aggregator/
ingress.rs

1use battleware_execution::state_transition::StateTransitionResult;
2use battleware_types::execution::{Output, Value};
3use bytes::Bytes;
4use commonware_consensus::{
5    aggregation::types::{Activity, Certificate, Index},
6    threshold_simplex::types::View,
7    Automaton, Reporter,
8};
9use commonware_cryptography::{bls12381::primitives::variant::MinSig, sha256::Digest};
10use commonware_resolver::{p2p::Producer, Consumer};
11use commonware_storage::{
12    mmr::verification::Proof,
13    store::operation::{Keyless, Variable},
14};
15use commonware_utils::sequence::U64;
16use futures::{
17    channel::{mpsc, oneshot},
18    SinkExt,
19};
20
21pub enum Message {
22    Executed {
23        view: View,
24        height: u64,
25        commitment: Digest,
26        result: StateTransitionResult,
27        state_proof: Proof<Digest>,
28        state_proof_ops: Vec<Variable<Digest, Value>>,
29        events_proof: Proof<Digest>,
30        events_proof_ops: Vec<Keyless<Output>>,
31        response: oneshot::Sender<()>,
32    },
33    Genesis {
34        response: oneshot::Sender<Digest>,
35    },
36    Propose {
37        index: Index,
38        response: oneshot::Sender<Digest>,
39    },
40    Verify {
41        index: Index,
42        payload: Digest,
43        response: oneshot::Sender<bool>,
44    },
45    Tip {
46        index: Index,
47    },
48    Certified {
49        certificate: Certificate<MinSig, Digest>,
50    },
51    Deliver {
52        index: Index,
53        certificate: Bytes,
54        response: oneshot::Sender<bool>,
55    },
56    Produce {
57        index: Index,
58        response: oneshot::Sender<Bytes>,
59    },
60    Uploaded {
61        index: Index,
62    },
63}
64
65#[derive(Clone)]
66pub struct Mailbox {
67    sender: mpsc::Sender<Message>,
68}
69
70impl Mailbox {
71    pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
72        Self { sender }
73    }
74
75    pub(super) async fn uploaded(&mut self, index: Index) {
76        self.sender
77            .send(Message::Uploaded { index })
78            .await
79            .expect("failed to send uploaded");
80    }
81
82    #[allow(clippy::too_many_arguments)]
83    pub async fn executed(
84        &mut self,
85        view: View,
86        height: u64,
87        commitment: Digest,
88        result: StateTransitionResult,
89        state_proof: Proof<Digest>,
90        state_proof_ops: Vec<Variable<Digest, Value>>,
91        events_proof: Proof<Digest>,
92        events_proof_ops: Vec<Keyless<Output>>,
93        response: oneshot::Sender<()>,
94    ) {
95        self.sender
96            .send(Message::Executed {
97                view,
98                height,
99                commitment,
100                result,
101                state_proof,
102                state_proof_ops,
103                events_proof,
104                events_proof_ops,
105                response,
106            })
107            .await
108            .expect("failed to send executed");
109    }
110}
111
112impl Automaton for Mailbox {
113    type Digest = Digest;
114    type Context = Index;
115
116    async fn genesis(&mut self) -> Self::Digest {
117        let (response, receiver) = oneshot::channel();
118        self.sender
119            .send(Message::Genesis { response })
120            .await
121            .expect("Failed to send aggregation genesis");
122        receiver
123            .await
124            .expect("Failed to receive aggregation genesis")
125    }
126
127    async fn propose(&mut self, context: Self::Context) -> oneshot::Receiver<Self::Digest> {
128        let (response, receiver) = oneshot::channel();
129        self.sender
130            .send(Message::Propose {
131                index: context,
132                response,
133            })
134            .await
135            .expect("Failed to send aggregation propose");
136        receiver
137    }
138
139    async fn verify(
140        &mut self,
141        context: Self::Context,
142        payload: Self::Digest,
143    ) -> oneshot::Receiver<bool> {
144        let (response, receiver) = oneshot::channel();
145        self.sender
146            .send(Message::Verify {
147                index: context,
148                payload,
149                response,
150            })
151            .await
152            .expect("Failed to send aggregation verify");
153        receiver
154    }
155}
156
157impl Reporter for Mailbox {
158    type Activity = Activity<MinSig, Digest>;
159
160    async fn report(&mut self, activity: Self::Activity) {
161        match activity {
162            Activity::Certified(certificate) => {
163                self.sender
164                    .send(Message::Certified { certificate })
165                    .await
166                    .expect("Failed to send aggregation certified");
167            }
168            Activity::Tip(index) => {
169                self.sender
170                    .send(Message::Tip { index })
171                    .await
172                    .expect("Failed to send aggregation tip");
173            }
174            _ => {}
175        }
176    }
177}
178
179impl Consumer for Mailbox {
180    type Key = U64;
181    type Value = Bytes;
182    type Failure = ();
183
184    async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
185        let (sender, receiver) = oneshot::channel();
186        self.sender
187            .send(Message::Deliver {
188                index: key.into(),
189                certificate: value,
190                response: sender,
191            })
192            .await
193            .expect("failed to send deliver");
194        receiver.await.unwrap_or(true) // default to true to avoid blocking
195    }
196
197    async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
198        // We don't need to do anything on failure, the resolver will retry.
199    }
200}
201
202impl Producer for Mailbox {
203    type Key = U64;
204
205    async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
206        let (sender, receiver) = oneshot::channel();
207        self.sender
208            .send(Message::Produce {
209                index: key.into(),
210                response: sender,
211            })
212            .await
213            .expect("failed to send produce");
214        receiver
215    }
216}