1#![deny(unsafe_code)]
27#![warn(missing_docs)]
28#![warn(unreachable_pub)]
29
30pub use ntp_proto::{error, extension, protocol, unix_time};
32
33#[cfg(any(feature = "nts", feature = "nts-smol"))]
35pub(crate) use ntp_proto::nts_common;
36
37#[cfg(any(feature = "tokio", feature = "smol-runtime"))]
42pub mod filter;
43
44#[cfg(any(feature = "tokio", feature = "smol-runtime"))]
46pub mod selection;
47
48#[cfg(any(feature = "tokio", feature = "smol-runtime"))]
50pub mod client_common;
51
52#[cfg(feature = "tokio")]
54pub mod client;
55
56#[cfg(feature = "nts")]
61pub mod nts;
62
63#[cfg(feature = "clock")]
68pub mod clock;
69
70#[cfg(feature = "discipline")]
75pub mod discipline;
76
77#[cfg(feature = "discipline")]
82pub mod clock_adjust;
83
84#[cfg(feature = "symmetric")]
88pub mod symmetric;
89
90#[cfg(feature = "broadcast")]
95pub mod broadcast_client;
96
97pub mod sntp;
105
106#[cfg(feature = "tokio")]
110pub mod async_ntp;
111
112#[cfg(feature = "smol-runtime")]
116pub mod smol_ntp;
117
118#[cfg(feature = "smol-runtime")]
120pub mod smol_client;
121
122#[cfg(feature = "nts-smol")]
127pub mod smol_nts;
128
129use log::debug;
134use protocol::{ConstPackedSizeBytes, ReadBytes, WriteBytes};
135use std::io;
136use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
137use std::ops::Deref;
138use std::time::Duration;
139
140pub(crate) fn bind_addr_for(target: &SocketAddr) -> &'static str {
144 match target {
145 SocketAddr::V4(_) => "0.0.0.0:0",
146 SocketAddr::V6(_) => "[::]:0",
147 }
148}
149
150#[derive(Clone, Copy, Debug)]
179pub struct KissOfDeathError {
180 pub code: protocol::KissOfDeath,
182}
183
184impl std::fmt::Display for KissOfDeathError {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 match self.code {
187 protocol::KissOfDeath::Deny => {
188 write!(
189 f,
190 "server sent Kiss-o'-Death DENY: access denied, stop querying this server"
191 )
192 }
193 protocol::KissOfDeath::Rstr => {
194 write!(
195 f,
196 "server sent Kiss-o'-Death RSTR: access restricted, stop querying this server"
197 )
198 }
199 protocol::KissOfDeath::Rate => {
200 write!(f, "server sent Kiss-o'-Death RATE: reduce polling interval")
201 }
202 }
203 }
204}
205
206impl std::error::Error for KissOfDeathError {}
207
208#[derive(Clone, Copy, Debug)]
214pub struct NtpResult {
215 pub packet: protocol::Packet,
217 pub destination_timestamp: protocol::TimestampFormat,
221 pub offset_seconds: f64,
232 pub delay_seconds: f64,
236}
237
238impl Deref for NtpResult {
239 type Target = protocol::Packet;
240 fn deref(&self) -> &Self::Target {
241 &self.packet
242 }
243}
244
245fn instant_to_f64(instant: &unix_time::Instant) -> f64 {
247 instant.secs() as f64 + (instant.subsec_nanos() as f64 / 1e9)
248}
249
250pub(crate) fn compute_offset_delay(
253 t1: &unix_time::Instant,
254 t2: &unix_time::Instant,
255 t3: &unix_time::Instant,
256 t4: &unix_time::Instant,
257) -> (f64, f64) {
258 let t1 = instant_to_f64(t1);
259 let t2 = instant_to_f64(t2);
260 let t3 = instant_to_f64(t3);
261 let t4 = instant_to_f64(t4);
262 let offset = ((t2 - t1) + (t3 - t4)) / 2.0;
263 let delay = (t4 - t1) - (t3 - t2);
264 (offset, delay)
265}
266
267pub(crate) fn build_request_packet() -> io::Result<(
271 [u8; protocol::Packet::PACKED_SIZE_BYTES],
272 protocol::TimestampFormat,
273)> {
274 let packet = protocol::Packet {
275 leap_indicator: protocol::LeapIndicator::default(),
276 version: protocol::Version::V4,
277 mode: protocol::Mode::Client,
278 stratum: protocol::Stratum::UNSPECIFIED,
279 poll: 0,
280 precision: 0,
281 root_delay: protocol::ShortFormat::default(),
282 root_dispersion: protocol::ShortFormat::default(),
283 reference_id: protocol::ReferenceIdentifier::PrimarySource(protocol::PrimarySource::Null),
284 reference_timestamp: protocol::TimestampFormat::default(),
285 origin_timestamp: protocol::TimestampFormat::default(),
286 receive_timestamp: protocol::TimestampFormat::default(),
287 transmit_timestamp: unix_time::Instant::now().into(),
288 };
289 let t1 = packet.transmit_timestamp;
290 let mut send_buf = [0u8; protocol::Packet::PACKED_SIZE_BYTES];
291 (&mut send_buf[..]).write_bytes(packet)?;
292 Ok((send_buf, t1))
293}
294
295pub(crate) fn parse_and_validate_response(
306 recv_buf: &[u8],
307 recv_len: usize,
308 src_addr: SocketAddr,
309 resolved_addrs: &[SocketAddr],
310) -> io::Result<(protocol::Packet, protocol::TimestampFormat)> {
311 let t4_instant = unix_time::Instant::now();
313 let t4: protocol::TimestampFormat = t4_instant.into();
314
315 if !resolved_addrs.iter().any(|a| a.ip() == src_addr.ip()) {
317 return Err(io::Error::new(
318 io::ErrorKind::InvalidData,
319 "response from unexpected source address",
320 ));
321 }
322
323 if recv_len < protocol::Packet::PACKED_SIZE_BYTES {
325 return Err(io::Error::new(
326 io::ErrorKind::InvalidData,
327 "NTP response too short",
328 ));
329 }
330
331 let response: protocol::Packet =
333 (&recv_buf[..protocol::Packet::PACKED_SIZE_BYTES]).read_bytes()?;
334
335 #[cfg(not(feature = "symmetric"))]
337 let valid_mode = response.mode == protocol::Mode::Server;
338 #[cfg(feature = "symmetric")]
339 let valid_mode = response.mode == protocol::Mode::Server
340 || response.mode == protocol::Mode::SymmetricPassive;
341
342 if !valid_mode {
343 return Err(io::Error::new(
344 io::ErrorKind::InvalidData,
345 "unexpected response mode (expected Server)",
346 ));
347 }
348
349 if let protocol::ReferenceIdentifier::KissOfDeath(kod) = response.reference_id {
351 return Err(io::Error::new(
352 io::ErrorKind::ConnectionRefused,
353 KissOfDeathError { code: kod },
354 ));
355 }
356
357 if response.transmit_timestamp.seconds == 0 && response.transmit_timestamp.fraction == 0 {
359 return Err(io::Error::new(
360 io::ErrorKind::InvalidData,
361 "server transmit timestamp is zero",
362 ));
363 }
364
365 if response.leap_indicator == protocol::LeapIndicator::Unknown
367 && response.stratum != protocol::Stratum::UNSPECIFIED
368 {
369 return Err(io::Error::new(
370 io::ErrorKind::InvalidData,
371 "server reports unsynchronized clock",
372 ));
373 }
374
375 Ok((response, t4))
376}
377
378pub(crate) fn validate_response(
384 recv_buf: &[u8],
385 recv_len: usize,
386 src_addr: SocketAddr,
387 resolved_addrs: &[SocketAddr],
388 t1: &protocol::TimestampFormat,
389) -> io::Result<NtpResult> {
390 let (response, t4) = parse_and_validate_response(recv_buf, recv_len, src_addr, resolved_addrs)?;
391
392 if response.origin_timestamp != *t1 {
394 return Err(io::Error::new(
395 io::ErrorKind::InvalidData,
396 "origin timestamp mismatch: response does not match our request",
397 ));
398 }
399
400 let t4_instant = unix_time::Instant::from(t4);
402 let t1_instant = unix_time::timestamp_to_instant(*t1, &t4_instant);
403 let t2_instant = unix_time::timestamp_to_instant(response.receive_timestamp, &t4_instant);
404 let t3_instant = unix_time::timestamp_to_instant(response.transmit_timestamp, &t4_instant);
405
406 let (offset_seconds, delay_seconds) =
407 compute_offset_delay(&t1_instant, &t2_instant, &t3_instant, &t4_instant);
408
409 Ok(NtpResult {
410 packet: response,
411 destination_timestamp: t4,
412 offset_seconds,
413 delay_seconds,
414 })
415}
416
417pub fn request<A: ToSocketAddrs>(addr: A) -> io::Result<NtpResult> {
459 request_with_timeout(addr, Duration::from_secs(5))
460}
461
462pub fn request_with_timeout<A: ToSocketAddrs>(addr: A, timeout: Duration) -> io::Result<NtpResult> {
505 let resolved_addrs: Vec<SocketAddr> = addr.to_socket_addrs()?.collect();
507 if resolved_addrs.is_empty() {
508 return Err(io::Error::new(
509 io::ErrorKind::InvalidInput,
510 "address resolved to no socket addresses",
511 ));
512 }
513 let target_addr = resolved_addrs[0];
514
515 let (send_buf, t1) = build_request_packet()?;
517
518 let sock = UdpSocket::bind(bind_addr_for(&target_addr))?;
520 sock.set_read_timeout(Some(timeout))?;
521 sock.set_write_timeout(Some(timeout))?;
522
523 let sz = sock.send_to(&send_buf, target_addr)?;
525 debug!("{:?}", sock.local_addr());
526 debug!("sent: {}", sz);
527
528 let mut recv_buf = [0u8; 1024];
530 let (recv_len, src_addr) = sock.recv_from(&mut recv_buf[..])?;
531 debug!("recv: {} bytes from {:?}", recv_len, src_addr);
532
533 validate_response(&recv_buf, recv_len, src_addr, &resolved_addrs, &t1)
535}
536
537#[cfg(test)]
538#[test]
539fn test_request_nist() {
540 match request_with_timeout("time.nist.gov:123", Duration::from_secs(10)) {
541 Ok(_) => {}
542 Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
543 eprintln!("skipping test_request_nist: NTP port unreachable ({e})");
544 }
545 Err(e) => panic!("unexpected error from time.nist.gov: {e}"),
546 }
547}
548
549#[cfg(test)]
550#[test]
551fn test_request_nist_alt() {
552 match request_with_timeout("time-a-g.nist.gov:123", Duration::from_secs(10)) {
553 Ok(_) => {}
554 Err(e) if e.kind() == io::ErrorKind::WouldBlock || e.kind() == io::ErrorKind::TimedOut => {
555 eprintln!("skipping test_request_nist_alt: NTP port unreachable ({e})");
556 }
557 Err(e) => panic!("unexpected error from time-a-g.nist.gov: {e}"),
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564
565 #[test]
568 fn test_offset_delay_symmetric() {
569 let t1 = unix_time::Instant::new(0, 0);
573 let t2 = unix_time::Instant::new(0, 500_000_000);
574 let t3 = unix_time::Instant::new(0, 500_000_000);
575 let t4 = unix_time::Instant::new(1, 0);
576 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
577 assert!(offset.abs() < 1e-9, "expected ~0 offset, got {offset}");
578 assert!(
579 (delay - 1.0).abs() < 1e-9,
580 "expected 1.0 delay, got {delay}"
581 );
582 }
583
584 #[test]
585 fn test_offset_delay_local_behind() {
586 let t1 = unix_time::Instant::new(0, 0);
590 let t2 = unix_time::Instant::new(1, 500_000_000);
591 let t3 = unix_time::Instant::new(1, 500_000_000);
592 let t4 = unix_time::Instant::new(1, 0);
593 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
594 assert!(
595 (offset - 1.0).abs() < 1e-9,
596 "expected 1.0 offset, got {offset}"
597 );
598 assert!(
599 (delay - 1.0).abs() < 1e-9,
600 "expected 1.0 delay, got {delay}"
601 );
602 }
603
604 #[test]
605 fn test_offset_delay_local_ahead() {
606 let t1 = unix_time::Instant::new(10, 0);
610 let t2 = unix_time::Instant::new(9, 250_000_000);
611 let t3 = unix_time::Instant::new(9, 750_000_000);
612 let t4 = unix_time::Instant::new(11, 0);
613 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
614 assert!(
615 (offset - (-1.0)).abs() < 1e-9,
616 "expected -1.0 offset, got {offset}"
617 );
618 assert!(
619 (delay - 0.5).abs() < 1e-9,
620 "expected 0.5 delay, got {delay}"
621 );
622 }
623
624 #[test]
625 fn test_offset_delay_zero_processing_time() {
626 let t1 = unix_time::Instant::new(0, 0);
630 let t2 = unix_time::Instant::new(0, 50_000_000);
631 let t3 = unix_time::Instant::new(0, 50_000_000);
632 let t4 = unix_time::Instant::new(0, 100_000_000);
633 let (offset, delay) = compute_offset_delay(&t1, &t2, &t3, &t4);
634 assert!(offset.abs() < 1e-9, "expected ~0 offset, got {offset}");
635 assert!(
636 (delay - 0.1).abs() < 1e-9,
637 "expected 0.1 delay, got {delay}"
638 );
639 }
640
641 #[test]
644 fn test_build_request_packet_structure() {
645 let (buf, t1) = build_request_packet().unwrap();
646
647 let pkt: protocol::Packet = (&buf[..protocol::Packet::PACKED_SIZE_BYTES])
649 .read_bytes()
650 .unwrap();
651 assert_eq!(pkt.version, protocol::Version::V4);
652 assert_eq!(pkt.mode, protocol::Mode::Client);
653 assert_eq!(pkt.stratum, protocol::Stratum::UNSPECIFIED);
654 assert_eq!(pkt.transmit_timestamp, t1);
655 assert!(t1.seconds != 0 || t1.fraction != 0);
657 }
658
659 #[test]
660 fn test_build_request_packet_size() {
661 let (buf, _) = build_request_packet().unwrap();
662 assert_eq!(buf.len(), protocol::Packet::PACKED_SIZE_BYTES);
663 assert_eq!(buf.len(), 48);
664 }
665
666 fn make_server_response(
670 mode: protocol::Mode,
671 li: protocol::LeapIndicator,
672 stratum: protocol::Stratum,
673 ref_id: protocol::ReferenceIdentifier,
674 transmit_secs: u32,
675 ) -> [u8; 48] {
676 let pkt = protocol::Packet {
677 leap_indicator: li,
678 version: protocol::Version::V4,
679 mode,
680 stratum,
681 poll: 6,
682 precision: -20,
683 root_delay: protocol::ShortFormat::default(),
684 root_dispersion: protocol::ShortFormat::default(),
685 reference_id: ref_id,
686 reference_timestamp: protocol::TimestampFormat::default(),
687 origin_timestamp: protocol::TimestampFormat {
688 seconds: 100,
689 fraction: 0,
690 },
691 receive_timestamp: protocol::TimestampFormat {
692 seconds: 3_913_056_000,
693 fraction: 0,
694 },
695 transmit_timestamp: protocol::TimestampFormat {
696 seconds: transmit_secs,
697 fraction: 1,
698 },
699 };
700 let mut buf = [0u8; 48];
701 (&mut buf[..]).write_bytes(pkt).unwrap();
702 buf
703 }
704
705 fn valid_server_buf() -> [u8; 48] {
706 make_server_response(
707 protocol::Mode::Server,
708 protocol::LeapIndicator::NoWarning,
709 protocol::Stratum(2),
710 protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
711 3_913_056_001,
712 )
713 }
714
715 fn src_addr() -> SocketAddr {
716 "127.0.0.1:123".parse().unwrap()
717 }
718
719 #[test]
720 fn test_validate_accepts_valid_response() {
721 let buf = valid_server_buf();
722 let addrs = vec![src_addr()];
723 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
724 assert!(result.is_ok());
725 let (pkt, _t4) = result.unwrap();
726 assert_eq!(pkt.mode, protocol::Mode::Server);
727 }
728
729 #[test]
730 fn test_validate_rejects_wrong_source_ip() {
731 let buf = valid_server_buf();
732 let addrs = vec!["10.0.0.1:123".parse().unwrap()];
733 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
734 assert!(result.is_err());
735 assert!(
736 result
737 .unwrap_err()
738 .to_string()
739 .contains("unexpected source")
740 );
741 }
742
743 #[test]
744 fn test_validate_rejects_short_packet() {
745 let buf = valid_server_buf();
746 let addrs = vec![src_addr()];
747 let result = parse_and_validate_response(&buf, 47, src_addr(), &addrs);
748 assert!(result.is_err());
749 assert!(result.unwrap_err().to_string().contains("too short"));
750 }
751
752 #[test]
753 fn test_validate_rejects_client_mode() {
754 let buf = make_server_response(
755 protocol::Mode::Client,
756 protocol::LeapIndicator::NoWarning,
757 protocol::Stratum(2),
758 protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
759 3_913_056_001,
760 );
761 let addrs = vec![src_addr()];
762 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
763 assert!(result.is_err());
764 assert!(
765 result
766 .unwrap_err()
767 .to_string()
768 .contains("unexpected response mode")
769 );
770 }
771
772 #[test]
773 fn test_validate_rejects_kiss_of_death() {
774 let buf = make_server_response(
775 protocol::Mode::Server,
776 protocol::LeapIndicator::NoWarning,
777 protocol::Stratum::UNSPECIFIED,
778 protocol::ReferenceIdentifier::KissOfDeath(protocol::KissOfDeath::Deny),
779 3_913_056_001,
780 );
781 let addrs = vec![src_addr()];
782 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
783 let err = result.unwrap_err();
784 assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
785 let kod = err
786 .get_ref()
787 .unwrap()
788 .downcast_ref::<KissOfDeathError>()
789 .unwrap();
790 assert!(matches!(kod.code, protocol::KissOfDeath::Deny));
791 }
792
793 #[test]
794 fn test_validate_rejects_zero_transmit() {
795 let pkt = protocol::Packet {
797 leap_indicator: protocol::LeapIndicator::NoWarning,
798 version: protocol::Version::V4,
799 mode: protocol::Mode::Server,
800 stratum: protocol::Stratum(2),
801 poll: 6,
802 precision: -20,
803 root_delay: protocol::ShortFormat::default(),
804 root_dispersion: protocol::ShortFormat::default(),
805 reference_id: protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
806 reference_timestamp: protocol::TimestampFormat::default(),
807 origin_timestamp: protocol::TimestampFormat::default(),
808 receive_timestamp: protocol::TimestampFormat::default(),
809 transmit_timestamp: protocol::TimestampFormat {
810 seconds: 0,
811 fraction: 0,
812 },
813 };
814 let mut raw = [0u8; 48];
815 (&mut raw[..]).write_bytes(pkt).unwrap();
816 let addrs = vec![src_addr()];
817 let result = parse_and_validate_response(&raw, 48, src_addr(), &addrs);
818 assert!(result.is_err());
819 assert!(
820 result
821 .unwrap_err()
822 .to_string()
823 .contains("transmit timestamp is zero")
824 );
825 }
826
827 #[test]
828 fn test_validate_rejects_unsynchronized() {
829 let buf = make_server_response(
830 protocol::Mode::Server,
831 protocol::LeapIndicator::Unknown,
832 protocol::Stratum(2), protocol::ReferenceIdentifier::SecondaryOrClient([127, 0, 0, 1]),
834 3_913_056_001,
835 );
836 let addrs = vec![src_addr()];
837 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
838 assert!(result.is_err());
839 assert!(result.unwrap_err().to_string().contains("unsynchronized"));
840 }
841
842 #[test]
843 fn test_validate_allows_li_unknown_stratum_zero() {
844 let buf = make_server_response(
847 protocol::Mode::Server,
848 protocol::LeapIndicator::Unknown,
849 protocol::Stratum::UNSPECIFIED,
850 protocol::ReferenceIdentifier::PrimarySource(protocol::PrimarySource::Gps),
851 3_913_056_001,
852 );
853 let addrs = vec![src_addr()];
854 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
855 assert!(result.is_ok());
856 }
857
858 #[test]
859 fn test_validate_accepts_different_port() {
860 let buf = valid_server_buf();
862 let addrs = vec!["127.0.0.1:456".parse().unwrap()];
863 let result = parse_and_validate_response(&buf, 48, src_addr(), &addrs);
864 assert!(result.is_ok());
865 }
866
867 #[test]
870 fn test_kod_display_deny() {
871 let kod = KissOfDeathError {
872 code: protocol::KissOfDeath::Deny,
873 };
874 assert!(kod.to_string().contains("DENY"));
875 }
876
877 #[test]
878 fn test_kod_display_rstr() {
879 let kod = KissOfDeathError {
880 code: protocol::KissOfDeath::Rstr,
881 };
882 assert!(kod.to_string().contains("RSTR"));
883 }
884
885 #[test]
886 fn test_kod_display_rate() {
887 let kod = KissOfDeathError {
888 code: protocol::KissOfDeath::Rate,
889 };
890 assert!(kod.to_string().contains("RATE"));
891 }
892}