use futures::future::Either;
use futures::pin_mut;
use lazy_static::lazy_static;
use prometheus::register_gauge_vec;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::watch;
use crate::{NoArgs, NoDependencies, Resource};
lazy_static! {
    static ref HEALTH_SIGNALS: prometheus::GaugeVec = register_gauge_vec!(
        "comprehensive_health_signal",
        "Assembly health signal (boolean)",
        &["signal"]
    )
    .unwrap();
}
pub struct HealthSignaller {
    reporter: Arc<HealthReporter>,
    index: usize,
}
impl HealthSignaller {
        pub fn set_healthy(&self, healthy: bool) {
        let reporter = &self.reporter;
        let signal = &reporter.signals[self.index];
        let mut lock = signal.healthy.lock().unwrap();
        if healthy != *lock {
            *lock = healthy;
            if healthy {
                reporter.unhealthy_count.fetch_sub(1, Ordering::Release);
            } else {
                reporter.unhealthy_count.fetch_add(1, Ordering::Release);
            }
            reporter.maybe_notify();
            HEALTH_SIGNALS
                .with_label_values(&[signal.name])
                .set(if healthy { 1.0 } else { 0.0 });
        }
    }
}
struct Signal {
    name: &'static str,
    healthy: std::sync::Mutex<bool>,
}
pub struct HealthReporter {
    signals: boxcar::Vec<Signal>,
    unhealthy_count: AtomicU32,
    watch: watch::Sender<bool>,
}
impl HealthReporter {
        pub fn is_healthy(&self) -> bool {
        self.unhealthy_count.load(Ordering::Acquire) == 0
    }
            pub fn subscribe(&self) -> watch::Receiver<bool> {
        self.watch.subscribe()
    }
    fn maybe_notify(&self) {
        self.watch.send_if_modified(|value| {
            let healthy = self.is_healthy();
            if *value == healthy {
                false
            } else {
                *value = healthy;
                true
            }
        });
    }
                                            pub fn register(
        self: &Arc<Self>,
        name: &'static str,
    ) -> Result<HealthSignaller, crate::ComprehensiveError> {
        let index = self.signals.push(Signal {
            name,
            healthy: std::sync::Mutex::new(false),
        });
        if self.unhealthy_count.fetch_add(1, Ordering::AcqRel) == 0 {
            self.maybe_notify();
            HEALTH_SIGNALS.with_label_values(&[name]).set(0.0);
        }
        Ok(HealthSignaller {
            reporter: Arc::clone(self),
            index,
        })
    }
    fn unhealthy_list(&self) -> (String, usize) {
        let mut v = self
            .signals
            .iter()
            .filter_map(|(_, s)| match *s.healthy.lock().unwrap() {
                true => None,
                false => Some(s.name),
            })
            .collect::<Vec<_>>();
        v.sort();
        (v.join(", "), v.len())
    }
    async fn startup_health_notices(&self, rx: &mut watch::Receiver<bool>) {
                const STARTUP_ANNOUNCE_INTERVAL: Duration = Duration::from_millis(30000);
        let start = Instant::now();
        loop {
            let deadline = tokio::time::Instant::now() + STARTUP_ANNOUNCE_INTERVAL;
            loop {
                if *rx.borrow_and_update() {
                    log::info!(
                        "After {}s, monitoring {} signals, all healthy",
                        std::time::Instant::now()
                            .duration_since(start)
                            .as_secs_f64(),
                        self.signals.count()
                    );
                    return;
                }
                let sleeper = tokio::time::sleep_until(deadline);
                let changed = rx.changed();
                pin_mut!(sleeper);
                pin_mut!(changed);
                if let Either::Left(_) = futures::future::select(sleeper, changed).await {
                    break;
                }
            }
            let (list, count) = self.unhealthy_list();
            log::warn!(
                "After {}s, {} signals are still unhealthy: {}",
                std::time::Instant::now()
                    .duration_since(start)
                    .as_secs_f64(),
                count,
                list
            );
        }
    }
    async fn wait_until_unhealthy(&self, rx: &mut watch::Receiver<bool>) {
        while *rx.borrow_and_update() {
            let _ = rx.changed().await;
        }
    }
    async fn complain_until_healthy(&self, rx: &mut watch::Receiver<bool>) {
        let mut last_count = 0;
        let mut last_list = String::from("");
        while !*rx.borrow_and_update() {
            let (list, count) = self.unhealthy_list();
            if count != last_count || list != last_list {
                log::warn!("{} unhealthy signal: {}", count, list);
                last_count = count;
                last_list = list;
            }
            let _ = rx.changed().await;
        }
        log::info!("All {} signals are healthy again", self.signals.count());
    }
}
impl Resource for HealthReporter {
    type Args = NoArgs;
    type Dependencies = NoDependencies;
    const NAME: &str = "Health reporter";
    fn new(_: NoDependencies, _: NoArgs) -> Result<Self, Box<dyn std::error::Error>> {
        Ok(Self {
            signals: boxcar::Vec::new(),
            watch: watch::Sender::new(true),
            unhealthy_count: AtomicU32::new(0),
        })
    }
    async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
        if self.signals.is_empty() {
            log::info!("No health signals registered. Server is trivially always healthy.");
            return Ok(());
        }
        let mut rx = self.watch.subscribe();
        self.startup_health_notices(&mut rx).await;
        loop {
            self.wait_until_unhealthy(&mut rx).await;
            self.complain_until_healthy(&mut rx).await;
        }
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use futures::poll;
    use std::pin::pin;
    #[derive(crate::ResourceDependencies)]
    struct TestAssembly(Arc<HealthReporter>);
    #[tokio::test]
    async fn no_signals_is_healthy() {
        let argv = vec!["cmd"];
        let assembly = crate::Assembly::<TestAssembly>::new_from_argv(argv).unwrap();
        assert!(assembly.top.0.is_healthy());
        let mut rx = assembly.top.0.subscribe();
        assert_eq!(*rx.borrow_and_update(), true);
        assert!(poll!(pin!(rx.changed())).is_pending());
    }
    #[tokio::test]
    async fn one_signal_is_unhealthy() {
        let argv = vec!["cmd"];
        let assembly = crate::Assembly::<TestAssembly>::new_from_argv(argv).unwrap();
        let _ = assembly.top.0.register("nobody");
        assert!(!assembly.top.0.is_healthy());
        let mut rx = assembly.top.0.subscribe();
        assert_eq!(*rx.borrow_and_update(), false);
        assert!(poll!(pin!(rx.changed())).is_pending());
    }
    #[tokio::test]
    async fn one_signal_is_healthy() {
        let argv = vec!["cmd"];
        let assembly = crate::Assembly::<TestAssembly>::new_from_argv(argv).unwrap();
        assembly.top.0.register("nobody").unwrap().set_healthy(true);
        assert!(assembly.top.0.is_healthy());
        let mut rx = assembly.top.0.subscribe();
        assert_eq!(*rx.borrow_and_update(), true);
        assert!(poll!(pin!(rx.changed())).is_pending());
    }
    #[tokio::test]
    async fn one_of_two_is_unhealthy() {
        let argv = vec!["cmd"];
        let assembly = crate::Assembly::<TestAssembly>::new_from_argv(argv).unwrap();
        let _ = assembly.top.0.register("sad");
        assembly.top.0.register("happy").unwrap().set_healthy(true);
        assert!(!assembly.top.0.is_healthy());
        let mut rx = assembly.top.0.subscribe();
        assert_eq!(*rx.borrow_and_update(), false);
        assert!(poll!(pin!(rx.changed())).is_pending());
    }
    #[tokio::test]
    async fn changes() {
        let argv = vec!["cmd"];
        let assembly = crate::Assembly::<TestAssembly>::new_from_argv(argv).unwrap();
        let signal = assembly.top.0.register("variable").unwrap();
        assert!(!assembly.top.0.is_healthy());
        let mut rx = assembly.top.0.subscribe();
        assert!(poll!(pin!(rx.changed())).is_pending());
        signal.set_healthy(true);
        assert!(assembly.top.0.is_healthy());
        assert!(poll!(pin!(rx.changed())).is_ready());
        assert_eq!(*rx.borrow_and_update(), true);
        assert!(poll!(pin!(rx.changed())).is_pending());
        signal.set_healthy(false);
        assert!(!assembly.top.0.is_healthy());
        assert!(poll!(pin!(rx.changed())).is_ready());
        assert_eq!(*rx.borrow_and_update(), false);
        assert!(poll!(pin!(rx.changed())).is_pending());
    }
}