kip_db 0.1.2-alpha.26.fix1

轻量级、异步 基于LSM Leveled Compaction K-V数据库
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
use crate::kernel::lsm::mem_table::{KeyValue, MemTable};
use crate::kernel::lsm::storage::{Config, StoreInner};
use crate::kernel::lsm::table::meta::TableMeta;
use crate::kernel::lsm::table::scope::Scope;
use crate::kernel::lsm::table::{collect_gen, Table};
use crate::kernel::lsm::version::edit::VersionEdit;
use crate::kernel::lsm::version::status::VersionStatus;
use crate::kernel::lsm::{data_sharding, MAX_LEVEL};
use crate::kernel::KernelResult;
use crate::KernelError;
use bytes::Bytes;
use futures::future;
use itertools::Itertools;
use std::collections::HashSet;
use std::mem;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::oneshot;
use tracing::info;

pub(crate) const LEVEL_0: usize = 0;

/// 数据分片集
/// 包含对应分片的Gen与数据
pub(crate) type MergeShardingVec = Vec<(i64, Vec<KeyValue>)>;
pub(crate) type DelNode = (Vec<i64>, TableMeta);
/// Major压缩时的待删除Gen封装(N为此次Major所压缩的Level),第一个为Level N级,第二个为Level N+1级
pub(crate) type DelNodeTuple = (DelNode, DelNode);
pub type SeekScope = (Scope, usize);

/// Store与Compactor的交互信息
#[derive(Debug)]
pub enum CompactTask {
    Seek(SeekScope),
    Flush(Option<oneshot::Sender<()>>),
}

/// 压缩器
///
/// 负责Minor和Major压缩
pub(crate) struct Compactor {
    store_inner: Arc<StoreInner>,
}

impl Compactor {
    pub(crate) fn new(store_inner: Arc<StoreInner>) -> Self {
        Compactor { store_inner }
    }

    /// 检查并进行压缩 (默认为 异步、被动 的Lazy压缩)
    ///
    /// 默认为try检测是否超出阈值,主要思路为以被动定时检测的机制使
    /// 多事务的commit脱离Compactor的耦合,
    /// 同时减少高并发事务或写入时的频繁Compaction,优先写入后统一压缩,
    /// 减少Level 0热数据的SSTable的冗余数据
    pub(crate) async fn check_then_compaction(
        &mut self,
        option_tx: Option<oneshot::Sender<()>>,
    ) -> KernelResult<()> {
        if let Some((gen, values)) = self.mem_table().swap()? {
            if !values.is_empty() {
                let start = Instant::now();
                // 目前minor触发major时是同步进行的,所以此处对live_tag是在此方法体保持存活
                self.minor_compaction(gen, values).await?;
                info!("[Compactor][Compaction Drop][Time: {:?}]", start.elapsed());
            }
        }

        // 压缩请求响应
        if let Some(tx) = option_tx {
            tx.send(()).map_err(|_| KernelError::ChannelClose)?
        }

        Ok(())
    }

    /// 持久化immutable_table为SSTable
    ///
    /// 请注意:vec_values必须是依照key值有序的
    pub(crate) async fn minor_compaction(
        &self,
        gen: i64,
        values: Vec<KeyValue>,
    ) -> KernelResult<()> {
        if !values.is_empty() {
            let (scope, meta) = self
                .ver_status()
                .loader()
                .create(
                    gen,
                    values,
                    LEVEL_0,
                    self.config().level_table_type[LEVEL_0],
                )
                .await?;

            // `Compactor::data_loading_with_level`中会检测是否达到压缩阈值,因此此处直接调用Major压缩
            self.major_compaction(
                LEVEL_0,
                scope.clone(),
                vec![VersionEdit::NewFile((vec![scope], 0), 0, meta)],
                false,
            )
            .await?;
        }
        Ok(())
    }

