1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use crate::{BTreeMap, HashMap, HashSet};
use std::ops::Bound::Included;
/// u64
pub type Time = u64;
/// u64
pub type PageId = u64;
/// Purpose of the cache is to retain page values at different times.
/// A database is assumed to be a large collection of pages.
/// There can be multiple readers, and one writer.
/// Writers save current page values in the cache before modifying them in the main database.
/// Readers see a consistent version of the database by checking the cache.
pub struct Stash<T> {
time: Time,
pages: HashMap<PageId, StashPage<T>>, // Page for specific PageId.
readers: BTreeMap<Time, usize>, // Count of readers at specified Time.
updates: BTreeMap<Time, HashSet<PageId>>, // Set of PageIds updated at specified Time.
}
impl<T> Default for Stash<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Stash<T> {
/// Create a new empty Stash.
pub fn new() -> Self {
Self {
time: 0,
pages: HashMap::new(),
readers: BTreeMap::new(),
updates: BTreeMap::new(),
}
}
/// Set the value of the specified page for the current time.
pub fn set(&mut self, pageid: PageId, value: T) {
let time = self.time;
let u = self.updates.entry(time).or_insert_with(HashSet::new);
if u.insert(pageid) {
let p = self.pages.entry(pageid).or_insert_with(StashPage::new);
p.set(time, value);
}
}
/// Get the value of the specified page for the specified (registered) time.
pub fn get(&self, pageid: PageId, time: Time) -> Option<&T> {
if let Some(p) = self.pages.get(&pageid) {
p.get(time)
} else {
Option::None
}
}
/// Register that an update operation has completed. The cache time is incremented.
/// Stashd pages may be freed.
pub fn tick(&mut self) {
// println!("Stash tick time={}", self.time);
self.time += 1;
self.trim();
}
/// Register that there is a client reading the database. The result is the cache time.
pub fn begin_read(&mut self) -> Time {
let time = self.time;
// println!("Stash begin read time={}", time);
let n = self.readers.entry(time).or_insert(0);
*n += 1;
time
}
/// Register that the read at the specified time has ended. Stashed pages may be freed.
pub fn end_read(&mut self, time: Time) {
// println!("Stash end read time={}", time);
let n = self.readers.get_mut(&time).unwrap();
*n -= 1;
if *n == 0 {
self.readers.remove(&time);
self.trim();
}
}
fn trim(&mut self) {
// rt is time of first remaining reader.
let rt = *self.readers.keys().next().unwrap_or(&self.time);
// wt is time of first remaining update.
while let Some(&wt) = self.updates.keys().next() {
if wt >= rt {
break;
}
for pid in self.updates.remove(&wt).unwrap() {
let p = self.pages.get_mut(&pid).unwrap();
// println!("Stash trim page {}", pid);
p.trim(rt);
}
}
}
} // end impl Stash
struct StashPage<T> {
values: BTreeMap<Time, T>, // values at different times
}
impl<T> StashPage<T> {
fn new() -> Self {
StashPage {
values: BTreeMap::new(),
}
}
fn set(&mut self, time: Time, value: T) {
self.values.insert(time, value);
}
fn get(&self, time: Time) -> Option<&T> {
match self
.values
.range((Included(&time), Included(&Time::MAX)))
.next()
{
Some((_k, v)) => Some(v),
None => None,
}
}
fn trim(&mut self, to: Time) {
while let Some(&f) = self.values.keys().next() {
if f >= to {
break;
}
self.values.remove(&f);
}
}
} // end impl StashPage
fn _cache_test() {
let mut c = Stash::new();
for rt in 0..10 {
assert_eq!(rt, c.begin_read());
c.set(rt % 3, rt * 10);
c.tick();
}
for rt in 0..10 {
if let Some(v) = c.get(rt % 3, rt) {
println!("At time {} value is {}", rt, v);
}
c.end_read(rt);
}
c.set(0, 0);
c.tick();
}