1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
use std::{cmp, collections::HashMap, sync::Arc};
use crossbeam_skiplist::SkipMap;
use super::{
compact::{Config, MergePointer, WriteTracker},
MergedSSTable, TableInsertor,
};
use crate::{
bucket::{Bucket, ImbalancedBuckets, InsertableToBucket, SSTablesToRemove},
err::Error,
filter::BloomFilter,
memtable::Entry,
types::{BucketMapHandle, CreatedAt, Key, KeyRangeHandle, ValOffset},
};
use crate::{err::Error::*, memtable::SkipMapValue};
/// Sized Tier Compaction Runner (STCS)
///
/// Responsible for merging sstables of almost similar sizes to form a bigger one,
/// obsolete entries are removed from the sstables and expired tombstones are removed
///
#[derive(Debug, Clone)]
pub struct SizedTierRunner<'a> {
/// A thread-safe BucketMap with each bucket mapped to its id
pub(crate) bucket_map: BucketMapHandle,
/// A thread-safe hashmap of sstables each mapped to its key range
pub(crate) key_range: KeyRangeHandle,
/// Compaction configuration
pub(crate) config: &'a Config,
/// Keeps track of tombstones encountered during compaction
/// to predict validity of subseqeunt entries
pub(crate) tombstones: HashMap<Key, CreatedAt>,
}
impl<'a> SizedTierRunner<'a> {
/// creates new instance of `SizedTierRunner`
pub fn new(
bucket_map: BucketMapHandle,
key_range: KeyRangeHandle,
config: &'a Config,
) -> SizedTierRunner<'a> {
Self {
tombstones: HashMap::new(),
bucket_map,
key_range,
config,
}
}
/// Returns buckets whose size exceeds max threshold
pub async fn fetch_imbalanced_buckets(bucket_map: BucketMapHandle) -> ImbalancedBuckets {
bucket_map.read().await.extract_imbalanced_buckets().await
}
/// Main compaction runner
pub async fn run_compaction(&mut self) -> Result<(), Error> {
if self.bucket_map.read().await.is_balanced().await {
return Ok(());
}
// The compaction loop will keep running until there
// are no more buckets with more than minimum treshold size
// TODO: Handle this with multiple threads
loop {
let buckets: BucketMapHandle = Arc::clone(&self.bucket_map);
let key_range = Arc::clone(&self.key_range);
// Step 1: Extract imbalanced buckets
let (imbalanced_buckets, ssts_to_remove) =
SizedTierRunner::fetch_imbalanced_buckets(buckets.clone()).await?;
if imbalanced_buckets.is_empty() {
self.tombstones.clear();
return Ok(());
}
// Step 2: Merge SSTs in each imbalanced buckct
match self.merge_ssts_in_buckets(&imbalanced_buckets.to_owned()).await {
Ok(merged_sstables) => {
let mut tracker = WriteTracker::new(merged_sstables.len());
// Step 3: Insert Merged SSTs to appropriate buckets
for merged_sst in merged_sstables.into_iter() {
let mut bucket = buckets.write().await;
let table = merged_sst.clone().sstable;
let insert_res = bucket.insert_to_appropriate_bucket(Arc::new(table)).await;
drop(bucket);
match insert_res {
Ok(sst) => {
if sst.summary.is_none() {
return Err(TableSummaryIsNone);
}
if sst.filter.is_none() {
return Err(FilterNotProvidedForFlush);
}
// IMPORTANT: Don't keep sst entries in memory
sst.entries.clear();
let summary = sst.summary.clone().unwrap();
// Step 5 Store sst key range
key_range
.set(sst.dir.to_owned(), summary.smallest_key, summary.biggest_key, sst)
.await;
tracker.actual += 1;
}
Err(err) => {
return Err(CompactionFailed(Box::new(err)));
}
}
}
if tracker.expected == tracker.actual {
// Step 6: Delete the sstables that we already merged from their previous buckets
let clean_up_successful = self
.clean_up_after_compaction(buckets, &ssts_to_remove.clone(), key_range)
.await;
match clean_up_successful {
Ok(None) => {
return Err(Error::CompactionPartiallyFailed(Box::new(
CompactionCleanupPartial,
)));
}
Err(err) => {
return Err(Error::CompactionCleanup(Box::new(err)));
}
_ => {}
}
} else {
log::error!("{}", Error::CannotRemoveObsoleteSST)
}
}
Err(err) => return Err(CompactionFailed(Box::new(err))),
}
}
}
/// Removes sstables that are already merged to form larger table(s)
///
/// NOTE: This should only be called if merged sstables have been written to disk
/// otherwise data loss can happen
///
/// # Errors
///
/// Returns error if deletion fails
pub async fn clean_up_after_compaction(
&self,
buckets: BucketMapHandle,
ssts_to_delete: &SSTablesToRemove,
key_range: KeyRangeHandle,
) -> Result<Option<()>, Error> {
// if all obsolete sstables were not deleted then don't remove the associated key range
if buckets.write().await.delete_ssts(ssts_to_delete).await? {
// Step 7: Remove obsolete keys from keys range
for (_, sstables) in ssts_to_delete {
for s in sstables {
key_range.remove(s.dir.to_owned()).await;
}
}
return Ok(Some(()));
}
Ok(None)
}
/// Merges the sstables in each `Bucket` to form a larger one
///
/// Returns `Result` with merged sstable or error
///
/// # Errors
///
/// Returns error incase an error occured during merge
pub async fn merge_ssts_in_buckets(&mut self, buckets: &[Bucket]) -> Result<Vec<MergedSSTable>, Error> {
let mut merged_ssts = Vec::new();
for bucket in buckets.iter() {
let mut hotness: u64 = Default::default();
let tables = &bucket.sstables.read().await;
let mut merged_sst: Box<dyn InsertableToBucket> = Box::new(tables.first().unwrap().to_owned());
for sst in tables[1..].iter() {
let mut insertable_sst = sst.to_owned();
hotness += insertable_sst.hotness;
insertable_sst
.load_entries_from_file()
.await
.map_err(|err| CompactionFailed(Box::new(err)))?;
// TODO: merge_sstables() can be CPU intensive so we should use spawn blocking here
// tokio::task::spawn_blocking(||{
// merge sstable here
// });
merged_sst = self.merge_sstables(merged_sst, Box::new(insertable_sst));
}
let entries = &merged_sst.get_entries();
let mut filter = BloomFilter::new(self.config.filter_false_positive, entries.len());
filter.build_filter_from_entries(entries);
merged_ssts.push(MergedSSTable::new(merged_sst, filter, hotness));
}
if merged_ssts.is_empty() {
return Err(CompactionFailed(Box::new(MergeSSTContainsZeroEntries)));
}
Ok(merged_ssts)
}
/// Merge two `Table` together one returns a larger one
///
/// Errors
///
/// Returns error if an error occured during merge
fn merge_sstables(
&mut self,
sst1: Box<dyn InsertableToBucket>,
sst2: Box<dyn InsertableToBucket>,
) -> Box<dyn InsertableToBucket> {
let mut new_sst = TableInsertor::default();
let new_sst_map = Arc::new(SkipMap::new());
let mut merged_entries = Vec::new();
let entries1 = sst1
.get_entries()
.iter()
.map(|e| {
Entry::new(
e.key().to_vec(),
e.value().val_offset,
e.value().created_at,
e.value().is_tombstone,
)
})
.collect::<Vec<Entry<Key, ValOffset>>>();
let entries2 = sst2
.get_entries()
.iter()
.map(|e| {
Entry::new(
e.key().to_vec(),
e.value().val_offset,
e.value().created_at,
e.value().is_tombstone,
)
})
.collect::<Vec<Entry<Key, ValOffset>>>();
let mut ptr = MergePointer::new();
while ptr.ptr1 < entries1.len() && ptr.ptr2 < entries2.len() {
match entries1[ptr.ptr1].key.cmp(&entries2[ptr.ptr2].key) {
cmp::Ordering::Less => {
self.tombstone_check(&entries1[ptr.ptr1], &mut merged_entries);
ptr.increment_ptr1();
}
cmp::Ordering::Equal => {
if entries1[ptr.ptr1].created_at > entries2[ptr.ptr2].created_at {
self.tombstone_check(&entries1[ptr.ptr1], &mut merged_entries);
} else {
self.tombstone_check(&entries2[ptr.ptr2], &mut merged_entries);
}
ptr.increment_ptr1();
ptr.increment_ptr2();
}
cmp::Ordering::Greater => {
self.tombstone_check(&entries2[ptr.ptr2], &mut merged_entries);
ptr.increment_ptr2();
}
}
}
while ptr.ptr1 < entries1.len() {
self.tombstone_check(&entries1[ptr.ptr1], &mut merged_entries);
ptr.increment_ptr1();
}
while ptr.ptr2 < entries2.len() {
self.tombstone_check(&entries2[ptr.ptr2], &mut merged_entries);
ptr.increment_ptr2();
}
merged_entries.iter().for_each(|e| {
new_sst_map.insert(
e.key.to_owned(),
SkipMapValue::new(e.val_offset, e.created_at, e.is_tombstone),
);
});
new_sst.set_entries(new_sst_map);
Box::new(new_sst)
}
/// Checks if an entry has been deleted or not
///
/// Deleted entries are discoverd using the tombstones hashmap
/// and prevented from being inserted
///
/// Returns true if entry should be inserted or false otherwise
pub(crate) fn tombstone_check(
&mut self,
entry: &Entry<Key, usize>,
merged_entries: &mut Vec<Entry<Key, usize>>,
) {
let mut should_insert = false;
if self.tombstones.contains_key(&entry.key) {
let tomb_insert_time = *self.tombstones.get(&entry.key).unwrap();
if entry.created_at > tomb_insert_time {
if entry.is_tombstone {
self.tombstones.insert(entry.key.to_owned(), entry.created_at);
should_insert = !entry.to_owned().has_expired(self.config.tombstone_ttl);
} else if self.config.use_ttl {
should_insert = !entry.has_expired(self.config.entry_ttl);
} else {
should_insert = true
}
}
} else if entry.is_tombstone {
self.tombstones.insert(entry.key.to_owned(), entry.created_at);
should_insert = !entry.has_expired(self.config.tombstone_ttl);
} else if self.config.use_ttl {
should_insert = !entry.has_expired(self.config.entry_ttl);
} else {
should_insert = true
}
if should_insert {
merged_entries.push(entry.clone())
}
}
}