1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use lattice::Lattice;
use trace::{Trace, Batch, BatchReader};
use trace::wrappers::rc::TraceBox;
use super::TraceAgentQueueWriter;
use super::TraceReplayInstruction;
pub struct TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
{
upper: Vec<Tr::Time>,
trace: Weak<RefCell<TraceBox<Tr>>>,
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
}
impl<Tr> TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
{
pub fn new(
upper: Vec<Tr::Time>,
trace: Weak<RefCell<TraceBox<Tr>>>,
queues: Rc<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>
) -> Self
{
Self { upper, trace, queues }
}
pub fn insert(&mut self, batch: Tr::Batch, hint: Option<Tr::Time>) {
if !(&self.upper[..] == batch.lower()) {
println!("{:?} vs {:?}", self.upper, batch.lower());
}
assert!(&self.upper[..] == batch.lower());
assert!(batch.lower() != batch.upper());
self.upper.clear();
self.upper.extend(batch.upper().iter().cloned());
let mut borrow = self.queues.borrow_mut();
for queue in borrow.iter_mut() {
if let Some(pair) = queue.upgrade() {
pair.1.borrow_mut().push_back(TraceReplayInstruction::Batch(batch.clone(), hint.clone()));
pair.1.borrow_mut().push_back(TraceReplayInstruction::Frontier(batch.upper().to_vec()));
pair.0.activate();
}
}
borrow.retain(|w| w.upgrade().is_some());
if let Some(trace) = self.trace.upgrade() {
trace.borrow_mut().trace.insert(batch);
}
}
pub fn seal(&mut self, upper: &[Tr::Time]) {
if &self.upper[..] != upper {
use trace::Builder;
let builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>>::Builder::new();
let batch = builder.done(&self.upper[..], upper, &self.upper[..]);
self.insert(batch, None);
}
}
}
impl<Tr> Drop for TraceWriter<Tr>
where
Tr: Trace,
Tr::Time: Lattice+Ord+Clone+std::fmt::Debug+'static,
Tr::Batch: Batch<Tr::Key,Tr::Val,Tr::Time,Tr::R>,
{
fn drop(&mut self) {
self.seal(&[])
}
}