commonware_broadcast/linked/
mod.rs

1//! Ordered, reliable broadcast across reconfigurable participants.
2//!
3//! # Concepts
4//!
5//! The system has two types of network participants: `sequencers` and `validators`. Their sets may
6//! overlap and are defined by the current `epoch`, a monotonically increasing integer. This module
7//! can handle reconfiguration of these sets across different epochs.
8//!
9//! Sequencers broadcast data. The smallest unit of data is a `chunk`. Sequencers broadcast `node`s
10//! that contain a chunk and a threshold signature over the previous chunk, forming a linked chain
11//! of nodes from each sequencer.
12//!
13//! Validators verify and sign chunks using partial signatures. These can be combined to recover a
14//! threshold signature, ensuring a quorum verifies each chunk. The threshold signature allows
15//! external parties to confirm that the chunk was reliably broadcast.
16//!
17//! Network participants persist any new nodes to a journal. This enables recovery from crashes and
18//! ensures that sequencers do not broadcast conflicting chunks and that validators do not sign
19//! them. "Conflicting" chunks are chunks from the same sequencer at the same height with different
20//! payloads.
21//!
22//! # Design
23//!
24//! The core of the module is the `signer` actor. It is responsible for:
25//! - Broadcasting nodes (if a sequencer)
26//! - Signing chunks (if a validator)
27//! - Tracking the latest chunk in each sequencer’s chain
28//! - Recovering threshold signatures from partial signatures for each chunk
29//! - Notifying other actors of new chunks and threshold signatures
30//!
31//! # Acknowledgements
32//!
33//! [Autobahn](https://arxiv.org/abs/2401.10369) provided the insight that a succinct
34//! proof-of-availability could be produced by linking sequencer broadcasts.
35
36use commonware_cryptography::Array;
37
38mod namespace;
39mod parsed;
40mod serializer;
41
42#[cfg(test)]
43pub mod mocks;
44
45mod wire {
46    include!(concat!(env!("OUT_DIR"), "/wire.rs"));
47}
48
49pub mod prover;
50pub mod signer;
51
52/// `Epoch` is used as the `Index` type for the `Coordinator` trait.
53/// Defines the current set of sequencers and signers.
54///
55/// This is not a single "View" in the sense of a consensus protocol, but rather a continuous
56/// sequence of views in-which the set of sequencers and signers is constant.
57pub type Epoch = u64;
58
59/// `Context` is used as the `Context` type for the `Application` and `Collector` traits.
60#[derive(Debug, Clone, Hash, PartialEq, Eq)]
61pub struct Context<P: Array> {
62    /// Sequencer's public key.
63    pub sequencer: P,
64
65    /// Sequencer-specific sequential height. Zero-indexed.
66    pub height: u64,
67}
68
69#[cfg(test)]
70mod tests {
71    use super::{mocks, signer};
72    use bytes::Bytes;
73    use commonware_cryptography::{
74        bls12381::{
75            dkg::ops,
76            primitives::{group::Share, poly},
77        },
78        ed25519::PublicKey,
79        sha256::{Digest as Sha256Digest, Sha256},
80        Ed25519, Hasher, Scheme,
81    };
82    use commonware_macros::test_traced;
83    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
84    use commonware_runtime::deterministic::{self, Context, Executor};
85    use commonware_runtime::{Clock, Runner, Spawner};
86    use futures::channel::oneshot;
87    use futures::future::join_all;
88    use prometheus_client::registry::Registry;
89    use std::sync::{Arc, Mutex};
90    use std::{
91        collections::{BTreeMap, HashSet},
92        time::Duration,
93    };
94    use tracing::debug;
95
96    type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
97
98    async fn register_validators(
99        oracle: &mut Oracle<PublicKey>,
100        validators: &[PublicKey],
101    ) -> Registrations<PublicKey> {
102        let mut registrations = BTreeMap::new();
103        for validator in validators.iter() {
104            let (a1, a2) = oracle.register(validator.clone(), 0).await.unwrap();
105            let (b1, b2) = oracle.register(validator.clone(), 1).await.unwrap();
106            registrations.insert(validator.clone(), ((a1, a2), (b1, b2)));
107        }
108        registrations
109    }
110
111    #[allow(dead_code)]
112    enum Action {
113        Link(Link),
114        Update(Link),
115        Unlink,
116    }
117
118    async fn link_validators(
119        oracle: &mut Oracle<PublicKey>,
120        validators: &[PublicKey],
121        action: Action,
122        restrict_to: Option<fn(usize, usize, usize) -> bool>,
123    ) {
124        for (i1, v1) in validators.iter().enumerate() {
125            for (i2, v2) in validators.iter().enumerate() {
126                if v2 == v1 {
127                    continue;
128                }
129                if let Some(f) = restrict_to {
130                    if !f(validators.len(), i1, i2) {
131                        continue;
132                    }
133                }
134                if matches!(action, Action::Update(_) | Action::Unlink) {
135                    oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
136                }
137                if let Action::Link(ref link) | Action::Update(ref link) = action {
138                    oracle
139                        .add_link(v1.clone(), v2.clone(), link.clone())
140                        .await
141                        .unwrap();
142                }
143            }
144        }
145    }
146
147    async fn initialize_simulation(
148        runtime: &Context,
149        num_validators: u32,
150        shares_vec: &mut [Share],
151    ) -> (
152        Oracle<PublicKey>,
153        Vec<(PublicKey, Ed25519, Share)>,
154        Vec<PublicKey>,
155        Registrations<PublicKey>,
156    ) {
157        let (network, mut oracle) = Network::new(
158            runtime.clone(),
159            commonware_p2p::simulated::Config {
160                registry: Arc::new(Mutex::new(Registry::default())),
161                max_size: 1024 * 1024,
162            },
163        );
164        runtime.clone().spawn("network", network.run());
165
166        let mut schemes = (0..num_validators)
167            .map(|i| Ed25519::from_seed(i as u64))
168            .collect::<Vec<_>>();
169        schemes.sort_by_key(|s| s.public_key());
170        let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
171            .iter()
172            .enumerate()
173            .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
174            .collect();
175        let pks = validators
176            .iter()
177            .map(|(pk, _, _)| pk.clone())
178            .collect::<Vec<_>>();
179
180        let registrations = register_validators(&mut oracle, &pks).await;
181        let link = Link {
182            latency: 10.0,
183            jitter: 1.0,
184            success_rate: 1.0,
185        };
186        link_validators(&mut oracle, &pks, Action::Link(link), None).await;
187        (oracle, validators, pks, registrations)
188    }
189
190    #[allow(clippy::too_many_arguments)]
191    fn spawn_validator_engines(
192        runtime: &Context,
193        identity: poly::Public,
194        pks: &[PublicKey],
195        validators: &[(PublicKey, Ed25519, Share)],
196        registrations: &mut Registrations<PublicKey>,
197        mailboxes: &mut BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>,
198        collectors: &mut BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
199        refresh_epoch_timeout: Duration,
200        rebroadcast_timeout: Duration,
201    ) {
202        let namespace = b"my testing namespace";
203        for (validator, scheme, share) in validators.iter() {
204            let mut coordinator = mocks::coordinator::Coordinator::<PublicKey>::new(
205                identity.clone(),
206                pks.to_vec(),
207                *share,
208            );
209            coordinator.set_view(111);
210
211            let (mut app, app_mailbox) =
212                mocks::application::Application::<Sha256Digest, PublicKey>::new();
213            mailboxes.insert(validator.clone(), app_mailbox.clone());
214
215            let (collector, collector_mailbox) =
216                mocks::collector::Collector::<Ed25519, Sha256Digest>::new(
217                    namespace,
218                    poly::public(&identity),
219                );
220            runtime.clone().spawn("collector", collector.run());
221            collectors.insert(validator.clone(), collector_mailbox);
222
223            let (signer, signer_mailbox) = signer::Actor::new(
224                runtime.clone(),
225                signer::Config {
226                    crypto: scheme.clone(),
227                    application: app_mailbox.clone(),
228                    collector: collectors.get(validator).unwrap().clone(),
229                    coordinator,
230                    mailbox_size: 1024,
231                    pending_verify_size: 1024,
232                    namespace: namespace.to_vec(),
233                    epoch_bounds: (1, 1),
234                    height_bound: 2,
235                    refresh_epoch_timeout,
236                    rebroadcast_timeout,
237                    journal_heights_per_section: 10,
238                    journal_replay_concurrency: 1,
239                    journal_name_prefix: format!("broadcast-linked-seq/{}/", validator),
240                },
241            );
242
243            runtime
244                .clone()
245                .spawn("app", async move { app.run(signer_mailbox).await });
246            let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
247            runtime.clone().spawn(
248                "signer",
249                async move { signer.run((a1, a2), (b1, b2)).await },
250            );
251        }
252    }
253
254    fn spawn_proposer(
255        runtime: &Context,
256        mailboxes: Arc<
257            Mutex<BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>>,
258        >,
259        invalid_when: fn(u64) -> bool,
260    ) {
261        runtime.clone().spawn("invalid signature proposer", {
262            let runtime = runtime.clone();
263            async move {
264                let mut iter = 0;
265                loop {
266                    iter += 1;
267                    let mailbox_vec: Vec<mocks::application::Mailbox<Sha256Digest, PublicKey>> = {
268                        let guard = mailboxes.lock().unwrap();
269                        guard.values().cloned().collect()
270                    };
271                    for mut mailbox in mailbox_vec {
272                        let payload = Bytes::from(format!("hello world, iter {}", iter));
273                        let mut hasher = Sha256::default();
274                        hasher.update(&payload);
275
276                        // Inject an invalid digest by updating with the payload again.
277                        if invalid_when(iter) {
278                            hasher.update(&payload);
279                        }
280
281                        let digest = hasher.finalize();
282                        mailbox.broadcast(digest).await;
283                    }
284                    runtime.sleep(Duration::from_millis(250)).await;
285                }
286            }
287        });
288    }
289
290    async fn await_collectors(
291        runtime: &Context,
292        collectors: &BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
293        threshold: u64,
294    ) {
295        let mut receivers = Vec::new();
296        for (sequencer, mailbox) in collectors.iter() {
297            // Create a oneshot channel to signal when the collector has reached the threshold.
298            let (tx, rx) = oneshot::channel();
299            receivers.push(rx);
300
301            // Spawn a watcher for the collector.
302            runtime.spawn("collector_watcher", {
303                let sequencer = sequencer.clone();
304                let mut mailbox = mailbox.clone();
305                let runtime = runtime.clone();
306                async move {
307                    loop {
308                        let tip = mailbox.get_tip(sequencer.clone()).await.unwrap_or(0);
309                        debug!(tip, ?sequencer, "collector");
310                        if tip >= threshold {
311                            let _ = tx.send(sequencer.clone());
312                            break;
313                        }
314                        runtime.sleep(Duration::from_millis(100)).await;
315                    }
316                }
317            });
318        }
319
320        // Wait for all oneshot receivers to complete.
321        let results = join_all(receivers).await;
322        assert_eq!(results.len(), collectors.len());
323    }
324
325    #[test_traced]
326    fn test_all_online() {
327        let num_validators: u32 = 4;
328        let quorum: u32 = 3;
329        let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
330        let (identity, mut shares_vec) =
331            ops::generate_shares(&mut context, None, num_validators, quorum);
332        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
333
334        runner.start({
335            let context = context.clone();
336            async move {
337                let (_oracle, validators, pks, mut registrations) =
338                    initialize_simulation(&context, num_validators, &mut shares_vec).await;
339                let mailboxes = Arc::new(Mutex::new(BTreeMap::<
340                    PublicKey,
341                    mocks::application::Mailbox<Sha256Digest, PublicKey>,
342                >::new()));
343                let mut collectors =
344                    BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
345                spawn_validator_engines(
346                    &context,
347                    identity.clone(),
348                    &pks,
349                    &validators,
350                    &mut registrations,
351                    &mut mailboxes.lock().unwrap(),
352                    &mut collectors,
353                    Duration::from_millis(100),
354                    Duration::from_secs(5),
355                );
356                spawn_proposer(&context, mailboxes.clone(), |_| false);
357                await_collectors(&context, &collectors, 100).await;
358            }
359        });
360    }
361
362    #[test_traced]
363    fn test_unclean_shutdown() {
364        let num_validators: u32 = 4;
365        let quorum: u32 = 3;
366        let (mut runner, mut context, _) = Executor::timed(Duration::from_secs(45));
367        let (identity, mut shares_vec) =
368            ops::generate_shares(&mut context, None, num_validators, quorum);
369        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
370        let completed = Arc::new(Mutex::new(HashSet::new()));
371        let shutdowns = Arc::new(Mutex::new(0u64));
372
373        while completed.lock().unwrap().len() != num_validators as usize {
374            runner.start({
375                let context = context.clone();
376                let completed = completed.clone();
377                let shares_vec = shares_vec.clone();
378                let shutdowns = shutdowns.clone();
379                let identity = identity.clone();
380                async move {
381                    let (network, mut oracle) = Network::new(
382                        context.clone(),
383                        commonware_p2p::simulated::Config {
384                            registry: Arc::new(Mutex::new(Registry::default())),
385                            max_size: 1024 * 1024,
386                        },
387                    );
388                    context.clone().spawn("network", network.run());
389
390                    let mut schemes = (0..num_validators)
391                        .map(|i| Ed25519::from_seed(i as u64))
392                        .collect::<Vec<_>>();
393                    schemes.sort_by_key(|s| s.public_key());
394                    let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
395                        .iter()
396                        .enumerate()
397                        .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
398                        .collect();
399                    let pks = validators
400                        .iter()
401                        .map(|(pk, _, _)| pk.clone())
402                        .collect::<Vec<_>>();
403
404                    let mut registrations = register_validators(&mut oracle, &pks).await;
405                    let link = commonware_p2p::simulated::Link {
406                        latency: 10.0,
407                        jitter: 1.0,
408                        success_rate: 1.0,
409                    };
410                    link_validators(&mut oracle, &pks, Action::Link(link), None).await;
411
412                    let mailboxes = Arc::new(Mutex::new(BTreeMap::<
413                        PublicKey,
414                        mocks::application::Mailbox<Sha256Digest, PublicKey>,
415                    >::new()));
416                    let mut collectors = BTreeMap::<
417                        PublicKey,
418                        mocks::collector::Mailbox<Ed25519, Sha256Digest>,
419                    >::new();
420                    spawn_validator_engines(
421                        &context,
422                        identity.clone(),
423                        &pks,
424                        &validators,
425                        &mut registrations,
426                        &mut mailboxes.lock().unwrap(),
427                        &mut collectors,
428                        Duration::from_millis(100),
429                        Duration::from_secs(5),
430                    );
431                    spawn_proposer(&context, mailboxes.clone(), |_| false);
432
433                    let collector_pairs: Vec<(
434                        PublicKey,
435                        mocks::collector::Mailbox<Ed25519, Sha256Digest>,
436                    )> = collectors
437                        .iter()
438                        .map(|(v, m)| (v.clone(), m.clone()))
439                        .collect();
440                    for (validator, mut mailbox) in collector_pairs {
441                        let context_cloned = context.clone();
442                        let completed_clone = completed.clone();
443                        context.clone().spawn("collector_unclean", async move {
444                            loop {
445                                let tip = mailbox.get_tip(validator.clone()).await.unwrap_or(0);
446                                if tip >= 100 {
447                                    completed_clone.lock().unwrap().insert(validator.clone());
448                                    break;
449                                }
450                                context_cloned.sleep(Duration::from_millis(100)).await;
451                            }
452                        });
453                    }
454                    context.sleep(Duration::from_millis(1000)).await;
455                    *shutdowns.lock().unwrap() += 1;
456                }
457            });
458            let recovered = context.recover();
459            runner = recovered.0;
460            context = recovered.1;
461        }
462    }
463
464    #[test_traced]
465    fn test_network_partition() {
466        let num_validators: u32 = 4;
467        let quorum: u32 = 3;
468        let (runner, mut context, _) = Executor::timed(Duration::from_secs(60));
469        let (identity, mut shares_vec) =
470            ops::generate_shares(&mut context, None, num_validators, quorum);
471        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
472
473        runner.start({
474            let context = context.clone();
475            async move {
476                let (mut oracle, validators, pks, mut registrations) =
477                    initialize_simulation(&context, num_validators, &mut shares_vec).await;
478                let mailboxes = Arc::new(Mutex::new(BTreeMap::<
479                    PublicKey,
480                    mocks::application::Mailbox<Sha256Digest, PublicKey>,
481                >::new()));
482                let mut collectors =
483                    BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
484                spawn_validator_engines(
485                    &context,
486                    identity.clone(),
487                    &pks,
488                    &validators,
489                    &mut registrations,
490                    &mut mailboxes.lock().unwrap(),
491                    &mut collectors,
492                    Duration::from_millis(100),
493                    Duration::from_secs(1),
494                );
495                spawn_proposer(&context, mailboxes.clone(), |_| false);
496                // Simulate partition by removing all links.
497                link_validators(&mut oracle, &pks, Action::Unlink, None).await;
498                context.sleep(Duration::from_secs(5)).await;
499                // Heal the partition by re-adding links.
500                let link = Link {
501                    latency: 10.0,
502                    jitter: 1.0,
503                    success_rate: 1.0,
504                };
505                link_validators(&mut oracle, &pks, Action::Link(link), None).await;
506                await_collectors(&context, &collectors, 100).await;
507            }
508        });
509    }
510
511    fn slow_and_lossy_links(seed: u64) -> String {
512        let num_validators: u32 = 4;
513        let quorum: u32 = 3;
514        let cfg = deterministic::Config {
515            seed,
516            timeout: Some(Duration::from_secs(40)),
517            ..deterministic::Config::default()
518        };
519        let (runner, mut context, auditor) = Executor::init(cfg);
520        let (identity, mut shares_vec) =
521            ops::generate_shares(&mut context, None, num_validators, quorum);
522        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
523
524        runner.start({
525            let context = context.clone();
526            async move {
527                let (oracle, validators, pks, mut registrations) =
528                    initialize_simulation(&context, num_validators, &mut shares_vec).await;
529                let delayed_link = Link {
530                    latency: 50.0,
531                    jitter: 40.0,
532                    success_rate: 0.5,
533                };
534                let mut oracle_clone = oracle.clone();
535                link_validators(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
536
537                let mailboxes = Arc::new(Mutex::new(BTreeMap::<
538                    PublicKey,
539                    mocks::application::Mailbox<Sha256Digest, PublicKey>,
540                >::new()));
541                let mut collectors =
542                    BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
543                spawn_validator_engines(
544                    &context,
545                    identity.clone(),
546                    &pks,
547                    &validators,
548                    &mut registrations,
549                    &mut mailboxes.lock().unwrap(),
550                    &mut collectors,
551                    Duration::from_millis(100),
552                    Duration::from_millis(150),
553                );
554
555                spawn_proposer(&context, mailboxes.clone(), |_| false);
556                await_collectors(&context, &collectors, 40).await;
557            }
558        });
559        auditor.state()
560    }
561
562    #[test_traced]
563    fn test_slow_and_lossy_links() {
564        slow_and_lossy_links(0);
565    }
566
567    #[test_traced]
568    fn test_determinism() {
569        // We use slow and lossy links as the deterministic test
570        // because it is the most complex test.
571        for seed in 1..6 {
572            let state_1 = slow_and_lossy_links(seed);
573            let state_2 = slow_and_lossy_links(seed);
574            assert_eq!(state_1, state_2);
575        }
576    }
577
578    #[test_traced]
579    fn test_invalid_signature_injection() {
580        let num_validators: u32 = 4;
581        let quorum: u32 = 3;
582        let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
583        let (identity, mut shares_vec) =
584            ops::generate_shares(&mut context, None, num_validators, quorum);
585        shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
586
587        runner.start({
588            let context = context.clone();
589            async move {
590                let (_oracle, validators, pks, mut registrations) =
591                    initialize_simulation(&context, num_validators, &mut shares_vec).await;
592                let mailboxes = Arc::new(Mutex::new(BTreeMap::<
593                    PublicKey,
594                    mocks::application::Mailbox<Sha256Digest, PublicKey>,
595                >::new()));
596                let mut collectors =
597                    BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
598                spawn_validator_engines(
599                    &context,
600                    identity.clone(),
601                    &pks,
602                    &validators,
603                    &mut registrations,
604                    &mut mailboxes.lock().unwrap(),
605                    &mut collectors,
606                    Duration::from_millis(100),
607                    Duration::from_secs(5),
608                );
609
610                spawn_proposer(&context, mailboxes.clone(), |i| i % 10 == 0);
611                await_collectors(&context, &collectors, 100).await;
612            }
613        });
614    }
615}