battleware_node/
supervisor.rs

1use battleware_types::{leader_index, Evaluation, Identity, Signature};
2use commonware_codec::Encode;
3use commonware_consensus::{
4    aggregation::types::Epoch, threshold_simplex::types::View, Monitor, Supervisor as Su,
5    ThresholdSupervisor as TSu,
6};
7use commonware_cryptography::{
8    bls12381::{
9        dkg::ops::evaluate_all,
10        primitives::{
11            group,
12            poly::{self, Poly},
13            variant::MinSig,
14        },
15    },
16    ed25519,
17};
18use commonware_resolver::p2p;
19use commonware_runtime::RwLock;
20use futures::{channel::mpsc, SinkExt};
21use std::{collections::HashMap, sync::Arc};
22
23/// Manages epoch state and subscribers.
24struct EpochManager {
25    epoch: Epoch,
26    subscribers: Vec<mpsc::Sender<Epoch>>,
27}
28
29impl EpochManager {
30    fn new() -> Self {
31        Self {
32            epoch: 0,
33            subscribers: Vec::new(),
34        }
35    }
36
37    async fn update(&mut self, epoch: Epoch) {
38        // Update epoch
39        self.epoch = epoch;
40
41        // Notify all subscribers
42        let mut i = 0;
43        while i < self.subscribers.len() {
44            if self.subscribers[i].send(epoch).await.is_err() {
45                // Remove disconnected subscriber
46                self.subscribers.swap_remove(i);
47            } else {
48                i += 1;
49            }
50        }
51    }
52
53    async fn subscribe(&mut self) -> (Epoch, mpsc::Receiver<Epoch>) {
54        let (tx, rx) = mpsc::channel(1);
55        self.subscribers.push(tx);
56        (self.epoch, rx)
57    }
58
59    fn current(&self) -> Epoch {
60        self.epoch
61    }
62}
63
64/// Core supervisor data shared between View and Epoch supervisors.
65pub struct Supervisor {
66    identity: Identity,
67    polynomial: Vec<Evaluation>,
68    participants: Vec<ed25519::PublicKey>,
69    participants_map: HashMap<ed25519::PublicKey, u32>,
70    share: group::Share,
71    epoch_manager: RwLock<EpochManager>,
72}
73
74impl Supervisor {
75    /// Create a new supervisor.
76    pub fn new(
77        polynomial: Poly<Evaluation>,
78        mut participants: Vec<ed25519::PublicKey>,
79        share: group::Share,
80    ) -> Arc<Self> {
81        // Setup participants
82        participants.sort();
83        let mut participants_map = HashMap::new();
84        for (index, validator) in participants.iter().enumerate() {
85            participants_map.insert(validator.clone(), index as u32);
86        }
87        let identity = *poly::public::<MinSig>(&polynomial);
88        let polynomial = evaluate_all::<MinSig>(&polynomial, participants.len() as u32);
89
90        // Return supervisor
91        Arc::new(Self {
92            identity,
93            polynomial,
94            participants,
95            participants_map,
96            share,
97            epoch_manager: RwLock::new(EpochManager::new()),
98        })
99    }
100}
101
102/// View-based [Supervisor] for [commonware_consensus::threshold_simplex].
103#[derive(Clone)]
104pub struct ViewSupervisor {
105    inner: Arc<Supervisor>,
106}
107
108impl ViewSupervisor {
109    pub fn new(supervisor: Arc<Supervisor>) -> Self {
110        Self { inner: supervisor }
111    }
112}
113
114impl p2p::Coordinator for ViewSupervisor {
115    type PublicKey = ed25519::PublicKey;
116
117    fn peers(&self) -> &Vec<Self::PublicKey> {
118        &self.inner.participants
119    }
120
121    fn peer_set_id(&self) -> u64 {
122        // Block on getting the current epoch
123        futures::executor::block_on(async { self.inner.epoch_manager.read().await.current() })
124    }
125}
126
127impl Su for ViewSupervisor {
128    type Index = View;
129    type PublicKey = ed25519::PublicKey;
130
131    fn leader(&self, _: Self::Index) -> Option<Self::PublicKey> {
132        unimplemented!("only defined in supertrait")
133    }
134
135    fn participants(&self, _: Self::Index) -> Option<&Vec<Self::PublicKey>> {
136        Some(&self.inner.participants)
137    }
138
139    fn is_participant(&self, _: Self::Index, candidate: &Self::PublicKey) -> Option<u32> {
140        self.inner.participants_map.get(candidate).cloned()
141    }
142}
143
144impl TSu for ViewSupervisor {
145    type Seed = Signature;
146    type Identity = Identity;
147    type Polynomial = Vec<Evaluation>;
148    type Share = group::Share;
149
150    fn leader(&self, _: Self::Index, seed: Self::Seed) -> Option<Self::PublicKey> {
151        let seed_bytes = seed.encode();
152        let index = leader_index(seed_bytes.as_ref(), self.inner.participants.len());
153        Some(self.inner.participants[index].clone())
154    }
155
156    fn identity(&self) -> &Self::Identity {
157        &self.inner.identity
158    }
159
160    fn polynomial(&self, _: Self::Index) -> Option<&Self::Polynomial> {
161        Some(&self.inner.polynomial)
162    }
163
164    fn share(&self, _: Self::Index) -> Option<&Self::Share> {
165        Some(&self.inner.share)
166    }
167}
168
169/// Epoch-based [Supervisor] for [commonware_consensus::aggregation].
170#[derive(Clone)]
171pub struct EpochSupervisor {
172    inner: Arc<Supervisor>,
173}
174
175impl EpochSupervisor {
176    pub fn new(supervisor: Arc<Supervisor>) -> Self {
177        Self { inner: supervisor }
178    }
179
180    pub async fn update(&self, epoch: Epoch) {
181        self.inner.epoch_manager.write().await.update(epoch).await;
182    }
183}
184
185impl Su for EpochSupervisor {
186    type Index = Epoch;
187    type PublicKey = ed25519::PublicKey;
188
189    fn leader(&self, _: Self::Index) -> Option<Self::PublicKey> {
190        unimplemented!("only defined in supertrait")
191    }
192
193    fn participants(&self, _: Self::Index) -> Option<&Vec<Self::PublicKey>> {
194        Some(&self.inner.participants)
195    }
196
197    fn is_participant(&self, _: Self::Index, candidate: &Self::PublicKey) -> Option<u32> {
198        self.inner.participants_map.get(candidate).cloned()
199    }
200}
201
202impl TSu for EpochSupervisor {
203    type Identity = Identity;
204    type Polynomial = Vec<Evaluation>;
205    type Seed = Signature;
206    type Share = group::Share;
207
208    fn leader(&self, _: Self::Index, seed: Self::Seed) -> Option<Self::PublicKey> {
209        let seed_bytes = seed.encode();
210        let index = leader_index(seed_bytes.as_ref(), self.inner.participants.len());
211        Some(self.inner.participants[index].clone())
212    }
213
214    fn identity(&self) -> &Self::Identity {
215        &self.inner.identity
216    }
217
218    fn polynomial(&self, _: Self::Index) -> Option<&Self::Polynomial> {
219        Some(&self.inner.polynomial)
220    }
221
222    fn share(&self, _: Self::Index) -> Option<&Self::Share> {
223        Some(&self.inner.share)
224    }
225}
226
227impl Monitor for EpochSupervisor {
228    type Index = Epoch;
229
230    async fn subscribe(&mut self) -> (Self::Index, mpsc::Receiver<Self::Index>) {
231        self.inner.epoch_manager.write().await.subscribe().await
232    }
233}