    /// Major压缩,负责将不同Level之间的数据向下层压缩转移
    /// 目前Major压缩的大体步骤是
    /// 1. 获取当前Version,通过传入的指定Scope得到该Level与该scope相交的SSTable,命名为tables_l
    /// 2. 获取的tables_l向下一级Level进行类似第2步骤的措施,获取两级之间压缩范围内最恰当的数据(table_ll的范围应当左右应包含与table_l)
    /// 3. tables_l与tables_ll之间的数据并行取出排序归并去重等处理后,分片成多个Vec<KeyValue>
    /// 4. 并行将每个分片各自生成SSTable
    /// 5. 生成的SSTables插入到tables_ll的第一个SSTable位置,并将tables_l和tables_ll的SSTable删除
    /// 6. 将变更的SSTable插入至vec_ver_edit以持久化
    /// Final: 将vec_ver_edit中的数据进行log_and_apply生成新的Version作为最新状态
    ///
    /// 经过压缩测试,Level 1的SSTable总是较多,根据原理推断:
    /// Level0的Key基本是无序的,容易生成大量的SSTable至Level1
    /// 而Level1-MAX_LEVEL的Key排布有序,故转移至下一层的SSTable数量较小
    /// 因此大量数据压缩的情况下Level 1的SSTable数量会较多
    pub(crate) async fn major_compaction(
        &self,
        mut level: usize,
        scope: Scope,
        mut vec_ver_edit: Vec<VersionEdit>,
        mut is_skip_sized: bool,
    ) -> KernelResult<()> {
        let config = self.config();
        let mut is_over = false;

        if level > MAX_LEVEL - 1 {
            return Err(KernelError::LevelOver);
        }

        while level < MAX_LEVEL && !is_over {
            let next_level = level + 1;

            // Tips: is_skip_sized选项仅仅允许跳过一次
            if let Some((
                index,
                ((del_gens_l, del_meta_l), (del_gens_ll, del_meta_ll)),
                vec_sharding,
            )) = self
                .data_loading_with_level(level, &scope, mem::replace(&mut is_skip_sized, false))
                .await?
            {
                let start = Instant::now();
                // 并行创建SSTable
                let table_futures = vec_sharding.into_iter().map(|(gen, sharding)| {
                    self.ver_status().loader().create(
                        gen,
                        sharding,
                        next_level,
                        config.level_table_type[next_level],
                    )
                });
                let vec_table_and_scope: Vec<(Scope, TableMeta)> =
                    future::try_join_all(table_futures).await?;
                let (new_scopes, new_metas): (Vec<Scope>, Vec<TableMeta>) =
                    vec_table_and_scope.into_iter().unzip();
                let fusion_meta = TableMeta::fusion(&new_metas);

                vec_ver_edit.append(&mut vec![
                    VersionEdit::NewFile((new_scopes, next_level), index, fusion_meta),
                    VersionEdit::DeleteFile((del_gens_l, level), del_meta_l),
                    VersionEdit::DeleteFile((del_gens_ll, next_level), del_meta_ll),
                ]);
                info!(
                    "[LsmStore][Major Compaction][recreate_sst][Level: {}][Time: {:?}]",
                    level,
                    start.elapsed()
                );
                level += 1;
            } else {
                is_over = true;
            }
            if !vec_ver_edit.is_empty() {
                self.ver_status()
                    .log_and_apply(
                        mem::take(&mut vec_ver_edit),
                        config.ver_log_snapshot_threshold,
                    )
                    .await?;
            }
        }
        Ok(())
    }

    /// 通过Level进行归并数据加载
    async fn data_loading_with_level(
        &self,
        level: usize,
        target: &Scope,
        is_skip_sized: bool,
    ) -> KernelResult<Option<(usize, DelNodeTuple, MergeShardingVec)>> {
        let version = self.ver_status().current().await;
        let config = self.config();
        let next_level = level + 1;

        // 如果该Level的SSTables数量尚未越出阈值则提取返回空
        if level > MAX_LEVEL - 2
            || !(is_skip_sized || version.is_threshold_exceeded_major(config, level))
        {
            return Ok(None);
        }

        // 此处vec_table_l指此level的Vec<SSTable>, vec_table_ll则是下一级的Vec<SSTable>
        // 类似罗马数字
        let start = Instant::now();

        // 获取此级中有重复键值范围的SSTable
        let (tables_l, scopes_l, _) = version.tables_by_scopes(level, target);
        if scopes_l.is_empty() {
            return Ok(None);
        }

        // 因此使用tables_l向下检测冲突时获取的集合应当含有tables_ll的元素
        let fusion_scope_l = Scope::fusion(&scopes_l).unwrap_or(target.clone());
        // 通过tables_l的scope获取下一级的父集
        let (tables_ll, _, index) = version.tables_by_scopes(next_level, &fusion_scope_l);

        // 收集需要清除的SSTable
        let del_gen_l = collect_gen(&tables_l)?;
        let del_gen_ll = collect_gen(&tables_ll)?;

        // 数据合并并切片
        let vec_merge_sharding =
            Self::data_merge_and_sharding(tables_l, tables_ll, config.sst_file_size).await?;
        info!(
            "[LsmStore][Major Compaction][data_loading_with_level][Time: {:?}]",
            start.elapsed()
        );

        Ok(Some((index, (del_gen_l, del_gen_ll), vec_merge_sharding)))
    }

