use std::ops::{Range, RangeInclusive};
use deepsize::DeepSizeOf;
use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum U64Segment {
Range(Range<u64>),
RangeWithHoles {
range: Range<u64>,
holes: EncodedU64Array,
},
RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
SortedArray(EncodedU64Array),
Array(EncodedU64Array),
}
impl DeepSizeOf for U64Segment {
fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
match self {
Self::Range(_) => 0,
Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
Self::SortedArray(array) => array.deep_size_of_children(context),
Self::Array(array) => array.deep_size_of_children(context),
}
}
}
#[derive(Debug)]
struct SegmentStats {
min: u64,
max: u64,
count: u64,
sorted: bool,
}
impl SegmentStats {
fn n_holes(&self) -> u64 {
debug_assert!(self.sorted);
if self.count == 0 {
0
} else {
let total_slots = self.max - self.min + 1;
total_slots - self.count
}
}
}
impl U64Segment {
fn holes_in_slice<'a>(
range: RangeInclusive<u64>,
existing: impl IntoIterator<Item = u64> + 'a,
) -> impl Iterator<Item = u64> + 'a {
let mut existing = existing.into_iter().peekable();
range.filter(move |val| {
if let Some(&existing_val) = existing.peek() {
if existing_val == *val {
existing.next();
return false;
}
}
true
})
}
fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
let mut sorted = true;
let mut min = u64::MAX;
let mut max = 0;
let mut count = 0;
for val in values {
count += 1;
if val < min {
min = val;
}
if val > max {
max = val;
}
if sorted && count > 1 && val < max {
sorted = false;
}
}
if count == 0 {
min = 0;
max = 0;
}
SegmentStats {
min,
max,
count,
sorted,
}
}
fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
let n_holes = stats.n_holes();
let total_slots = stats.max - stats.min + 1;
let range_with_holes = 24 + 4 * n_holes as usize;
let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize;
let sorted_array = 24 + 2 * stats.count as usize;
[range_with_holes, range_with_bitmap, sorted_array]
}
fn from_stats_and_sequence(
stats: SegmentStats,
sequence: impl IntoIterator<Item = u64>,
) -> Self {
if stats.sorted {
let n_holes = stats.n_holes();
if stats.count == 0 {
Self::Range(0..0)
} else if n_holes == 0 {
Self::Range(stats.min..(stats.max + 1))
} else {
let sizes = Self::sorted_sequence_sizes(&stats);
let min_size = sizes.iter().min().unwrap();
if min_size == &sizes[0] {
let range = stats.min..(stats.max + 1);
let mut holes =
Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
holes.sort_unstable();
let holes = EncodedU64Array::from(holes);
Self::RangeWithHoles { range, holes }
} else if min_size == &sizes[1] {
let range = stats.min..(stats.max + 1);
let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
let offset = (hole - stats.min) as usize;
bitmap.clear(offset);
}
Self::RangeWithBitmap { range, bitmap }
} else {
Self::SortedArray(EncodedU64Array::from_iter(sequence))
}
}
} else {
Self::Array(EncodedU64Array::from_iter(sequence))
}
}
pub fn from_slice(slice: &[u64]) -> Self {
let stats = Self::compute_stats(slice.iter().copied());
Self::from_stats_and_sequence(stats, slice.iter().copied())
}
}
impl U64Segment {
pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
match self {
Self::Range(range) => Box::new(range.clone()),
Self::RangeWithHoles { range, holes } => {
Box::new((range.start..range.end).filter(move |&val| {
holes.binary_search(val).is_err()
}))
}
Self::RangeWithBitmap { range, bitmap } => {
Box::new((range.start..range.end).filter(|val| {
let offset = (val - range.start) as usize;
bitmap.get(offset)
}))
}
Self::SortedArray(array) => Box::new(array.iter()),
Self::Array(array) => Box::new(array.iter()),
}
}
pub fn len(&self) -> usize {
match self {
Self::Range(range) => (range.end - range.start) as usize,
Self::RangeWithHoles { range, holes } => {
let holes = holes.iter().count();
(range.end - range.start) as usize - holes
}
Self::RangeWithBitmap { range, bitmap } => {
let holes = bitmap.count_zeros();
(range.end - range.start) as usize - holes
}
Self::SortedArray(array) => array.len(),
Self::Array(array) => array.len(),
}
}
pub fn range(&self) -> Option<RangeInclusive<u64>> {
match self {
Self::Range(range) if range.is_empty() => None,
Self::Range(range)
| Self::RangeWithBitmap { range, .. }
| Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
Self::SortedArray(array) => {
let min_value = array.first().unwrap();
let max_value = array.last().unwrap();
Some(min_value..=max_value)
}
Self::Array(array) => {
let min_value = array.min().unwrap();
let max_value = array.max().unwrap();
Some(min_value..=max_value)
}
}
}
pub fn slice(&self, offset: usize, len: usize) -> Self {
match self {
Self::Range(range) => {
let start = range.start + offset as u64;
Self::Range(start..(start + len as u64))
}
Self::RangeWithHoles { range, holes } => {
let start = range.start + offset as u64;
let end = start + len as u64;
let start = holes.binary_search(start).unwrap_or_else(|x| x) as u64;
let end = holes.binary_search(end).unwrap_or_else(|x| x) as u64;
let holes_len = end - start;
if holes_len == 0 {
Self::Range(start..end)
} else {
let holes = holes.slice(start as usize, holes_len as usize);
Self::RangeWithHoles {
range: start..end,
holes,
}
}
}
Self::RangeWithBitmap { range, bitmap } => {
let start = range.start + offset as u64;
let end = start + len as u64;
let bitmap = bitmap.slice(offset, len);
if bitmap.count_ones() == len {
Self::Range(start..end)
} else {
Self::RangeWithBitmap {
range: start..end,
bitmap: bitmap.into(),
}
}
}
Self::SortedArray(array) => Self::SortedArray(array.slice(offset, len)),
Self::Array(array) => Self::Array(array.slice(offset, len)),
}
}
pub fn position(&self, val: u64) -> Option<usize> {
match self {
Self::Range(range) => {
if range.contains(&val) {
Some((val - range.start) as usize)
} else {
None
}
}
Self::RangeWithHoles { range, holes } => {
if range.contains(&val) && holes.binary_search(val).is_err() {
let offset = (val - range.start) as usize;
let holes = holes.iter().take_while(|&hole| hole < val).count();
Some(offset - holes)
} else {
None
}
}
Self::RangeWithBitmap { range, bitmap } => {
if range.contains(&val) && bitmap.get((val - range.start) as usize) {
let offset = (val - range.start) as usize;
let num_zeros = bitmap.slice(0, offset).count_zeros();
Some(offset - num_zeros)
} else {
None
}
}
Self::SortedArray(array) => array.binary_search(val).ok(),
Self::Array(array) => array.iter().position(|v| v == val),
}
}
pub fn get(&self, i: usize) -> Option<u64> {
match self {
Self::Range(range) => match range.start.checked_add(i as u64) {
Some(val) if val < range.end => Some(val),
_ => None,
},
Self::RangeWithHoles { range, .. } => {
if i >= (range.end - range.start) as usize {
return None;
}
self.iter().nth(i)
}
Self::RangeWithBitmap { range, .. } => {
if i >= (range.end - range.start) as usize {
return None;
}
self.iter().nth(i)
}
Self::SortedArray(array) => array.get(i),
Self::Array(array) => array.get(i),
}
}
pub fn delete(&self, vals: &[u64]) -> Self {
debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
let make_new_iter = || {
let mut vals_iter = vals.iter().copied().peekable();
self.iter().filter(move |val| {
if let Some(&next_val) = vals_iter.peek() {
if next_val == *val {
vals_iter.next();
return false;
}
}
true
})
};
let stats = Self::compute_stats(make_new_iter());
Self::from_stats_and_sequence(stats, make_new_iter())
}
pub fn mask(&mut self, positions: &[u32]) {
if positions.is_empty() {
return;
}
if positions.len() == self.len() {
*self = Self::Range(0..0);
return;
}
let count = (self.len() - positions.len()) as u64;
let sorted = match self {
Self::Range(_) => true,
Self::RangeWithHoles { .. } => true,
Self::RangeWithBitmap { .. } => true,
Self::SortedArray(_) => true,
Self::Array(_) => false,
};
let first_unmasked = (0..self.len())
.zip(positions.iter().cycle())
.find(|(sequential_i, i)| **i != *sequential_i as u32)
.map(|(sequential_i, _)| sequential_i)
.unwrap();
let min = self.get(first_unmasked).unwrap();
let last_unmasked = (0..self.len())
.rev()
.zip(positions.iter().rev().cycle())
.filter(|(sequential_i, i)| **i != *sequential_i as u32)
.map(|(sequential_i, _)| sequential_i)
.next()
.unwrap();
let max = self.get(last_unmasked).unwrap();
let stats = SegmentStats {
min,
max,
count,
sorted,
};
let mut positions = positions.iter().copied().peekable();
let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
if let Some(next_pos) = positions.peek() {
if *next_pos == i as u32 {
positions.next();
return None;
}
}
Some(val)
});
*self = Self::from_stats_and_sequence(stats, sequence)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_segments() {
fn check_segment(values: &[u64], expected: &U64Segment) {
let segment = U64Segment::from_slice(values);
assert_eq!(segment, *expected);
assert_eq!(values.len(), segment.len());
let roundtripped = segment.iter().collect::<Vec<_>>();
assert_eq!(roundtripped, values);
let expected_min = values.iter().copied().min();
let expected_max = values.iter().copied().max();
match segment.range() {
Some(range) => {
assert_eq!(range.start(), &expected_min.unwrap());
assert_eq!(range.end(), &expected_max.unwrap());
}
None => {
assert_eq!(expected_min, None);
assert_eq!(expected_max, None);
}
}
for (i, value) in values.iter().enumerate() {
assert_eq!(segment.get(i), Some(*value), "i = {}", i);
assert_eq!(segment.position(*value), Some(i), "i = {}", i);
}
check_segment_iter(&segment);
}
fn check_segment_iter(segment: &U64Segment) {
let forwards = segment.iter().collect::<Vec<_>>();
let mut backwards = segment.iter().rev().collect::<Vec<_>>();
backwards.reverse();
assert_eq!(forwards, backwards);
let mut expected = Vec::with_capacity(segment.len());
let mut actual = Vec::with_capacity(segment.len());
let mut iter = segment.iter();
for i in 0..segment.len() {
if i % 2 == 0 {
actual.push(iter.next().unwrap());
expected.push(segment.get(i / 2).unwrap());
} else {
let i = segment.len() - 1 - i / 2;
actual.push(iter.next_back().unwrap());
expected.push(segment.get(i).unwrap());
};
}
assert_eq!(expected, actual);
}
check_segment(&[], &U64Segment::Range(0..0));
check_segment(&[42], &U64Segment::Range(42..43));
check_segment(
&(100..200).collect::<Vec<_>>(),
&U64Segment::Range(100..200),
);
let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
check_segment(
&values,
&U64Segment::RangeWithHoles {
range: 0..1000,
holes: vec![100].into(),
},
);
let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
check_segment(
&values,
&U64Segment::RangeWithBitmap {
range: 0..999,
bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
},
);
check_segment(
&[1, 7000, 24000],
&U64Segment::SortedArray(vec![1, 7000, 24000].into()),
);
check_segment(
&[7000, 1, 24000],
&U64Segment::Array(vec![7000, 1, 24000].into()),
);
}
}