use std::sync::Arc;
use ps_datachunk::{DataChunk, DataChunkError};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use crate::{
long::{long_hkey_expanded::constants::LHKEY_SEGMENT_MAX_LENGTH, LongHkeyExpanded},
Hkey, HkeyError, Range, Store,
};
use super::update::helpers::{calculate_depth, calculate_segment_length};
impl LongHkeyExpanded {
pub fn normalize_segment<'a, C, E, S>(
&self,
store: &'a S,
depth: u32,
range: Range,
) -> Result<Self, E>
where
C: DataChunk,
E: From<HkeyError> + From<DataChunkError> + Send,
S: Store<Chunk<'a> = C, Error = E> + Sync + 'a,
{
if range.end == range.start {
return Ok(Self::default());
}
if let Some(result) = self.parts.iter().find_map(|segment| {
if segment.0 == range {
match &segment.1 {
Hkey::LongHkeyExpanded(lhkey) => Some(Ok(lhkey.clone())),
Hkey::LongHkey(lhkey) => Some(lhkey.expand(store)),
_ => None,
}
} else {
None
}
}) {
return result;
}
let length = range.end - range.start;
let depth = calculate_depth(depth, length);
if depth == 0 && length <= LHKEY_SEGMENT_MAX_LENGTH {
let data = self.resolve_slice(store, range)?;
let segment_hkey = store.put(&data)?;
let segment_parts = Arc::from([(0..length, segment_hkey)]);
let lhkey = Self::new(0, data.len(), segment_parts);
return Ok(lhkey);
}
if depth == 0 {
let count = length.div_ceil(LHKEY_SEGMENT_MAX_LENGTH);
let iterator = (0..count).into_par_iter();
let parts: Result<Vec<_>, E> = iterator
.map(|index| {
let begin = range.start + index * LHKEY_SEGMENT_MAX_LENGTH;
let end = range
.end
.min(range.start + (index + 1) * LHKEY_SEGMENT_MAX_LENGTH);
let data = self.resolve_slice(store, begin..end)?;
let hkey = store.put(&data)?;
Ok::<_, E>((
index * LHKEY_SEGMENT_MAX_LENGTH..(index + 1) * LHKEY_SEGMENT_MAX_LENGTH,
hkey,
))
})
.collect();
let parts = Arc::from(parts?.into_boxed_slice());
let lhkey = Self::new(1, length, parts);
return Ok(lhkey);
}
let segment_length = calculate_segment_length(depth);
let iterator = (0..length.div_ceil(segment_length)).into_par_iter();
let parts: Result<Vec<_>, E> = iterator
.map(|index| {
let begin = range.start + index * segment_length;
let end = range.end.min(range.start + (index + 1) * segment_length);
let lhkey = self.normalize_segment(store, depth - 1, begin..end)?;
let hkey = Hkey::LongHkey(lhkey.store(store)?);
Ok::<_, E>((index * segment_length..(index + 1) * segment_length, hkey))
})
.collect();
let parts = Arc::from(parts?.into_boxed_slice());
let lhkey = Self::new(depth, length, parts);
Ok(lhkey)
}
}