    /// 以SSTables的数据归并再排序后切片,获取以KeyValue的Key值由小到大的切片排序
    /// 1. 并行获取Level l(当前等级)的待合并SSTables_l的全量数据
    /// 2. 基于SSTables_l获取唯一KeySet用于迭代过滤
    /// 3. 并行对Level ll的SSTables_ll通过KeySet进行迭代同时过滤数据
    /// 4. 组合SSTables_l和SSTables_ll的数据合并并进行唯一,排序处理
    #[allow(clippy::mutable_key_type)]
    async fn data_merge_and_sharding(
        tables_l: Vec<&dyn Table>,
        tables_ll: Vec<&dyn Table>,
        file_size: usize,
    ) -> KernelResult<MergeShardingVec> {
        // SSTables的Gen会基于时间有序生成,所有以此作为SSTables的排序依据
        let map_futures_l = tables_l
            .iter()
            .sorted_unstable_by_key(|table| table.gen())
            .map(|table| async { Self::table_load_data(table, |_| true) });

        let sharding_l = future::try_join_all(map_futures_l).await?;

        // 获取Level l的唯一KeySet用于Level ll的迭代过滤数据
        let filter_set_l: HashSet<&Bytes> = sharding_l
            .iter()
            .flatten()
            .map(|key_value| &key_value.0)
            .collect();

        // 通过KeySet过滤出Level l中需要补充的数据
        // 并行: 因为即使l为0时,此时的ll(Level 1)仍然保证SSTable数据之间排列有序且不冲突,因此并行迭代不会导致数据冲突
        // 过滤: 基于l进行数据过滤避免冗余的数据迭代导致占用大量内存占用
        let sharding_ll = future::try_join_all(tables_ll.iter().map(|table| async {
            Self::table_load_data(table, |key| !filter_set_l.contains(key))
        }))
        .await?;

        // 使用sharding_ll来链接sharding_l以保持数据倒序的顺序是由新->旧
        let vec_cmd_data = sharding_ll
            .into_iter()
            .chain(sharding_l)
            .flatten()
            .rev()
            .unique_by(|(key, _)| key.clone())
            .sorted_unstable_by_key(|(key, _)| key.clone())
            .collect();
        Ok(data_sharding(vec_cmd_data, file_size))
    }

    fn table_load_data<F>(table: &&dyn Table, fn_is_filter: F) -> KernelResult<Vec<KeyValue>>
    where
        F: Fn(&Bytes) -> bool,
    {
        let mut iter = table.iter()?;
        let mut vec_cmd = Vec::with_capacity(table.len());
        while let Some(item) = iter.try_next()? {
            if fn_is_filter(&item.0) {
                vec_cmd.push(item)
            }
        }
        Ok(vec_cmd)
    }

    pub(crate) fn config(&self) -> &Config {
        &self.store_inner.config
    }

    pub(crate) fn mem_table(&self) -> &MemTable {
        &self.store_inner.mem_table
    }

    pub(crate) fn ver_status(&self) -> &VersionStatus {
        &self.store_inner.ver_status
    }
}

#[cfg(test)]
mod tests {
    use crate::kernel::io::{FileExtension, IoFactory, IoType};
    use crate::kernel::lsm::compactor::{Compactor, LEVEL_0};
    use crate::kernel::lsm::storage::{Config, KipStorage, StoreInner};
    use crate::kernel::lsm::table::meta::TableMeta;
    use crate::kernel::lsm::table::scope::Scope;
    use crate::kernel::lsm::table::ss_table::SSTable;
    use crate::kernel::lsm::table::TableType;
    use crate::kernel::lsm::trigger::TriggerType;
    use crate::kernel::lsm::version::edit::VersionEdit;
    use crate::kernel::lsm::version::DEFAULT_SS_TABLE_PATH;
    use crate::kernel::utils::lru_cache::ShardingLruCache;
    use crate::kernel::{KernelResult, Storage};
    use bytes::Bytes;
    use itertools::Itertools;
    use std::collections::hash_map::RandomState;
    use std::sync::atomic::Ordering::Relaxed;
    use std::sync::Arc;
    use std::time::Instant;
    use tempfile::TempDir;

