cuendillar 0.1.0

Embedded persistent key–value storage engine (LSM-tree based)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
use std::{
    fs::{File, create_dir_all},
    io::{BufWriter, Write},
    sync::{Arc, OnceLock, RwLock},
};

use byteorder::{BigEndian, WriteBytesExt};
use tracing::{info, instrument};

use crate::{
    DatabaseIterator,
    database::{
        OwnedEntry,
        config::{
            bloom_config::BloomConfig, compaction_config::CompactionConfig,
            index_config::IndexConfig,
        },
        iterator::merged_iterator::MergedIterator,
        sstable::{
            compaction::Compaction,
            errors::SSTableError,
            metadata::{
                SSTMetadata, SSTableFooter, SSTableKeyRange,
                bloom_filter::bloom_factory::BloomFactory, index::index_factory::IndexFactory,
            },
            version::{
                Version,
                version_manager::VersionManager,
                version_update::{
                    VersionOperation::{AddWithMeta, Del},
                    VersionUpdate,
                },
            },
        },
    },
};

pub struct LevelCompaction {
    version_manager: Arc<RwLock<VersionManager>>,
    config: CompactionConfig,
    bloom_config: BloomConfig,
    index_config: IndexConfig,
}

impl LevelCompaction {
    pub fn new(
        version_manager: Arc<RwLock<VersionManager>>,
        config: &CompactionConfig,
        bloom_config: &BloomConfig,
        index_config: &IndexConfig,
    ) -> Self {
        Self {
            version_manager,
            config: config.clone(),
            bloom_config: bloom_config.clone(),
            index_config: index_config.clone(),
        }
    }
    #[instrument(
        name = "Leveled Compaction Encode Table",
        skip(self, it),
        fields(table_id)
    )]
    fn encode_table(
        &self,
        level: u16,
        table_id: uuid::Uuid,
        it: &mut MergedIterator,
        take: usize,
        remove_tombstone: bool,
    ) -> Result<SSTMetadata, SSTableError> {
        let mut enteries = Vec::with_capacity(take);
        let mut taken = 0;
        while take > taken {
            if let Some(entry) = it.next_owned() {
                if remove_tombstone && matches!(entry, OwnedEntry::Tombstone { seq_no: _, key: _ })
                {
                    continue;
                }
                enteries.push(entry);
            } else {
                break;
            }
            taken += 1;
        }
        assert!(taken > 0);
        let level_path = self.config.root_dir.join(format!("l{}", level));
        create_dir_all(&level_path)?;
        let new_table_path = level_path.join(table_id.to_string());
        let file = File::options()
            .append(true)
            .create_new(true)
            .open(&new_table_path)?;

        let mut writer = BufWriter::new(file);

        let mut bloom = BloomFactory::build_bloom_filter(&self.bloom_config, taken as u64);
        let mut index = IndexFactory::build_index(&self.index_config);

        let mut bytes_encoded = 0;
        let mut byte_encoded_since_last_index = self.index_config.index_block_min_size as u64;
        let first_key = enteries[0].get_key().into();
        let last_key = enteries[enteries.len() - 1].get_key().into();
        for i in enteries {
            // check if entry is eligible for entry
            if byte_encoded_since_last_index >= self.index_config.index_block_min_size as u64 {
                index.add_entry(i.get_key(), bytes_encoded);
                byte_encoded_since_last_index = 0;
            }
            let bytes_encoded_for_this_entry = i.encode(&mut writer)?;
            byte_encoded_since_last_index += bytes_encoded_for_this_entry;
            bytes_encoded += bytes_encoded_for_this_entry;
            bloom.add(i.get_key());
        }
        index.add_last_offset(bytes_encoded);

        // now we will serialize the bloom filter
        // first we will write the name of bloom filter for deserilization
        let mut bloom_filter_size = 0;
        let bloom_name = bloom.get_name().as_bytes();
        writer.write_u16::<BigEndian>(bloom_name.len() as u16)?;
        bloom_filter_size += 2;
        writer.write_all(bloom_name)?;
        bloom_filter_size += bloom_name.len() as u64;
        bloom_filter_size += bloom.serialize(&mut writer)?;

        // now we will serialize the index
        // first we will write the name of index for deserilization
        let mut index_size = 0;
        let index_name = index.get_name().as_bytes();
        writer.write_u16::<BigEndian>(index_name.len() as u16)?;
        index_size += 2;
        writer.write_all(index_name)?;
        index_size += index_name.len() as u64;
        index_size += index.serialize(&mut writer)?;
        // now we will write the keyrange block
        let key_range = SSTableKeyRange {
            first_key,
            last_key,
        };
        let key_range_block_size = key_range.serialize(&mut writer)?;
        // now we will create a serialize footer
        let footer = SSTableFooter::new(
            bytes_encoded,
            bloom_filter_size,
            index_size,
            key_range_block_size,
        );
        footer.seriealize(&mut writer)?;
        writer.flush()?;
        Ok(SSTMetadata::new(
            table_id,
            bloom.into(),
            index.into(),
            key_range.first_key,
            key_range.last_key,
            OnceLock::new(),
            new_table_path,
            footer,
        ))
    }

    #[instrument(
        name = "Leveled Compaction Merge",
        skip(self, version, compaction_update, it),
        fields(level)
    )]
    fn compact_ln(
        &self,
        level: u16,
        it: &mut MergedIterator,
        version: &Version,
        compaction_update: &mut VersionUpdate,
    ) -> Result<(), SSTableError> {
        let ln_tables = version
            .get_level_tables(level as usize)
            .map(|v| v.as_slice())
            .unwrap_or_else(|| &[]);
        info!("Total tables in level {} is {}", level, ln_tables.len());
        // we will find the table in which we entries are overlaping
        let mut first_key = it
            .first_entry()
            .expect("Merging with empty iterator Not supported")
            .get_key()
            .to_vec();
        // Assumtion: enteries contain always items >1
        let mut last_key = it
            .last_entry()
            .expect("Merging with empty iterator Not supported")
            .get_key()
            .to_vec();

        let mut curr_index = 0;

        for table in ln_tables {
            // we will check using first and last key
            if (table.key_range.first_key <= first_key && table.key_range.last_key >= first_key)
                || (table.key_range.first_key <= last_key && table.key_range.last_key >= last_key)
                || (first_key <= table.key_range.first_key && last_key >= table.key_range.first_key)
                || (first_key <= table.key_range.last_key && last_key >= table.key_range.last_key)
            {
                compaction_update.add_operation(Del {
                    level: level as u32,
                    id: table.id,
                });
                // we need to merge this into enteries
                // we have two list both sorted and
                it.add_iterator(table.iter(None, None)?);
                // first and last key will change after merging the enteries;
                first_key = it
                    .first_entry()
                    .expect("Merging with empty iterator Not supported")
                    .get_key()
                    .to_vec();
                // Assumtion: enteries contain always items >1
                last_key = it
                    .last_entry()
                    .expect("Merging with empty iterator Not supported")
                    .get_key()
                    .to_vec();
            }
            // else if the last key of enteris is smaller than current table it means the entries sstable should be in front of current
            else if it.peek().is_some() && table.key_range.first_key > last_key {
                // currently we are merging all enteries to single sstable but it should be multiple tables
                let max_enteries_per_sstable = (self.config.level_entries_growth_factor)
                    .pow(level.into())
                    * self.config.base_entries_per_table;
                let is_last_level = level >= self.config.max_level_count as u16;
                while it.peek().is_some() {
                    let sstable_meta = self
                        .encode_table(
                            level,
                            uuid::Uuid::new_v4(),
                            it,
                            max_enteries_per_sstable,
                            is_last_level,
                        )
                        .expect("Error while creating new sstable");
                    compaction_update.add_operation(AddWithMeta {
                        level: level as u32,
                        meta: sstable_meta,
                        index: curr_index,
                    });
                    curr_index += 1;
                }
                curr_index += 1; // this is for the current table (old one in the level) as after inserting the entries we will inser the curr table
            } else {
                curr_index += 1;
            }
        }
        // entereis may be added at the end
        if it.peek().is_some() {
            let max_enteries_per_sstable = (self.config.level_entries_growth_factor)
                .pow(level.into())
                * self.config.base_entries_per_table;
            let is_last_level = level >= self.config.max_level_count as u16;
            while it.peek().is_some() {
                let sstable_meta = self
                    .encode_table(
                        level,
                        uuid::Uuid::new_v4(),
                        it,
                        max_enteries_per_sstable,
                        is_last_level,
                    )
                    .expect("Error while creating new sstable");
                compaction_update.add_operation(AddWithMeta {
                    level: level as u32,
                    meta: sstable_meta,
                    index: curr_index,
                });
                curr_index += 1;
            }
        }
        Ok(())
    }
}

