use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
    config::Config,
    key_range::KeyRange,
    level_manifest::{level::Level, LevelManifest},
    segment::Segment,
    HashSet, SegmentId,
};
#[derive(Clone)]
pub struct Strategy {
    pub l0_threshold: u8,
    pub target_size: u32,
    #[allow(clippy::doc_markdown)]
    pub level_ratio: u8,
}
impl Default for Strategy {
    fn default() -> Self {
        Self {
            l0_threshold: 4,
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 8, }
    }
}
fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
    KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
}
fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, target_size: u32) -> usize {
    (ratio as usize).pow(u32::from(level_idx)) * (target_size as usize)
}
fn pick_minimal_overlap(
    curr_level: &Level,
    next_level: &Level,
    overshoot: u64,
) -> (HashSet<SegmentId>, bool) {
    let mut choices = vec![];
    for size in 1..=curr_level.len() {
        let windows = curr_level.windows(size);
        for window in windows {
            let size_sum = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
            if size_sum >= overshoot {
                let mut segment_ids: HashSet<SegmentId> =
                    window.iter().map(|x| x.metadata.id).collect();
                let key_range = aggregate_key_range(window);
                let next_level_overlapping_segments: Vec<_> = next_level
                    .overlapping_segments(&key_range)
                    .cloned()
                    .collect();
                let key_range = aggregate_key_range(&next_level_overlapping_segments);
                let curr_level_overlapping_segment_ids: Vec<_> = curr_level
                    .overlapping_segments(&key_range)
                    .filter(|x| !segment_ids.contains(&x.metadata.id))
                    .collect();
                let size_next_level = next_level_overlapping_segments
                    .iter()
                    .map(|x| x.metadata.file_size)
                    .sum::<u64>();
                let size_curr_level = curr_level_overlapping_segment_ids
                    .iter()
                    .map(|x| x.metadata.file_size)
                    .sum::<u64>();
                let effort = size_sum + size_next_level + size_curr_level;
                segment_ids.extend(
                    next_level_overlapping_segments
                        .iter()
                        .map(|x| x.metadata.id),
                );
                segment_ids.extend(
                    curr_level_overlapping_segment_ids
                        .iter()
                        .map(|x| x.metadata.id),
                );
                choices.push((
                    effort,
                    segment_ids,
                    next_level_overlapping_segments.is_empty(),
                ));
            }
        }
    }
    let minimum_effort_choice = choices.into_iter().min_by(|a, b| a.0.cmp(&b.0));
    let (_, set, can_trivial_move) = minimum_effort_choice.expect("should exist");
    (set, can_trivial_move)
}
impl CompactionStrategy for Strategy {
    #[allow(clippy::too_many_lines)]
    fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
        let resolved_view = levels.resolved_view();
        let busy_levels = levels.busy_levels();
        for (curr_level_index, level) in resolved_view
            .iter()
            .enumerate()
            .skip(1)
            .take(resolved_view.len() - 2)
            .rev()
        {
            #[allow(clippy::cast_possible_truncation)]
            let curr_level_index = curr_level_index as u8;
            let next_level_index = curr_level_index + 1;
            if level.is_empty() {
                continue;
            }
            if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
                continue;
            }
            let desired_bytes =
                desired_level_size_in_bytes(curr_level_index, self.level_ratio, self.target_size);
            let overshoot = level.size().saturating_sub(desired_bytes as u64);
            if overshoot > 0 {
                let Some(next_level) = &resolved_view.get(next_level_index as usize) else {
                    break;
                };
                let (segment_ids, can_trivial_move) =
                    pick_minimal_overlap(level, next_level, overshoot);
                let choice = CompactionInput {
                    segment_ids,
                    dest_level: next_level_index,
                    target_size: u64::from(self.target_size),
                };
                let goes_into_cold_storage = next_level_index == 2;
                if goes_into_cold_storage {
                    return Choice::Merge(choice);
                }
                if can_trivial_move && level.is_disjoint {
                    return Choice::Move(choice);
                }
                return Choice::Merge(choice);
            }
        }
        {
            let Some(first_level) = resolved_view.first() else {
                return Choice::DoNothing;
            };
            if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
                let first_level_size = first_level.size();
                if levels.is_disjoint() {
                    if first_level_size < self.target_size.into() {
                        return if first_level.len() >= 32 {
                            Choice::Merge(CompactionInput {
                                dest_level: 0,
                                segment_ids: first_level.list_ids(),
                                target_size: ((self.target_size as f32) * 1.1) as u64,
                            })
                        } else {
                            Choice::DoNothing
                        };
                    }
                    return Choice::Merge(CompactionInput {
                        dest_level: 1,
                        segment_ids: first_level.list_ids(),
                        target_size: ((self.target_size as f32) * 1.1) as u64,
                    });
                }
                if first_level_size < self.target_size.into() {
                    return Choice::Merge(CompactionInput {
                        dest_level: 0,
                        segment_ids: first_level.list_ids(),
                        target_size: self.target_size.into(),
                    });
                }
                if !busy_levels.contains(&1) {
                    let mut level = first_level.clone();
                    level.sort_by_key_range();
                    let Some(next_level) = &resolved_view.get(1) else {
                        return Choice::DoNothing;
                    };
                    let mut segment_ids: HashSet<u64> =
                        level.iter().map(|x| x.metadata.id).collect();
                    let key_range = aggregate_key_range(&level);
                    let next_level_overlapping_segment_ids: Vec<_> = next_level
                        .overlapping_segments(&key_range)
                        .map(|x| x.metadata.id)
                        .collect();
                    segment_ids.extend(&next_level_overlapping_segment_ids);
                    let choice = CompactionInput {
                        segment_ids,
                        dest_level: 1,
                        target_size: u64::from(self.target_size),
                    };
                    if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
                        return Choice::Move(choice);
                    }
                    return Choice::Merge(choice);
                }
            }
        }
        Choice::DoNothing
    }
}
#[cfg(test)]
mod tests {
    use super::{Choice, Strategy};
    use crate::{
        block_cache::BlockCache,
        compaction::{CompactionStrategy, Input as CompactionInput},
        descriptor_table::FileDescriptorTable,
        key_range::KeyRange,
        level_manifest::LevelManifest,
        segment::{
            block_index::two_level_index::TwoLevelBlockIndex,
            file_offsets::FileOffsets,
            meta::{Metadata, SegmentId},
            value_block::BlockOffset,
            Segment, SegmentInner,
        },
        time::unix_timestamp,
        Config, HashSet,
    };
    use std::{path::Path, sync::Arc};
    use test_log::test;
    #[cfg(feature = "bloom")]
    use crate::bloom::BloomFilter;
    fn string_key_range(a: &str, b: &str) -> KeyRange {
        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
    }
    #[allow(
        clippy::expect_used,
        clippy::cast_possible_truncation,
        clippy::cast_sign_loss
    )]
    fn fixture_segment(
        id: SegmentId,
        key_range: KeyRange,
        size: u64,
        tombstone_ratio: f32,
    ) -> Segment {
        let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
        SegmentInner {
            tree_id: 0,
            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
            block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
            offsets: FileOffsets {
                bloom_ptr: BlockOffset(0),
                range_filter_ptr: BlockOffset(0),
                index_block_ptr: BlockOffset(0),
                metadata_ptr: BlockOffset(0),
                range_tombstones_ptr: BlockOffset(0),
                tli_ptr: BlockOffset(0),
                pfx_ptr: BlockOffset(0),
            },
            metadata: Metadata {
                data_block_count: 0,
                index_block_count: 0,
                data_block_size: 4_096,
                index_block_size: 4_096,
                created_at: unix_timestamp().as_nanos(),
                id,
                file_size: size,
                compression: crate::segment::meta::CompressionType::None,
                table_type: crate::segment::meta::TableType::Block,
                item_count: 1_000_000,
                key_count: 0,
                key_range,
                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
                range_tombstone_count: 0,
                uncompressed_size: 0,
                seqnos: (0, 0),
            },
            block_cache,
            #[cfg(feature = "bloom")]
            bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
        }
        .into()
    }
    #[allow(clippy::expect_used)]
    fn build_levels(
        path: &Path,
        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
    ) -> crate::Result<LevelManifest> {
        let mut levels = LevelManifest::create_new(
            recipe.len().try_into().expect("oopsie"),
            path.join("levels"),
        )?;
        for (idx, level) in recipe.into_iter().enumerate() {
            for (id, min, max, size_mib) in level {
                levels.insert_into_level(
                    idx.try_into().expect("oopsie"),
                    fixture_segment(
                        id,
                        string_key_range(min, max),
                        size_mib * 1_024 * 1_024,
                        0.0,
                    ),
                );
            }
        }
        Ok(levels)
    }
    #[test]
    fn leveled_empty_levels() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn leveled_default_l0() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let mut levels = build_levels(tempdir.path(), vec![
            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
            vec![],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 1,
                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
                target_size: 64 * 1_024 * 1_024
            })
        );
        levels.hide_segments(std::iter::once(4));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    #[allow(
        clippy::cast_sign_loss,
        clippy::cast_precision_loss,
        clippy::cast_possible_truncation
    )]
    fn leveled_intra_l0() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let mut levels = build_levels(tempdir.path(), vec![
            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
            vec![],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 0,
                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
                target_size: u64::from(compactor.target_size),
            })
        );
        levels.hide_segments(std::iter::once(4));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 1,
                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let mut levels = build_levels(tempdir.path(), vec![
            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 1,
                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
                target_size: 64 * 1_024 * 1_024
            })
        );
        levels.hide_segments(std::iter::once(5));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn leveled_deeper_level_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 2,
            ..Default::default()
        };
        let config = Config::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "x", "z", 64)],
            vec![(4, "f", "l", 64)],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 2,
                segment_ids: set![3],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn leveled_deeper_level_no_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 2,
            ..Default::default()
        };
        let config = Config::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "t", 64)],
            vec![(4, "k", "l", 64)],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 2,
                segment_ids: set![1],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn leveled_last_level_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 2,
            ..Default::default()
        };
        let config = Config::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![(1, "a", "g", 64), (2, "a", "g", 64), (3, "a", "g", 64), (4, "a", "g", 64), (5, "y", "z", 64)],
            vec![(6, "f", "l", 64)],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 3,
                segment_ids: set![5],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_last_level_with_overlap_invariant() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 2,
            ..Default::default()
        };
        let config = Config::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "l", 64), (4, "m", "n", 64), (5, "y", "z", 64)],
            vec![(6, "f", "l", 64)],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Move(CompactionInput {
                dest_level: 3,
                segment_ids: set![4],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_last_level_without_overlap_invariant() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 2,
            ..Default::default()
        };
        let config = Config::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![(1, "a", "g", 64), (2, "h", "j", 64), (3, "k", "l", 64), (4, "m", "n", 64), (5, "y", "z", 64)],
            vec![(6, "w", "x", 64)],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Move(CompactionInput {
                dest_level: 3,
                segment_ids: set![1],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_from_tiered() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            level_ratio: 2,
            ..Default::default()
        };
        let config = Config::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64)],
            vec![(4, "a", "g", 64)],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 2,
                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
}