    #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
    async fn test_lsm_major_compactor() -> KernelResult<()> {
        let temp_dir = TempDir::new().expect("unable to create temporary working directory");

        let times = 30_000;

        let value = b"Stray birds of summer come to my window to sing and fly away.
            And yellow leaves of autumn, which have no songs, flutter and fall
            there with a sign.";

        // Tips: 此处由于倍率为1且阈值固定为4,因此容易导致Level 1高出阈值时候导致归并转移到Level 2时,
        // 重复触发阈值,导致迁移到最高等级的Level之中,此情况是理想之中的
        // 普通场景下每个Level之间的阈值数量是有倍数递增的,因此除了极限情况以外,不会发送这种逐级转移的现象
        let config = Config::new(temp_dir.path().to_str().unwrap())
            .major_threshold_with_sst_size(4)
            .level_sst_magnification(1)
            .enable_level_0_memorization()
            .minor_trigger_with_threshold(TriggerType::Count, 1000);
        let kv_store = KipStorage::open_with_config(config).await?;
        let mut vec_kv = Vec::new();

        for i in 0..times {
            let vec_u8 = bincode::serialize(&i)?;
            vec_kv.push((
                Bytes::from(vec_u8.clone()),
                Bytes::from(vec_u8.into_iter().chain(value.to_vec()).collect_vec()),
            ));
        }

        let start = Instant::now();

        assert_eq!(times % 1000, 0);

        for i in 0..times / 1000 {
            for j in 0..1000 {
                kv_store
                    .set(
                        vec_kv[i * 1000 + j].0.clone(),
                        vec_kv[i * 1000 + j].1.clone(),
                    )
                    .await?;
            }
            kv_store.flush().await?;
        }
        println!("[set_for][Time: {:?}]", start.elapsed());

        let version = kv_store.current_version().await;
        let level_slice = &version.level_slice;
        println!("MajorCompaction Test: {:#?}", level_slice);
        assert!(!level_slice[0].is_empty());
        assert!(
            !level_slice[1].is_empty() || !level_slice[2].is_empty() || !level_slice[3].is_empty()
        );

        for (level, slice) in level_slice.iter().enumerate() {
            if !slice.is_empty() && level != LEVEL_0 {
                let mut tmp_scope: Option<&Scope> = None;

                for scope in slice {
                    if let Some(last_scope) = tmp_scope {
                        assert!(last_scope.end < scope.start);
                    }
                    tmp_scope = Some(scope);
                }
            }
        }

        assert_eq!(kv_store.len().await?, times);

        let start = Instant::now();
        for kv in vec_kv.iter().take(times) {
            assert_eq!(kv_store.get(&kv.0).await?, Some(kv.1.clone()));
        }
        println!("[get_for][Time: {:?}]", start.elapsed());
        kv_store.flush().await?;

        Ok(())
    }

    #[tokio::test]
    async fn test_data_merge() -> KernelResult<()> {
        let temp_dir = TempDir::new().expect("unable to create temporary working directory");

        let config = Config::new(temp_dir.into_path());
        let sst_factory = IoFactory::new(
            config.dir_path.join(DEFAULT_SS_TABLE_PATH),
            FileExtension::SSTable,
        )?;
        let cache = Arc::new(ShardingLruCache::new(
            config.block_cache_size,
            16,
            RandomState::default(),
        )?);
        let ss_table_1 = SSTable::new(
            &sst_factory,
            &config,
            Arc::clone(&cache),
            1,
            vec![
                (Bytes::from_static(b"1"), Some(Bytes::from_static(b"1"))),
                (Bytes::from_static(b"2"), Some(Bytes::from_static(b"2"))),
                (Bytes::from_static(b"3"), Some(Bytes::from_static(b"31"))),
            ],
            0,
            IoType::Direct,
        )
        .await?;
        let ss_table_2 = SSTable::new(
            &sst_factory,
            &config,
            Arc::clone(&cache),
            2,
            vec![
                (Bytes::from_static(b"3"), Some(Bytes::from_static(b"3"))),
                (Bytes::from_static(b"4"), Some(Bytes::from_static(b"4"))),
            ],
            0,
            IoType::Direct,
        )
        .await?;
        let ss_table_3 = SSTable::new(
            &sst_factory,
            &config,
            Arc::clone(&cache),
            3,
            vec![
                (Bytes::from_static(b"1"), Some(Bytes::from_static(b"11"))),
                (Bytes::from_static(b"2"), Some(Bytes::from_static(b"21"))),
            ],
            1,
            IoType::Direct,
        )
        .await?;
        let ss_table_4 = SSTable::new(
            &sst_factory,
            &config,
            Arc::clone(&cache),
            4,
            vec![
                (Bytes::from_static(b"3"), Some(Bytes::from_static(b"32"))),
                (Bytes::from_static(b"4"), Some(Bytes::from_static(b"41"))),
                (Bytes::from_static(b"5"), Some(Bytes::from_static(b"5"))),
            ],
            1,
            IoType::Direct,
        )
        .await?;

        let (_, vec_data) = &Compactor::data_merge_and_sharding(
            vec![&ss_table_1, &ss_table_2],
            vec![&ss_table_3, &ss_table_4],
            config.sst_file_size,
        )
        .await?[0];

        assert_eq!(
            vec_data,
            &vec![
                (Bytes::from_static(b"1"), Some(Bytes::from_static(b"1"))),
                (Bytes::from_static(b"2"), Some(Bytes::from_static(b"2"))),
                (Bytes::from_static(b"3"), Some(Bytes::from_static(b"3"))),
                (Bytes::from_static(b"4"), Some(Bytes::from_static(b"4"))),
                (Bytes::from_static(b"5"), Some(Bytes::from_static(b"5")))
            ]
        );
        Ok(())
    }

