use std::cmp;
use std::collections::{BinaryHeap, HashMap};
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use crate::std::queue::mpsc_list_v1::Entry;
use crate::std::queue::mpsc_list_v1::Queue as TimeoutQueue;
use crate::std::queue::seg_queue::SegQueue as mpsc;
use crossbeam::atomic::AtomicCell;
use once_cell::sync::Lazy;
use parking_lot::Mutex;
const NANOS_PER_MILLI: u64 = 1_000_000;
const NANOS_PER_SEC: u64 = 1_000_000_000;
const HASH_CAP: usize = 1024;
#[inline]
fn dur_to_ns(dur: Duration) -> u64 {
    dur.as_secs()
        .saturating_mul(NANOS_PER_SEC)
        .saturating_add(u64::from(dur.subsec_nanos()))
}
#[inline]
pub fn ns_to_dur(ns: u64) -> Duration {
    Duration::new(ns / NANOS_PER_SEC, (ns % NANOS_PER_SEC) as u32)
}
#[allow(dead_code)]
#[inline]
pub fn ns_to_ms(ns: u64) -> u64 {
    (ns + NANOS_PER_MILLI - 1) / NANOS_PER_MILLI
}
pub static START_TIME: Lazy<Instant> = Lazy::new(|| Instant::now());
#[inline]
pub fn now() -> u64 {
    START_TIME.elapsed().as_nanos() as u64
}
pub struct TimeoutData<T> {
    time: u64,
    pub data: T, }
