battleware_node/application/
ingress.rs

1use battleware_types::{Block, Seed};
2use commonware_consensus::threshold_simplex::types::{Context, View};
3use commonware_consensus::{Automaton, Relay, Reporter};
4use commonware_cryptography::sha256::Digest;
5use commonware_runtime::{telemetry::metrics::histogram, Clock};
6use futures::{
7    channel::{mpsc, oneshot},
8    SinkExt,
9};
10
11/// Messages sent to the application.
12pub enum Message<E: Clock> {
13    Genesis {
14        response: oneshot::Sender<Digest>,
15    },
16    Propose {
17        view: View,
18        parent: (View, Digest),
19        response: oneshot::Sender<Digest>,
20    },
21    Ancestry {
22        view: View,
23        blocks: Vec<Block>,
24        timer: histogram::Timer<E>,
25        response: oneshot::Sender<Digest>,
26    },
27    Broadcast {
28        payload: Digest,
29    },
30    Verify {
31        view: View,
32        parent: (View, Digest),
33        payload: Digest,
34        response: oneshot::Sender<bool>,
35    },
36    Finalized {
37        block: Block,
38        response: oneshot::Sender<()>,
39    },
40    Seeded {
41        block: Block,
42        seed: Seed,
43        timer: histogram::Timer<E>,
44        response: oneshot::Sender<()>,
45    },
46}
47
48/// Mailbox for the application.
49#[derive(Clone)]
50pub struct Mailbox<E: Clock> {
51    sender: mpsc::Sender<Message<E>>,
52}
53
54impl<E: Clock> Mailbox<E> {
55    pub(super) fn new(sender: mpsc::Sender<Message<E>>) -> Self {
56        Self { sender }
57    }
58
59    pub(super) async fn ancestry(
60        &mut self,
61        view: View,
62        blocks: Vec<Block>,
63        timer: histogram::Timer<E>,
64        response: oneshot::Sender<Digest>,
65    ) {
66        self.sender
67            .send(Message::Ancestry {
68                view,
69                blocks,
70                timer,
71                response,
72            })
73            .await
74            .expect("Failed to send ancestry");
75    }
76
77    pub(super) async fn seeded(
78        &mut self,
79        block: Block,
80        seed: Seed,
81        timer: histogram::Timer<E>,
82        response: oneshot::Sender<()>,
83    ) {
84        self.sender
85            .send(Message::Seeded {
86                block,
87                seed,
88                timer,
89                response,
90            })
91            .await
92            .expect("Failed to send seeded");
93    }
94}
95
96impl<E: Clock> Automaton for Mailbox<E> {
97    type Digest = Digest;
98    type Context = Context<Self::Digest>;
99
100    async fn genesis(&mut self) -> Self::Digest {
101        let (response, receiver) = oneshot::channel();
102        self.sender
103            .send(Message::Genesis { response })
104            .await
105            .expect("Failed to send genesis");
106        receiver.await.expect("Failed to receive genesis")
107    }
108
109    async fn propose(&mut self, context: Context<Self::Digest>) -> oneshot::Receiver<Self::Digest> {
110        // If we linked payloads to their parent, we would include
111        // the parent in the `Context` in the payload.
112        let (response, receiver) = oneshot::channel();
113        self.sender
114            .send(Message::Propose {
115                view: context.view,
116                parent: context.parent,
117                response,
118            })
119            .await
120            .expect("Failed to send propose");
121        receiver
122    }
123
124    async fn verify(
125        &mut self,
126        context: Context<Self::Digest>,
127        payload: Self::Digest,
128    ) -> oneshot::Receiver<bool> {
129        // If we linked payloads to their parent, we would verify
130        // the parent included in the payload matches the provided `Context`.
131        let (response, receiver) = oneshot::channel();
132        self.sender
133            .send(Message::Verify {
134                view: context.view,
135                parent: context.parent,
136                payload,
137                response,
138            })
139            .await
140            .expect("Failed to send verify");
141        receiver
142    }
143}
144
145impl<E: Clock> Relay for Mailbox<E> {
146    type Digest = Digest;
147
148    async fn broadcast(&mut self, digest: Self::Digest) {
149        self.sender
150            .send(Message::Broadcast { payload: digest })
151            .await
152            .expect("Failed to send broadcast");
153    }
154}
155
156impl<E: Clock> Reporter for Mailbox<E> {
157    type Activity = Block;
158
159    async fn report(&mut self, block: Self::Activity) {
160        let (response, receiver) = oneshot::channel();
161        self.sender
162            .send(Message::Finalized { block, response })
163            .await
164            .expect("Failed to send finalized");
165
166        // Wait for the item to be processed (used to increment "save point" in marshal)
167        let _ = receiver.await; // TODO: may be shutting down
168    }
169}