1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use std::thread::{spawn, JoinHandle};
use std::collections::HashSet;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::time::Instant;
use hash::{hash, Hash};
use entry::Entry;
use logger::{ExitReason, Logger};
use signature::Signature;
use event::Event;
pub struct Historian {
pub sender: SyncSender<Event>,
pub receiver: Receiver<Entry>,
pub thread_hdl: JoinHandle<ExitReason>,
pub signatures: HashSet<Signature>,
}
impl Historian {
pub fn new(start_hash: &Hash, ms_per_tick: Option<u64>) -> Self {
let (sender, event_receiver) = sync_channel(1000);
let (entry_sender, receiver) = sync_channel(1000);
let thread_hdl =
Historian::create_logger(*start_hash, ms_per_tick, event_receiver, entry_sender);
let signatures = HashSet::new();
Historian {
sender,
receiver,
thread_hdl,
signatures,
}
}
fn create_logger(
start_hash: Hash,
ms_per_tick: Option<u64>,
receiver: Receiver<Event>,
sender: SyncSender<Entry>,
) -> JoinHandle<ExitReason> {
spawn(move || {
let mut logger = Logger::new(receiver, sender, start_hash);
let now = Instant::now();
loop {
if let Err(err) = logger.process_events(now, ms_per_tick) {
return err;
}
if ms_per_tick.is_some() {
logger.last_id = hash(&logger.last_id);
logger.num_hashes += 1;
}
}
})
}
}
pub fn reserve_signature(sigs: &mut HashSet<Signature>, sig: &Signature) -> bool {
if sigs.contains(sig) {
return false;
}
sigs.insert(*sig);
true
}
#[cfg(test)]
mod tests {
use super::*;
use log::*;
use event::*;
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test_historian() {
let zero = Hash::default();
let hist = Historian::new(&zero, None);
hist.sender.send(Event::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
hist.sender.send(Event::Tick).unwrap();
sleep(Duration::new(0, 1_000_000));
hist.sender.send(Event::Tick).unwrap();
let entry0 = hist.receiver.recv().unwrap();
let entry1 = hist.receiver.recv().unwrap();
let entry2 = hist.receiver.recv().unwrap();
assert_eq!(entry0.num_hashes, 0);
assert_eq!(entry1.num_hashes, 0);
assert_eq!(entry2.num_hashes, 0);
drop(hist.sender);
assert_eq!(
hist.thread_hdl.join().unwrap(),
ExitReason::RecvDisconnected
);
assert!(verify_slice(&[entry0, entry1, entry2], &zero));
}
#[test]
fn test_historian_closed_sender() {
let zero = Hash::default();
let hist = Historian::new(&zero, None);
drop(hist.receiver);
hist.sender.send(Event::Tick).unwrap();
assert_eq!(
hist.thread_hdl.join().unwrap(),
ExitReason::SendDisconnected
);
}
#[test]
fn test_duplicate_event_signature() {
let mut sigs = HashSet::new();
let sig = Signature::default();
assert!(reserve_signature(&mut sigs, &sig));
assert!(!reserve_signature(&mut sigs, &sig));
}
#[test]
fn test_ticking_historian() {
let zero = Hash::default();
let hist = Historian::new(&zero, Some(20));
sleep(Duration::from_millis(30));
hist.sender.send(Event::Tick).unwrap();
drop(hist.sender);
let entries: Vec<Entry> = hist.receiver.iter().collect();
assert_eq!(entries.len(), 1);
assert_ne!(entries[0].id, zero);
}
}