ntpd 1.7.2

Full-featured implementation of NTP with NTS support
Documentation
use std::path::PathBuf;

use ntp_proto::{
    Measurement, NtpClock, NtpDuration, NtpInstant, NtpLeapIndicator, OneWaySource,
    OneWaySourceSnapshot, OneWaySourceUpdate, ReferenceId, SourceController, SystemSourceUpdate,
};
use pps_time::PpsDevice;
use tokio::sync::mpsc;
use tracing::{Instrument, Span, debug, error, instrument, warn};

use crate::daemon::{exitcode, ntp_source::MsgForSystem};

use super::{ntp_source::SourceChannels, spawn::SourceId};

struct PpsDeviceFetchTask {
    pps: PpsDevice,
    fetch_sender: mpsc::Sender<pps_time::pps::pps_fdata>,
}

impl PpsDeviceFetchTask {
    fn run(&self) {
        loop {
            match self.pps.fetch_blocking() {
                Err(e) => error!("PPS error: {}", e),
                Ok(data) => self.fetch_sender.blocking_send(data).unwrap(),
            }
        }
    }
}

pub(crate) struct PpsSourceTask<
    C: 'static + NtpClock + Send,
    Controller: SourceController<MeasurementDelay = ()>,
> {
    index: SourceId,
    clock: C,
    channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
    path: PathBuf,
    source: OneWaySource<Controller>,
    fetch_receiver: mpsc::Receiver<pps_time::pps::pps_fdata>,
}

impl<C, Controller: SourceController<MeasurementDelay = ()>> PpsSourceTask<C, Controller>
where
    C: 'static + NtpClock + Send + Sync,
{
    async fn run(&mut self) {
        loop {
            enum SelectResult<Controller: SourceController> {
                PpsRecv(Option<pps_time::pps::pps_fdata>),
                SystemUpdate(
                    Result<
                        SystemSourceUpdate<Controller::ControllerMessage>,
                        tokio::sync::broadcast::error::RecvError,
                    >,
                ),
            }

            let selected: SelectResult<Controller> = tokio::select! {
                result = self.fetch_receiver.recv() => {
                    SelectResult::PpsRecv(result)
                },
                result = self.channels.system_update_receiver.recv() => {
                    SelectResult::SystemUpdate(result)
                }
            };

            match selected {
                SelectResult::PpsRecv(result) => match result {
                    Some(data) => {
                        debug!("received {:?}", data);

                        let time = match self.clock.now() {
                            Ok(time) => time,
                            Err(e) => {
                                error!(error = ?e, "There was an error retrieving the current time");
                                std::process::exit(exitcode::NOPERM);
                            }
                        };

                        let offset = f64::from(-data.info.assert_tu.nsec) / 1_000_000_000.;
                        debug!("offset: {}", offset);

                        let measurement = Measurement {
                            delay: (),
                            offset: NtpDuration::from_seconds(offset),
                            localtime: time,
                            monotime: NtpInstant::now(),

                            stratum: 0,
                            root_delay: NtpDuration::ZERO,
                            root_dispersion: NtpDuration::ZERO,
                            leap: NtpLeapIndicator::NoWarning,
                            precision: 0,
                        };

                        let controller_message = self.source.handle_measurement(measurement);

                        let update = OneWaySourceUpdate {
                            snapshot: OneWaySourceSnapshot {
                                source_id: ReferenceId::PPS,
                                stratum: 0,
                            },
                            message: controller_message,
                        };

                        self.channels
                            .msg_for_system_sender
                            .send(MsgForSystem::OneWaySourceUpdate(self.index, update))
                            .await
                            .ok();

                        self.channels
                            .source_snapshots
                            .write()
                            .expect("Unexpected poisoned mutex")
                            .insert(
                                self.index,
                                self.source.observe(
                                    "PPS device".to_string(),
                                    self.path.display().to_string(),
                                    self.index,
                                ),
                            );
                    }
                    None => {
                        warn!("Did not receive any new PPS data");
                        continue;
                    }
                },
                SelectResult::SystemUpdate(result) => match result {
                    Ok(update) => {
                        self.source.handle_message(update.message);
                    }
                    Err(e) => {
                        error!("Error receiving system update: {:?}", e);
                    }
                },
            }
        }
    }

    #[instrument(level = tracing::Level::ERROR, name = "Pps Source", skip(clock, channels, source))]
    pub fn spawn(
        index: SourceId,
        device_path: PathBuf,
        clock: C,
        channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
        source: OneWaySource<Controller>,
    ) -> tokio::task::JoinHandle<()> {
        let pps = PpsDevice::new(device_path.clone()).expect("Could not open PPS device");
        let cap = pps.get_cap().expect("Could not get PPS capabilities");
        if cap & pps_time::pps::PPS_CANWAIT == 0 {
            panic!("PPS device does not support blocking calls")
        }

        let (fetch_sender, fetch_receiver) = mpsc::channel(1);

        tokio::task::spawn_blocking(|| {
            let process = PpsDeviceFetchTask { pps, fetch_sender };

            process.run();
        });

        tokio::spawn(
            (async move {
                let mut process = PpsSourceTask {
                    index,
                    clock,
                    channels,
                    path: device_path,
                    source,
                    fetch_receiver,
                };

                process.run().await;
            })
            .instrument(Span::current()),
        )
    }
}