use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{config::Config, key_range::KeyRange, levels::LevelManifest, segment::Segment};
use std::{collections::HashSet, ops::Deref, sync::Arc};
pub struct Strategy {
    pub l0_threshold: u8,
    pub target_size: u32,
}
impl Default for Strategy {
    fn default() -> Self {
        Self {
            l0_threshold: 4,
            target_size: 64 * 1_024 * 1_024,
        }
    }
}
fn aggregate_key_range(segments: &[Arc<Segment>]) -> KeyRange {
    let (mut min, mut max) = segments
        .first()
        .expect("segment should always exist")
        .metadata
        .key_range
        .deref()
        .clone();
    for other in segments.iter().skip(1) {
        if other.metadata.key_range.0 < min {
            min = other.metadata.key_range.0.clone();
        }
        if other.metadata.key_range.1 > max {
            max = other.metadata.key_range.1.clone();
        }
    }
    KeyRange::new((min, max))
}
fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, target_size: u32) -> usize {
    (ratio as usize).pow(u32::from(level_idx)) * (target_size as usize)
}
impl CompactionStrategy for Strategy {
    fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
        let resolved_view = levels.resolved_view();
        let busy_levels = levels.busy_levels();
        for (curr_level_index, level) in resolved_view
            .iter()
            .enumerate()
            .map(|(idx, lvl)| {
                (
                    u8::try_from(idx).expect("levels should not exceed 255"),
                    lvl,
                )
            })
            .skip(1)
            .take(resolved_view.len() - 2)
            .rev()
        {
            let next_level_index = curr_level_index + 1;
            if level.is_empty() {
                continue;
            }
            if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
                continue;
            }
            let curr_level_bytes = level.size();
            let desired_bytes =
                desired_level_size_in_bytes(curr_level_index, config.level_ratio, self.target_size);
            let mut overshoot = curr_level_bytes.saturating_sub(desired_bytes as u64);
            if overshoot > 0 {
                let mut segments_to_compact = vec![];
                let mut level = level.clone();
                level.sort_by_key_range();
                for segment in level.iter().take(config.level_ratio.into()).cloned() {
                    if overshoot == 0 {
                        break;
                    }
                    overshoot = overshoot.saturating_sub(segment.metadata.file_size);
                    segments_to_compact.push(segment);
                }
                let Some(next_level) = &resolved_view.get(next_level_index as usize) else {
                    break;
                };
                let mut segment_ids: HashSet<u64> =
                    segments_to_compact.iter().map(|x| x.metadata.id).collect();
                let key_range = aggregate_key_range(&segments_to_compact);
                let curr_level_overlapping_segment_ids: Vec<_> = level
                    .overlapping_segments(&key_range)
                    .map(|x| x.metadata.id)
                    .collect();
                segment_ids.extend(&curr_level_overlapping_segment_ids);
                let key_range = aggregate_key_range(&segments_to_compact);
                let next_level_overlapping_segment_ids: Vec<_> = next_level
                    .overlapping_segments(&key_range)
                    .map(|x| x.metadata.id)
                    .collect();
                segment_ids.extend(&next_level_overlapping_segment_ids);
                let choice = CompactionInput {
                    segment_ids: {
                        let mut v = segment_ids.into_iter().collect::<Vec<_>>();
                        v.sort_unstable();
                        v
                    },
                    dest_level: next_level_index,
                    target_size: u64::from(self.target_size),
                };
                if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
                    return Choice::Move(choice);
                }
                return Choice::Merge(choice);
            }
        }
        {
            let Some(first_level) = resolved_view.first() else {
                return Choice::DoNothing;
            };
            if first_level.len() >= self.l0_threshold.into()
                && !busy_levels.contains(&0)
                && !busy_levels.contains(&1)
            {
                let mut level = first_level.clone();
                level.sort_by_key_range();
                let Some(next_level) = &resolved_view.get(1) else {
                    return Choice::DoNothing;
                };
                let mut segment_ids: Vec<u64> =
                    level.iter().map(|x| x.metadata.id).collect::<Vec<_>>();
                let key_range = aggregate_key_range(&level);
                let next_level_overlapping_segment_ids: Vec<_> = next_level
                    .overlapping_segments(&key_range)
                    .map(|x| x.metadata.id)
                    .collect();
                segment_ids.extend(next_level_overlapping_segment_ids);
                return Choice::Merge(CompactionInput {
                    segment_ids,
                    dest_level: 1,
                    target_size: u64::from(self.target_size),
                });
            }
        }
        Choice::DoNothing
    }
}
#[cfg(test)]
mod tests {
    use super::{Choice, Strategy};
    use crate::{
        block_cache::BlockCache,
        compaction::{CompactionStrategy, Input as CompactionInput},
        descriptor_table::FileDescriptorTable,
        key_range::KeyRange,
        levels::LevelManifest,
        segment::{
            block_index::BlockIndex,
            file_offsets::FileOffsets,
            meta::{Metadata, SegmentId},
            Segment,
        },
        time::unix_timestamp,
        Config,
    };
    use std::{path::Path, sync::Arc};
    use test_log::test;
    #[cfg(feature = "bloom")]
    use crate::bloom::BloomFilter;
    fn string_key_range(a: &str, b: &str) -> KeyRange {
        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
    }
    #[allow(
        clippy::expect_used,
        clippy::cast_possible_truncation,
        clippy::cast_sign_loss
    )]
    fn fixture_segment(
        id: SegmentId,
        key_range: KeyRange,
        size: u64,
        tombstone_ratio: f32,
    ) -> Arc<Segment> {
        let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
        Arc::new(Segment {
            tree_id: 0,
            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
            block_index: Arc::new(BlockIndex::new((0, id).into(), block_cache.clone())),
            offsets: FileOffsets {
                bloom_ptr: 0,
                index_block_ptr: 0,
                metadata_ptr: 0,
                range_tombstone_ptr: 0,
                tli_ptr: 0,
            },
            metadata: Metadata {
                block_count: 0,
                block_size: 0,
                created_at: unix_timestamp().as_nanos(),
                id,
                file_size: size,
                compression: crate::segment::meta::CompressionType::Lz4,
                table_type: crate::segment::meta::TableType::Block,
                item_count: 1_000_000,
                key_count: 0,
                key_range,
                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
                range_tombstone_count: 0,
                uncompressed_size: 0,
                seqnos: (0, 0),
            },
            block_cache,
            #[cfg(feature = "bloom")]
            bloom_filter: BloomFilter::with_fp_rate(1, 0.1),
        })
    }
    #[allow(clippy::expect_used)]
    fn build_levels(
        path: &Path,
        recipe: Vec<Vec<(SegmentId, &str, &str)>>,
    ) -> crate::Result<LevelManifest> {
        let mut levels = LevelManifest::create_new(
            recipe.len().try_into().expect("oopsie"),
            path.join("levels"),
        )?;
        for (idx, level) in recipe.into_iter().enumerate() {
            for (id, min, max) in level {
                levels.insert_into_level(
                    idx.try_into().expect("oopsie"),
                    fixture_segment(id, string_key_range(min, max), 64 * 1_024 * 1_024, 0.0),
                );
            }
        }
        Ok(levels)
    }
    #[test]
    fn levelled_empty_levels() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy::default();
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn levelled_default_l0() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let mut levels = build_levels(tempdir.path(), vec![
            vec![(1, "a", "z"), (2, "a", "z"), (3, "a", "z"), (4, "a", "z")],
            vec![],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 1,
                segment_ids: vec![1, 2, 3, 4],
                target_size: 64 * 1_024 * 1_024
            })
        );
        levels.hide_segments(&[4]);
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn levelled_more_than_min_no_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![(1, "h", "t"), (2, "h", "t"), (3, "h", "t"), (4, "h", "t")],
            vec![(5, "a", "g"), (6, "a", "g"), (7, "a", "g"), (8, "a", "g")],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 1,
                segment_ids: vec![1, 2, 3, 4],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_more_than_min_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        #[rustfmt::skip]
        let mut levels = build_levels(tempdir.path(), vec![
            vec![(1, "a", "g"), (2, "h", "t"), (3, "i", "t"), (4, "j", "t")],
            vec![(5, "a", "g"), (6, "a", "g"), (7, "y", "z"), (8, "y", "z")],
            vec![],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::Merge(CompactionInput {
                dest_level: 1,
                segment_ids: vec![1, 2, 3, 4, 5, 6],
                target_size: 64 * 1_024 * 1_024
            })
        );
        levels.hide_segments(&[5]);
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn levelled_deeper_level_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![(1, "a", "g"), (2, "h", "t"), (3, "x", "z")],
            vec![(4, "f", "l")],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 2,
                segment_ids: vec![1, 4],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_deeper_level_no_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "t")],
            vec![(4, "k", "l")],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Move(CompactionInput {
                dest_level: 2,
                segment_ids: vec![1],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_last_level_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![(1, "a", "g"), (2, "a", "g"), (3, "a", "g"), (4, "a", "g"), (5, "y", "z")],
            vec![(6, "f", "l")],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 3,
                segment_ids: vec![1, 2, 3, 4, 6],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_last_level_with_overlap_invariant() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "l"), (4, "m", "n"), (5, "y", "z")],
            vec![(6, "f", "l")],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 3,
                segment_ids: vec![1, 6],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_last_level_without_overlap_invariant() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![],
            vec![(1, "a", "g"), (2, "h", "j"), (3, "k", "l"), (4, "m", "n"), (5, "y", "z")],
            vec![(6, "w", "x")],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Move(CompactionInput {
                dest_level: 3,
                segment_ids: vec![1],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
    #[test]
    fn levelled_from_tiered() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            target_size: 64 * 1_024 * 1_024,
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        #[rustfmt::skip]
        let levels = build_levels(tempdir.path(), vec![
            vec![],
            vec![(1, "a", "z"), (2, "a", "z"), (3, "g", "z")],
            vec![(4, "a", "g")],
            vec![],
        ])?;
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::Merge(CompactionInput {
                dest_level: 2,
                segment_ids: vec![1, 2, 3, 4],
                target_size: 64 * 1_024 * 1_024
            })
        );
        Ok(())
    }
}