commonware_consensus/simplex/
engine.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use super::{
    actors::{resolver, voter},
    config::Config,
    Context, View,
};
use crate::{Automaton, Committer, Relay, Supervisor};
use commonware_cryptography::{Hasher, Scheme};
use commonware_macros::select;
use commonware_p2p::{Receiver, Sender};
use commonware_runtime::{Blob, Clock, Spawner, Storage};
use commonware_storage::journal::Journal;
use governor::clock::Clock as GClock;
use rand::{CryptoRng, Rng};
use tracing::debug;

/// Instance of `simplex` consensus engine.
pub struct Engine<
    B: Blob,
    E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B>,
    C: Scheme,
    H: Hasher,
    A: Automaton<Context = Context>,
    R: Relay,
    F: Committer,
    S: Supervisor<Seed = (), Index = View>,
> {
    runtime: E,

    voter: voter::Actor<B, E, C, H, A, R, F, S>,
    voter_mailbox: voter::Mailbox,
    resolver: resolver::Actor<E, C, H, S>,
    resolver_mailbox: resolver::Mailbox,
}

impl<
        B: Blob,
        E: Clock + GClock + Rng + CryptoRng + Spawner + Storage<B>,
        C: Scheme,
        H: Hasher,
        A: Automaton<Context = Context>,
        R: Relay,
        F: Committer,
        S: Supervisor<Seed = (), Index = View>,
    > Engine<B, E, C, H, A, R, F, S>
{
    /// Create a new `simplex` consensus engine.
    pub fn new(runtime: E, journal: Journal<B, E>, cfg: Config<C, H, A, R, F, S>) -> Self {
        // Ensure configuration is valid
        cfg.assert();

        // Create voter
        let (voter, voter_mailbox) = voter::Actor::new(
            runtime.clone(),
            journal,
            voter::Config {
                crypto: cfg.crypto.clone(),
                hasher: cfg.hasher,
                automaton: cfg.automaton,
                relay: cfg.relay,
                committer: cfg.committer,
                supervisor: cfg.supervisor.clone(),
                registry: cfg.registry.clone(),
                mailbox_size: cfg.mailbox_size,
                namespace: cfg.namespace.clone(),
                leader_timeout: cfg.leader_timeout,
                notarization_timeout: cfg.notarization_timeout,
                nullify_retry: cfg.nullify_retry,
                activity_timeout: cfg.activity_timeout,
                replay_concurrency: cfg.replay_concurrency,
            },
        );

        // Create resolver
        let (resolver, resolver_mailbox) = resolver::Actor::new(
            runtime.clone(),
            resolver::Config {
                crypto: cfg.crypto,
                supervisor: cfg.supervisor,
                registry: cfg.registry,
                mailbox_size: cfg.mailbox_size,
                namespace: cfg.namespace,
                activity_timeout: cfg.activity_timeout,
                fetch_timeout: cfg.fetch_timeout,
                fetch_concurrent: cfg.fetch_concurrent,
                max_fetch_count: cfg.max_fetch_count,
                max_fetch_size: cfg.max_fetch_size,
                fetch_rate_per_peer: cfg.fetch_rate_per_peer,
            },
        );

        // Return the engine
        Self {
            runtime,

            voter,
            voter_mailbox,
            resolver,
            resolver_mailbox,
        }
    }

    /// Start the `simplex` consensus engine.
    ///
    /// This will also rebuild the state of the engine from provided `Journal`.
    pub async fn run(
        self,
        voter_network: (impl Sender, impl Receiver),
        resolver_network: (impl Sender, impl Receiver),
    ) {
        // Start the voter
        let (voter_sender, voter_receiver) = voter_network;
        let mut voter = self.runtime.spawn("voter", async move {
            self.voter
                .run(self.resolver_mailbox, voter_sender, voter_receiver)
                .await;
        });

        // Start the resolver
        let (resolver_sender, resolver_receiver) = resolver_network;
        let mut resolver = self.runtime.spawn("resolver", async move {
            self.resolver
                .run(self.voter_mailbox, resolver_sender, resolver_receiver)
                .await;
        });

        // Wait for the resolver or voter to finish
        select! {
            _ = &mut voter => {
                debug!("voter finished");
                resolver.abort();
            },
            _ = &mut resolver => {
                debug!("resolver finished");
                voter.abort();
            },
        }
    }
}