sn_consensus/mvba/vcbc/
mod.rs

1pub(crate) mod error;
2pub(crate) mod message;
3
4use std::collections::hash_map::Entry::Vacant;
5use std::collections::HashMap;
6use std::fmt::Debug;
7
8use blsttc::{PublicKey, PublicKeySet, SecretKeyShare, Signature, SignatureShare};
9use serde::Serialize;
10
11use self::error::{Error, Result};
12use self::message::{Action, Message};
13use super::hash::Hash32;
14use super::tag::Tag;
15use super::{bundle, MessageValidity, NodeId};
16use crate::mvba::broadcaster::Broadcaster;
17
18// make_c_request_message creates the payload message to request a proposal
19// from the the proposer
20pub fn make_c_request_message<P>(tag: Tag) -> bundle::Message<P> {
21    bundle::Message::Vcbc(Message {
22        tag,
23        action: Action::Request,
24    })
25}
26
27// checks if the delivered proposal comes with a valid signature
28pub fn verify_delivered_proposal<P: Serialize>(
29    tag: &Tag,
30    proposal: &P,
31    sig: &Signature,
32    pk: &PublicKey,
33) -> Result<bool> {
34    let d = Hash32::calculate(proposal)?;
35    let sign_bytes = c_ready_bytes_to_sign(tag, &d)?;
36    Ok(pk.verify(sig, sign_bytes))
37}
38
39// c_ready_bytes_to_sign generates bytes that should be signed by each party
40// as a witness of receiving the message.
41// c_ready_bytes_to_sign is same as serialized of $(ID.j.s, c-ready, H(m))$ in spec
42pub fn c_ready_bytes_to_sign(
43    tag: &Tag,
44    digest: &Hash32,
45) -> std::result::Result<Vec<u8>, bincode::Error> {
46    bincode::serialize(&(tag, "c-ready", digest))
47}
48
49// Protocol VCBC for verifiable and authenticated consistent broadcast.
50pub(crate) struct Vcbc<P> {
51    tag: Tag, // Tag is a combination of Domain and proposer ID. It is unique in each VCBC instances.
52    i: NodeId, // represents our unique identifier
53    m_bar: Option<P>, // represents the proposal data. If a proposal is not delivered yet, it is None.
54    u_bar: Option<Signature>, // represents the signature of the delivered proposal. If the proposal is not delivered yet, it is None
55    wd: HashMap<NodeId, SignatureShare>, // represents witness data. The witness data includes the signature shares of all the nodes that have participated in the protocol instance.
56    rd: usize, // represents the number of signature shares received yet. Once this number reaches a threshold, the node can merge signatures.
57    d: Option<Hash32>, // Memorizing the message digest
58    pub_key_set: PublicKeySet,
59    sec_key_share: SecretKeyShare,
60    final_messages: HashMap<NodeId, Message<P>>,
61    message_validity: MessageValidity<P>,
62}
63
64/// Tries to insert a key-value pair into the map.
65///
66/// If the map already had this key present, nothing is updated, and
67/// `DuplicatedMessage` error is returned.
68/// TODO: replace it with unstable try_insert function
69fn try_insert<P>(map: &mut HashMap<NodeId, Message<P>>, k: NodeId, v: Message<P>) -> Result<()> {
70    if let std::collections::hash_map::Entry::Vacant(e) = map.entry(k) {
71        e.insert(v);
72        Ok(())
73    } else {
74        Err(Error::DuplicatedMessage(k, v.action_str().to_string()))
75    }
76}
77
78impl<P: Debug + Clone + Serialize + Eq> Vcbc<P> {
79    pub fn new(
80        tag: Tag,
81        self_id: NodeId,
82        pub_key_set: PublicKeySet,
83        sec_key_share: SecretKeyShare,
84        message_validity: MessageValidity<P>,
85    ) -> Self {
86        Self {
87            tag,
88            i: self_id,
89            m_bar: None,
90            u_bar: None,
91            wd: HashMap::new(),
92            rd: 0,
93            d: None,
94            final_messages: HashMap::new(),
95            pub_key_set,
96            sec_key_share,
97            message_validity,
98        }
99    }
100
101    /// c_broadcast sends the messages `m` to all other parties.
102    /// It also adds the message to message_log and process it.
103    pub fn c_broadcast(&mut self, m: P, broadcaster: &mut Broadcaster<P>) -> Result<()> {
104        debug_assert_eq!(self.i, self.tag.proposer);
105
106        // Upon receiving message (ID.j.s, in, c-broadcast, m):
107        // send (ID.j.s, c-send, m) to all parties
108        let send_msg = Message {
109            tag: self.tag.clone(),
110            action: Action::Send(m),
111        };
112        self.broadcast(send_msg, broadcaster)
113    }
114
115    /// receive_message process the received message 'msg` from `initiator`
116    pub fn receive_message(
117        &mut self,
118        initiator: NodeId,
119        msg: Message<P>,
120        broadcaster: &mut Broadcaster<P>,
121    ) -> Result<()> {
122        log::trace!(
123            "party {} received {} message: {:?} from {}",
124            self.i,
125            msg.action_str(),
126            msg,
127            initiator
128        );
129
130        if msg.tag != self.tag {
131            return Err(Error::InvalidMessage(format!(
132                "invalid tag. expected {}, got {}",
133                self.tag, msg.tag
134            )));
135        }
136
137        match msg.action.clone() {
138            Action::Send(m) => {
139                // Upon receiving message (ID.j.s, c-send, m) from Pl:
140                // if j = l and m̄ = ⊥ then
141                if initiator == self.tag.proposer && self.m_bar.is_none() {
142                    if !(self.message_validity)(initiator, &m) {
143                        return Err(Error::InvalidMessage("invalid proposal".to_string()));
144                    }
145                    // m̄ ← m
146                    self.m_bar = Some(m.clone());
147
148                    let d = Hash32::calculate(&m)?;
149                    self.d = Some(d);
150
151                    // compute an S1-signature share ν on (ID.j.s, c-ready, H(m))
152                    let sign_bytes = c_ready_bytes_to_sign(&self.tag, &d)?;
153                    let s1 = self.sec_key_share.sign(sign_bytes);
154
155                    let ready_msg = Message {
156                        tag: self.tag.clone(),
157                        action: Action::Ready(d, s1),
158                    };
159
160                    // send (ID.j.s, c-ready, H(m), ν) to Pj
161                    self.send_to(ready_msg, self.tag.proposer, broadcaster)?;
162                }
163            }
164            Action::Ready(msg_d, sig_share) => {
165                let d = match self.d {
166                    Some(d) => d,
167                    None => return Err(Error::Generic("protocol violated. no digest".to_string())),
168                };
169                let sign_bytes = c_ready_bytes_to_sign(&self.tag, &d)?;
170
171                if d != msg_d {
172                    log::warn!("party {} received c-ready with unknown digest. expected {d:?}, got {msg_d:?}", self.i);
173                    return Err(Error::Generic("Invalid digest".to_string()));
174                }
175
176                // Upon receiving message (ID.j.s, c-ready, d, νl) from Pl for the first time:
177                if let Vacant(e) = self.wd.entry(initiator) {
178                    let valid_sig = self
179                        .pub_key_set
180                        .public_key_share(initiator)
181                        .verify(&sig_share, sign_bytes);
182
183                    if !valid_sig {
184                        log::warn!(
185                            "party {} received c-ready with invalid signature share",
186                            self.i
187                        );
188                    }
189
190                    // if i = j and νl is a valid S1-signature share then
191                    if self.i == msg.tag.proposer && valid_sig {
192                        // Wd ← Wd ∪ {νl}
193                        e.insert(sig_share);
194
195                        //  rd ← rd + 1
196                        self.rd += 1;
197
198                        // self.threshold() MUST be same as n+t+1/2
199                        // spec: if rd = n+t+1/2 then
200                        if self.rd >= self.threshold() {
201                            // combine the shares in Wd to an S1 -threshold signature µ
202                            let sig = self.pub_key_set.combine_signatures(self.wd.iter())?;
203
204                            let final_msg = Message {
205                                tag: self.tag.clone(),
206                                action: Action::Final(d, sig),
207                            };
208
209                            // send (ID.j.s, c-final, d, µ) to all parties
210                            self.broadcast(final_msg, broadcaster)?;
211                        }
212                    }
213                }
214            }
215            Action::Final(msg_d, sig) => {
216                // Upon receiving message (ID.j.s, c-final, d, µ):
217                let d = match self.d {
218                    Some(d) => d,
219                    None => {
220                        log::warn!(
221                            "party {} received c-final before receiving c-send, logging message",
222                            self.i
223                        );
224                        try_insert(&mut self.final_messages, initiator, msg)?;
225                        // requesting for the proposal
226                        let request_msg = Message {
227                            tag: self.tag.clone(),
228                            action: Action::Request,
229                        };
230                        self.send_to(request_msg, initiator, broadcaster)?;
231
232                        return Ok(());
233                    }
234                };
235
236                let sign_bytes = c_ready_bytes_to_sign(&self.tag, &d)?;
237                let valid_sig = self.pub_key_set.public_key().verify(&sig, sign_bytes);
238
239                if !valid_sig {
240                    log::warn!(
241                        "party {} received c-ready with invalid signature share",
242                        self.i
243                    );
244                }
245
246                // if H(m̄) = d and µ̄ = ⊥ and µ is a valid S1-signature then
247                if d == msg_d && self.u_bar.is_none() && valid_sig {
248                    // µ̄ ← µ
249                    self.u_bar = Some(sig);
250                }
251            }
252            Action::Request => {
253                // Upon receiving message (ID.j.s, c-request) from Pl :
254                if let Some(u) = &self.u_bar {
255                    // if µ̄  != ⊥ then
256
257                    // proposal is known because we have the valid signature
258                    debug_assert!(self.m_bar.is_some());
259                    if let Some(m) = &self.m_bar {
260                        let answer_msg = Message {
261                            tag: self.tag.clone(),
262                            action: Action::Answer(m.clone(), u.clone()),
263                        };
264
265                        // send (ID.j.s, c-answer, m̄, µ̄) to Pl
266                        self.send_to(answer_msg, initiator, broadcaster)?;
267                    }
268                }
269            }
270            Action::Answer(m, u) => {
271                // Upon receiving message (ID.j.s, c-answer, m, µ) from Pl :
272                if self.u_bar.is_none() {
273                    // if µ̄ = ⊥ and ...
274                    let d = Hash32::calculate(&m)?;
275                    let sign_bytes = c_ready_bytes_to_sign(&self.tag, &d)?;
276                    if self.pub_key_set.public_key().verify(&u, sign_bytes) {
277                        // ... µ is a valid S1 -signature on (ID.j.s, c-ready, H(m)) then
278                        // µ̄ ← µ
279                        // m̄ ← m
280                        self.u_bar = Some(u);
281                        self.m_bar = Some(m);
282                    }
283                }
284            }
285        }
286
287        for (initiator, final_msg) in std::mem::take(&mut self.final_messages) {
288            self.receive_message(initiator, final_msg, broadcaster)?;
289        }
290
291        Ok(())
292    }
293
294    pub fn read_delivered(&self) -> Option<(P, Signature)> {
295        if let (Some(proposal), Some(sig)) = (self.m_bar.clone(), self.u_bar.clone()) {
296            Some((proposal, sig))
297        } else {
298            None
299        }
300    }
301
302    // send_to sends the message `msg` to the corresponding peer `to`.
303    // If the `to` is us, it adds the  message to our messages log.
304    fn send_to(
305        &mut self,
306        msg: self::Message<P>,
307        to: NodeId,
308        broadcaster: &mut Broadcaster<P>,
309    ) -> Result<()> {
310        log::debug!("party {} sends {msg:?} to {}", self.i, to);
311
312        if to == self.i {
313            self.receive_message(self.i, msg, broadcaster)?;
314        } else {
315            broadcaster.send_to(Some(self.tag.proposer), bundle::Message::Vcbc(msg), to);
316        }
317        Ok(())
318    }
319
320    // broadcast sends the message `msg` to all other peers in the network.
321    // It adds the message to our messages log.
322    fn broadcast(&mut self, msg: self::Message<P>, broadcaster: &mut Broadcaster<P>) -> Result<()> {
323        log::debug!("party {} broadcasts {msg:?}", self.i);
324
325        broadcaster.broadcast(Some(self.i), bundle::Message::Vcbc(msg.clone()));
326        self.receive_message(self.i, msg, broadcaster)?;
327        Ok(())
328    }
329
330    // threshold is same as $t$ in spec
331    fn threshold(&self) -> usize {
332        self.pub_key_set.threshold() + 1
333    }
334}
335
336#[cfg(test)]
337#[path = "./test.rs"]
338mod test;
339
340#[cfg(test)]
341#[path = "./proptest.rs"]
342mod proptest;