pub type TimeoutHandle<T> = Entry<TimeoutData<T>>;
struct TimeoutQueueWrapper<T> {
    inner: TimeoutQueue<TimeoutData<T>>,
    in_use: AtomicUsize,
}
impl<T> TimeoutQueueWrapper<T> {
    fn new() -> Self {
        TimeoutQueueWrapper {
            inner: TimeoutQueue::new(),
            in_use: AtomicUsize::new(0),
        }
    }
}
type IntervalList<T> = Arc<TimeoutQueueWrapper<T>>;
struct IntervalEntry<T> {
    time: u64,
    list: IntervalList<T>,
    interval: u64,
}
impl<T> IntervalEntry<T> {
    pub fn pop_timeout<F>(&self, now: u64, f: &F) -> Option<u64>
    where
        F: Fn(T),
    {
        let p = |v: &TimeoutData<T>| v.time <= now;
        while let Some(timeout) = self.list.inner.pop_if(&p) {
            f(timeout.data);
        }
        self.list.inner.peek().map(|t| t.time)
    }
}
impl<T> PartialEq for IntervalEntry<T> {
    fn eq(&self, other: &Self) -> bool {
        self.time == other.time
    }
}
impl<T> Eq for IntervalEntry<T> {}
impl<T> PartialOrd for IntervalEntry<T> {
    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
        Some(self.cmp(other))
    }
}
impl<T> cmp::Ord for IntervalEntry<T> {
    fn cmp(&self, other: &Self) -> cmp::Ordering {
        other.time.cmp(&self.time)
    }
}
pub struct TimeOutList<T> {
    interval_map: RwLock<HashMap<u64, IntervalList<T>>>,
    timer_bh: Mutex<BinaryHeap<IntervalEntry<T>>>,
}
impl<T> TimeOutList<T> {
    pub fn new() -> Self {
        TimeOutList {
            interval_map: RwLock::new(HashMap::with_capacity(HASH_CAP)),
            timer_bh: Mutex::new(BinaryHeap::new()),
        }
    }
    fn install_timer_bh(&self, entry: IntervalEntry<T>) {
        if entry.list.in_use.fetch_add(1, Ordering::AcqRel) == 0 {
            self.timer_bh.lock().push(entry);
        }
    }
    pub fn add_timer(&self, dur: Duration, data: T) -> (TimeoutHandle<T>, bool) {
        let interval = dur_to_ns(dur);
        let time = now() + interval; let timeout = TimeoutData { time, data };
        let interval_list = {
            let interval_map_r = self.interval_map.read().unwrap();
            (*interval_map_r).get(&interval).cloned()
            };
        if let Some(interval_list) = interval_list {
            let (handle, is_head) = interval_list.inner.push(timeout);
            if is_head {
                self.install_timer_bh(IntervalEntry {
                    time,
                    interval,
                    list: interval_list,
                });
            }
            return (handle, is_head);
        }
        let mut interval_map_w = self.interval_map.write().unwrap();
        if let Some(interval_list) = (*interval_map_w).get(&interval) {
            let (handle, is_head) = interval_list.inner.push(timeout);
            if is_head {
                self.install_timer_bh(IntervalEntry {
                    time,
                    interval,
                    list: interval_list.clone(),
                });
            }
            return (handle, is_head);
        }
        let interval_list = Arc::new(TimeoutQueueWrapper::<T>::new());
        let ret = interval_list.inner.push(timeout).0;
        (*interval_map_w).insert(interval, interval_list.clone());
        mem::drop(interval_map_w);
        self.install_timer_bh(IntervalEntry {
            time,
            interval,
            list: interval_list,
        });
        (ret, true)
    }
    pub fn schedule_timer<F: Fn(T)>(&self, now: u64, f: &F) -> Option<u64> {
        loop {
            let mut entry = {
                let mut timer_bh = self.timer_bh.lock();
                match timer_bh.peek() {
                    Some(entry) => {
                        if entry.time > now {
                            return Some(entry.time - now);
                        } else {
                            }
                    }
                    None => return None,
                }
                let entry = timer_bh.pop().unwrap();
                entry.list.in_use.store(0, Ordering::Release);
                entry
            };
            match entry.pop_timeout(now, f) {
                Some(time) => {
                    if entry.list.in_use.fetch_add(1, Ordering::AcqRel) == 0 {
                        entry.time = time;
                        self.timer_bh.lock().push(entry);
                    }
                }
                None => {
                    let mut interval_map_w = self.interval_map.write().unwrap();
                    if entry.list.inner.is_empty() {
                        if (*interval_map_w).len() > HASH_CAP {
                            (*interval_map_w).remove(&entry.interval);
                        }
                    } else if entry.list.in_use.fetch_add(1, Ordering::AcqRel) == 0 {
                        mem::drop(interval_map_w);
                        entry.time = entry.list.inner.peek().unwrap().time;
                        self.timer_bh.lock().push(entry);
                    }
                }
            }
        }
    }
}
pub struct TimerThread<T> {
    timer_list: TimeOutList<T>,
    remove_list: mpsc<TimeoutHandle<T>>,
    wakeup: AtomicCell<Option<thread::Thread>>,
}
impl<T> TimerThread<T> {
    pub fn new() -> Self {
        TimerThread {
            timer_list: TimeOutList::new(),
            remove_list: mpsc::new(),
            wakeup: AtomicCell::new(None),
        }
    }
    pub fn add_timer(&self, dur: Duration, data: T) -> TimeoutHandle<T> {
        let (h, is_recal) = self.timer_list.add_timer(dur, data);
        if is_recal {
            if let Some(t) = self.wakeup.take() {
                t.unpark();
            }
        }
        h
    }
    pub fn del_timer(&self, handle: TimeoutHandle<T>) {
        self.remove_list.push(handle);
        if let Some(t) = self.wakeup.take() {
            t.unpark();
        }
    }
    pub fn run<F: Fn(T)>(&self, f: &F) {
        let current_thread = thread::current();
        loop {
            while let Some(h) = self.remove_list.pop() {
                h.remove();
            }
            self.wakeup.swap(Some(current_thread.clone()));
            if !self.remove_list.is_empty() {
                if let Some(t) = self.wakeup.take() {
                    t.unpark();
                }
            }
            match self.timer_list.schedule_timer(now(), f) {
                Some(time) => thread::park_timeout(ns_to_dur(time)),
                None => thread::park(),
            }
        }
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;
    #[test]
    fn test_timeout_list() {
        let timer = Arc::new(TimerThread::<usize>::new());
        let t = timer.clone();
        let f = |data: usize| {
            println!("timeout data:{:?}", data);
        };
        thread::spawn(move || t.run(&f));
        let t1 = timer.clone();
        thread::spawn(move || {
            t1.add_timer(Duration::from_millis(1000), 50);
            t1.add_timer(Duration::from_millis(1000), 60);
            t1.add_timer(Duration::from_millis(1400), 70);
        });
        thread::sleep(Duration::from_millis(10));
        timer.add_timer(Duration::from_millis(1000), 10);
        timer.add_timer(Duration::from_millis(500), 40);
        timer.add_timer(Duration::from_millis(1200), 20);
        thread::sleep(Duration::from_millis(100));
        timer.add_timer(Duration::from_millis(1000), 30);
        thread::sleep(Duration::from_millis(1500));
    }
}