use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{levels::Levels, segment::Segment, value::UserKey, Config};
use std::sync::Arc;
pub struct Strategy {
    pub l0_threshold: u8,
    pub target_size: u32,
}
impl Default for Strategy {
    fn default() -> Self {
        Self {
            l0_threshold: 4,
            target_size: 128 * 1_024 * 1_024,
        }
    }
}
fn get_key_range(segments: &[Arc<Segment>]) -> (UserKey, UserKey) {
    let (mut min, mut max) = segments[0].metadata.key_range.clone();
    for other in segments.iter().skip(1) {
        if other.metadata.key_range.0 < min {
            min = other.metadata.key_range.0.clone();
        }
        if other.metadata.key_range.1 > max {
            max = other.metadata.key_range.1.clone();
        }
    }
    (min, max)
}
fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, target_size: u32) -> usize {
    (ratio as usize).pow(u32::from(level_idx)) * (target_size as usize)
}
impl CompactionStrategy for Strategy {
    fn choose(&self, levels: &Levels, config: &Config) -> Choice {
        let resolved_view = levels.resolved_view();
        let busy_levels = levels.busy_levels();
        for (curr_level_index, level) in resolved_view
            .iter()
            .enumerate()
            .map(|(idx, lvl)| (idx as u8, lvl))
            .skip(1)
            .take(resolved_view.len() - 2)
            .rev()
        {
            let next_level_index = curr_level_index + 1;
            if level.is_empty() {
                continue;
            }
            if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
                continue;
            }
            let curr_level_bytes = level
                .iter()
                .fold(0, |bytes, segment| bytes + segment.metadata.file_size);
            let desired_bytes =
                desired_level_size_in_bytes(curr_level_index, config.level_ratio, self.target_size);
            let mut overshoot = curr_level_bytes.saturating_sub(desired_bytes as u64) as usize;
            if overshoot > 0 {
                let mut segments_to_compact = vec![];
                for segment in level.iter().rev().cloned() {
                    if overshoot == 0 {
                        break;
                    }
                    overshoot = overshoot.saturating_sub(segment.metadata.file_size as usize);
                    segments_to_compact.push(segment);
                }
                let (min, max) = get_key_range(&segments_to_compact);
                let next_level = &resolved_view[next_level_index as usize];
                let overlapping_segment_ids = next_level.get_overlapping_segments(min, max);
                let mut segment_ids: Vec<_> = segments_to_compact
                    .iter()
                    .map(|x| &x.metadata.id)
                    .cloned()
                    .collect();
                segment_ids.extend(overlapping_segment_ids.into_iter().cloned());
                return Choice::DoCompact(CompactionInput {
                    segment_ids,
                    dest_level: next_level_index,
                    target_size: u64::from(self.target_size),
                });
            }
        }
        {
            let Some(first_level) = &resolved_view.get(0) else {
                return Choice::DoNothing;
            };
            if first_level.len() >= self.l0_threshold.into()
                && !busy_levels.contains(&0)
                && !busy_levels.contains(&1)
            {
                let mut first_level_segments: Vec<_> = first_level.iter().cloned().collect();
                first_level_segments
                    .sort_by(|a, b| a.metadata.key_range.0.cmp(&b.metadata.key_range.0));
                let (min, max) = get_key_range(&first_level_segments);
                let next_level = &resolved_view[1];
                let overlapping_segment_ids = next_level.get_overlapping_segments(min, max);
                let mut segment_ids: Vec<_> = first_level_segments
                    .iter()
                    .map(|x| &x.metadata.id)
                    .cloned()
                    .collect();
                segment_ids.extend(overlapping_segment_ids.into_iter().cloned());
                return Choice::DoCompact(CompactionInput {
                    segment_ids,
                    dest_level: 1,
                    target_size: u64::from(self.target_size),
                });
            }
        }
        Choice::DoNothing
    }
}
#[cfg(test)]
mod tests {
    use super::{Choice, Strategy};
    use crate::{
        block_cache::BlockCache,
        compaction::{CompactionStrategy, Input as CompactionInput},
        descriptor_table::FileDescriptorTable,
        file::LEVELS_MANIFEST_FILE,
        levels::Levels,
        segment::{index::BlockIndex, meta::Metadata, Segment},
        time::unix_timestamp,
        value::UserKey,
        Config,
    };
    use std::sync::Arc;
    use test_log::test;
    #[allow(clippy::expect_used)]
    fn fixture_segment(id: String, key_range: (UserKey, UserKey), size: u64) -> Arc<Segment> {
        let block_cache = Arc::new(BlockCache::with_capacity_blocks(0));
        Arc::new(Segment {
            descriptor_table: Arc::new(
                FileDescriptorTable::new("Cargo.toml").expect("should open"),
            ),
            block_index: Arc::new(BlockIndex::new(id.clone(), block_cache.clone())),
            metadata: Metadata {
                path: ".".into(),
                version: crate::version::Version::V0,
                block_count: 0,
                block_size: 0,
                created_at: unix_timestamp().as_nanos(),
                id,
                file_size: size,
                compression: crate::segment::meta::CompressionType::Lz4,
                item_count: 0,
                key_count: 0,
                key_range,
                tombstone_count: 0,
                uncompressed_size: 0,
                seqnos: (0, 0),
            },
            block_cache,
        })
    }
    #[test]
    fn empty_levels() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy::default();
        let levels = Levels::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn default_l0() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy::default();
        let mut levels = Levels::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
        levels.add(fixture_segment(
            "1".into(),
            ("a".as_bytes().into(), "z".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        levels.add(fixture_segment(
            "2".into(),
            ("a".as_bytes().into(), "z".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        levels.add(fixture_segment(
            "3".into(),
            ("a".as_bytes().into(), "z".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        levels.add(fixture_segment(
            "4".into(),
            ("a".as_bytes().into(), "z".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoCompact(CompactionInput {
                dest_level: 1,
                segment_ids: vec!["4".into(), "3".into(), "2".into(), "1".into()],
                target_size: 128 * 1024 * 1024
            })
        );
        levels.hide_segments(&["4".into()]);
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn more_than_min_no_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy::default();
        let mut levels = Levels::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
        levels.add(fixture_segment(
            "1".into(),
            ("h".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.add(fixture_segment(
            "2".into(),
            ("h".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.add(fixture_segment(
            "3".into(),
            ("h".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.add(fixture_segment(
            "4".into(),
            ("h".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.insert_into_level(
            1,
            fixture_segment(
                "5".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "6".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "7".into(),
                ("y".as_bytes().into(), "z".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "8".into(),
                ("y".as_bytes().into(), "z".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoCompact(CompactionInput {
                dest_level: 1,
                segment_ids: vec!["4".into(), "3".into(), "2".into(), "1".into()],
                target_size: 128 * 1024 * 1024
            })
        );
        Ok(())
    }
    #[test]
    fn more_than_min_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy::default();
        let mut levels = Levels::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
        levels.add(fixture_segment(
            "1".into(),
            ("a".as_bytes().into(), "g".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.add(fixture_segment(
            "2".into(),
            ("h".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.add(fixture_segment(
            "3".into(),
            ("i".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.add(fixture_segment(
            "4".into(),
            ("j".as_bytes().into(), "t".as_bytes().into()),
            128 * 1_024 * 1_024,
        ));
        levels.insert_into_level(
            1,
            fixture_segment(
                "5".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "6".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "7".into(),
                ("y".as_bytes().into(), "z".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "8".into(),
                ("y".as_bytes().into(), "z".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoCompact(CompactionInput {
                dest_level: 1,
                segment_ids: vec![
                    "1".into(),
                    "2".into(),
                    "3".into(),
                    "4".into(),
                    "6".into(),
                    "5".into()
                ],
                target_size: 128 * 1024 * 1024
            })
        );
        levels.hide_segments(&["5".into()]);
        assert_eq!(
            compactor.choose(&levels, &Config::default()),
            Choice::DoNothing
        );
        Ok(())
    }
    #[test]
    fn deeper_level_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        let mut levels = Levels::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
        levels.insert_into_level(
            2,
            fixture_segment(
                "4".into(),
                ("f".as_bytes().into(), "l".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
        levels.insert_into_level(
            1,
            fixture_segment(
                "1".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
        levels.insert_into_level(
            1,
            fixture_segment(
                "2".into(),
                ("h".as_bytes().into(), "t".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            1,
            fixture_segment(
                "3".into(),
                ("h".as_bytes().into(), "t".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::DoCompact(CompactionInput {
                dest_level: 2,
                segment_ids: vec!["1".into(), "4".into()],
                target_size: 128 * 1024 * 1024
            })
        );
        Ok(())
    }
    #[test]
    fn last_level_with_overlap() -> crate::Result<()> {
        let tempdir = tempfile::tempdir()?;
        let compactor = Strategy {
            ..Default::default()
        };
        let config = Config::default().level_ratio(2);
        let mut levels = Levels::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
        levels.insert_into_level(
            3,
            fixture_segment(
                "5".into(),
                ("f".as_bytes().into(), "l".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
        levels.insert_into_level(
            2,
            fixture_segment(
                "1".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
        levels.insert_into_level(
            2,
            fixture_segment(
                "2".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
        levels.insert_into_level(
            2,
            fixture_segment(
                "3".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
        levels.insert_into_level(
            2,
            fixture_segment(
                "4".into(),
                ("a".as_bytes().into(), "g".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        levels.insert_into_level(
            2,
            fixture_segment(
                "6".into(),
                ("y".as_bytes().into(), "z".as_bytes().into()),
                128 * 1_024 * 1_024,
            ),
        );
        assert_eq!(
            compactor.choose(&levels, &config),
            Choice::DoCompact(CompactionInput {
                dest_level: 3,
                segment_ids: vec!["1".into(), "5".into()],
                target_size: 128 * 1024 * 1024
            })
        );
        Ok(())
    }
}