1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::{collections::BTreeMap, time::Duration};

use arrayref::array_ref;
use async_trait::async_trait;
use bytes::Bytes;
use futures_lite::FutureExt;
use tmelcrypt::{Ed25519PK, Ed25519SK};

use crate::core::Core;

/// Encapsulates a single instance of Streamlette, that eventually comes to consensus on a single decision.
pub struct Decider {
    config: Box<dyn DeciderConfig>,
    core: Core,
    tick: u64,

    decision: Option<Bytes>,
}

impl Decider {
    /// Creates a new Decider.
    pub fn new(config: impl DeciderConfig) -> Self {
        let seed = config.seed();
        let total_votes: u64 = config.vote_weights().values().sum();
        let weights = config.vote_weights();
        let core = Core::new(config.seed(), config.vote_weights(), move |tick| {
            // we first randomly and fairly pick a number between 0 and total_votes.
            let random_point = {
                let mut state = seed.wrapping_add(tick as u128);
                let mut point = u64::MAX;
                while point >= total_votes {
                    let v = tmelcrypt::hash_single(&state.to_be_bytes());
                    state = u128::from_be_bytes(*array_ref![v, 0, 16]);
                    point = (state >> (total_votes as u128).leading_zeros()) as u64;
                }
                point
            };
            // using that random number, we then pick a player according to its weight.
            // we add the weights together until we exceed the random number; the staker we're at when that happens is the selected one
            let mut sum = 0;
            for (&pk, &weight) in weights.iter() {
                sum += weight;
                if sum > random_point {
                    return pk;
                }
            }
            unreachable!()
        });
        Self {
            config: Box::new(config),
            core,
            tick: 0,
            decision: None,
        }
    }

    /// Prints the graphivz representation of everything we have now.
    pub fn debug_graphviz(&self) -> String {
        self.core.debug_graphviz()
    }

    /// Runs the first half of the tick of the Decider. If the decision has been made, return it.
    ///
    /// Does no I/O. Either use [Decider::tick_to_end], or call the [Decider::sync_state] method periodically.
    pub fn pre_tick(&mut self) -> Option<Bytes> {
        self.core.set_max_tick(self.tick + 1);
        if let Some(v) = self.core.get_finalized() {
            self.decision = Some(v.body.clone());
        }
        if self.decision.is_some() {
            return self.decision.clone();
        }
        self.core
            .insert_my_prop_or_solicit(self.tick, self.config.my_secret(), || {
                self.config.generate_proposal()
            });
        None
    }

    /// Runs the first half of the tick of the Decider. If the decision has been made, return it.
    ///
    /// Does no I/O. Either use [Decider::tick_to_end], or call the [Decider::sync_state] method periodically.
    pub fn post_tick(&mut self) -> Option<Bytes> {
        if let Some(v) = self.core.get_finalized() {
            self.decision = Some(v.body.clone());
        }
        if self.decision.is_some() {
            return self.decision.clone();
        }
        // do our logic
        self.core.insert_my_votes(self.config.my_secret());
        self.tick += 1;
        None
    }

    /// Synchronized state, given a timeout.
    pub async fn sync_state(&mut self, timeout: Option<Duration>) {
        if let Some(timeout) = timeout {
            self.config
                .sync_core(&mut self.core)
                .or(async {
                    async_io::Timer::after(timeout).await;
                })
                .await
        } else {
            self.config.sync_core(&mut self.core).await;
        }
    }

    /// Ticks this decider until the decision has been made. We use a gradually increasing synchronization interval that starts from 1 second and increases by 10% every tick.
    ///
    /// If liveness is required, it is generally *not* okay to drop the [Decider] after this function returns. Otherwise, some participants' `tick_to_end` may not return. Instead, the decider should be kept running (by calling `sync_state`) until you're sure everyone has gotten the message.
    pub async fn tick_to_end(&mut self) -> Bytes {
        let mut interval = 2.0f64;
        loop {
            self.pre_tick();
            self.sync_state(Duration::from_secs_f64(interval / 2.0).into())
                .await;
            let result = self.post_tick();
            self.sync_state(Duration::from_secs_f64(interval / 2.0).into())
                .await;
            interval *= 1.6;
            if let Some(result) = result.as_ref() {
                return result.clone();
            }
        }
    }
}

/// Decider is a particular configuration that the consensus protocol must implement.
///
/// Using a trait instead of a struct improves ergonomics of the "callbacks", as well as "polluting" the [Decider] with a generic bound that prevents confusion between [Decider] instances deciding different sorts of facts.
#[async_trait]
pub trait DeciderConfig: Sync + Send + 'static {
    /// Generates a new proposal.
    fn generate_proposal(&self) -> Bytes;

    /// Returns whether a proposed decision is valid.
    fn verify_proposal(&self, prop: &[u8]) -> bool;

    /// Synchronizes, in a best-effort fashion, this "Core" state with other players on the network. Should *never return* and be cancel-safe; the Decider itself will timeout this as needed.
    async fn sync_core(&self, core: &mut Core);

    /// Returns a mapping of each player's public key to how many votes the player has. Must return the same value every time!
    fn vote_weights(&self) -> BTreeMap<Ed25519PK, u64>;

    /// Returns a random seed. Must return the same value every time!
    fn seed(&self) -> u128;

    /// Returns our secret key.
    fn my_secret(&self) -> Ed25519SK;
}