commonware_broadcast/linked/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks using partial signatures. These can be combined to recover a
14//! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows
15//! external parties to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Design
23//!
24//! The core of the module is the `signer` actor. It is responsible for:
25//! - Broadcasting nodes (if a sequencer)
26//! - Signing chunks (if a validator)
27//! - Tracking the latest chunk in each sequencer’s chain
28//! - Recovering threshold signatures from partial signatures for each chunk
29//! - Notifying other actors of new chunks and threshold signatures
30//!
31//! # Acknowledgements
32//!
33//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
34//! proof-of-availability could be produced by linking sequencer broadcasts.
35
36use commonware_utils::Array;
37
38mod namespace;
39mod parsed;
40mod serializer;
41
42#[cfg(test)]
43pub mod mocks;
44
45mod wire {
46    include!(concat!(env!("OUT_DIR"), "/wire.rs"));
47}
48
49pub mod prover;
50pub mod signer;
51
52/// `Epoch` is used as the `Index` type for the `Coordinator` trait.
53/// Defines the current set of sequencers and signers.
54///
55/// This is not a single "View" in the sense of a consensus protocol, but rather a continuous
56/// sequence of views in-which the set of sequencers and signers is constant.
57pub type Epoch = u64;
58
59/// `Context` is used as the `Context` type for the `Application` and `Collector` traits.
60#[derive(Debug, Clone, Hash, PartialEq, Eq)]
61pub struct Context<P: Array> {
62    /// Sequencer's public key.
63    pub sequencer: P,
64
65    /// Sequencer-specific sequential height. Zero-indexed.
66    pub height: u64,
67}
68
69#[cfg(test)]
70mod tests {
71    use super::{mocks, signer};
72    use bytes::Bytes;
73    use commonware_cryptography::{
74        bls12381::{
75            dkg::ops,
76            primitives::{group::Share, poly},
77        },
78        ed25519::PublicKey,
79        sha256::{Digest as Sha256Digest, Sha256},
80        Ed25519, Hasher, Scheme,
81    };
82    use commonware_macros::test_traced;
83    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
84    use commonware_runtime::{
85        deterministic::{self, Context, Executor},
86        Metrics,
87    };
88    use commonware_runtime::{Clock, Runner, Spawner};
89    use futures::channel::oneshot;
90    use futures::future::join_all;
91    use std::sync::{Arc, Mutex};
92    use std::{
93        collections::{BTreeMap, HashSet},
94        time::Duration,
95    };
96    use tracing::debug;
97
98    type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
99
100    async fn register_validators(
101        oracle: &mut Oracle<PublicKey>,
102        validators: &[PublicKey],
103    ) -> Registrations<PublicKey> {
104        let mut registrations = BTreeMap::new();
105        for validator in validators.iter() {
106            let (a1, a2) = oracle.register(validator.clone(), 0).await.unwrap();
107            let (b1, b2) = oracle.register(validator.clone(), 1).await.unwrap();
108            registrations.insert(validator.clone(), ((a1, a2), (b1, b2)));
109        }
110        registrations
111    }
112
113    #[allow(dead_code)]
114    enum Action {
115        Link(Link),
116        Update(Link),
117        Unlink,
118    }
119
120    async fn link_validators(
121        oracle: &mut Oracle<PublicKey>,
122        validators: &[PublicKey],
123        action: Action,
124        restrict_to: Option<fn(usize, usize, usize) -> bool>,
125    ) {
126        for (i1, v1) in validators.iter().enumerate() {
127            for (i2, v2) in validators.iter().enumerate() {
128                if v2 == v1 {
129                    continue;
130                }
131                if let Some(f) = restrict_to {
132                    if !f(validators.len(), i1, i2) {
133                        continue;
134                    }
135                }
136                if matches!(action, Action::Update(_) | Action::Unlink) {
137                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
138                }
139                if let Action::Link(ref link) | Action::Update(ref link) = action {
140                    oracle
141                        .add_link(v1.clone(), v2.clone(), link.clone())
142                        .await
143                        .unwrap();
144                }
145            }
146        }
147    }
148
149    async fn initialize_simulation(
150        context: Context,
151        num_validators: u32,
152        shares_vec: &mut [Share],
153    ) -> (
154        Oracle<PublicKey>,
155        Vec<(PublicKey, Ed25519, Share)>,
156        Vec<PublicKey>,
157        Registrations<PublicKey>,
158    ) {
159        let (network, mut oracle) = Network::new(
160            context.with_label("network"),
161            commonware_p2p::simulated::Config {
162                max_size: 1024 * 1024,
163            },
164        );
165        network.start();
166
167        let mut schemes = (0..num_validators)
168            .map(|i| Ed25519::from_seed(i as u64))
169            .collect::<Vec<_>>();
170        schemes.sort_by_key(|s| s.public_key());
171        let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
172            .iter()
173            .enumerate()
174            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
175            .collect();
176        let pks = validators
177            .iter()
178            .map(|(pk, _, _)| pk.clone())
179            .collect::<Vec<_>>();
180
181        let registrations = register_validators(&mut oracle, &pks).await;
182        let link = Link {
183            latency: 10.0,
184            jitter: 1.0,
185            success_rate: 1.0,
186        };
187        link_validators(&mut oracle, &pks, Action::Link(link), None).await;
188        (oracle, validators, pks, registrations)
189    }
190
191    #[allow(clippy::too_many_arguments)]
192    fn spawn_validator_engines(
193        context: Context,
194        identity: poly::Public,
195        pks: &[PublicKey],
196        validators: &[(PublicKey, Ed25519, Share)],
197        registrations: &mut Registrations<PublicKey>,
198        mailboxes: &mut BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>,
199        collectors: &mut BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
200        refresh_epoch_timeout: Duration,
201        rebroadcast_timeout: Duration,
202    ) {
203        let namespace = b"my testing namespace";
204        for (validator, scheme, share) in validators.iter() {
205            let context = context.with_label(&validator.to_string());
206            let mut coordinator = mocks::coordinator::Coordinator::<PublicKey>::new(
207                identity.clone(),
208                pks.to_vec(),
209                *share,
210            );
211            coordinator.set_view(111);
212
213            let (app, app_mailbox) =
214                mocks::application::Application::<Sha256Digest, PublicKey>::new();
215            mailboxes.insert(validator.clone(), app_mailbox.clone());
216
217            let (collector, collector_mailbox) =
218                mocks::collector::Collector::<Ed25519, Sha256Digest>::new(
219                    namespace,
220                    poly::public(&identity),
221                );
222            context.with_label("collector").spawn(|_| collector.run());
223            collectors.insert(validator.clone(), collector_mailbox);
224
225            let (signer, signer_mailbox) = signer::Actor::new(
226                context.with_label("signer"),
227                signer::Config {
228                    crypto: scheme.clone(),
229                    application: app_mailbox.clone(),
230                    collector: collectors.get(validator).unwrap().clone(),
231                    coordinator,
232                    mailbox_size: 1024,
233                    pending_verify_size: 1024,
234                    namespace: namespace.to_vec(),
235                    epoch_bounds: (1, 1),
236                    height_bound: 2,
237                    refresh_epoch_timeout,
238                    rebroadcast_timeout,
239                    journal_heights_per_section: 10,
240                    journal_replay_concurrency: 1,
241                    journal_name_prefix: format!("broadcast-linked-seq/{}/", validator),
242                },
243            );
244
245            context.with_label("app").spawn(|_| app.run(signer_mailbox));
246            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
247            signer.start((a1, a2), (b1, b2));
248        }
249    }
250
251    fn spawn_proposer(
252        context: Context,
253        mailboxes: Arc<
254            Mutex<BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>>,
255        >,
256        invalid_when: fn(u64) -> bool,
257    ) {
258        context
259            .clone()
260            .with_label("invalid signature proposer")
261            .spawn(move |context| async move {
262                let mut iter = 0;
263                loop {
264                    iter += 1;
265                    let mailbox_vec: Vec<mocks::application::Mailbox<Sha256Digest, PublicKey>> = {
266                        let guard = mailboxes.lock().unwrap();
267                        guard.values().cloned().collect()
268                    };
269                    for mut mailbox in mailbox_vec {
270                        let payload = Bytes::from(format!("hello world, iter {}", iter));
271                        let mut hasher = Sha256::default();
272                        hasher.update(&payload);
273
274                        // Inject an invalid digest by updating with the payload again.
275                        if invalid_when(iter) {
276                            hasher.update(&payload);
277                        }
278
279                        let digest = hasher.finalize();
280                        mailbox.broadcast(digest).await;
281                    }
282                    context.sleep(Duration::from_millis(250)).await;
283                }
284            });
285    }
286
287    async fn await_collectors(
288        context: Context,
289        collectors: &BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
290        threshold: u64,
291    ) {
292        let mut receivers = Vec::new();
293        for (sequencer, mailbox) in collectors.iter() {
294            // Create a oneshot channel to signal when the collector has reached the threshold.
295            let (tx, rx) = oneshot::channel();
296            receivers.push(rx);
297
298            // Spawn a watcher for the collector.
299            context.with_label("collector_watcher").spawn({
300                let sequencer = sequencer.clone();
301                let mut mailbox = mailbox.clone();
302                move |context| async move {
303                    loop {
304                        let tip = mailbox.get_tip(sequencer.clone()).await.unwrap_or(0);
305                        debug!(tip, ?sequencer, "collector");
306                        if tip >= threshold {
307                            let _ = tx.send(sequencer.clone());
308                            break;
309                        }
310                        context.sleep(Duration::from_millis(100)).await;
311                    }
312                }
313            });
314        }
315
316        // Wait for all oneshot receivers to complete.
317        let results = join_all(receivers).await;
318        assert_eq!(results.len(), collectors.len());
319    }
320
321    #[test_traced]
322    fn test_all_online() {
323        let num_validators: u32 = 4;
324        let quorum: u32 = 3;
325        let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
326        let (identity, mut shares_vec) =
327            ops::generate_shares(&mut context, None, num_validators, quorum);
328        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
329
330        runner.start(async move {
331            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
332                context.with_label("simulation"),
333                num_validators,
334                &mut shares_vec,
335            )
336            .await;
337            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
338                PublicKey,
339                mocks::application::Mailbox<Sha256Digest, PublicKey>,
340            >::new()));
341            let mut collectors =
342                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
343            spawn_validator_engines(
344                context.with_label("validator"),
345                identity.clone(),
346                &pks,
347                &validators,
348                &mut registrations,
349                &mut mailboxes.lock().unwrap(),
350                &mut collectors,
351                Duration::from_millis(100),
352                Duration::from_secs(5),
353            );
354            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
355            await_collectors(context.with_label("collector"), &collectors, 100).await;
356        });
357    }
358
359    #[test_traced]
360    fn test_unclean_shutdown() {
361        let num_validators: u32 = 4;
362        let quorum: u32 = 3;
363        let (mut runner, mut context, _) = Executor::timed(Duration::from_secs(45));
364        let (identity, mut shares_vec) =
365            ops::generate_shares(&mut context, None, num_validators, quorum);
366        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
367        let completed = Arc::new(Mutex::new(HashSet::new()));
368        let shutdowns = Arc::new(Mutex::new(0u64));
369
370        while completed.lock().unwrap().len() != num_validators as usize {
371            runner.start({
372                let context = context.clone();
373                let completed = completed.clone();
374                let shares_vec = shares_vec.clone();
375                let shutdowns = shutdowns.clone();
376                let identity = identity.clone();
377                async move {
378                    let (network, mut oracle) = Network::new(
379                        context.with_label("network"),
380                        commonware_p2p::simulated::Config {
381                            max_size: 1024 * 1024,
382                        },
383                    );
384                    network.start();
385
386                    let mut schemes = (0..num_validators)
387                        .map(|i| Ed25519::from_seed(i as u64))
388                        .collect::<Vec<_>>();
389                    schemes.sort_by_key(|s| s.public_key());
390                    let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
391                        .iter()
392                        .enumerate()
393                        .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
394                        .collect();
395                    let pks = validators
396                        .iter()
397                        .map(|(pk, _, _)| pk.clone())
398                        .collect::<Vec<_>>();
399
400                    let mut registrations = register_validators(&mut oracle, &pks).await;
401                    let link = commonware_p2p::simulated::Link {
402                        latency: 10.0,
403                        jitter: 1.0,
404                        success_rate: 1.0,
405                    };
406                    link_validators(&mut oracle, &pks, Action::Link(link), None).await;
407
408                    let mailboxes = Arc::new(Mutex::new(BTreeMap::<
409                        PublicKey,
410                        mocks::application::Mailbox<Sha256Digest, PublicKey>,
411                    >::new()));
412                    let mut collectors = BTreeMap::<
413                        PublicKey,
414                        mocks::collector::Mailbox<Ed25519, Sha256Digest>,
415                    >::new();
416                    spawn_validator_engines(
417                        context.with_label("validator"),
418                        identity.clone(),
419                        &pks,
420                        &validators,
421                        &mut registrations,
422                        &mut mailboxes.lock().unwrap(),
423                        &mut collectors,
424                        Duration::from_millis(100),
425                        Duration::from_secs(5),
426                    );
427                    spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
428
429                    let collector_pairs: Vec<(
430                        PublicKey,
431                        mocks::collector::Mailbox<Ed25519, Sha256Digest>,
432                    )> = collectors
433                        .iter()
434                        .map(|(v, m)| (v.clone(), m.clone()))
435                        .collect();
436                    for (validator, mut mailbox) in collector_pairs {
437                        let completed_clone = completed.clone();
438                        context
439                            .with_label("collector_unclean")
440                            .spawn(|context| async move {
441                                loop {
442                                    let tip = mailbox.get_tip(validator.clone()).await.unwrap_or(0);
443                                    if tip >= 100 {
444                                        completed_clone.lock().unwrap().insert(validator.clone());
445                                        break;
446                                    }
447                                    context.sleep(Duration::from_millis(100)).await;
448                                }
449                            });
450                    }
451                    context.sleep(Duration::from_millis(1000)).await;
452                    *shutdowns.lock().unwrap() += 1;
453                }
454            });
455            let recovered = context.recover();
456            runner = recovered.0;
457            context = recovered.1;
458        }
459    }
460
461    #[test_traced]
462    fn test_network_partition() {
463        let num_validators: u32 = 4;
464        let quorum: u32 = 3;
465        let (runner, mut context, _) = Executor::timed(Duration::from_secs(60));
466        let (identity, mut shares_vec) =
467            ops::generate_shares(&mut context, None, num_validators, quorum);
468        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
469
470        runner.start(async move {
471            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
472                context.with_label("simulation"),
473                num_validators,
474                &mut shares_vec,
475            )
476            .await;
477            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
478                PublicKey,
479                mocks::application::Mailbox<Sha256Digest, PublicKey>,
480            >::new()));
481            let mut collectors =
482                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
483            spawn_validator_engines(
484                context.with_label("validator"),
485                identity.clone(),
486                &pks,
487                &validators,
488                &mut registrations,
489                &mut mailboxes.lock().unwrap(),
490                &mut collectors,
491                Duration::from_millis(100),
492                Duration::from_secs(1),
493            );
494            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
495            // Simulate partition by removing all links.
496            link_validators(&mut oracle, &pks, Action::Unlink, None).await;
497            context.sleep(Duration::from_secs(5)).await;
498            // Heal the partition by re-adding links.
499            let link = Link {
500                latency: 10.0,
501                jitter: 1.0,
502                success_rate: 1.0,
503            };
504            link_validators(&mut oracle, &pks, Action::Link(link), None).await;
505            await_collectors(context.with_label("collector"), &collectors, 100).await;
506        });
507    }
508
509    fn slow_and_lossy_links(seed: u64) -> String {
510        let num_validators: u32 = 4;
511        let quorum: u32 = 3;
512        let cfg = deterministic::Config {
513            seed,
514            timeout: Some(Duration::from_secs(40)),
515            ..deterministic::Config::default()
516        };
517        let (runner, mut context, auditor) = Executor::init(cfg);
518        let (identity, mut shares_vec) =
519            ops::generate_shares(&mut context, None, num_validators, quorum);
520        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
521
522        runner.start(async move {
523            let (oracle, validators, pks, mut registrations) = initialize_simulation(
524                context.with_label("simulation"),
525                num_validators,
526                &mut shares_vec,
527            )
528            .await;
529            let delayed_link = Link {
530                latency: 50.0,
531                jitter: 40.0,
532                success_rate: 0.5,
533            };
534            let mut oracle_clone = oracle.clone();
535            link_validators(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
536
537            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
538                PublicKey,
539                mocks::application::Mailbox<Sha256Digest, PublicKey>,
540            >::new()));
541            let mut collectors =
542                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
543            spawn_validator_engines(
544                context.with_label("validator"),
545                identity.clone(),
546                &pks,
547                &validators,
548                &mut registrations,
549                &mut mailboxes.lock().unwrap(),
550                &mut collectors,
551                Duration::from_millis(100),
552                Duration::from_millis(150),
553            );
554
555            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
556            await_collectors(context.with_label("collector"), &collectors, 40).await;
557        });
558        auditor.state()
559    }
560
561    #[test_traced]
562    fn test_slow_and_lossy_links() {
563        slow_and_lossy_links(0);
564    }
565
566    #[test_traced]
567    fn test_determinism() {
568        // We use slow and lossy links as the deterministic test
569        // because it is the most complex test.
570        for seed in 1..6 {
571            let state_1 = slow_and_lossy_links(seed);
572            let state_2 = slow_and_lossy_links(seed);
573            assert_eq!(state_1, state_2);
574        }
575    }
576
577    #[test_traced]
578    fn test_invalid_signature_injection() {
579        let num_validators: u32 = 4;
580        let quorum: u32 = 3;
581        let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
582        let (identity, mut shares_vec) =
583            ops::generate_shares(&mut context, None, num_validators, quorum);
584        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
585
586        runner.start(async move {
587            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
588                context.with_label("simulation"),
589                num_validators,
590                &mut shares_vec,
591            )
592            .await;
593            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
594                PublicKey,
595                mocks::application::Mailbox<Sha256Digest, PublicKey>,
596            >::new()));
597            let mut collectors =
598                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
599            spawn_validator_engines(
600                context.with_label("validator"),
601                identity.clone(),
602                &pks,
603                &validators,
604                &mut registrations,
605                &mut mailboxes.lock().unwrap(),
606                &mut collectors,
607                Duration::from_millis(100),
608                Duration::from_secs(5),
609            );
610
611            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |i| {
612                i % 10 == 0
613            });
614            await_collectors(context.with_label("collector"), &collectors, 100).await;
615        });
616    }
617}