    /// Key -> 4
    ///
    /// Level 1: [1,2],[3,5,6]
    /// Level 2: [1,2],[3,4],[5,6]
    ///    /// Level 1: [1,2]
    /// Level 2: [1,2],[3,4,5,6]
    #[test]
    fn test_seek_compaction() -> KernelResult<()> {
        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
        let config = Config::new(temp_dir.into_path());

        tokio_test::block_on(async move {
            let inner = StoreInner::new(config).await?;
            let compactor = Compactor::new(Arc::new(inner));
            let version_status = compactor.ver_status();
            let table_loader = version_status.loader();

            let (scope_1, meta_1) = table_loader
                .create(
                    1,
                    vec![
                        (Bytes::from_static(b"1"), None),
                        (Bytes::from_static(b"2"), None),
                    ],
                    1,
                    TableType::BTree,
                )
                .await?;
            let (scope_2, meta_2) = table_loader
                .create(
                    2,
                    vec![
                        (Bytes::from_static(b"3"), None),
                        (Bytes::from_static(b"5"), None),
                        (Bytes::from_static(b"6"), None),
                    ],
                    1,
                    TableType::BTree,
                )
                .await?;
            let (scope_3, meta_3) = table_loader
                .create(
                    3,
                    vec![
                        (Bytes::from_static(b"1"), None),
                        (Bytes::from_static(b"2"), None),
                    ],
                    2,
                    TableType::BTree,
                )
                .await?;
            let (scope_4, meta_4) = table_loader
                .create(
                    4,
                    vec![
                        (Bytes::from_static(b"3"), None),
                        (Bytes::from_static(b"4"), None),
                    ],
                    2,
                    TableType::BTree,
                )
                .await?;
            let (scope_5, meta_5) = table_loader
                .create(
                    5,
                    vec![
                        (Bytes::from_static(b"5"), None),
                        (Bytes::from_static(b"6"), None),
                    ],
                    2,
                    TableType::BTree,
                )
                .await?;
            version_status
                .log_and_apply(
                    vec![
                        VersionEdit::NewFile(
                            (vec![scope_1, scope_2], 1),
                            0,
                            TableMeta::fusion(&[meta_1, meta_2]),
                        ),
                        VersionEdit::NewFile(
                            (vec![scope_3, scope_4, scope_5], 2),
                            0,
                            TableMeta::fusion(&[meta_3, meta_4, meta_5]),
                        ),
                    ],
                    114514,
                )
                .await?;

            let version_1 = version_status.current().await;

            let mut failure_count = 0;
            loop {
                failure_count += 1;
                if let (_, Some((scope, level))) = version_1.query(b"4")? {
                    compactor
                        .major_compaction(level, scope, vec![], true)
                        .await?;
                    break;
                }
                if failure_count >= 200 {
                    panic!("time out!");
                }
            }

            let version_2 = version_status.current().await;
            let level_slice = &version_2.level_slice;

            println!("SeekCompaction Test: {version_2}");
            assert!(!level_slice[1].is_empty());
            assert!(!level_slice[2].is_empty());

            let final_scope_1 = &level_slice[2][0];
            let final_scope_2 = &level_slice[2][1];
            assert_eq!(final_scope_1.start, Bytes::from_static(b"1"));
            assert_eq!(final_scope_1.end, Bytes::from_static(b"2"));
            assert_eq!(final_scope_2.start, Bytes::from_static(b"3"));
            assert_eq!(final_scope_2.end, Bytes::from_static(b"6"));
            assert_eq!(
                final_scope_1.allowed_seeks.clone().unwrap().load(Relaxed),
                0
            );
            assert_eq!(
                final_scope_2.allowed_seeks.clone().unwrap().load(Relaxed),
                0
            );

            Ok(())
        })
    }
}