1use byteorder::{BigEndian, ByteOrder};
3use bytes::Buf;
4use bytes::BufMut;
5use bytes::Bytes;
6use itertools::Itertools;
7
8use crate::storage::SyncableFile;
9
10use super::bitarray::*;
11use super::logarray::*;
12
13use futures::io;
14use futures::stream::StreamExt;
15use tokio::io::AsyncRead;
16
17const SBLOCK_SIZE: usize = 52;
22
23#[inline(always)]
26fn use_linear_bitscan(start: u64, end: u64) -> bool {
27 start / (8 * 64) == end / (8 * 64)
28}
29
30#[derive(Clone)]
32pub struct BitIndex {
33 array: BitArray,
34 blocks: LogArray,
35 sblocks: LogArray,
36}
37
38impl BitIndex {
39 pub fn from_maps(bitarray_map: Bytes, blocks_map: Bytes, sblocks_map: Bytes) -> BitIndex {
40 let bitarray = BitArray::from_bits(bitarray_map).unwrap();
41 let blocks_logarray = LogArray::parse(blocks_map).unwrap();
42 let sblocks_logarray = LogArray::parse(sblocks_map).unwrap();
43
44 BitIndex::from_parts(bitarray, blocks_logarray, sblocks_logarray)
45 }
46
47 pub fn from_parts(array: BitArray, blocks: LogArray, sblocks: LogArray) -> BitIndex {
48 assert!(sblocks.len() == (blocks.len() + SBLOCK_SIZE - 1) / SBLOCK_SIZE);
49 assert!(blocks.len() == (array.len() + 63) / 64);
50
51 BitIndex {
52 array,
53 blocks,
54 sblocks,
55 }
56 }
57
58 fn block_bits(&self, block_index: usize) -> &[u8] {
59 let bit_index = block_index * 8;
60
61 &self.array.bits()[bit_index..bit_index + 8]
62 }
63
64 pub fn len(&self) -> usize {
66 self.array.len()
67 }
68
69 pub fn get(&self, index: u64) -> bool {
71 self.array.get(index as usize)
72 }
73
74 pub fn rank1(&self, index: u64) -> u64 {
76 let block_index = index / 64;
77 let sblock_index = block_index / SBLOCK_SIZE as u64;
78
79 let block_rank = self.blocks.entry(block_index as usize);
80 let sblock_rank = self.sblocks.entry(sblock_index as usize);
81 let bits = self.block_bits(block_index as usize);
82 assert!(bits.len() == 8);
83
84 let mut bits_num = BigEndian::read_u64(bits);
85 bits_num >>= 63 - index % 64; let bits_rank = bits_num.count_ones() as u64;
87
88 sblock_rank - block_rank + bits_rank
89 }
90
91 pub fn rank1_from_range(&self, start: u64, end: u64) -> u64 {
93 if start == end {
94 return 0;
95 }
96 let mut rank = self.rank1(end - 1);
97 if start != 0 {
98 rank -= self.rank1(start - 1);
99 }
100
101 rank
102 }
103
104 fn select1_sblock_from_range(&self, rank: u64, start: u64, end: Option<u64>) -> usize {
105 let mut start = start as usize / (64 * SBLOCK_SIZE);
106 let mut end = match end {
107 Some(end) => end as usize / (64 * SBLOCK_SIZE),
108 None => self.sblocks.len() - 1,
109 };
110 let mut mid;
111
112 loop {
113 mid = (start + end) / 2;
114 if start == end {
115 break;
116 }
117
118 let r = self.sblocks.entry(mid);
119 match r < rank {
120 true => start = mid + 1,
121 false => end = mid,
122 }
123 }
124
125 mid
126 }
127
128 fn select1_block(&self, sblock: usize, subrank: u64) -> usize {
129 let mut start = sblock * SBLOCK_SIZE;
130 let mut end = start + SBLOCK_SIZE - 1;
131 if end > self.blocks.len() - 1 {
132 end = self.blocks.len() - 1;
133 }
134 let mut mid;
135
136 loop {
145 mid = (start + end + 1) / 2;
146 if start == end {
147 break;
148 }
149
150 let r = self.blocks.entry(mid);
151 match r > subrank {
152 true => start = mid,
153 false => end = mid - 1,
154 }
155 }
156
157 mid
158 }
159
160 pub fn select1(&self, rank: u64) -> Option<u64> {
162 self.select1_from_range_opt(rank, 0, None)
163 }
164
165 pub fn select1_from_range(&self, subrank: u64, start: u64, end: u64) -> Option<u64> {
166 self.select1_from_range_opt(subrank, start, Some(end))
167 }
168
169 fn select1_from_range_opt(&self, subrank: u64, start: u64, end: Option<u64>) -> Option<u64> {
170 if use_linear_bitscan(start, end.unwrap_or(self.len() as u64)) {
171 return self.select_from_range_opt_linear(subrank, start, end, true);
172 }
173
174 let rank = match start {
175 0 => subrank,
176 n => self.rank1(n - 1) + subrank,
177 };
178 let sblock = self.select1_sblock_from_range(rank, start, end);
179 let sblock_rank = self.sblocks.entry(sblock);
180 if sblock_rank < rank {
181 return None;
182 }
183
184 let block = self.select1_block(sblock, sblock_rank - rank);
185 let block_subrank = self.blocks.entry(block);
186 let rank_in_block = rank - (sblock_rank - block_subrank);
187 assert!(rank_in_block <= 64);
188 let bits = self.block_bits(block);
189
190 let mut bits_num = BigEndian::read_u64(bits);
191 let mut tally = rank_in_block;
192 for i in 0..64 {
193 if bits_num & 0x8000000000000000 != 0 {
194 tally -= 1;
195
196 if tally == 0 {
197 let result = block as u64 * 64 + i;
198 if result < start
199 && (end.is_none() || start < end.unwrap())
200 && subrank == 0
201 && !self.get(start)
202 {
203 return Some(start);
204 } else if result < start || (end.is_some() && result >= end.unwrap()) {
205 return None;
206 }
207 return Some(result);
208 }
209 }
210
211 bits_num <<= 1;
212 }
213
214 None
215 }
216
217 fn select_from_range_opt_linear(
218 &self,
219 mut subrank: u64,
220 start: u64,
221 end: Option<u64>,
222 find: bool,
223 ) -> Option<u64> {
224 if end.is_some() && start >= end.unwrap() {
225 return None;
226 }
227
228 if subrank == 0 {
229 if self.get(start) == find {
230 return None;
231 } else {
232 return Some(start);
233 }
234 }
235
236 let end = match end {
237 Some(end) => end,
238 None => self.len() as u64,
239 };
240
241 for i in start..end {
242 if self.get(i) == find {
243 subrank -= 1;
244
245 if subrank == 0 {
246 return Some(i as u64);
247 }
248 }
249 }
250
251 None
252 }
253
254 pub fn rank0(&self, index: u64) -> u64 {
256 let r0 = self.rank1(index);
257 1 + index - r0
258 }
259
260 pub fn rank0_from_range(&self, start: u64, end: u64) -> u64 {
262 if start == end {
263 return 0;
264 }
265 let mut rank = self.rank0(end - 1);
266 if start != 0 {
267 rank -= self.rank0(start - 1);
268 }
269
270 rank
271 }
272
273 fn select0_sblock_from_range(&self, rank: u64, start: u64, end: Option<u64>) -> usize {
274 let mut start = start as usize / (64 * SBLOCK_SIZE);
275 let mut end = match end {
276 Some(end) => end as usize / (64 * SBLOCK_SIZE),
277 None => self.sblocks.len() - 1,
278 };
279 let mut mid;
280
281 loop {
282 mid = (start + end) / 2;
283 if start == end {
284 break;
285 }
286
287 let r = ((1 + mid) * SBLOCK_SIZE) as u64 * 64 - self.sblocks.entry(mid);
288 match r < rank {
289 true => start = mid + 1,
290 false => end = mid,
291 }
292 }
293
294 mid
295 }
296
297 fn select0_block(&self, sblock: usize, subrank: u64) -> usize {
298 let mut start = sblock * SBLOCK_SIZE;
299 let mut end = start + SBLOCK_SIZE - 1;
300 if end > self.blocks.len() - 1 {
301 end = self.blocks.len() - 1;
302 }
303
304 let mut mid;
305
306 loop {
315 mid = (start + end + 1) / 2;
316 if start == end {
317 break;
318 }
319
320 let r = (SBLOCK_SIZE - mid % SBLOCK_SIZE) as u64 * 64 - self.blocks.entry(mid);
321 match r > subrank {
322 true => start = mid,
323 false => end = mid - 1,
324 }
325 }
326
327 mid
328 }
329
330 pub fn select0(&self, rank: u64) -> Option<u64> {
332 self.select0_from_range_opt(rank, 0, None)
333 }
334
335 pub fn select0_from_range(&self, subrank: u64, start: u64, end: u64) -> Option<u64> {
336 self.select0_from_range_opt(subrank, start, Some(end))
337 }
338
339 pub fn select0_from_range_opt(
340 &self,
341 subrank: u64,
342 start: u64,
343 end: Option<u64>,
344 ) -> Option<u64> {
345 if use_linear_bitscan(start, end.unwrap_or(self.len() as u64)) {
346 return self.select_from_range_opt_linear(subrank, start, end, false);
347 }
348
349 let rank = match start {
350 0 => subrank,
351 n => self.rank0(n - 1) + subrank,
352 };
353 let sblock = self.select0_sblock_from_range(rank, start, end);
354 let sblock_rank = ((1 + sblock) * SBLOCK_SIZE * 64) as u64 - self.sblocks.entry(sblock);
355
356 if sblock_rank < rank {
357 return None;
358 }
359
360 let block = self.select0_block(sblock, sblock_rank - rank);
361 let block_subrank =
362 (SBLOCK_SIZE - block % SBLOCK_SIZE) as u64 * 64 - self.blocks.entry(block);
363 let rank_in_block = rank - (sblock_rank - block_subrank);
364 assert!(rank_in_block <= 64);
365 let bits = self.block_bits(block);
366
367 let mut bits_num = BigEndian::read_u64(bits);
368 let mut tally = rank_in_block;
369 for i in 0..64 {
370 if bits_num & 0x8000000000000000 == 0 {
371 tally -= 1;
372
373 if tally == 0 {
374 let result = block as u64 * 64 + i;
375 if result < start
376 && (end.is_none() || start < end.unwrap())
377 && subrank == 0
378 && self.get(start)
379 {
380 return Some(start);
381 } else if result < start || (end.is_some() && result >= end.unwrap()) {
382 return None;
383 }
384 return Some(result);
385 }
386 }
387
388 bits_num <<= 1;
389 }
390
391 None
392 }
393
394 pub fn iter(&self) -> impl Iterator<Item = bool> {
395 self.array.iter()
396 }
397}
398
399pub async fn build_bitindex<
400 R: 'static + AsyncRead + Unpin + Send,
401 W1: 'static + SyncableFile + Send,
402 W2: 'static + SyncableFile + Send,
403>(
404 bitarray: R,
405 blocks: W1,
406 sblocks: W2,
407) -> io::Result<()> {
408 let block_stream = bitarray_stream_blocks(bitarray);
409 let mut blocks_builder =
411 LogArrayFileBuilder::new(blocks, 64 - (SBLOCK_SIZE * 64).leading_zeros() as u8);
412 let mut sblocks_builder = LogArrayFileBuilder::new(sblocks, 64);
413
414 let mut sblock_rank = 0;
416 let mut stream = block_stream.chunks(SBLOCK_SIZE);
417 while let Some(chunk) = stream.next().await {
418 let mut block_ranks = Vec::with_capacity(chunk.len());
419 for num in chunk {
420 block_ranks.push(num?.count_ones() as u64);
421 }
422
423 let mut sblock_subrank = block_ranks.iter().sum();
424 sblock_rank += sblock_subrank;
425
426 for block_rank in block_ranks {
427 blocks_builder.push(sblock_subrank).await?;
428 sblock_subrank -= block_rank;
429 }
430
431 sblocks_builder.push(sblock_rank).await?;
432 }
433
434 blocks_builder.finalize().await?;
435 sblocks_builder.finalize().await?;
436
437 Ok(())
438}
439
440pub fn build_bitindex_from_block_iter<I: Iterator<Item = u64>, B1: BufMut, B2: BufMut>(
441 blocks_iter: I,
442 blocks: B1,
443 sblocks: B2,
444) {
445 let mut blocks_builder =
447 LogArrayBufBuilder::new(blocks, 64 - (SBLOCK_SIZE * 64).leading_zeros() as u8);
448 let mut sblocks_builder = LogArrayBufBuilder::new(sblocks, 64);
449
450 let mut sblock_rank = 0;
452 let chunks = blocks_iter.chunks(SBLOCK_SIZE);
453 let mut iter = chunks.into_iter();
454 while let Some(chunk) = iter.next() {
455 let chunk: Vec<_> = chunk.collect();
456 let mut block_ranks = Vec::with_capacity(chunk.len());
457 for num in chunk {
458 block_ranks.push(num.count_ones() as u64);
459 }
460
461 let mut sblock_subrank = block_ranks.iter().sum();
462 sblock_rank += sblock_subrank;
463
464 for block_rank in block_ranks {
465 blocks_builder.push(sblock_subrank);
466 sblock_subrank -= block_rank;
467 }
468
469 sblocks_builder.push(sblock_rank);
470 }
471
472 blocks_builder.finalize();
473 sblocks_builder.finalize();
474}
475
476pub fn build_bitindex_from_buf<B1: Buf, B2: BufMut, B3: BufMut>(
477 bitarray: B1,
478 blocks: B2,
479 sblocks: B3,
480) {
481 let mut iter = bitarray_iter_blocks(bitarray);
482 build_bitindex_from_block_iter(&mut iter, blocks, sblocks)
483}
484
485#[cfg(test)]
486mod tests {
487 use super::*;
488 use crate::{
489 storage::{memory::MemoryBackedStore, FileLoad, FileStore},
490 util::stream_iter_ok,
491 };
492 use futures::executor::block_on;
493
494 #[tokio::test]
495 async fn rank1_works() {
496 let bits = MemoryBackedStore::new();
497 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
498 let contents = (0..).map(|n| n % 3 == 0).take(123456);
499
500 block_on(async {
501 ba_builder.push_all(stream_iter_ok(contents)).await?;
502 ba_builder.finalize().await?;
503
504 Ok::<_, io::Error>(())
505 })
506 .unwrap();
507
508 let index_blocks = MemoryBackedStore::new();
509 let index_sblocks = MemoryBackedStore::new();
510 block_on(build_bitindex(
511 bits.open_read().await.unwrap(),
512 index_blocks.open_write().await.unwrap(),
513 index_sblocks.open_write().await.unwrap(),
514 ))
515 .unwrap();
516
517 let index = BitIndex::from_maps(
518 block_on(bits.map()).unwrap(),
519 block_on(index_blocks.map()).unwrap(),
520 block_on(index_sblocks.map()).unwrap(),
521 );
522
523 for i in 0..123456 {
524 assert_eq!(i / 3 + 1, index.rank1(i));
525 }
526 }
527
528 #[tokio::test]
529 async fn select1_works() {
530 let bits = MemoryBackedStore::new();
531 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
532 let contents = (0..).map(|n| n % 3 == 0).take(123456);
533
534 block_on(async {
535 ba_builder.push_all(stream_iter_ok(contents)).await?;
536 ba_builder.finalize().await?;
537
538 Ok::<_, io::Error>(())
539 })
540 .unwrap();
541
542 let index_blocks = MemoryBackedStore::new();
543 let index_sblocks = MemoryBackedStore::new();
544 block_on(build_bitindex(
545 bits.open_read().await.unwrap(),
546 index_blocks.open_write().await.unwrap(),
547 index_sblocks.open_write().await.unwrap(),
548 ))
549 .unwrap();
550
551 let index = BitIndex::from_maps(
552 block_on(bits.map()).unwrap(),
553 block_on(index_blocks.map()).unwrap(),
554 block_on(index_sblocks.map()).unwrap(),
555 );
556
557 for i in 1..(123456 / 3) {
558 assert_eq!((i - 1) * 3, index.select1(i).unwrap());
559 }
560
561 assert!(index.select1(123456 * 2 / 3).is_none());
562 }
563
564 #[tokio::test]
565 async fn rank1_ranged() {
566 let bits = MemoryBackedStore::new();
567 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
568 let contents = (0..).map(|n| n % 3 == 0).take(123456);
569
570 block_on(async {
571 ba_builder.push_all(stream_iter_ok(contents)).await?;
572 ba_builder.finalize().await?;
573
574 Ok::<_, io::Error>(())
575 })
576 .unwrap();
577
578 let index_blocks = MemoryBackedStore::new();
579 let index_sblocks = MemoryBackedStore::new();
580 block_on(build_bitindex(
581 bits.open_read().await.unwrap(),
582 index_blocks.open_write().await.unwrap(),
583 index_sblocks.open_write().await.unwrap(),
584 ))
585 .unwrap();
586
587 let index = BitIndex::from_maps(
588 block_on(bits.map()).unwrap(),
589 block_on(index_blocks.map()).unwrap(),
590 block_on(index_sblocks.map()).unwrap(),
591 );
592
593 assert_eq!(0, index.rank1_from_range(6, 6));
594 assert_eq!(1, index.rank1_from_range(6, 7));
595 assert_eq!(1, index.rank1_from_range(6, 8));
596 assert_eq!(2, index.rank1_from_range(6, 12));
597 assert_eq!(2, index.rank1_from_range(4, 12));
598 }
599
600 #[tokio::test]
601 async fn select1_ranged() {
602 let bits = MemoryBackedStore::new();
603 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
604 let contents = (0..).map(|n| n % 3 == 0).take(123456);
605
606 block_on(async {
607 ba_builder.push_all(stream_iter_ok(contents)).await?;
608 ba_builder.finalize().await?;
609
610 Ok::<_, io::Error>(())
611 })
612 .unwrap();
613
614 let index_blocks = MemoryBackedStore::new();
615 let index_sblocks = MemoryBackedStore::new();
616 block_on(build_bitindex(
617 bits.open_read().await.unwrap(),
618 index_blocks.open_write().await.unwrap(),
619 index_sblocks.open_write().await.unwrap(),
620 ))
621 .unwrap();
622
623 let index = BitIndex::from_maps(
624 block_on(bits.map()).unwrap(),
625 block_on(index_blocks.map()).unwrap(),
626 block_on(index_sblocks.map()).unwrap(),
627 );
628
629 assert_eq!(None, index.select1_from_range(0, 6, 6));
630 assert_eq!(None, index.select1_from_range(0, 6, 7));
631 assert_eq!(Some(6), index.select1_from_range(1, 6, 7));
632 assert_eq!(Some(7), index.select1_from_range(0, 7, 8));
633 assert_eq!(Some(9), index.select1_from_range(2, 5, 11));
634 assert_eq!(None, index.select1_from_range(123456, 5, 10));
635 }
636
637 #[tokio::test]
638 async fn rank0_works() {
639 let bits = MemoryBackedStore::new();
640 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
641 let contents = (0..).map(|n| n % 3 == 0).take(123456);
642
643 block_on(async {
644 ba_builder.push_all(stream_iter_ok(contents)).await?;
645 ba_builder.finalize().await?;
646
647 Ok::<_, io::Error>(())
648 })
649 .unwrap();
650
651 let index_blocks = MemoryBackedStore::new();
652 let index_sblocks = MemoryBackedStore::new();
653 block_on(build_bitindex(
654 bits.open_read().await.unwrap(),
655 index_blocks.open_write().await.unwrap(),
656 index_sblocks.open_write().await.unwrap(),
657 ))
658 .unwrap();
659
660 let index = BitIndex::from_maps(
661 block_on(bits.map()).unwrap(),
662 block_on(index_blocks.map()).unwrap(),
663 block_on(index_sblocks.map()).unwrap(),
664 );
665
666 for i in 0..123456 {
667 assert_eq!(1 + i - (i / 3 + 1), index.rank0(i));
668 }
669 }
670
671 #[tokio::test]
672 async fn select0_works() {
673 let bits = MemoryBackedStore::new();
674 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
675 let contents = (0..).map(|n| n % 3 == 0).take(123456);
676
677 block_on(async {
678 ba_builder.push_all(stream_iter_ok(contents)).await?;
679 ba_builder.finalize().await?;
680
681 Ok::<_, io::Error>(())
682 })
683 .unwrap();
684
685 let index_blocks = MemoryBackedStore::new();
686 let index_sblocks = MemoryBackedStore::new();
687 block_on(build_bitindex(
688 bits.open_read().await.unwrap(),
689 index_blocks.open_write().await.unwrap(),
690 index_sblocks.open_write().await.unwrap(),
691 ))
692 .unwrap();
693
694 let index = BitIndex::from_maps(
695 block_on(bits.map()).unwrap(),
696 block_on(index_blocks.map()).unwrap(),
697 block_on(index_sblocks.map()).unwrap(),
698 );
699
700 for i in 1..=(123456 * 2 / 3) {
701 assert_eq!(i + (i - 1) / 2, index.select0(i).unwrap());
702 }
703
704 assert_eq!(None, index.select0(123456 * 2 / 3 + 1));
705 }
706
707 #[tokio::test]
708 async fn rank0_ranged() {
709 let bits = MemoryBackedStore::new();
710 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
711 let contents = (0..).map(|n| n % 3 == 0).take(123456);
712
713 block_on(async {
714 ba_builder.push_all(stream_iter_ok(contents)).await?;
715 ba_builder.finalize().await?;
716
717 Ok::<_, io::Error>(())
718 })
719 .unwrap();
720
721 let index_blocks = MemoryBackedStore::new();
722 let index_sblocks = MemoryBackedStore::new();
723 block_on(build_bitindex(
724 bits.open_read().await.unwrap(),
725 index_blocks.open_write().await.unwrap(),
726 index_sblocks.open_write().await.unwrap(),
727 ))
728 .unwrap();
729
730 let index = BitIndex::from_maps(
731 block_on(bits.map()).unwrap(),
732 block_on(index_blocks.map()).unwrap(),
733 block_on(index_sblocks.map()).unwrap(),
734 );
735
736 assert_eq!(0, index.rank0_from_range(5, 5));
737 assert_eq!(1, index.rank0_from_range(5, 6));
738 assert_eq!(0, index.rank0_from_range(6, 6));
739 assert_eq!(2, index.rank0_from_range(5, 8));
740 assert_eq!(4, index.rank0_from_range(6, 12));
741 assert_eq!(6, index.rank0_from_range(4, 12));
742 }
743
744 #[tokio::test]
745 async fn select0_ranged() {
746 let bits = MemoryBackedStore::new();
747 let mut ba_builder = BitArrayFileBuilder::new(bits.open_write().await.unwrap());
748 let contents = (0..).map(|n| n % 3 == 0).take(123456);
749
750 block_on(async {
751 ba_builder.push_all(stream_iter_ok(contents)).await?;
752 ba_builder.finalize().await?;
753
754 Ok::<_, io::Error>(())
755 })
756 .unwrap();
757
758 let index_blocks = MemoryBackedStore::new();
759 let index_sblocks = MemoryBackedStore::new();
760 block_on(build_bitindex(
761 bits.open_read().await.unwrap(),
762 index_blocks.open_write().await.unwrap(),
763 index_sblocks.open_write().await.unwrap(),
764 ))
765 .unwrap();
766
767 let index = BitIndex::from_maps(
768 block_on(bits.map()).unwrap(),
769 block_on(index_blocks.map()).unwrap(),
770 block_on(index_sblocks.map()).unwrap(),
771 );
772
773 assert_eq!(None, index.select0_from_range(0, 6, 6));
774 assert_eq!(Some(6), index.select0_from_range(0, 6, 7));
775 assert_eq!(None, index.select0_from_range(1, 6, 7));
776 assert_eq!(Some(7), index.select0_from_range(1, 6, 8));
777 assert_eq!(None, index.select0_from_range(0, 7, 8));
778 assert_eq!(Some(10), index.select0_from_range(4, 5, 11));
779 assert_eq!(None, index.select0_from_range(123456, 5, 10));
780 }
781}