1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright 2020 Graydon Hoare <graydon@pobox.com>
// Licensed under the MIT and Apache-2.0 licenses.

use crate::{GlobalTime, PeerID, ServerWatermarksLE, ServerWatermarksLEExt, Srw, Svw};
use async_std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
    collections::BTreeSet,
    sync::atomic::{AtomicU64, Ordering},
    time::Duration,
};

/// Trait to support multiple sorts of clock-source.
pub trait Clock: Send + Sync {
    fn current_time(&self) -> SystemTime;
}

/// An implementation of [Clock] that calls [std::time::SystemTime::now].
pub struct RealClock;
impl Clock for RealClock {
    fn current_time(&self) -> SystemTime {
        std::time::SystemTime::now()
    }
}

/// An implementation of [Clock] that holds a shared [AtomicU64] representing
/// the current millisecond count since the epoch, that increments on each
/// call to `Clock::current_time`.
pub struct TestClock(Arc<AtomicU64>);
impl TestClock {
    pub fn new() -> Self {
        TestClock(Arc::new(AtomicU64::from(0)))
    }
}
impl Clock for TestClock {
    fn current_time(&self) -> SystemTime {
        SystemTime::UNIX_EPOCH + Duration::from_millis(self.0.fetch_add(1, Ordering::SeqCst))
    }
}
/// As in the paper: TidMgr tracks the set of "Transaction IDs" ([GlobalTime]s)
/// coordinated by the current peer. This includes the local server visibility
/// watermark ([Svw]) and server replication watermark ([Srw]) values.
pub struct TidMgr {
    /// The clock source we'll use to draw system time from.
    clock: Box<dyn Clock>,

    /// The previously-issued GlobalTime, will only advance monotonically
    /// regardless of movement of the clock.
    prev: GlobalTime,

    /// Set of all locally-issued timestamps of transactions that are writing
    /// but not yet written to a quorum, so still in S-phase. The svw is derived
    /// as the minimum value of this set (or the next-issuable timestamp if the
    /// set is empty). This is called the TSset in the paper, not very
    /// helpfully.
    pub(crate) wq_set: BTreeSet<GlobalTime>,

    /// Set of all locally-issued timestamps of transactions that are writing
    /// but not yet written to all peers, may or may not still be in S-phase.
    /// The srw is derived as the minimum value of this set (or the
    /// next-issueable timestamp if the set is empty). This set isn't named in
    /// the paper, it's left as an exercise to the reader.
    wa_set: BTreeSet<GlobalTime>,
}

impl TidMgr {
    pub(crate) fn new(self_id: PeerID, clock: Box<dyn Clock>) -> Self {
        TidMgr {
            clock,
            prev: GlobalTime::time_zero_for(self_id),
            wq_set: Default::default(),
            wa_set: Default::default(),
        }
    }

    pub(crate) fn current_time(&self) -> SystemTime {
        self.clock.current_time()
    }

    pub(crate) fn self_id(&self) -> PeerID {
        self.prev.peer
    }

    /// Issues a monotonically-increasing timestamp for the current server,
    /// adding the new timestamp to the tset for this TidMgr.
    ///
    /// Usually this moves forward with the system clock, but if the system
    /// clock stalls, goes backwards, or otherwise misbehaves, we just call
    /// `GlobalTime.next_event()` on the previously-issued timestamp, which in
    /// the worst case may increment the millisecond count if the
    /// per-millisecond event count overflows. This is the best we can do.
    pub(crate) fn create_timestamp(&mut self) -> GlobalTime {
        let now = self.clock.current_time();
        let next_millis = match now.duration_since(UNIX_EPOCH) {
            Err(_) => None,
            Ok(dur) => {
                let secs: u64 = dur.as_secs();
                let millis: u32 = dur.subsec_millis();
                // Seconds since the unix epoch here should be _way_ less
                // than 64bit; it won't exceed 33 bits in my lifetime
                // or that of anyone currently living.
                let mut ms = secs
                    .checked_mul(1000)
                    .expect("create_timestamp sec-to-ms overflow");
                ms = ms
                    .checked_add(millis as u64)
                    .expect("create_timestamp ms-addition overflow");
                Some(ms)
            }
        };
        self.prev = match next_millis {
            // As with "time zero" we issue timestamps counting from event 1 because it's slightly
            // nicer to read "the previous event" in logs when it's 0 not u64::max.
            Some(millis) if millis > self.prev.milli_secs => {
                self.prev.with_milli_sec(millis).with_event(1)
            }
            _ => self.prev.next_event(),
        };
        self.prev
    }

    pub(crate) fn create_watermark_and_start_s_phase(&mut self) -> GlobalTime {
        let ts = self.create_timestamp();
        self.wq_set.insert(ts);
        self.wa_set.insert(ts);
        ts
    }

    /// Removes `ts` from `self.wq_set`, indicating that the transaction in
    /// question has been stored on a quorum, and is therefore finished S-phase.
    /// Panics if the `ts` was not in `self.wq_set`.
    pub(crate) fn stored(&mut self, ts: GlobalTime) {
        assert!(self.wq_set.remove(&ts));
    }

    /// Removes `ts` from `self.rset`, indicating that the transaction in
    /// question has been stored on _all_ peers, and can therefore be read
    /// from any peer directly, without performing a quorum-read.
    pub(crate) fn fully_replicated(&mut self, ts: GlobalTime) {
        assert!(!self.wq_set.contains(&ts));
        assert!(self.wa_set.remove(&ts));
    }

    /// Returns the local server visibility watermark to gossip to other
    /// peers, describing the minimum transaction coordinated by this peer
    /// and still in S-phase. If no txn is in S-phase, we return a new
    /// timestamp to indicate a lower bound on any txn we _could_ issue
    /// in the future.
    pub(crate) fn svw(&mut self) -> Svw {
        match self.wq_set.iter().next() {
            None => Svw(self.create_timestamp()),
            Some(ts) => Svw(ts.clone()),
        }
    }

    pub(crate) fn srw(&mut self) -> Srw {
        match self.wa_set.iter().next() {
            None => Srw(self.create_timestamp()),
            Some(ts) => Srw(ts.clone()),
        }
    }

    pub(crate) fn server_watermarks(&mut self) -> ServerWatermarksLE {
        ServerWatermarksLE::new(self.svw(), self.srw())
    }
}