1#![deny(unsafe_code)]
27#![warn(missing_docs)]
28#![warn(unreachable_pub)]
29
30pub use ntp_proto::{error, extension, protocol, unix_time};
32
33#[cfg(any(feature = "nts", feature = "nts-smol"))]
35pub(crate) use ntp_proto::nts_common;
36
37#[cfg(any(feature = "tokio", feature = "smol-runtime"))]
42pub mod filter;
43
44#[cfg(any(feature = "tokio", feature = "smol-runtime"))]
46pub mod selection;
47
48#[cfg(any(feature = "tokio", feature = "smol-runtime"))]
50pub mod client_common;
51
52#[cfg(feature = "tokio")]
54pub mod client;
55
56#[cfg(feature = "nts")]
61pub mod nts;
62
63#[cfg(feature = "clock")]
68pub mod clock;
69
70#[cfg(feature = "discipline")]
75pub mod discipline;
76
77#[cfg(feature = "discipline")]
82pub mod clock_adjust;
83
84#[cfg(feature = "symmetric")]
88pub mod symmetric;
89
90#[cfg(feature = "broadcast")]
95pub mod broadcast_client;
96
97#[cfg(any(feature = "refclock", feature = "gps", feature = "pps"))]
102pub mod refclock;
103
104pub mod sntp;
112
113#[cfg(feature = "tokio")]
117pub mod async_ntp;
118
119#[cfg(feature = "smol-runtime")]
123pub mod smol_ntp;
124
125#[cfg(feature = "smol-runtime")]
127pub mod smol_client;
128
129#[cfg(feature = "nts-smol")]
134pub mod smol_nts;
135
136use log::debug;
141use protocol::{ConstPackedSizeBytes, ReadBytes, WriteBytes};
142use std::io;
143use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
144use std::ops::Deref;
145use std::time::Duration;
146
147pub(crate) fn bind_addr_for(target: &SocketAddr) -> &'static str {
151 match target {
152 SocketAddr::V4(_) => "0.0.0.0:0",
153 SocketAddr::V6(_) => "[::]:0",
154 }
155}
156
157#[derive(Clone, Copy, Debug)]
186pub struct KissOfDeathError {
187 pub code: protocol::KissOfDeath,
189}
190
191impl std::fmt::Display for KissOfDeathError {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 match self.code {
194 protocol::KissOfDeath::Deny => {
195 write!(
196 f,
197 "server sent Kiss-o'-Death DENY: access denied, stop querying this server"
198 )
199 }
200 protocol::KissOfDeath::Rstr => {
201 write!(
202 f,
203 "server sent Kiss-o'-Death RSTR: access restricted, stop querying this server"
204 )
205 }
206 protocol::KissOfDeath::Rate => {
207 write!(f, "server sent Kiss-o'-Death RATE: reduce polling interval")
208 }
209 }
210 }
211}
212
213impl std::error::Error for KissOfDeathError {}
214
215#[derive(Clone, Copy, Debug)]
221pub struct NtpResult {
222 pub packet: protocol::Packet,
224 pub destination_timestamp: protocol::TimestampFormat,
228 pub offset_seconds: f64,
239 pub delay_seconds: f64,
243}
244
245impl Deref for NtpResult {
246 type Target = protocol::Packet;
247 fn deref(&self) -> &Self::Target {
248 &self.packet
249 }
250}
251
252fn instant_to_f64(instant: &unix_time::Instant) -> f64 {
254 instant.secs() as f64 + (instant.subsec_nanos() as f64 / 1e9)
255}
256
257pub(crate) fn compute_offset_delay(
260 t1: &unix_time::Instant,
261 t2: &unix_time::Instant,
262 t3: &unix_time::Instant,
263 t4: &unix_time::Instant,
264) -> (f64, f64) {
265 let t1 = instant_to_f64(t1);
266 let t2 = instant_to_f64(t2);
267 let t3 = instant_to_f64(t3);
268 let t4 = instant_to_f64(t4);
269 let offset = ((t2 - t1) + (t3 - t4)) / 2.0;
270 let delay = (t4 - t1) - (t3 - t2);
271 (offset, delay)
272}
273
274pub(crate) fn build_request_packet() -> io::Result<(
278 [u8; protocol::Packet::PACKED_SIZE_BYTES],
279 protocol::TimestampFormat,
280)> {
281 let packet = protocol::Packet {
282 leap_indicator: protocol::LeapIndicator::default(),
283 version: protocol::Version::V4,
284 mode: protocol::Mode::Client,
285 stratum: protocol::Stratum::UNSPECIFIED,
286 poll: 0,
287 precision: 0,
288 root_delay: protocol::ShortFormat::default(),
289 root_dispersion: protocol::ShortFormat::default(),
290 reference_id: protocol::ReferenceIdentifier::PrimarySource(protocol::PrimarySource::Null),
291 reference_timestamp: protocol::TimestampFormat::default(),
292 origin_timestamp: protocol::TimestampFormat::default(),
293 receive_timestamp: protocol::TimestampFormat::default(),
294 transmit_timestamp: unix_time::Instant::now().into(),
295 };
296 let t1 = packet.transmit_timestamp;
297 let mut send_buf = [0u8; protocol::Packet::PACKED_SIZE_BYTES];
298 (&mut send_buf[..]).write_bytes(packet)?;
299 Ok((send_buf, t1))
300}
301
302pub(crate) fn parse_and_validate_response(
313 recv_buf: &[u8],
314 recv_len: usize,
315 src_addr: SocketAddr,
316 resolved_addrs: &[SocketAddr],
317) -> io::Result<(protocol::Packet, protocol::TimestampFormat)> {
318 let t4_instant = unix_time::Instant::now();
320 let t4: protocol::TimestampFormat = t4_instant.into();
321
322 if !resolved_addrs.iter().any(|a| a.ip() == src_addr.ip()) {
324 return Err(io::Error::new(
325 io::ErrorKind::InvalidData,
326 "response from unexpected source address",
327 ));
328 }
329
330 if recv_len < protocol::Packet::PACKED_SIZE_BYTES {
332 return Err(io::Error::new(
333 io::ErrorKind::InvalidData,
334 "NTP response too short",
335 ));
336 }
337
338 let response: protocol::Packet =
340 (&recv_buf[..protocol::Packet::PACKED_SIZE_BYTES]).read_bytes()?;
341
342 #[cfg(not(feature = "symmetric"))]
344 let valid_mode = response.mode == protocol::Mode::Server;
345 #[cfg(feature = "symmetric")]
346 let valid_mode = response.mode == protocol::Mode::Server
347 || response.mode == protocol::Mode::SymmetricPassive;
348
349 if !valid_mode {
350 return Err(io::Error::new(
351 io::ErrorKind::InvalidData,
352 "unexpected response mode (expected Server)",
353 ));
354 }
355
356 if let protocol::ReferenceIdentifier::KissOfDeath(kod) = response.reference_id {
358 return Err(io::Error::new(
359 io::ErrorKind::ConnectionRefused,
360 KissOfDeathError { code: kod },
361 ));
362 }
363
364 if response.transmit_timestamp.seconds == 0 && response.transmit_timestamp.fraction == 0 {
366 return Err(io::Error::new(
367 io::ErrorKind::InvalidData,
368 "server transmit timestamp is zero",
369 ));
370 }
371
372 if response.leap_indicator == protocol::LeapIndicator::Unknown
374 && response.stratum != protocol::Stratum::UNSPECIFIED
375 {
376 return Err(io::Error::new(
377 io::ErrorKind::InvalidData,
378 "server reports unsynchronized clock",
379 ));
380 }
381
382 Ok((response, t4))
383}
384
385pub(crate) fn validate_response(
391 recv_buf: &[u8],
392 recv_len: usize,
393 src_addr: SocketAddr,
394 resolved_addrs: &[SocketAddr],
395 t1: &protocol::TimestampFormat,
396) -> io::Result<NtpResult> {
397 let (response, t4) = parse_and_validate_response(recv_buf, recv_len, src_addr, resolved_addrs)?;
398
399 if response.origin_timestamp != *t1 {
401 return Err(io::Error::new(
402 io::ErrorKind::InvalidData,
403 "origin timestamp mismatch: response does not match our request",
404 ));
405 }
406
407 let t4_instant = unix_time::Instant::from(t4);
409 let t1_instant = unix_time::timestamp_to_instant(*t1, &t4_instant);
410 let t2_instant = unix_time::timestamp_to_instant(response.receive_timestamp, &t4_instant);
411 let t3_instant = unix_time::timestamp_to_instant(response.transmit_timestamp, &t4_instant);
412
413 let (offset_seconds, delay_seconds) =
414 compute_offset_delay(&t1_instant, &t2_instant, &t3_instant, &t4_instant);
415
416 Ok(NtpResult {
417 packet: response,
418 destination_timestamp: t4,
419 offset_seconds,
420 delay_seconds,
421 })
422}
423
424pub fn request<A: ToSocketAddrs>(addr: A) -> io::Result<NtpResult> {
466 request_with_timeout(addr, Duration::from_secs(5))
467}
468
469pub fn request_with_timeout<A: ToSocketAddrs>(addr: A, timeout: Duration) -> io::Result<NtpResult> {
512 let resolved_addrs: Vec<SocketAddr> = addr.to_socket_addrs()?.collect();
514 if resolved_addrs.is_empty() {
515 return Err(io::Error::new(
516 io::ErrorKind::InvalidInput,
517 "address resolved to no socket addresses",
518 ));
519 }
520 let target_addr = resolved_addrs[0];
521
522 let (send_buf, t1) = build_request_packet()?;
524
525 let sock = UdpSocket::bind(bind_addr_for(&target_addr))?;
527 sock.set_read_timeout(Some(timeout))?;
528 sock.set_write_timeout(Some(timeout))?;
529
530 let sz = sock.send_to(&send_buf, target_addr)?;
532 debug!("{:?}", sock.local_addr());
533 debug!("sent: {}", sz);
534
535 let mut recv_buf = [0u8; 1024];
537 let (recv_len, src_addr) = sock.recv_from(&mut recv_buf[..])?;
538 debug!("recv: {} bytes from {:?}", recv_len, src_addr);
539
540 validate_response(&recv_buf, recv_len, src_addr, &resolved_addrs, &t1)
542}
543
544#[cfg(test)]
545#[test]
546fn test_request_nist() {
547 match request_with_timeout("time.nist.gov:123", Duration::from_secs(10)) {
548 Ok(_) => {}
549 Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
550 eprintln!("skipping test_request_nist: NTP port unreachable ({e})");
551 }
552 Err(e) => panic!("unexpected error from time.nist.gov: {e}"),
553 }
554}
555
556#[cfg(test)]
557#[test]
558fn test_request_nist_alt() {
559 match request_with_timeout("time-a-g.nist.gov:123", Duration::from_secs(10)) {
560 Ok(_) => {}
561 Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
562 eprintln!("skipping test_request_nist_alt: NTP port unreachable ({e})");
563 }
564 Err(e) => panic!("unexpected error from time-a-g.nist.gov: {e}"),
565 }
566}
567
568#[cfg(test)]
569mod tests {
570 use super::*;
571
572 #[test]
575 fn test_offset_delay_symmetric() {
576 let t1 = unix_time::Instant::new(0, 0);
580 let t2 = unix_time::Instant::new(0, 500_000_000);
581 let t3 = unix_time::Instant::new(0, 500_000_000);
582 let t4 = unix_time::Instant::new(1, 0);
583 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
584 assert!(offset.abs() < 1e-9, "expected ~0 offset, got {offset}");
585 assert!(
586 (delay - 1.0).abs() < 1e-9,
587 "expected 1.0 delay, got {delay}"
588 );
589 }
590
591 #[test]
592 fn test_offset_delay_local_behind() {
593 let t1 = unix_time::Instant::new(0, 0);
597 let t2 = unix_time::Instant::new(1, 500_000_000);
598 let t3 = unix_time::Instant::new(1, 500_000_000);
599 let t4 = unix_time::Instant::new(1, 0);
600 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
601 assert!(
602 (offset - 1.0).abs() < 1e-9,
603 "expected 1.0 offset, got {offset}"
604 );
605 assert!(
606 (delay - 1.0).abs() < 1e-9,
607 "expected 1.0 delay, got {delay}"
608 );
609 }
610
611 #[test]
612 fn test_offset_delay_local_ahead() {
613 let t1 = unix_time::Instant::new(10, 0);
617 let t2 = unix_time::Instant::new(9, 250_000_000);
618 let t3 = unix_time::Instant::new(9, 750_000_000);
619 let t4 = unix_time::Instant::new(11, 0);
620 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
621 assert!(
622 (offset - (-1.0)).abs() < 1e-9,
623 "expected -1.0 offset, got {offset}"
624 );
625 assert!(
626 (delay - 0.5).abs() < 1e-9,
627 "expected 0.5 delay, got {delay}"
628 );
629 }
630
631 #[test]
632 fn test_offset_delay_zero_processing_time() {
633 let t1 = unix_time::Instant::new(0, 0);
637 let t2 = unix_time::Instant::new(0, 50_000_000);
638 let t3 = unix_time::Instant::new(0, 50_000_000);
639 let t4 = unix_time::Instant::new(0, 100_000_000);
640 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
641 assert!(offset.abs() < 1e-9, "expected ~0 offset, got {offset}");
642 assert!(
643 (delay - 0.1).abs() < 1e-9,
644 "expected 0.1 delay, got {delay}"
645 );
646 }
647
648 #[test]
651 fn test_build_request_packet_structure() {
652 let (buf, t1) = build_request_packet().unwrap();
653
654 let pkt: protocol::Packet = (&buf[..protocol::Packet::PACKED_SIZE_BYTES])
656 .read_bytes()
657 .unwrap();
658 assert_eq!(pkt.version, protocol::Version::V4);
659 assert_eq!(pkt.mode, protocol::Mode::Client);
660 assert_eq!(pkt.stratum, protocol::Stratum::UNSPECIFIED);
661 assert_eq!(pkt.transmit_timestamp, t1);
662 assert!(t1.seconds != 0 || t1.fraction != 0);
664 }
665
666 #[test]
667 fn test_build_request_packet_size() {
668 let (buf, _) = build_request_packet().unwrap();
669 assert_eq!(buf.len(), protocol::Packet::PACKED_SIZE_BYTES);
670 assert_eq!(buf.len(), 48);
671 }
672
673 fn make_server_response(
677 mode: protocol::Mode,
678 li: protocol::LeapIndicator,
679 stratum: protocol::Stratum,
680 ref_id: protocol::ReferenceIdentifier,
681 transmit_secs: u32,
682 ) -> [u8; 48] {
683 let pkt = protocol::Packet {
684 leap_indicator: li,
685 version: protocol::Version::V4,
686 mode,
687 stratum,
688 poll: 6,
689 precision: -20,
690 root_delay: protocol::ShortFormat::default(),
691 root_dispersion: protocol::ShortFormat::default(),
692 reference_id: ref_id,
693 reference_timestamp: protocol::TimestampFormat::default(),
694 origin_timestamp: protocol::TimestampFormat {
695 seconds: 100,
696 fraction: 0,
697 },
698 receive_timestamp: protocol::TimestampFormat {
699 seconds: 3_913_056_000,
700 fraction: 0,
701 },
702 transmit_timestamp: protocol::TimestampFormat {
703 seconds: transmit_secs,
704 fraction: 1,
705 },
706 };
707 let mut buf = [0u8; 48];
708 (&mut buf[..]).write_bytes(pkt).unwrap();
709 buf
710 }
711
712 fn valid_server_buf() -> [u8; 48] {
713 make_server_response(
714 protocol::Mode::Server,
715 protocol::LeapIndicator::NoWarning,
716 protocol::Stratum(2),
717 protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
718 3_913_056_001,
719 )
720 }
721
722 fn src_addr() -> SocketAddr {
723 "127.0.0.1:123".parse().unwrap()
724 }
725
726 #[test]
727 fn test_validate_accepts_valid_response() {
728 let buf = valid_server_buf();
729 let addrs = vec![src_addr()];
730 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
731 assert!(result.is_ok());
732 let (pkt, _t4) = result.unwrap();
733 assert_eq!(pkt.mode, protocol::Mode::Server);
734 }
735
736 #[test]
737 fn test_validate_rejects_wrong_source_ip() {
738 let buf = valid_server_buf();
739 let addrs = vec!["10.0.0.1:123".parse().unwrap()];
740 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
741 assert!(result.is_err());
742 assert!(
743 result
744 .unwrap_err()
745 .to_string()
746 .contains("unexpected source")
747 );
748 }
749
750 #[test]
751 fn test_validate_rejects_short_packet() {
752 let buf = valid_server_buf();
753 let addrs = vec![src_addr()];
754 let result = parse_and_validate_response(&buf, 47, src_addr(), &addrs);
755 assert!(result.is_err());
756 assert!(result.unwrap_err().to_string().contains("too short"));
757 }
758
759 #[test]
760 fn test_validate_rejects_client_mode() {
761 let buf = make_server_response(
762 protocol::Mode::Client,
763 protocol::LeapIndicator::NoWarning,
764 protocol::Stratum(2),
765 protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
766 3_913_056_001,
767 );
768 let addrs = vec![src_addr()];
769 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
770 assert!(result.is_err());
771 assert!(
772 result
773 .unwrap_err()
774 .to_string()
775 .contains("unexpected response mode")
776 );
777 }
778
779 #[test]
780 fn test_validate_rejects_kiss_of_death() {
781 let buf = make_server_response(
782 protocol::Mode::Server,
783 protocol::LeapIndicator::NoWarning,
784 protocol::Stratum::UNSPECIFIED,
785 protocol::ReferenceIdentifier::KissOfDeath(protocol::KissOfDeath::Deny),
786 3_913_056_001,
787 );
788 let addrs = vec![src_addr()];
789 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
790 let err = result.unwrap_err();
791 assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
792 let kod = err
793 .get_ref()
794 .unwrap()
795 .downcast_ref::<KissOfDeathError>()
796 .unwrap();
797 assert!(matches!(kod.code, protocol::KissOfDeath::Deny));
798 }
799
800 #[test]
801 fn test_validate_rejects_zero_transmit() {
802 let pkt = protocol::Packet {
804 leap_indicator: protocol::LeapIndicator::NoWarning,
805 version: protocol::Version::V4,
806 mode: protocol::Mode::Server,
807 stratum: protocol::Stratum(2),
808 poll: 6,
809 precision: -20,
810 root_delay: protocol::ShortFormat::default(),
811 root_dispersion: protocol::ShortFormat::default(),
812 reference_id: protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
813 reference_timestamp: protocol::TimestampFormat::default(),
814 origin_timestamp: protocol::TimestampFormat::default(),
815 receive_timestamp: protocol::TimestampFormat::default(),
816 transmit_timestamp: protocol::TimestampFormat {
817 seconds: 0,
818 fraction: 0,
819 },
820 };
821 let mut raw = [0u8; 48];
822 (&mut raw[..]).write_bytes(pkt).unwrap();
823 let addrs = vec![src_addr()];
824 let result = parse_and_validate_response(&raw, 48, src_addr(), &addrs);
825 assert!(result.is_err());
826 assert!(
827 result
828 .unwrap_err()
829 .to_string()
830 .contains("transmit timestamp is zero")
831 );
832 }
833
834 #[test]
835 fn test_validate_rejects_unsynchronized() {
836 let buf = make_server_response(
837 protocol::Mode::Server,
838 protocol::LeapIndicator::Unknown,
839 protocol::Stratum(2), protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
841 3_913_056_001,
842 );
843 let addrs = vec![src_addr()];
844 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
845 assert!(result.is_err());
846 assert!(result.unwrap_err().to_string().contains("unsynchronized"));
847 }
848
849 #[test]
850 fn test_validate_allows_li_unknown_stratum_zero() {
851 let buf = make_server_response(
854 protocol::Mode::Server,
855 protocol::LeapIndicator::Unknown,
856 protocol::Stratum::UNSPECIFIED,
857 protocol::ReferenceIdentifier::PrimarySource(protocol::PrimarySource::Gps),
858 3_913_056_001,
859 );
860 let addrs = vec![src_addr()];
861 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
862 assert!(result.is_ok());
863 }
864
865 #[test]
866 fn test_validate_accepts_different_port() {
867 let buf = valid_server_buf();
869 let addrs = vec!["127.0.0.1:456".parse().unwrap()];
870 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
871 assert!(result.is_ok());
872 }
873
874 #[test]
877 fn test_kod_display_deny() {
878 let kod = KissOfDeathError {
879 code: protocol::KissOfDeath::Deny,
880 };
881 assert!(kod.to_string().contains("DENY"));
882 }
883
884 #[test]
885 fn test_kod_display_rstr() {
886 let kod = KissOfDeathError {
887 code: protocol::KissOfDeath::Rstr,
888 };
889 assert!(kod.to_string().contains("RSTR"));
890 }
891
892 #[test]
893 fn test_kod_display_rate() {
894 let kod = KissOfDeathError {
895 code: protocol::KissOfDeath::Rate,
896 };
897 assert!(kod.to_string().contains("RATE"));
898 }
899}