ps-hkey 0.1.0-34

This crate defines the hashkey format and provides methods for resolving them
Documentation
use std::sync::Arc;

use ps_datachunk::{DataChunk, DataChunkError};
use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{
    long::{long_hkey_expanded::constants::LHKEY_SEGMENT_MAX_LENGTH, LongHkeyExpanded},
    Hkey, HkeyError, Range, Store,
};

use super::update::helpers::{calculate_depth, calculate_segment_length};

impl LongHkeyExpanded {
    /// Normalizes a segment of this `LongHkeyExpanded` within the given range, producing a new
    /// `LongHkeyExpanded` optimized based on depth and size. Uses parallel processing for large
    /// segments and recursive normalization for depth >= 1.
    ///
    /// # Arguments
    /// - `store`: The data store for resolving and storing chunks.
    /// - `depth`: The recursion depth for segment splitting.
    /// - `range`: The range to normalize (start..end, inclusive start, exclusive end).
    ///
    /// # Returns
    /// An `Arc<LongHkeyExpanded>` containing the normalized segment, or an error if resolution fails.
    pub fn normalize_segment<'a, C, E, S>(
        &self,
        store: &'a S,
        depth: u32,
        range: Range,
    ) -> Result<Self, E>
    where
        C: DataChunk,
        E: From<HkeyError> + From<DataChunkError> + Send,
        S: Store<Chunk<'a> = C, Error = E> + Sync + 'a,
    {
        if range.end == range.start {
            return Ok(Self::default());
        }

        // Check for existing segment
        if let Some(result) = self.parts.iter().find_map(|segment| {
            if segment.0 == range {
                match &segment.1 {
                    Hkey::LongHkeyExpanded(lhkey) => Some(Ok(lhkey.clone())),
                    Hkey::LongHkey(lhkey) => Some(lhkey.expand(store)),
                    _ => None,
                }
            } else {
                None
            }
        }) {
            return result;
        }

        let length = range.end - range.start;
        let depth = calculate_depth(depth, length);

        if depth == 0 && length <= LHKEY_SEGMENT_MAX_LENGTH {
            let data = self.resolve_slice(store, range)?;
            let segment_hkey = store.put(&data)?;
            let segment_parts = Arc::from([(0..length, segment_hkey)]);
            let lhkey = Self::new(0, data.len(), segment_parts);

            return Ok(lhkey);
        }

        if depth == 0 {
            let count = length.div_ceil(LHKEY_SEGMENT_MAX_LENGTH);
            let iterator = (0..count).into_par_iter();

            let parts: Result<Vec<_>, E> = iterator
                .map(|index| {
                    let begin = range.start + index * LHKEY_SEGMENT_MAX_LENGTH;
                    let end = range
                        .end
                        .min(range.start + (index + 1) * LHKEY_SEGMENT_MAX_LENGTH);
                    let data = self.resolve_slice(store, begin..end)?;
                    let hkey = store.put(&data)?;

                    Ok::<_, E>((
                        index * LHKEY_SEGMENT_MAX_LENGTH..(index + 1) * LHKEY_SEGMENT_MAX_LENGTH,
                        hkey,
                    ))
                })
                .collect();

            let parts = Arc::from(parts?.into_boxed_slice());

            let lhkey = Self::new(1, length, parts);

            return Ok(lhkey);
        }

        // if depth >= 1, resolve recursively

        let segment_length = calculate_segment_length(depth);

        let iterator = (0..length.div_ceil(segment_length)).into_par_iter();

        let parts: Result<Vec<_>, E> = iterator
            .map(|index| {
                let begin = range.start + index * segment_length;
                let end = range.end.min(range.start + (index + 1) * segment_length);
                let lhkey = self.normalize_segment(store, depth - 1, begin..end)?;
                let hkey = Hkey::LongHkey(lhkey.store(store)?);

                Ok::<_, E>((index * segment_length..(index + 1) * segment_length, hkey))
            })
            .collect();

        let parts = Arc::from(parts?.into_boxed_slice());

        let lhkey = Self::new(depth, length, parts);

        Ok(lhkey)
    }
}