use crate::server::ServerStats;
use crate::{sockets::create_unix_socket, system::ServerData};
use ntp_proto::{ObservablePeerTimedata, PollInterval, Reach, ReferenceId, SystemSnapshot};
use prometheus_client::encoding::EncodeLabelValue;
use std::fmt::Write;
use std::net::SocketAddr;
use std::os::unix::fs::PermissionsExt;
use tokio::task::JoinHandle;
use tracing::warn;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct ObservableState {
pub system: SystemSnapshot,
pub peers: Vec<ObservablePeerState>,
pub servers: Vec<ObservableServerState>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ObservableServerState {
pub address: WrappedSocketAddr,
pub stats: ServerStats,
}
impl From<&ServerData> for ObservableServerState {
fn from(data: &ServerData) -> Self {
ObservableServerState {
address: data.config.addr.into(),
stats: data.stats.clone(),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WrappedSocketAddr(SocketAddr);
impl From<SocketAddr> for WrappedSocketAddr {
fn from(s: SocketAddr) -> Self {
WrappedSocketAddr(s)
}
}
impl EncodeLabelValue for WrappedSocketAddr {
fn encode(
&self,
encoder: &mut prometheus_client::encoding::LabelValueEncoder,
) -> Result<(), std::fmt::Error> {
encoder.write_str(&self.0.to_string())
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum ObservablePeerState {
Nothing,
Observable {
#[serde(flatten)]
timedata: ObservablePeerTimedata,
reachability: Reach,
poll_interval: PollInterval,
peer_id: ReferenceId,
address: String,
},
}
pub async fn spawn(
config: &crate::config::ObserveConfig,
peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
) -> JoinHandle<std::io::Result<()>> {
let config = config.clone();
tokio::spawn(async move {
let result = observer(config, peers_reader, server_reader, system_reader).await;
if let Err(ref e) = result {
warn!("Abnormal termination of the state observer: {}", e);
warn!("The state observer will not be available");
}
result
})
}
async fn observer(
config: crate::config::ObserveConfig,
peers_reader: tokio::sync::watch::Receiver<Vec<ObservablePeerState>>,
server_reader: tokio::sync::watch::Receiver<Vec<ServerData>>,
system_reader: tokio::sync::watch::Receiver<SystemSnapshot>,
) -> std::io::Result<()> {
let path = match config.path {
Some(path) => path,
None => return Ok(()),
};
let peers_listener = create_unix_socket(&path)?;
let permissions: std::fs::Permissions = PermissionsExt::from_mode(config.mode);
std::fs::set_permissions(&path, permissions)?;
loop {
let (mut stream, _addr) = peers_listener.accept().await?;
let observe = ObservableState {
peers: peers_reader.borrow().to_owned(),
system: *system_reader.borrow(),
servers: server_reader.borrow().iter().map(|s| s.into()).collect(),
};
crate::sockets::write_json(&mut stream, &observe).await?;
}
}
#[cfg(test)]
mod tests {
use std::{borrow::BorrowMut, time::Duration};
use ntp_proto::{
NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, PollIntervalLimits,
Reach, ReferenceId, TimeSnapshot,
};
use tokio::{io::AsyncReadExt, net::UnixStream};
use super::*;
#[derive(Debug, Clone, Default)]
struct TestClock {}
impl NtpClock for TestClock {
type Error = std::io::Error;
fn now(&self) -> std::result::Result<NtpTimestamp, Self::Error> {
Err(std::io::Error::from(std::io::ErrorKind::Unsupported))
}
fn set_frequency(&self, _freq: f64) -> Result<NtpTimestamp, Self::Error> {
Ok(NtpTimestamp::default())
}
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
Ok(NtpTimestamp::default())
}
fn enable_ntp_algorithm(&self) -> Result<(), Self::Error> {
Ok(())
}
fn disable_ntp_algorithm(&self) -> Result<(), Self::Error> {
Ok(())
}
fn ntp_algorithm_update(
&self,
_offset: NtpDuration,
_poll_interval: PollInterval,
) -> Result<(), Self::Error> {
Ok(())
}
fn error_estimate_update(
&self,
_est_error: NtpDuration,
_max_error: NtpDuration,
) -> Result<(), Self::Error> {
Ok(())
}
fn status_update(&self, _leap_status: NtpLeapIndicator) -> Result<(), Self::Error> {
Ok(())
}
}
#[tokio::test]
async fn test_observation() {
let path = std::env::temp_dir().join("ntp-test-stream-2");
let config = crate::config::ObserveConfig {
path: Some(path.clone()),
mode: 0o700,
};
let (_, peers_reader) = tokio::sync::watch::channel(vec![
ObservablePeerState::Nothing,
ObservablePeerState::Nothing,
ObservablePeerState::Observable {
timedata: Default::default(),
reachability: Reach::default(),
poll_interval: PollIntervalLimits::default().min,
peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
address: "127.0.0.3:123".into(),
},
]);
let (_, servers_reader) = tokio::sync::watch::channel(vec![]);
let (_, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
stratum: 1,
reference_id: ReferenceId::NONE,
accumulated_steps_threshold: None,
time_snapshot: TimeSnapshot {
poll_interval: PollIntervalLimits::default().min,
precision: NtpDuration::from_seconds(1e-3),
root_delay: NtpDuration::ZERO,
root_dispersion: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::Leap59,
accumulated_steps: NtpDuration::ZERO,
},
});
let handle = tokio::spawn(async move {
observer(config, peers_reader, servers_reader, system_reader)
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(10)).await;
let mut reader = UnixStream::connect(path).await.unwrap();
let mut buf = vec![];
while reader.read_buf(&mut buf).await.unwrap() != 0 {}
let result: ObservableState = serde_json::from_slice(&buf).unwrap();
let mut count = 0;
for peer in &result.peers {
if matches!(peer, ObservablePeerState::Observable { .. }) {
count += 1;
}
}
assert_eq!(count, 1);
handle.abort();
}
#[tokio::test]
async fn test_block_during_read() {
let path = std::env::temp_dir().join("ntp-test-stream-3");
let config = crate::config::ObserveConfig {
path: Some(path.clone()),
mode: 0o700,
};
let (mut peers_writer, peers_reader) = tokio::sync::watch::channel(vec![
ObservablePeerState::Nothing,
ObservablePeerState::Nothing,
ObservablePeerState::Observable {
timedata: Default::default(),
reachability: Reach::default(),
poll_interval: PollIntervalLimits::default().min,
peer_id: ReferenceId::from_ip("127.0.0.1".parse().unwrap()),
address: "127.0.0.3:123".into(),
},
]);
let (mut server_writer, servers_reader) = tokio::sync::watch::channel(vec![]);
let (mut system_writer, system_reader) = tokio::sync::watch::channel(SystemSnapshot {
stratum: 1,
reference_id: ReferenceId::NONE,
accumulated_steps_threshold: None,
time_snapshot: TimeSnapshot {
poll_interval: PollIntervalLimits::default().min,
precision: NtpDuration::from_seconds(1e-3),
root_delay: NtpDuration::ZERO,
root_dispersion: NtpDuration::ZERO,
leap_indicator: NtpLeapIndicator::Leap59,
accumulated_steps: NtpDuration::ZERO,
},
});
let handle = tokio::spawn(async move {
observer(config, peers_reader, servers_reader, system_reader)
.await
.unwrap();
});
tokio::time::sleep(Duration::from_millis(10)).await;
let mut reader = UnixStream::connect(path).await.unwrap();
let mut buf = [0_u8; 12];
let mut bufref: &mut [u8] = &mut buf;
reader.read_buf(&mut bufref).await.unwrap();
let _ = system_writer.borrow_mut();
let _ = peers_writer.borrow_mut();
let _ = server_writer.borrow_mut();
handle.abort();
}
}