use std::collections::VecDeque;
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use fail::fail_point;
use log::{info, warn};
use parking_lot::{Mutex, RwLock};
use crate::config::Config;
use crate::engine::read_entry_bytes_from_file;
use crate::event_listener::EventListener;
use crate::log_batch::{AtomicGroupBuilder, LogBatch};
use crate::memtable::{MemTableHandle, MemTables};
use crate::metrics::*;
use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog};
use crate::{GlobalStats, Result};
const FORCE_COMPACT_RATIO: f64 = 0.2;
const REWRITE_RATIO: f64 = 0.7;
const MAX_REWRITE_ENTRIES_PER_REGION: usize = 32;
const MAX_COUNT_BEFORE_FORCE_REWRITE: u32 = 9;
fn max_batch_bytes() -> usize {
    fail_point!("max_rewrite_batch_bytes", |s| s
        .unwrap()
        .parse::<usize>()
        .unwrap());
    128 * 1024
}
pub struct PurgeManager<P>
where
    P: PipeLog,
{
    cfg: Arc<Config>,
    memtables: MemTables,
    pipe_log: Arc<P>,
    global_stats: Arc<GlobalStats>,
    listeners: Vec<Arc<dyn EventListener>>,
    force_rewrite_candidates: Arc<Mutex<HashMap<u64, u32>>>,
}
impl<P> PurgeManager<P>
where
    P: PipeLog,
{
    pub fn new(
        cfg: Arc<Config>,
        memtables: MemTables,
        pipe_log: Arc<P>,
        global_stats: Arc<GlobalStats>,
        listeners: Vec<Arc<dyn EventListener>>,
    ) -> PurgeManager<P> {
        PurgeManager {
            cfg,
            memtables,
            pipe_log,
            global_stats,
            listeners,
            force_rewrite_candidates: Arc::new(Mutex::new(HashMap::default())),
        }
    }
    pub fn purge_expired_files(&self) -> Result<Vec<u64>> {
        let _t = StopWatch::new(&*ENGINE_PURGE_DURATION_HISTOGRAM);
        let guard = self.force_rewrite_candidates.try_lock();
        if guard.is_none() {
            warn!("Unable to purge expired files: locked");
            return Ok(vec![]);
        }
        let mut rewrite_candidate_regions = guard.unwrap();
        let mut should_compact = HashSet::new();
        if self.needs_rewrite_log_files(LogQueue::Rewrite) {
            should_compact.extend(self.rewrite_rewrite_queue()?);
            self.rescan_memtables_and_purge_stale_files(
                LogQueue::Rewrite,
                self.pipe_log.file_span(LogQueue::Rewrite).1,
            )?;
        }
        if self.needs_rewrite_log_files(LogQueue::Append) {
            if let (Some(rewrite_watermark), Some(compact_watermark)) =
                self.append_queue_watermarks()
            {
                let (first_append, latest_append) = self.pipe_log.file_span(LogQueue::Append);
                let append_queue_barrier =
                    self.listeners.iter().fold(latest_append, |barrier, l| {
                        l.first_file_not_ready_for_purge(LogQueue::Append)
                            .map_or(barrier, |f| std::cmp::min(f, barrier))
                    });
                self.rewrite_append_queue_tombstones()?;
                should_compact.extend(self.rewrite_or_compact_append_queue(
                    rewrite_watermark,
                    compact_watermark,
                    &mut rewrite_candidate_regions,
                )?);
                if append_queue_barrier == first_append && first_append < latest_append {
                    warn!("Unable to purge expired files: blocked by barrier");
                }
                self.rescan_memtables_and_purge_stale_files(
                    LogQueue::Append,
                    append_queue_barrier,
                )?;
            }
        }
        Ok(should_compact.into_iter().collect())
    }
    pub fn must_rewrite_append_queue(
        &self,
        watermark: Option<FileSeq>,
        exit_after_step: Option<u64>,
    ) {
        let _lk = self.force_rewrite_candidates.try_lock().unwrap();
        let (_, last) = self.pipe_log.file_span(LogQueue::Append);
        let watermark = watermark.map_or(last, |w| std::cmp::min(w, last));
        if watermark == last {
            self.pipe_log.rotate(LogQueue::Append).unwrap();
        }
        self.rewrite_append_queue_tombstones().unwrap();
        if exit_after_step == Some(1) {
            return;
        }
        self.rewrite_memtables(self.memtables.collect(|_| true), 0, Some(watermark))
            .unwrap();
        if exit_after_step == Some(2) {
            return;
        }
        self.rescan_memtables_and_purge_stale_files(
            LogQueue::Append,
            self.pipe_log.file_span(LogQueue::Append).1,
        )
        .unwrap();
    }
    pub fn must_rewrite_rewrite_queue(&self) {
        let _lk = self.force_rewrite_candidates.try_lock().unwrap();
        self.rewrite_rewrite_queue().unwrap();
        self.rescan_memtables_and_purge_stale_files(
            LogQueue::Rewrite,
            self.pipe_log.file_span(LogQueue::Rewrite).1,
        )
        .unwrap();
    }
    pub fn must_purge_all_stale(&self) {
        let _lk = self.force_rewrite_candidates.try_lock().unwrap();
        self.pipe_log.rotate(LogQueue::Rewrite).unwrap();
        self.rescan_memtables_and_purge_stale_files(
            LogQueue::Rewrite,
            self.pipe_log.file_span(LogQueue::Rewrite).1,
        )
        .unwrap();
        self.pipe_log.rotate(LogQueue::Append).unwrap();
        self.rescan_memtables_and_purge_stale_files(
            LogQueue::Append,
            self.pipe_log.file_span(LogQueue::Append).1,
        )
        .unwrap();
    }
    pub(crate) fn needs_rewrite_log_files(&self, queue: LogQueue) -> bool {
        let (first_file, active_file) = self.pipe_log.file_span(queue);
        if active_file == first_file {
            return false;
        }
        let total_size = self.pipe_log.total_size(queue);
        match queue {
            LogQueue::Append => total_size > self.cfg.purge_threshold.0 as usize,
            LogQueue::Rewrite => {
                let compacted_rewrites_ratio = self.global_stats.deleted_rewrite_entries() as f64
                    / self.global_stats.rewrite_entries() as f64;
                total_size > self.cfg.purge_rewrite_threshold.unwrap().0 as usize
                    && compacted_rewrites_ratio > self.cfg.purge_rewrite_garbage_ratio
            }
        }
    }
    fn append_queue_watermarks(&self) -> (Option<FileSeq>, Option<FileSeq>) {
        let queue = LogQueue::Append;
        let (first_file, active_file) = self.pipe_log.file_span(queue);
        if active_file == first_file {
            return (None, None);
        }
        let rewrite_watermark = self.pipe_log.file_at(queue, REWRITE_RATIO);
        let compact_watermark = self.pipe_log.file_at(queue, FORCE_COMPACT_RATIO);
        debug_assert!(active_file - 1 > 0);
        (
            Some(std::cmp::min(rewrite_watermark, active_file - 1)),
            Some(std::cmp::min(compact_watermark, active_file - 1)),
        )
    }
    fn rewrite_or_compact_append_queue(
        &self,
        rewrite_watermark: FileSeq,
        compact_watermark: FileSeq,
        rewrite_candidates: &mut HashMap<u64, u32>,
    ) -> Result<Vec<u64>> {
        let _t = StopWatch::new(&*ENGINE_REWRITE_APPEND_DURATION_HISTOGRAM);
        debug_assert!(compact_watermark <= rewrite_watermark);
        let mut should_compact = Vec::with_capacity(16);
        let mut new_candidates = HashMap::with_capacity(rewrite_candidates.len());
        let memtables = self.memtables.collect(|t| {
            let min_append_seq = t.min_file_seq(LogQueue::Append).unwrap_or(u64::MAX);
            let old = min_append_seq < compact_watermark || t.rewrite_count() > 0;
            let has_something_to_rewrite = min_append_seq <= rewrite_watermark;
            let append_heavy = t.has_at_least_some_entries_before(
                FileId::new(LogQueue::Append, rewrite_watermark),
                MAX_REWRITE_ENTRIES_PER_REGION + t.rewrite_count(),
            );
            let full_heavy = t.has_at_least_some_entries_before(
                FileId::new(LogQueue::Append, rewrite_watermark),
                MAX_REWRITE_ENTRIES_PER_REGION,
            );
            let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0);
            if old && full_heavy {
                if *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE {
                    should_compact.push(t.region_id());
                    new_candidates.insert(t.region_id(), *compact_counter + 1);
                    return false;
                } else {
                    should_compact.push(t.region_id());
                    return has_something_to_rewrite;
                }
            }
            !append_heavy && has_something_to_rewrite
        });
        self.rewrite_memtables(
            memtables,
            MAX_REWRITE_ENTRIES_PER_REGION,
            Some(rewrite_watermark),
        )?;
        *rewrite_candidates = new_candidates;
        Ok(should_compact)
    }
    fn rewrite_rewrite_queue(&self) -> Result<Vec<u64>> {
        let _t = StopWatch::new(&*ENGINE_REWRITE_REWRITE_DURATION_HISTOGRAM);
        self.pipe_log.rotate(LogQueue::Rewrite)?;
        let mut force_compact_regions = vec![];
        let memtables = self.memtables.collect(|t| {
            if t.rewrite_count() > MAX_REWRITE_ENTRIES_PER_REGION {
                force_compact_regions.push(t.region_id());
            }
            t.min_file_seq(LogQueue::Rewrite).is_some()
        });
        self.rewrite_memtables(memtables, 0 , None)?;
        self.global_stats.reset_rewrite_counters();
        Ok(force_compact_regions)
    }
    fn rewrite_append_queue_tombstones(&self) -> Result<()> {
        let mut log_batch = self.memtables.take_cleaned_region_logs();
        self.rewrite_impl(
            &mut log_batch,
            None, true, )?;
        Ok(())
    }
    fn rescan_memtables_and_purge_stale_files(&self, queue: LogQueue, seq: FileSeq) -> Result<()> {
        let min_seq = self.memtables.fold(seq, |min, t| {
            t.min_file_seq(queue).map_or(min, |m| std::cmp::min(min, m))
        });
        let purged = self.pipe_log.purge_to(FileId {
            queue,
            seq: min_seq,
        })?;
        if purged > 0 {
            info!("purged {purged} expired log files for queue {queue:?}");
            for listener in &self.listeners {
                listener.post_purge(FileId {
                    queue,
                    seq: min_seq - 1,
                });
            }
        }
        Ok(())
    }
    fn rewrite_memtables(
        &self,
        memtables: Vec<MemTableHandle>,
        expect_rewrites_per_memtable: usize,
        rewrite: Option<FileSeq>,
    ) -> Result<()> {
        let needs_atomicity = (|| {
            fail_point!("force_use_atomic_group", |_| true);
            rewrite.is_none()
        })();
        let mut log_batch = LogBatch::default();
        for memtable in memtables {
            let mut entry_indexes = Vec::with_capacity(expect_rewrites_per_memtable);
            let mut kvs = Vec::new();
            let region_id = {
                let m = memtable.read();
                if let Some(rewrite) = rewrite {
                    m.fetch_entry_indexes_before(rewrite, &mut entry_indexes)?;
                    m.fetch_kvs_before(rewrite, &mut kvs);
                } else {
                    m.fetch_rewritten_entry_indexes(&mut entry_indexes)?;
                    m.fetch_rewritten_kvs(&mut kvs);
                }
                m.region_id()
            };
            let mut previous_size = log_batch.approximate_size();
            let mut atomic_group = None;
            let mut atomic_group_start = None;
            let mut current_entry_indexes = Vec::new();
            let mut current_entries = Vec::new();
            let mut current_size = 0;
            let mut entry_indexes = entry_indexes.into_iter().peekable();
            while let Some(ei) = entry_indexes.next() {
                let entry = read_entry_bytes_from_file(self.pipe_log.as_ref(), &ei)?;
                current_size += entry.len();
                current_entries.push(entry);
                current_entry_indexes.push(ei);
                if entry_indexes.peek().is_some()
                    && current_size + previous_size > max_batch_bytes()
                {
                    if needs_atomicity {
                        if previous_size > 0 {
                            self.rewrite_impl(&mut log_batch, rewrite, false)?;
                            previous_size = 0;
                            if current_size <= max_batch_bytes() {
                                continue;
                            }
                        }
                        match atomic_group.as_mut() {
                            None => {
                                let mut g = AtomicGroupBuilder::default();
                                g.begin(&mut log_batch);
                                atomic_group = Some(g);
                            }
                            Some(g) => {
                                g.add(&mut log_batch);
                            }
                        }
                    }
                    log_batch.add_raw_entries(
                        region_id,
                        mem::take(&mut current_entry_indexes),
                        mem::take(&mut current_entries),
                    )?;
                    current_size = 0;
                    previous_size = 0;
                    let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
                    if needs_atomicity && atomic_group_start.is_none() {
                        atomic_group_start = Some(handle.id.seq);
                    }
                }
            }
            log_batch.add_raw_entries(region_id, current_entry_indexes, current_entries)?;
            for (k, v) in kvs {
                log_batch.put(region_id, k, v)?;
            }
            if let Some(g) = atomic_group.as_mut() {
                g.end(&mut log_batch);
                let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
                self.memtables.apply_rewrite_atomic_group(
                    region_id,
                    atomic_group_start.unwrap(),
                    handle.id.seq,
                );
            } else if log_batch.approximate_size() > max_batch_bytes() {
                self.rewrite_impl(&mut log_batch, rewrite, false)?;
            }
        }
        self.rewrite_impl(&mut log_batch, rewrite, true)?;
        Ok(())
    }
    fn rewrite_impl(
        &self,
        log_batch: &mut LogBatch,
        rewrite_watermark: Option<FileSeq>,
        sync: bool,
    ) -> Result<Option<FileBlockHandle>> {
        if log_batch.is_empty() {
            debug_assert!(sync);
            self.pipe_log.sync(LogQueue::Rewrite)?;
            return Ok(None);
        }
        log_batch.finish_populate(
            self.cfg.batch_compression_threshold.0 as usize,
            self.cfg.compression_level,
        )?;
        let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
        if sync {
            self.pipe_log.sync(LogQueue::Rewrite)?
        }
        log_batch.finish_write(file_handle);
        self.memtables.apply_rewrite_writes(
            log_batch.drain(),
            rewrite_watermark,
            file_handle.id.seq,
        );
        for listener in &self.listeners {
            listener.post_apply_memtables(file_handle.id);
        }
        if rewrite_watermark.is_none() {
            BACKGROUND_REWRITE_BYTES
                .rewrite
                .observe(file_handle.len as f64);
        } else {
            BACKGROUND_REWRITE_BYTES
                .append
                .observe(file_handle.len as f64);
        }
        Ok(Some(file_handle))
    }
}
#[derive(Default)]
pub struct PurgeHook {
    active_log_files: RwLock<VecDeque<(FileSeq, AtomicUsize)>>,
}
impl EventListener for PurgeHook {
    fn post_new_log_file(&self, file_id: FileId) {
        if file_id.queue == LogQueue::Append {
            let mut active_log_files = self.active_log_files.write();
            if let Some(seq) = active_log_files.back().map(|x| x.0) {
                assert_eq!(
                    seq + 1,
                    file_id.seq,
                    "active log files should be contiguous"
                );
            }
            let counter = AtomicUsize::new(0);
            active_log_files.push_back((file_id.seq, counter));
        }
    }
    fn on_append_log_file(&self, handle: FileBlockHandle) {
        if handle.id.queue == LogQueue::Append {
            let active_log_files = self.active_log_files.read();
            assert!(!active_log_files.is_empty());
            let front = active_log_files[0].0;
            let counter = &active_log_files[(handle.id.seq - front) as usize].1;
            counter.fetch_add(1, Ordering::Release);
        }
    }
    fn post_apply_memtables(&self, file_id: FileId) {
        if file_id.queue == LogQueue::Append {
            let active_log_files = self.active_log_files.read();
            assert!(!active_log_files.is_empty());
            let front = active_log_files[0].0;
            let counter = &active_log_files[(file_id.seq - front) as usize].1;
            counter.fetch_sub(1, Ordering::Release);
        }
    }
    fn first_file_not_ready_for_purge(&self, queue: LogQueue) -> Option<FileSeq> {
        if queue == LogQueue::Append {
            let active_log_files = self.active_log_files.read();
            for (id, counter) in active_log_files.iter() {
                if counter.load(Ordering::Acquire) > 0 {
                    return Some(*id);
                }
            }
        }
        None
    }
    fn post_purge(&self, file_id: FileId) {
        if file_id.queue == LogQueue::Append {
            let mut active_log_files = self.active_log_files.write();
            assert!(!active_log_files.is_empty());
            let front = active_log_files[0].0;
            if front <= file_id.seq {
                let mut purged = active_log_files.drain(0..=(file_id.seq - front) as usize);
                assert_eq!(purged.next_back().unwrap().0, file_id.seq);
            }
        }
    }
}