impl Compaction for LevelCompaction {
    #[instrument(name = "Leveled Compaction Need Compaction", skip(self))]
    fn need_compaction(&self) -> bool {
        let version_manager = self
            .version_manager
            .read(
                // "Checking if compaction is needed"
            )
            .unwrap();
        let version = version_manager.get_latest_version();
        match version.get_level_tables(0) {
            Some(tables) if tables.len() < self.config.min_l0_file_count as usize => false,
            None => false,
            _ => true,
        }
    }
    #[instrument(name = "Leveled Compaction Run Compaction", skip(self))]
    fn run_compaction(&self) -> Result<(), SSTableError> {
        info!("Leveled compaction started");
        // we will check for the l0 have table  greater than min files trigger
        let version_manager = self
            .version_manager
            .read(
                // "reading latest version at commpaction start"
            )
            .unwrap();
        let version = version_manager.get_latest_version();
        drop(version_manager);
        let mut version = (*version).clone();
        let mut compaction_update = VersionUpdate::new(0);
        // now we have version we will do compaction in the l0
        // now we get multiple iterator and we will merge them into single table
        let mut iterator = MergedIterator::new();

        for table in version
            .get_level_tables(0)
            .unwrap()
            .iter()
            .take(self.config.max_l0_file_count_per_cycle)
        {
            compaction_update.add_operation(Del {
                level: 0,
                id: table.id,
            });
            let table_iterator = table.iter(None, None)?;
            iterator.add_iterator(table_iterator);
        }
        info!(
            "{} l0 tables tables are poped for compaction",
            compaction_update.operations.len()
        );
        // no we have a single iterator over all the table in l0
        // now we will set merge this table to the l1 and so on
        for i in 1..self.config.max_level_count {
            info!("Compacting level {}", i);
            let mut local_compaction_update = VersionUpdate::new(0);
            self.compact_ln(
                i as u16,
                &mut iterator,
                &version,
                &mut local_compaction_update,
            )?;
            // at this point we will have the version update which will contain all the neccessary updates
            compaction_update
                .operations
                .append(&mut local_compaction_update.operations.clone());
            info!(
                "After compaction version update count become : {}",
                compaction_update.operations.len()
            );
            version.apply_update(local_compaction_update)?;

            // next level we will choose last table only (for now as it is easy to pop)
            let mut li_total_size = 0;
            let mut new_li_meta = version
                .get_level_tables(i)
                .map(|x| x.iter().collect())
                .unwrap_or_else(|| Vec::new());
            for i in &new_li_meta {
                li_total_size += i.get_size();
            }
            // we will check if the level size is exceeding the threshold (to trigger compaction again)
            if li_total_size
                < (self.config.level_size_growth_factor as u64).pow(i as u32)
                    * self.config.level_base_size as u64
                || i == self.config.max_level_count
            {
                // in this branch we will handle when there is no compaction needed
                break;
            }
            // we will pop the last 2 enteries if there are and use them as a new version list
            // and skip the last level as  we can't compact it to next level
            if i < self.config.max_level_count - 1 {
                let poped_table = new_li_meta.pop().unwrap();
                compaction_update.add_operation(Del {
                    level: i as u32,
                    id: poped_table.id,
                });
                iterator = MergedIterator::new();
                iterator.add_iterator(poped_table.iter(None, None)?);
                // we try to move half data we added to the next level
                let mut tables_to_be_poped = new_li_meta.len() / 2;
                while !new_li_meta.is_empty() && tables_to_be_poped > 0 {
                    let poped_table = new_li_meta.pop().unwrap();
                    compaction_update.add_operation(Del {
                        level: i as u32,
                        id: poped_table.id,
                    });
                    iterator.add_iterator(poped_table.iter(None, None)?);
                    tables_to_be_poped -= 1;
                }
            } else {
                // nothing to worry since current iteration is last iteration
                iterator = MergedIterator::new();
            }
        }
        // following the above code the version will contain all the neccessary updates
        Version::normalize_version_update_operation(&mut compaction_update, &self.config.root_dir)?;
        self.version_manager
            .write(
                // "Writing compaction update to the "
            )
            .unwrap()
            .push_version_update(compaction_update)?;
        info!("Compaction completed successfully");
        Ok(())
    }
}