use crate::server::ServerStats;
use crate::{sockets::create_unix_socket, system::ServerData};
use ntp_proto::{ObservablePeerTimedata, PollInterval, Reach, ReferenceId, SystemSnapshot};
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt;
use tokio::task::JoinHandle;
use tracing::warn;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct ObservableState {
    pub system: SystemSnapshot,
    pub peers: Vec<ObservablePeerState>,
    pub servers: Vec<ObservableServerState>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ObservableServerState {
    pub address: SocketAddr,
    pub stats: ServerStats,
}
impl From<&ServerData> for ObservableServerState {
    fn from(data: &ServerData) -> Self {
        ObservableServerState {
            address: data.config.addr,
            stats: data.stats.clone(),
        }
    }
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum ObservablePeerState {
    Nothing,
    Observable {
        #[serde(flatten)]
        timedata: ObservablePeerTimedata,
        reachability: Reach,
        poll_interval: PollInterval,
        peer_id: ReferenceId,
        address: String,
    },
}
pub async fn spawn(
    config: &crate::config::ObserveConfig,
    peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
    server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
    system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
) -> JoinHandle<std::io::Result<()>> {
    let config = config.clone();
    tokio::spawn(async move {
        let result = observer(config, peers_reader, server_reader, system_reader).await;
        if let Err(ref e) = result {
            warn!("Abnormal termination of the state observer: {}", e);
            warn!("The state observer will not be available");
        }
        result
    })
}
async fn observer(
    config: crate::config::ObserveConfig,
    peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
    server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
    system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
) -> std::io::Result<()> {
    let path = match config.path {
        Some(path) => path,
        None => return Ok(()),
    };
    let peers_listener = create_unix_socket(&path)?;
                let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
    std::fs::set_permissions(&path, permissions)?;
    loop {
        let (mut stream, _addr) = peers_listener.accept().await?;
        let observe = ObservableState {
            peers: peers_reader.borrow().to_owned(),
            system: *system_reader.borrow(),
            servers: server_reader.borrow().iter().map(|s| s.into()).collect(),
        };
        crate::sockets::write_json(&mut stream, &observe).await?;
    }
}
#[cfg(test)]
mod tests {
    use std::{borrow::BorrowMut, time::Duration};
    use ntp_proto::{
        NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, PollIntervalLimits,
        Reach, ReferenceId, TimeSnapshot,
    };
    use tokio::{io::AsyncReadExt, net::UnixStream};
    use super::*;
    #[derive(Debug, Clone, Default)]
    struct TestClock {}
    impl NtpClock for TestClock {
        type Error = std::io::Error;
        fn now(&self) -> std::result::Result<NtpTimestamp, Self::Error> {
            Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
        }
        fn set_frequency(&self, _freq: f64) -> Result<NtpTimestamp, Self::Error> {
            Ok(NtpTimestamp::default())
        }
        fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
            Ok(NtpTimestamp::default())
        }
        fn enable_ntp_algorithm(&self) -> Result<(), Self::Error> {
            Ok(())
        }
        fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> {
            Ok(())
        }
        fn ntp_algorithm_update(
            &self,
            _offset: NtpDuration,
            _poll_interval: PollInterval,
        ) -> Result<(), Self::Error> {
            Ok(())
        }
        fn error_estimate_update(
            &self,
            _est_error: NtpDuration,
            _max_error: NtpDuration,
        ) -> Result<(), Self::Error> {
            Ok(())
        }
        fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> {
            Ok(())
        }
    }
    #[tokio::test]
    async fn test_observation() {
                let path = std::env::temp_dir().join("ntp-test-stream-2");
        let config = crate::config::ObserveConfig {
            path: Some(path.clone()),
            mode: 0o700,
        };
        let (_, peers_reader) = tokio::sync::watch::channel(vec![
            ObservablePeerState::Nothing,
            ObservablePeerState::Nothing,
            ObservablePeerState::Observable {
                timedata: Default::default(),
                reachability: Reach::default(),
                poll_interval: PollIntervalLimits::default().min,
                peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
                address: "127.0.0.3:123".into(),
            },
        ]);
        let (_, servers_reader) = tokio::sync::watch::channel(vec![]);
        let (_, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
            stratum: 1,
            reference_id: ReferenceId::NONE,
            accumulated_steps_threshold: None,
            time_snapshot: TimeSnapshot {
                poll_interval: PollIntervalLimits::default().min,
                precision: NtpDuration::from_seconds(1e-3),
                root_delay: NtpDuration::ZERO,
                root_dispersion: NtpDuration::ZERO,
                leap_indicator: NtpLeapIndicator::Leap59,
                accumulated_steps: NtpDuration::ZERO,
            },
        });
        let handle = tokio::spawn(async move {
            observer(config, peers_reader, servers_reader, system_reader)
                .await
                .unwrap();
        });
        tokio::time::sleep(Duration::from_millis(10)).await;
        let mut reader = UnixStream::connect(path).await.unwrap();
        let mut buf = vec![];
        while reader.read_buf(&mut buf).await.unwrap() != 0 {}
        let result: ObservableState = serde_json::from_slice(&buf).unwrap();
                let mut count = 0;
        for peer in &result.peers {
            if matches!(peer, ObservablePeerState::Observable { .. }) {
                count += 1;
            }
        }
        assert_eq!(count, 1);
        handle.abort();
    }
    #[tokio::test]
    async fn test_block_during_read() {
                let path = std::env::temp_dir().join("ntp-test-stream-3");
        let config = crate::config::ObserveConfig {
            path: Some(path.clone()),
            mode: 0o700,
        };
        let (mut peers_writer, peers_reader) = tokio::sync::watch::channel(vec![
            ObservablePeerState::Nothing,
            ObservablePeerState::Nothing,
            ObservablePeerState::Observable {
                timedata: Default::default(),
                reachability: Reach::default(),
                poll_interval: PollIntervalLimits::default().min,
                peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
                address: "127.0.0.3:123".into(),
            },
        ]);
        let (mut server_writer, servers_reader) = tokio::sync::watch::channel(vec![]);
        let (mut system_writer, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
            stratum: 1,
            reference_id: ReferenceId::NONE,
            accumulated_steps_threshold: None,
            time_snapshot: TimeSnapshot {
                poll_interval: PollIntervalLimits::default().min,
                precision: NtpDuration::from_seconds(1e-3),
                root_delay: NtpDuration::ZERO,
                root_dispersion: NtpDuration::ZERO,
                leap_indicator: NtpLeapIndicator::Leap59,
                accumulated_steps: NtpDuration::ZERO,
            },
        });
        let handle = tokio::spawn(async move {
            observer(config, peers_reader, servers_reader, system_reader)
                .await
                .unwrap();
        });
        tokio::time::sleep(Duration::from_millis(10)).await;
        let mut reader = UnixStream::connect(path).await.unwrap();
                                let mut buf = [0_u8; 12];
        let mut bufref: &mut [u8] = &mut buf;
        reader.read_buf(&mut bufref).await.unwrap();
                let _ = system_writer.borrow_mut();
        let _ = peers_writer.borrow_mut();
        let _ = server_writer.borrow_mut();
        handle.abort();
    }
}