commonware_broadcast/linked/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks using partial signatures. These can be combined to recover a
14//! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows
15//! external parties to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Design
23//!
24//! The core of the module is the [`Engine`]. It is responsible for:
25//! - Broadcasting nodes (if a sequencer)
26//! - Signing chunks (if a validator)
27//! - Tracking the latest chunk in each sequencer’s chain
28//! - Recovering threshold signatures from partial signatures for each chunk
29//! - Notifying other actors of new chunks and threshold signatures
30//!
31//! # Acknowledgements
32//!
33//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
34//! proof-of-availability could be produced by linking sequencer broadcasts.
35
36use commonware_utils::Array;
37
38mod ack_manager;
39use ack_manager::AckManager;
40mod config;
41pub use config::Config;
42mod engine;
43pub use engine::Engine;
44mod ingress;
45use ingress::{Mailbox, Message};
46mod metrics;
47mod namespace;
48mod parsed;
49mod prover;
50pub use prover::Prover;
51mod serializer;
52mod tip_manager;
53use tip_manager::TipManager;
54mod wire {
55    include!(concat!(env!("OUT_DIR"), "/wire.rs"));
56}
57#[cfg(test)]
58pub mod mocks;
59
60/// Used as the [`Index`](crate::Coordinator::Index) type.
61/// Defines the current set of sequencers and signers.
62///
63/// This is not a single "View" in the sense of a consensus protocol, but rather a continuous
64/// sequence of views in-which the set of sequencers and signers is constant.
65pub type Epoch = u64;
66
67/// Used as the [`Application::Context`](crate::Application::Context) type.
68#[derive(Debug, Clone, Hash, PartialEq, Eq)]
69pub struct Context<P: Array> {
70    /// Sequencer's public key.
71    pub sequencer: P,
72
73    /// Sequencer-specific sequential height. Zero-indexed.
74    pub height: u64,
75}
76
77#[cfg(test)]
78mod tests {
79    use super::{mocks, Config, Engine};
80    use bytes::Bytes;
81    use commonware_cryptography::{
82        bls12381::{
83            dkg::ops,
84            primitives::{group::Share, poly},
85        },
86        ed25519::PublicKey,
87        sha256::{Digest as Sha256Digest, Sha256},
88        Ed25519, Hasher, Scheme,
89    };
90    use commonware_macros::test_traced;
91    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
92    use commonware_runtime::{
93        deterministic::{self, Context, Executor},
94        Metrics,
95    };
96    use commonware_runtime::{Clock, Runner, Spawner};
97    use futures::channel::oneshot;
98    use futures::future::join_all;
99    use std::sync::{Arc, Mutex};
100    use std::{
101        collections::{BTreeMap, HashSet},
102        time::Duration,
103    };
104    use tracing::debug;
105
106    type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
107
108    async fn register_validators(
109        oracle: &mut Oracle<PublicKey>,
110        validators: &[PublicKey],
111    ) -> Registrations<PublicKey> {
112        let mut registrations = BTreeMap::new();
113        for validator in validators.iter() {
114            let (a1, a2) = oracle.register(validator.clone(), 0).await.unwrap();
115            let (b1, b2) = oracle.register(validator.clone(), 1).await.unwrap();
116            registrations.insert(validator.clone(), ((a1, a2), (b1, b2)));
117        }
118        registrations
119    }
120
121    #[allow(dead_code)]
122    enum Action {
123        Link(Link),
124        Update(Link),
125        Unlink,
126    }
127
128    async fn link_validators(
129        oracle: &mut Oracle<PublicKey>,
130        validators: &[PublicKey],
131        action: Action,
132        restrict_to: Option<fn(usize, usize, usize) -> bool>,
133    ) {
134        for (i1, v1) in validators.iter().enumerate() {
135            for (i2, v2) in validators.iter().enumerate() {
136                if v2 == v1 {
137                    continue;
138                }
139                if let Some(f) = restrict_to {
140                    if !f(validators.len(), i1, i2) {
141                        continue;
142                    }
143                }
144                if matches!(action, Action::Update(_) | Action::Unlink) {
145                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
146                }
147                if let Action::Link(ref link) | Action::Update(ref link) = action {
148                    oracle
149                        .add_link(v1.clone(), v2.clone(), link.clone())
150                        .await
151                        .unwrap();
152                }
153            }
154        }
155    }
156
157    async fn initialize_simulation(
158        context: Context,
159        num_validators: u32,
160        shares_vec: &mut [Share],
161    ) -> (
162        Oracle<PublicKey>,
163        Vec<(PublicKey, Ed25519, Share)>,
164        Vec<PublicKey>,
165        Registrations<PublicKey>,
166    ) {
167        let (network, mut oracle) = Network::new(
168            context.with_label("network"),
169            commonware_p2p::simulated::Config {
170                max_size: 1024 * 1024,
171            },
172        );
173        network.start();
174
175        let mut schemes = (0..num_validators)
176            .map(|i| Ed25519::from_seed(i as u64))
177            .collect::<Vec<_>>();
178        schemes.sort_by_key(|s| s.public_key());
179        let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
180            .iter()
181            .enumerate()
182            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
183            .collect();
184        let pks = validators
185            .iter()
186            .map(|(pk, _, _)| pk.clone())
187            .collect::<Vec<_>>();
188
189        let registrations = register_validators(&mut oracle, &pks).await;
190        let link = Link {
191            latency: 10.0,
192            jitter: 1.0,
193            success_rate: 1.0,
194        };
195        link_validators(&mut oracle, &pks, Action::Link(link), None).await;
196        (oracle, validators, pks, registrations)
197    }
198
199    #[allow(clippy::too_many_arguments)]
200    fn spawn_validator_engines(
201        context: Context,
202        identity: poly::Public,
203        pks: &[PublicKey],
204        validators: &[(PublicKey, Ed25519, Share)],
205        registrations: &mut Registrations<PublicKey>,
206        mailboxes: &mut BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>,
207        collectors: &mut BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
208        refresh_epoch_timeout: Duration,
209        rebroadcast_timeout: Duration,
210    ) {
211        let namespace = b"my testing namespace";
212        for (validator, scheme, share) in validators.iter() {
213            let context = context.with_label(&validator.to_string());
214            let mut coordinator = mocks::coordinator::Coordinator::<PublicKey>::new(
215                identity.clone(),
216                pks.to_vec(),
217                *share,
218            );
219            coordinator.set_view(111);
220
221            let (app, app_mailbox) =
222                mocks::application::Application::<Sha256Digest, PublicKey>::new();
223            mailboxes.insert(validator.clone(), app_mailbox.clone());
224
225            let (collector, collector_mailbox) =
226                mocks::collector::Collector::<Ed25519, Sha256Digest>::new(
227                    namespace,
228                    poly::public(&identity),
229                );
230            context.with_label("collector").spawn(|_| collector.run());
231            collectors.insert(validator.clone(), collector_mailbox);
232
233            let (engine, mailbox) = Engine::new(
234                context.with_label("engine"),
235                Config {
236                    crypto: scheme.clone(),
237                    application: app_mailbox.clone(),
238                    collector: collectors.get(validator).unwrap().clone(),
239                    coordinator,
240                    mailbox_size: 1024,
241                    verify_concurrent: 1024,
242                    namespace: namespace.to_vec(),
243                    epoch_bounds: (1, 1),
244                    height_bound: 2,
245                    refresh_epoch_timeout,
246                    rebroadcast_timeout,
247                    journal_heights_per_section: 10,
248                    journal_replay_concurrency: 1,
249                    journal_name_prefix: format!("broadcast-linked-seq/{}/", validator),
250                },
251            );
252
253            context.with_label("app").spawn(|_| app.run(mailbox));
254            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
255            engine.start((a1, a2), (b1, b2));
256        }
257    }
258
259    fn spawn_proposer(
260        context: Context,
261        mailboxes: Arc<
262            Mutex<BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>>,
263        >,
264        invalid_when: fn(u64) -> bool,
265    ) {
266        context
267            .clone()
268            .with_label("invalid signature proposer")
269            .spawn(move |context| async move {
270                let mut iter = 0;
271                loop {
272                    iter += 1;
273                    let mailbox_vec: Vec<mocks::application::Mailbox<Sha256Digest, PublicKey>> = {
274                        let guard = mailboxes.lock().unwrap();
275                        guard.values().cloned().collect()
276                    };
277                    for mut mailbox in mailbox_vec {
278                        let payload = Bytes::from(format!("hello world, iter {}", iter));
279                        let mut hasher = Sha256::default();
280                        hasher.update(&payload);
281
282                        // Inject an invalid digest by updating with the payload again.
283                        if invalid_when(iter) {
284                            hasher.update(&payload);
285                        }
286
287                        let digest = hasher.finalize();
288                        mailbox.broadcast(digest).await;
289                    }
290                    context.sleep(Duration::from_millis(250)).await;
291                }
292            });
293    }
294
295    async fn await_collectors(
296        context: Context,
297        collectors: &BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
298        threshold: u64,
299    ) {
300        let mut receivers = Vec::new();
301        for (sequencer, mailbox) in collectors.iter() {
302            // Create a oneshot channel to signal when the collector has reached the threshold.
303            let (tx, rx) = oneshot::channel();
304            receivers.push(rx);
305
306            // Spawn a watcher for the collector.
307            context.with_label("collector_watcher").spawn({
308                let sequencer = sequencer.clone();
309                let mut mailbox = mailbox.clone();
310                move |context| async move {
311                    loop {
312                        let tip = mailbox.get_tip(sequencer.clone()).await.unwrap_or(0);
313                        debug!(tip, ?sequencer, "collector");
314                        if tip >= threshold {
315                            let _ = tx.send(sequencer.clone());
316                            break;
317                        }
318                        context.sleep(Duration::from_millis(100)).await;
319                    }
320                }
321            });
322        }
323
324        // Wait for all oneshot receivers to complete.
325        let results = join_all(receivers).await;
326        assert_eq!(results.len(), collectors.len());
327    }
328
329    #[test_traced]
330    fn test_all_online() {
331        let num_validators: u32 = 4;
332        let quorum: u32 = 3;
333        let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
334        let (identity, mut shares_vec) =
335            ops::generate_shares(&mut context, None, num_validators, quorum);
336        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
337
338        runner.start(async move {
339            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
340                context.with_label("simulation"),
341                num_validators,
342                &mut shares_vec,
343            )
344            .await;
345            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
346                PublicKey,
347                mocks::application::Mailbox<Sha256Digest, PublicKey>,
348            >::new()));
349            let mut collectors =
350                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
351            spawn_validator_engines(
352                context.with_label("validator"),
353                identity.clone(),
354                &pks,
355                &validators,
356                &mut registrations,
357                &mut mailboxes.lock().unwrap(),
358                &mut collectors,
359                Duration::from_millis(100),
360                Duration::from_secs(5),
361            );
362            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
363            await_collectors(context.with_label("collector"), &collectors, 100).await;
364        });
365    }
366
367    #[test_traced]
368    fn test_unclean_shutdown() {
369        let num_validators: u32 = 4;
370        let quorum: u32 = 3;
371        let (mut runner, mut context, _) = Executor::timed(Duration::from_secs(45));
372        let (identity, mut shares_vec) =
373            ops::generate_shares(&mut context, None, num_validators, quorum);
374        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
375        let completed = Arc::new(Mutex::new(HashSet::new()));
376        let shutdowns = Arc::new(Mutex::new(0u64));
377
378        while completed.lock().unwrap().len() != num_validators as usize {
379            runner.start({
380                let context = context.clone();
381                let completed = completed.clone();
382                let shares_vec = shares_vec.clone();
383                let shutdowns = shutdowns.clone();
384                let identity = identity.clone();
385                async move {
386                    let (network, mut oracle) = Network::new(
387                        context.with_label("network"),
388                        commonware_p2p::simulated::Config {
389                            max_size: 1024 * 1024,
390                        },
391                    );
392                    network.start();
393
394                    let mut schemes = (0..num_validators)
395                        .map(|i| Ed25519::from_seed(i as u64))
396                        .collect::<Vec<_>>();
397                    schemes.sort_by_key(|s| s.public_key());
398                    let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
399                        .iter()
400                        .enumerate()
401                        .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
402                        .collect();
403                    let pks = validators
404                        .iter()
405                        .map(|(pk, _, _)| pk.clone())
406                        .collect::<Vec<_>>();
407
408                    let mut registrations = register_validators(&mut oracle, &pks).await;
409                    let link = commonware_p2p::simulated::Link {
410                        latency: 10.0,
411                        jitter: 1.0,
412                        success_rate: 1.0,
413                    };
414                    link_validators(&mut oracle, &pks, Action::Link(link), None).await;
415
416                    let mailboxes = Arc::new(Mutex::new(BTreeMap::<
417                        PublicKey,
418                        mocks::application::Mailbox<Sha256Digest, PublicKey>,
419                    >::new()));
420                    let mut collectors = BTreeMap::<
421                        PublicKey,
422                        mocks::collector::Mailbox<Ed25519, Sha256Digest>,
423                    >::new();
424                    spawn_validator_engines(
425                        context.with_label("validator"),
426                        identity.clone(),
427                        &pks,
428                        &validators,
429                        &mut registrations,
430                        &mut mailboxes.lock().unwrap(),
431                        &mut collectors,
432                        Duration::from_millis(100),
433                        Duration::from_secs(5),
434                    );
435                    spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
436
437                    let collector_pairs: Vec<(
438                        PublicKey,
439                        mocks::collector::Mailbox<Ed25519, Sha256Digest>,
440                    )> = collectors
441                        .iter()
442                        .map(|(v, m)| (v.clone(), m.clone()))
443                        .collect();
444                    for (validator, mut mailbox) in collector_pairs {
445                        let completed_clone = completed.clone();
446                        context
447                            .with_label("collector_unclean")
448                            .spawn(|context| async move {
449                                loop {
450                                    let tip = mailbox.get_tip(validator.clone()).await.unwrap_or(0);
451                                    if tip >= 100 {
452                                        completed_clone.lock().unwrap().insert(validator.clone());
453                                        break;
454                                    }
455                                    context.sleep(Duration::from_millis(100)).await;
456                                }
457                            });
458                    }
459                    context.sleep(Duration::from_millis(1000)).await;
460                    *shutdowns.lock().unwrap() += 1;
461                }
462            });
463            let recovered = context.recover();
464            runner = recovered.0;
465            context = recovered.1;
466        }
467    }
468
469    #[test_traced]
470    fn test_network_partition() {
471        let num_validators: u32 = 4;
472        let quorum: u32 = 3;
473        let (runner, mut context, _) = Executor::timed(Duration::from_secs(60));
474        let (identity, mut shares_vec) =
475            ops::generate_shares(&mut context, None, num_validators, quorum);
476        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
477
478        runner.start(async move {
479            let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
480                context.with_label("simulation"),
481                num_validators,
482                &mut shares_vec,
483            )
484            .await;
485            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
486                PublicKey,
487                mocks::application::Mailbox<Sha256Digest, PublicKey>,
488            >::new()));
489            let mut collectors =
490                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
491            spawn_validator_engines(
492                context.with_label("validator"),
493                identity.clone(),
494                &pks,
495                &validators,
496                &mut registrations,
497                &mut mailboxes.lock().unwrap(),
498                &mut collectors,
499                Duration::from_millis(100),
500                Duration::from_secs(1),
501            );
502            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
503            // Simulate partition by removing all links.
504            link_validators(&mut oracle, &pks, Action::Unlink, None).await;
505            context.sleep(Duration::from_secs(5)).await;
506            // Heal the partition by re-adding links.
507            let link = Link {
508                latency: 10.0,
509                jitter: 1.0,
510                success_rate: 1.0,
511            };
512            link_validators(&mut oracle, &pks, Action::Link(link), None).await;
513            await_collectors(context.with_label("collector"), &collectors, 100).await;
514        });
515    }
516
517    fn slow_and_lossy_links(seed: u64) -> String {
518        let num_validators: u32 = 4;
519        let quorum: u32 = 3;
520        let cfg = deterministic::Config {
521            seed,
522            timeout: Some(Duration::from_secs(40)),
523            ..deterministic::Config::default()
524        };
525        let (runner, mut context, auditor) = Executor::init(cfg);
526        let (identity, mut shares_vec) =
527            ops::generate_shares(&mut context, None, num_validators, quorum);
528        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
529
530        runner.start(async move {
531            let (oracle, validators, pks, mut registrations) = initialize_simulation(
532                context.with_label("simulation"),
533                num_validators,
534                &mut shares_vec,
535            )
536            .await;
537            let delayed_link = Link {
538                latency: 50.0,
539                jitter: 40.0,
540                success_rate: 0.5,
541            };
542            let mut oracle_clone = oracle.clone();
543            link_validators(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
544
545            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
546                PublicKey,
547                mocks::application::Mailbox<Sha256Digest, PublicKey>,
548            >::new()));
549            let mut collectors =
550                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
551            spawn_validator_engines(
552                context.with_label("validator"),
553                identity.clone(),
554                &pks,
555                &validators,
556                &mut registrations,
557                &mut mailboxes.lock().unwrap(),
558                &mut collectors,
559                Duration::from_millis(100),
560                Duration::from_millis(150),
561            );
562
563            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
564            await_collectors(context.with_label("collector"), &collectors, 40).await;
565        });
566        auditor.state()
567    }
568
569    #[test_traced]
570    fn test_slow_and_lossy_links() {
571        slow_and_lossy_links(0);
572    }
573
574    #[test_traced]
575    fn test_determinism() {
576        // We use slow and lossy links as the deterministic test
577        // because it is the most complex test.
578        for seed in 1..6 {
579            let state_1 = slow_and_lossy_links(seed);
580            let state_2 = slow_and_lossy_links(seed);
581            assert_eq!(state_1, state_2);
582        }
583    }
584
585    #[test_traced]
586    fn test_invalid_signature_injection() {
587        let num_validators: u32 = 4;
588        let quorum: u32 = 3;
589        let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
590        let (identity, mut shares_vec) =
591            ops::generate_shares(&mut context, None, num_validators, quorum);
592        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
593
594        runner.start(async move {
595            let (_oracle, validators, pks, mut registrations) = initialize_simulation(
596                context.with_label("simulation"),
597                num_validators,
598                &mut shares_vec,
599            )
600            .await;
601            let mailboxes = Arc::new(Mutex::new(BTreeMap::<
602                PublicKey,
603                mocks::application::Mailbox<Sha256Digest, PublicKey>,
604            >::new()));
605            let mut collectors =
606                BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
607            spawn_validator_engines(
608                context.with_label("validator"),
609                identity.clone(),
610                &pks,
611                &validators,
612                &mut registrations,
613                &mut mailboxes.lock().unwrap(),
614                &mut collectors,
615                Duration::from_millis(100),
616                Duration::from_secs(5),
617            );
618
619            spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |i| {
620                i % 10 == 0
621            });
622            await_collectors(context.with_label("collector"), &collectors, 100).await;
623        });
624    }
625}