battleware_node/seeder/
ingress.rs

1use battleware_types::{Activity, Seed};
2use bytes::Bytes;
3use commonware_consensus::{
4    threshold_simplex::types::{Seedable, View},
5    Reporter,
6};
7use commonware_resolver::{p2p::Producer, Consumer};
8use commonware_utils::sequence::U64;
9use futures::{
10    channel::{mpsc, oneshot},
11    SinkExt,
12};
13
14pub enum Message {
15    Put(Seed),
16    Get {
17        view: View,
18        response: oneshot::Sender<Seed>,
19    },
20    Deliver {
21        view: View,
22        signature: Bytes,
23        response: oneshot::Sender<bool>,
24    },
25    Produce {
26        view: View,
27        response: oneshot::Sender<Bytes>,
28    },
29    Uploaded {
30        view: View,
31    },
32}
33
34#[derive(Clone)]
35pub struct Mailbox {
36    sender: mpsc::Sender<Message>,
37}
38
39impl Mailbox {
40    pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
41        Self { sender }
42    }
43
44    pub async fn put(&mut self, seed: Seed) {
45        self.sender
46            .send(Message::Put(seed))
47            .await
48            .expect("failed to send put");
49    }
50
51    pub async fn get(&mut self, view: View) -> Seed {
52        let (sender, receiver) = oneshot::channel();
53        self.sender
54            .send(Message::Get {
55                view,
56                response: sender,
57            })
58            .await
59            .expect("failed to send get");
60        receiver.await.expect("failed to receive get")
61    }
62
63    pub async fn uploaded(&mut self, view: View) {
64        self.sender
65            .send(Message::Uploaded { view })
66            .await
67            .expect("failed to send uploaded");
68    }
69}
70
71impl Consumer for Mailbox {
72    type Key = U64;
73    type Value = Bytes;
74    type Failure = ();
75
76    async fn deliver(&mut self, key: Self::Key, value: Self::Value) -> bool {
77        let (sender, receiver) = oneshot::channel();
78        self.sender
79            .send(Message::Deliver {
80                view: key.into(),
81                signature: value,
82                response: sender,
83            })
84            .await
85            .expect("failed to send deliver");
86        receiver.await.unwrap_or(true) // default to true to avoid blocking
87    }
88
89    async fn failed(&mut self, _: Self::Key, _: Self::Failure) {
90        // We don't need to do anything on failure, the resolver will retry.
91    }
92}
93
94impl Producer for Mailbox {
95    type Key = U64;
96
97    async fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
98        let (sender, receiver) = oneshot::channel();
99        self.sender
100            .send(Message::Produce {
101                view: key.into(),
102                response: sender,
103            })
104            .await
105            .expect("failed to send produce");
106        receiver
107    }
108}
109
110impl Reporter for Mailbox {
111    type Activity = Activity;
112
113    async fn report(&mut self, activity: Self::Activity) {
114        match activity {
115            Activity::Notarization(notarization) => {
116                self.put(notarization.seed()).await;
117            }
118            Activity::Nullification(nullification) => {
119                self.put(nullification.seed()).await;
120            }
121            Activity::Finalization(finalization) => {
122                self.put(finalization.seed()).await;
123            }
124            _ => {}
125        }
126    }
127}