1use std::sync::Arc;
70
71use dashmap::DashMap;
72use parking_lot::RwLock;
73
74use crate::key_buffer::ArenaKeyHandle;
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub enum IndexPolicy {
83 WriteOptimized,
86
87 Balanced,
91
92 ScanOptimized,
96
97 AppendOnly,
101}
102
103impl Default for IndexPolicy {
104 fn default() -> Self {
105 IndexPolicy::Balanced
106 }
107}
108
109impl IndexPolicy {
110 pub fn from_str(s: &str) -> Option<Self> {
112 match s.to_lowercase().as_str() {
113 "write_optimized" | "write-optimized" | "write" => Some(IndexPolicy::WriteOptimized),
114 "balanced" | "default" => Some(IndexPolicy::Balanced),
115 "scan_optimized" | "scan-optimized" | "scan" => Some(IndexPolicy::ScanOptimized),
116 "append_only" | "append-only" | "append" => Some(IndexPolicy::AppendOnly),
117 _ => None,
118 }
119 }
120
121 pub fn write_cost(&self) -> &'static str {
123 match self {
124 IndexPolicy::WriteOptimized => "O(1)",
125 IndexPolicy::Balanced => "O(1) amortized",
126 IndexPolicy::ScanOptimized => "O(log N)",
127 IndexPolicy::AppendOnly => "O(1)",
128 }
129 }
130
131 pub fn scan_cost(&self) -> &'static str {
133 match self {
134 IndexPolicy::WriteOptimized => "O(N)",
135 IndexPolicy::Balanced => "O(output + log K)",
136 IndexPolicy::ScanOptimized => "O(log N + K)",
137 IndexPolicy::AppendOnly => "O(N)",
138 }
139 }
140
141 pub fn has_ordered_index(&self) -> bool {
143 matches!(self, IndexPolicy::ScanOptimized)
144 }
145
146 pub fn supports_efficient_scans(&self) -> bool {
148 matches!(self, IndexPolicy::ScanOptimized | IndexPolicy::Balanced)
149 }
150}
151
152#[derive(Debug, Clone)]
158pub struct TableIndexConfig {
159 pub table_name: String,
161 pub policy: IndexPolicy,
163 pub max_sorted_runs: usize,
165 pub target_run_size: usize,
167 pub enable_bloom_filter: bool,
169}
170
171impl TableIndexConfig {
172 pub fn new(table_name: impl Into<String>, policy: IndexPolicy) -> Self {
174 Self {
175 table_name: table_name.into(),
176 policy,
177 max_sorted_runs: 4,
178 target_run_size: 16 * 1024 * 1024, enable_bloom_filter: true,
180 }
181 }
182
183 pub fn with_max_sorted_runs(mut self, max: usize) -> Self {
185 self.max_sorted_runs = max;
186 self
187 }
188
189 pub fn with_target_run_size(mut self, size: usize) -> Self {
191 self.target_run_size = size;
192 self
193 }
194
195 pub fn with_bloom_filter(mut self, enable: bool) -> Self {
197 self.enable_bloom_filter = enable;
198 self
199 }
200}
201
202pub struct TableIndexRegistry {
211 configs: DashMap<String, TableIndexConfig>,
213 default_policy: RwLock<IndexPolicy>,
215}
216
217impl TableIndexRegistry {
218 pub fn new() -> Self {
220 Self {
221 configs: DashMap::new(),
222 default_policy: RwLock::new(IndexPolicy::Balanced),
223 }
224 }
225
226 pub fn with_default_policy(policy: IndexPolicy) -> Self {
228 Self {
229 configs: DashMap::new(),
230 default_policy: RwLock::new(policy),
231 }
232 }
233
234 pub fn set_default_policy(&self, policy: IndexPolicy) {
236 *self.default_policy.write() = policy;
237 }
238
239 pub fn default_policy(&self) -> IndexPolicy {
241 *self.default_policy.read()
242 }
243
244 pub fn configure_table(&self, config: TableIndexConfig) {
246 self.configs.insert(config.table_name.clone(), config);
247 }
248
249 pub fn get_policy(&self, table_name: &str) -> IndexPolicy {
251 self.configs
252 .get(table_name)
253 .map(|c| c.policy)
254 .unwrap_or_else(|| *self.default_policy.read())
255 }
256
257 pub fn get_config(&self, table_name: &str) -> TableIndexConfig {
259 self.configs
260 .get(table_name)
261 .map(|c| c.clone())
262 .unwrap_or_else(|| TableIndexConfig::new(table_name, *self.default_policy.read()))
263 }
264
265 pub fn has_explicit_config(&self, table_name: &str) -> bool {
267 self.configs.contains_key(table_name)
268 }
269
270 pub fn remove_config(&self, table_name: &str) -> Option<TableIndexConfig> {
272 self.configs.remove(table_name).map(|(_, c)| c)
273 }
274
275 pub fn configured_tables(&self) -> Vec<String> {
277 self.configs.iter().map(|e| e.key().clone()).collect()
278 }
279}
280
281impl Default for TableIndexRegistry {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287#[derive(Debug)]
302pub struct SortedRun<K, V> {
303 entries: Vec<(K, V)>,
305 min_key: Option<K>,
307 max_key: Option<K>,
309 size_bytes: usize,
311 #[allow(dead_code)]
313 created_at: std::time::Instant,
314 level: usize,
316}
317
318impl<K: Ord + Clone, V: Clone> SortedRun<K, V> {
319 pub fn from_unsorted(mut entries: Vec<(K, V)>, level: usize) -> Self {
321 entries.sort_by(|a, b| a.0.cmp(&b.0));
322 let size_bytes = std::mem::size_of_val(&entries);
323 let min_key = entries.first().map(|(k, _)| k.clone());
324 let max_key = entries.last().map(|(k, _)| k.clone());
325 Self {
326 entries,
327 min_key,
328 max_key,
329 size_bytes,
330 created_at: std::time::Instant::now(),
331 level,
332 }
333 }
334
335 pub fn from_sorted(entries: Vec<(K, V)>, level: usize) -> Self {
337 let size_bytes = std::mem::size_of_val(&entries);
338 let min_key = entries.first().map(|(k, _)| k.clone());
339 let max_key = entries.last().map(|(k, _)| k.clone());
340 Self {
341 entries,
342 min_key,
343 max_key,
344 size_bytes,
345 created_at: std::time::Instant::now(),
346 level,
347 }
348 }
349
350 pub fn len(&self) -> usize {
352 self.entries.len()
353 }
354
355 pub fn is_empty(&self) -> bool {
357 self.entries.is_empty()
358 }
359
360 pub fn size_bytes(&self) -> usize {
362 self.size_bytes
363 }
364
365 pub fn level(&self) -> usize {
367 self.level
368 }
369
370 pub fn get(&self, key: &K) -> Option<&V> {
372 self.entries
373 .binary_search_by(|(k, _)| k.cmp(key))
374 .ok()
375 .map(|idx| &self.entries[idx].1)
376 }
377
378 pub fn range_from<'a>(&'a self, start: &K) -> impl Iterator<Item = &'a (K, V)> {
380 let idx = self
381 .entries
382 .binary_search_by(|(k, _)| k.cmp(start))
383 .unwrap_or_else(|i| i);
384 self.entries[idx..].iter()
385 }
386
387 pub fn range<'a>(&'a self, start: &K, end: &K) -> impl Iterator<Item = &'a (K, V)> {
389 let start_idx = self
390 .entries
391 .binary_search_by(|(k, _)| k.cmp(start))
392 .unwrap_or_else(|i| i);
393 let end_idx = self
394 .entries
395 .binary_search_by(|(k, _)| k.cmp(end))
396 .unwrap_or_else(|i| i);
397 self.entries[start_idx..end_idx].iter()
398 }
399
400 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
402 self.entries.iter()
403 }
404
405 #[inline]
410 pub fn entries(&self) -> &[(K, V)] {
411 &self.entries
412 }
413
414 #[inline]
428 pub fn overlaps_prefix(&self, prefix: &K) -> bool {
429 match &self.max_key {
430 Some(max) if max < prefix => false, _ => true, }
433 }
434
435 #[inline]
440 pub fn overlaps_range(&self, start: &K, end: &K) -> bool {
441 match (&self.min_key, &self.max_key) {
443 (Some(min), _) if min >= end => false, (_, Some(max)) if max < start => false, _ => true, }
447 }
448
449 #[inline]
451 pub fn min_key(&self) -> Option<&K> {
452 self.min_key.as_ref()
453 }
454
455 #[inline]
457 pub fn max_key(&self) -> Option<&K> {
458 self.max_key.as_ref()
459 }
460}
461
462pub struct BalancedTableIndex<V: Clone + Send + Sync + Eq + 'static> {
473 memtable: DashMap<ArenaKeyHandle, V>,
475 sorted_runs: RwLock<Vec<Arc<SortedRun<ArenaKeyHandle, V>>>>,
477 config: TableIndexConfig,
479 memtable_size: std::sync::atomic::AtomicUsize,
481}
482
483impl<V: Clone + Send + Sync + Eq + 'static> BalancedTableIndex<V> {
484 pub fn new(config: TableIndexConfig) -> Self {
486 Self {
487 memtable: DashMap::new(),
488 sorted_runs: RwLock::new(Vec::new()),
489 config,
490 memtable_size: std::sync::atomic::AtomicUsize::new(0),
491 }
492 }
493
494 pub fn insert(&self, key: ArenaKeyHandle, value: V) {
496 let key_size = key.len();
497 let value_size = std::mem::size_of::<V>();
498
499 self.memtable.insert(key, value);
500 self.memtable_size
501 .fetch_add(key_size + value_size, std::sync::atomic::Ordering::Relaxed);
502 }
503
504 pub fn get(&self, key: &ArenaKeyHandle) -> Option<V> {
509 if let Some(v) = self.memtable.get(key) {
511 return Some(v.clone());
512 }
513
514 let runs = self.sorted_runs.read();
517 for run in runs.iter().rev() {
518 if run.overlaps_prefix(key) {
520 if let Some(v) = run.get(key) {
521 return Some(v.clone());
522 }
523 }
524 }
525
526 None
527 }
528
529 pub fn scan_prefix(&self, prefix: &ArenaKeyHandle) -> Vec<(ArenaKeyHandle, V)> {
541 use std::cmp::Reverse;
542 use std::collections::BinaryHeap;
543
544 #[derive(Eq, PartialEq)]
545 struct PrefixHeapEntry<V: Clone> {
546 key: ArenaKeyHandle,
547 value: V,
548 source_idx: usize, }
550
551 impl<V: Clone + Eq + PartialEq> Ord for PrefixHeapEntry<V> {
552 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
553 match self.key.cmp(&other.key) {
554 std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
555 other => other,
556 }
557 }
558 }
559
560 impl<V: Clone + Eq + PartialEq> PartialOrd for PrefixHeapEntry<V> {
561 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
562 Some(self.cmp(other))
563 }
564 }
565
566 let mut heap: BinaryHeap<Reverse<PrefixHeapEntry<V>>> = BinaryHeap::new();
567
568 for entry in self.memtable.iter() {
570 let key = entry.key();
571 if key.as_bytes().starts_with(prefix.as_bytes()) {
572 heap.push(Reverse(PrefixHeapEntry {
573 key: key.clone(),
574 value: entry.value().clone(),
575 source_idx: 0,
576 }));
577 }
578 }
579
580 let runs = self.sorted_runs.read();
582 for (run_idx, run) in runs.iter().enumerate() {
583 if !run.overlaps_prefix(prefix) {
585 continue; }
587
588 for (key, value) in run.range_from(prefix) {
590 if !key.as_bytes().starts_with(prefix.as_bytes()) {
591 break; }
593 heap.push(Reverse(PrefixHeapEntry {
594 key: key.clone(),
595 value: value.clone(),
596 source_idx: run_idx + 1, }));
598 }
599 }
600
601 let mut result = Vec::with_capacity(heap.len());
603 let mut last_key: Option<ArenaKeyHandle> = None;
604
605 while let Some(Reverse(entry)) = heap.pop() {
606 let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
607 if is_new_key {
608 last_key = Some(entry.key.clone());
609 result.push((entry.key, entry.value));
610 }
611 }
612
613 result
614 }
615
616 pub fn needs_compaction(&self) -> bool {
618 let memtable_size = self
619 .memtable_size
620 .load(std::sync::atomic::Ordering::Relaxed);
621 let runs = self.sorted_runs.read();
622
623 memtable_size >= self.config.target_run_size || runs.len() >= self.config.max_sorted_runs
624 }
625
626 pub fn compact_memtable(&self) {
628 let entries: Vec<_> = self
630 .memtable
631 .iter()
632 .map(|e| (e.key().clone(), e.value().clone()))
633 .collect();
634
635 if entries.is_empty() {
636 return;
637 }
638
639 self.memtable.clear();
641 self.memtable_size
642 .store(0, std::sync::atomic::Ordering::Relaxed);
643
644 let run = Arc::new(SortedRun::from_unsorted(entries, 0));
646
647 let mut runs = self.sorted_runs.write();
648 runs.push(run);
649 }
650
651 pub fn merge_runs(&self, levels_to_merge: usize) {
653 let mut runs = self.sorted_runs.write();
654
655 if runs.len() < levels_to_merge {
656 return;
657 }
658
659 let to_merge: Vec<_> = runs.drain(..levels_to_merge).collect();
661
662 let merged = self.k_way_merge(&to_merge);
664
665 let new_run = Arc::new(SortedRun::from_sorted(merged, to_merge.len()));
667 runs.insert(0, new_run);
668 }
669
670 fn k_way_merge(&self, runs: &[Arc<SortedRun<ArenaKeyHandle, V>>]) -> Vec<(ArenaKeyHandle, V)> {
678 use std::cmp::Reverse;
679 use std::collections::BinaryHeap;
680
681 #[derive(Eq, PartialEq)]
682 struct HeapEntry<V: Clone> {
683 key: ArenaKeyHandle,
684 value: V,
685 run_idx: usize,
686 entry_idx: usize,
687 }
688
689 impl<V: Clone + Eq + PartialEq> Ord for HeapEntry<V> {
690 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
691 match self.key.cmp(&other.key) {
694 std::cmp::Ordering::Equal => self.run_idx.cmp(&other.run_idx),
695 other => other,
696 }
697 }
698 }
699
700 impl<V: Clone + Eq + PartialEq> PartialOrd for HeapEntry<V> {
701 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
702 Some(self.cmp(other))
703 }
704 }
705
706 let mut heap: BinaryHeap<Reverse<HeapEntry<V>>> = BinaryHeap::new();
707
708 let mut run_positions: Vec<usize> = vec![0; runs.len()];
710
711 for (run_idx, run) in runs.iter().enumerate() {
713 let entries = run.entries();
714 if !entries.is_empty() {
715 let (key, value) = &entries[0]; heap.push(Reverse(HeapEntry {
717 key: key.clone(),
718 value: value.clone(),
719 run_idx,
720 entry_idx: 0,
721 }));
722 }
723 }
724
725 let estimated_size: usize = runs.iter().map(|r| r.len()).sum();
727 let mut result = Vec::with_capacity(estimated_size);
728 let mut last_key: Option<ArenaKeyHandle> = None;
729
730 while let Some(Reverse(entry)) = heap.pop() {
731 let is_new_key = last_key.as_ref().map(|k| k != &entry.key).unwrap_or(true);
733 if is_new_key {
734 last_key = Some(entry.key.clone());
735 result.push((entry.key.clone(), entry.value));
736 }
737
738 run_positions[entry.run_idx] += 1;
740 let next_idx = run_positions[entry.run_idx];
741
742 let run_entries = runs[entry.run_idx].entries();
744 if next_idx < run_entries.len() {
745 let (key, value) = &run_entries[next_idx]; heap.push(Reverse(HeapEntry {
747 key: key.clone(),
748 value: value.clone(),
749 run_idx: entry.run_idx,
750 entry_idx: next_idx,
751 }));
752 }
753 }
754
755 result
756 }
757
758 pub fn config(&self) -> &TableIndexConfig {
760 &self.config
761 }
762
763 pub fn memtable_size(&self) -> usize {
765 self.memtable_size
766 .load(std::sync::atomic::Ordering::Relaxed)
767 }
768
769 pub fn run_count(&self) -> usize {
771 self.sorted_runs.read().len()
772 }
773}
774
775#[cfg(test)]
780mod tests {
781 use super::*;
782
783 #[test]
784 fn test_index_policy_from_str() {
785 assert_eq!(
786 IndexPolicy::from_str("write_optimized"),
787 Some(IndexPolicy::WriteOptimized)
788 );
789 assert_eq!(
790 IndexPolicy::from_str("balanced"),
791 Some(IndexPolicy::Balanced)
792 );
793 assert_eq!(
794 IndexPolicy::from_str("scan-optimized"),
795 Some(IndexPolicy::ScanOptimized)
796 );
797 assert_eq!(
798 IndexPolicy::from_str("append_only"),
799 Some(IndexPolicy::AppendOnly)
800 );
801 assert_eq!(IndexPolicy::from_str("invalid"), None);
802 }
803
804 #[test]
805 fn test_registry_default_policy() {
806 let registry = TableIndexRegistry::new();
807
808 assert_eq!(registry.get_policy("unknown"), IndexPolicy::Balanced);
810
811 registry.configure_table(TableIndexConfig::new("users", IndexPolicy::WriteOptimized));
813 assert_eq!(registry.get_policy("users"), IndexPolicy::WriteOptimized);
814
815 assert_eq!(registry.get_policy("orders"), IndexPolicy::Balanced);
817 }
818
819 #[test]
820 fn test_registry_change_default() {
821 let registry = TableIndexRegistry::new();
822
823 registry.set_default_policy(IndexPolicy::ScanOptimized);
824 assert_eq!(registry.get_policy("any_table"), IndexPolicy::ScanOptimized);
825 }
826
827 #[test]
828 fn test_sorted_run() {
829 let entries = vec![
830 (ArenaKeyHandle::new(b"c"), 3),
831 (ArenaKeyHandle::new(b"a"), 1),
832 (ArenaKeyHandle::new(b"b"), 2),
833 ];
834
835 let run = SortedRun::from_unsorted(entries, 0);
836
837 assert_eq!(run.len(), 3);
838 assert_eq!(run.get(&ArenaKeyHandle::new(b"a")), Some(&1));
839 assert_eq!(run.get(&ArenaKeyHandle::new(b"b")), Some(&2));
840 assert_eq!(run.get(&ArenaKeyHandle::new(b"c")), Some(&3));
841 assert_eq!(run.get(&ArenaKeyHandle::new(b"d")), None);
842 }
843
844 #[test]
845 fn test_balanced_table_index() {
846 let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
847 let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
848
849 index.insert(ArenaKeyHandle::new(b"key1"), 1);
850 index.insert(ArenaKeyHandle::new(b"key2"), 2);
851
852 assert_eq!(index.get(&ArenaKeyHandle::new(b"key1")), Some(1));
853 assert_eq!(index.get(&ArenaKeyHandle::new(b"key2")), Some(2));
854 assert_eq!(index.get(&ArenaKeyHandle::new(b"key3")), None);
855 }
856
857 #[test]
858 fn test_balanced_compaction() {
859 let config = TableIndexConfig::new("test", IndexPolicy::Balanced).with_target_run_size(100); let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
862
863 for i in 0..10 {
864 let key = format!("key{:03}", i);
865 index.insert(ArenaKeyHandle::new(key.as_bytes()), i as i32);
866 }
867
868 index.compact_memtable();
870
871 assert_eq!(index.run_count(), 1);
872 assert_eq!(index.memtable_size(), 0);
873
874 assert_eq!(index.get(&ArenaKeyHandle::new(b"key005")), Some(5));
876 }
877
878 #[test]
879 fn test_k_way_merge_scaling() {
880 use std::time::Instant;
883
884 let sizes = [100, 500, 1000];
885 let mut times_ns: Vec<u128> = Vec::new();
886
887 for size in sizes {
888 let runs: Vec<Arc<SortedRun<ArenaKeyHandle, i32>>> = (0..5)
890 .map(|run_id| {
891 let entries: Vec<(ArenaKeyHandle, i32)> = (0..size)
892 .map(|i| {
893 let key = format!("key_{:08}_{}", i * 5 + run_id, run_id);
894 (ArenaKeyHandle::new(key.as_bytes()), (i * 5 + run_id) as i32)
895 })
896 .collect();
897 Arc::new(SortedRun::from_sorted(entries, run_id))
898 })
899 .collect();
900
901 let config = TableIndexConfig::new("test", IndexPolicy::Balanced);
902 let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
903
904 let start = Instant::now();
905 let merged = index.k_way_merge(&runs);
906 let elapsed = start.elapsed();
907
908 times_ns.push(elapsed.as_nanos());
909
910 let total_entries = size * 5;
912 assert_eq!(
913 merged.len(),
914 total_entries,
915 "Merge should produce all unique entries"
916 );
917 }
918
919 if times_ns.len() >= 2 && times_ns[0] > 0 {
923 let ratio_1_to_2 = times_ns[1] as f64 / times_ns[0] as f64;
924 let ratio_2_to_3 = times_ns[2] as f64 / times_ns[1] as f64;
925
926 assert!(
929 ratio_1_to_2 < 15.0,
930 "Merge scaling should be sub-quadratic: ratio={:.1}x for 5x size",
931 ratio_1_to_2
932 );
933 assert!(
934 ratio_2_to_3 < 10.0,
935 "Merge scaling should be sub-quadratic: ratio={:.1}x for 2x size",
936 ratio_2_to_3
937 );
938 }
939 }
940
941 #[test]
942 fn test_sorted_run_metadata_pruning() {
943 let entries = vec![
945 (ArenaKeyHandle::new(b"apple"), 1),
946 (ArenaKeyHandle::new(b"banana"), 2),
947 (ArenaKeyHandle::new(b"cherry"), 3),
948 ];
949 let run = SortedRun::from_sorted(entries, 0);
950
951 assert_eq!(
953 run.min_key().map(|k| k.as_bytes()),
954 Some(b"apple".as_slice())
955 );
956 assert_eq!(
957 run.max_key().map(|k| k.as_bytes()),
958 Some(b"cherry".as_slice())
959 );
960
961 assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"banana"))); assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"apple"))); assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"cherry"))); assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"date"))); assert!(!run.overlaps_prefix(&ArenaKeyHandle::new(b"zebra"))); assert!(run.overlaps_range(
972 &ArenaKeyHandle::new(b"banana"),
973 &ArenaKeyHandle::new(b"cherry")
974 ));
975 assert!(!run.overlaps_range(&ArenaKeyHandle::new(b"date"), &ArenaKeyHandle::new(b"fig"))); assert!(!run.overlaps_range(&ArenaKeyHandle::new(b"aaa"), &ArenaKeyHandle::new(b"aab"))); }
978
979 #[test]
980 fn test_scan_prefix() {
981 let config = TableIndexConfig::new("test", IndexPolicy::Balanced).with_target_run_size(50); let index: BalancedTableIndex<i32> = BalancedTableIndex::new(config);
983
984 let prefixes = ["user:1:", "user:2:", "order:1:", "order:2:"];
986 for (i, prefix) in prefixes.iter().enumerate() {
987 for j in 0..5 {
988 let key = format!("{}{}", prefix, j);
989 index.insert(ArenaKeyHandle::new(key.as_bytes()), (i * 10 + j) as i32);
990 }
991 }
992
993 index.compact_memtable();
995
996 index.insert(ArenaKeyHandle::new(b"user:1:99"), 199);
998 index.insert(ArenaKeyHandle::new(b"order:1:99"), 299);
999
1000 let results = index.scan_prefix(&ArenaKeyHandle::new(b"user:1:"));
1002 assert_eq!(results.len(), 6); for (key, _value) in &results {
1006 assert!(
1007 key.as_bytes().starts_with(b"user:1:"),
1008 "Key {:?} should start with user:1:",
1009 String::from_utf8_lossy(key.as_bytes())
1010 );
1011 }
1012
1013 for window in results.windows(2) {
1015 assert!(
1016 window[0].0 <= window[1].0,
1017 "Results should be sorted by key"
1018 );
1019 }
1020
1021 let results = index.scan_prefix(&ArenaKeyHandle::new(b"order:"));
1023 assert_eq!(results.len(), 11); }
1025
1026 #[test]
1027 fn test_empty_sorted_run_metadata() {
1028 let entries: Vec<(ArenaKeyHandle, i32)> = vec![];
1030 let run = SortedRun::from_sorted(entries, 0);
1031
1032 assert!(run.min_key().is_none());
1033 assert!(run.max_key().is_none());
1034 assert!(run.overlaps_prefix(&ArenaKeyHandle::new(b"anything"))); assert!(run.overlaps_range(&ArenaKeyHandle::new(b"a"), &ArenaKeyHandle::new(b"z"))); }
1037}