use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct PtpTimestamp {
pub seconds: u64,
pub nanoseconds: u32,
}
impl PtpTimestamp {
#[must_use]
pub const fn new(seconds: u64, nanoseconds: u32) -> Self {
Self {
seconds,
nanoseconds,
}
}
#[must_use]
pub fn to_nanos(self) -> u64 {
self.seconds
.saturating_mul(1_000_000_000)
.saturating_add(u64::from(self.nanoseconds))
}
#[must_use]
pub fn diff_nanos(self, other: Self) -> i64 {
let a = self.to_nanos();
let b = other.to_nanos();
(a as i64).wrapping_sub(b as i64)
}
#[must_use]
pub fn add_nanos(self, nanos: i64) -> Self {
let total = (self.to_nanos() as i64).saturating_add(nanos);
if total < 0 {
return Self::default();
}
let total_u = total as u64;
Self {
seconds: total_u / 1_000_000_000,
nanoseconds: (total_u % 1_000_000_000) as u32,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct PortIdentity {
pub clock_identity: [u8; 8],
pub port_number: u16,
}
impl PortIdentity {
#[must_use]
pub fn from_u64(id: u64, port: u16) -> Self {
Self {
clock_identity: id.to_be_bytes(),
port_number: port,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum MessageType {
Sync = 0x0,
DelayReq = 0x1,
FollowUp = 0x8,
DelayResp = 0x9,
Announce = 0xB,
Signaling = 0xC,
Management = 0xD,
}
impl MessageType {
#[must_use]
pub fn from_nibble(v: u8) -> Option<Self> {
match v & 0x0F {
0x0 => Some(Self::Sync),
0x1 => Some(Self::DelayReq),
0x8 => Some(Self::FollowUp),
0x9 => Some(Self::DelayResp),
0xB => Some(Self::Announce),
0xC => Some(Self::Signaling),
0xD => Some(Self::Management),
_ => None,
}
}
#[must_use]
pub fn is_event(self) -> bool {
matches!(self, Self::Sync | Self::DelayReq)
}
}
impl std::fmt::Display for MessageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Sync => "Sync",
Self::DelayReq => "Delay_Req",
Self::FollowUp => "Follow_Up",
Self::DelayResp => "Delay_Resp",
Self::Announce => "Announce",
Self::Signaling => "Signaling",
Self::Management => "Management",
};
write!(f, "{s}")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MessageHeader {
pub transport_and_type: u8,
pub ptp_version: u8,
pub message_length: u16,
pub domain_number: u8,
pub minor_sdo_id: u8,
pub flags: u16,
pub correction_ns: i64,
pub type_specific: u32,
pub source_port_identity: PortIdentity,
pub sequence_id: u16,
pub control: u8,
pub log_message_interval: i8,
}
impl MessageHeader {
pub const FLAG_TWO_STEP: u16 = 1 << 9;
pub const FLAG_UTC_REASONABLE: u16 = 1 << 2;
pub const FLAG_PTP_TIMESCALE: u16 = 1 << 3;
#[must_use]
pub fn new(
msg_type: MessageType,
source: PortIdentity,
domain: u8,
sequence_id: u16,
message_length: u16,
) -> Self {
let control = match msg_type {
MessageType::Sync => 0,
MessageType::DelayReq => 1,
MessageType::FollowUp => 2,
MessageType::DelayResp => 3,
MessageType::Announce => 5,
_ => 4,
};
Self {
transport_and_type: msg_type as u8,
ptp_version: 2,
message_length,
domain_number: domain,
minor_sdo_id: 0,
flags: 0,
correction_ns: 0,
type_specific: 0,
source_port_identity: source,
sequence_id,
control,
log_message_interval: 0,
}
}
#[must_use]
pub fn message_type(&self) -> Option<MessageType> {
MessageType::from_nibble(self.transport_and_type)
}
#[must_use]
pub fn is_two_step(&self) -> bool {
self.flags & Self::FLAG_TWO_STEP != 0
}
}
#[derive(Debug, Clone)]
pub struct AnnounceMessage {
pub header: MessageHeader,
pub origin_timestamp: PtpTimestamp,
pub current_utc_offset: i16,
pub reserved: u8,
pub grandmaster_priority1: u8,
pub grandmaster_clock_class: u8,
pub grandmaster_clock_accuracy: u8,
pub grandmaster_clock_variance: u16,
pub grandmaster_priority2: u8,
pub grandmaster_identity: [u8; 8],
pub steps_removed: u16,
pub time_source: u8,
}
impl AnnounceMessage {
#[must_use]
pub fn new(
source: PortIdentity,
domain: u8,
sequence_id: u16,
origin_timestamp: PtpTimestamp,
grandmaster_identity: [u8; 8],
priority1: u8,
priority2: u8,
clock_class: u8,
steps_removed: u16,
announce_interval_log: i8,
) -> Self {
let mut hdr = MessageHeader::new(
MessageType::Announce,
source,
domain,
sequence_id,
64,
);
hdr.log_message_interval = announce_interval_log;
Self {
header: hdr,
origin_timestamp,
current_utc_offset: 37, reserved: 0,
grandmaster_priority1: priority1,
grandmaster_clock_class: clock_class,
grandmaster_clock_accuracy: 0x21, grandmaster_clock_variance: 0x4E5D,
grandmaster_priority2: priority2,
grandmaster_identity,
steps_removed,
time_source: 0x20, }
}
pub fn validate(&self) -> Result<(), String> {
if self.header.ptp_version != 2 {
return Err(format!(
"unsupported PTP version {}",
self.header.ptp_version
));
}
if self.steps_removed > 255 {
return Err(format!(
"steps_removed {} exceeds maximum 255",
self.steps_removed
));
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct SyncMessage {
pub header: MessageHeader,
pub origin_timestamp: PtpTimestamp,
}
impl SyncMessage {
#[must_use]
pub fn one_step(
source: PortIdentity,
domain: u8,
sequence_id: u16,
origin_timestamp: PtpTimestamp,
sync_interval_log: i8,
) -> Self {
let mut hdr = MessageHeader::new(
MessageType::Sync,
source,
domain,
sequence_id,
44,
);
hdr.log_message_interval = sync_interval_log;
Self {
header: hdr,
origin_timestamp,
}
}
#[must_use]
pub fn two_step(
source: PortIdentity,
domain: u8,
sequence_id: u16,
sync_interval_log: i8,
) -> Self {
let mut hdr = MessageHeader::new(MessageType::Sync, source, domain, sequence_id, 44);
hdr.log_message_interval = sync_interval_log;
hdr.flags |= MessageHeader::FLAG_TWO_STEP;
Self {
header: hdr,
origin_timestamp: PtpTimestamp::default(),
}
}
}
#[derive(Debug, Clone)]
pub struct FollowUpMessage {
pub header: MessageHeader,
pub precise_origin_timestamp: PtpTimestamp,
}
impl FollowUpMessage {
#[must_use]
pub fn new(
source: PortIdentity,
domain: u8,
sync_sequence_id: u16,
precise_origin_timestamp: PtpTimestamp,
) -> Self {
let hdr = MessageHeader::new(
MessageType::FollowUp,
source,
domain,
sync_sequence_id,
44,
);
Self {
header: hdr,
precise_origin_timestamp,
}
}
}
#[derive(Debug, Clone)]
pub struct DelayReqMessage {
pub header: MessageHeader,
pub origin_timestamp: PtpTimestamp,
}
impl DelayReqMessage {
#[must_use]
pub fn new(
source: PortIdentity,
domain: u8,
sequence_id: u16,
origin_timestamp: PtpTimestamp,
) -> Self {
let hdr = MessageHeader::new(MessageType::DelayReq, source, domain, sequence_id, 44);
Self {
header: hdr,
origin_timestamp,
}
}
}
#[derive(Debug, Clone)]
pub struct DelayRespMessage {
pub header: MessageHeader,
pub receive_timestamp: PtpTimestamp,
pub requesting_port_identity: PortIdentity,
}
impl DelayRespMessage {
#[must_use]
pub fn new(
source: PortIdentity,
domain: u8,
sequence_id: u16,
receive_timestamp: PtpTimestamp,
requesting_port_identity: PortIdentity,
) -> Self {
let hdr = MessageHeader::new(
MessageType::DelayResp,
source,
domain,
sequence_id,
54,
);
Self {
header: hdr,
receive_timestamp,
requesting_port_identity,
}
}
}
#[derive(Debug, Clone)]
pub struct OffsetEstimator {
alpha: f64,
estimate_ns: f64,
sample_count: u64,
initialized: bool,
}
impl OffsetEstimator {
pub fn new(alpha: f64) -> Result<Self, String> {
if alpha <= 0.0 || alpha > 1.0 {
return Err(format!("alpha {alpha} must be in (0, 1]"));
}
Ok(Self {
alpha,
estimate_ns: 0.0,
sample_count: 0,
initialized: false,
})
}
pub fn update(&mut self, offset_ns: i64) {
let x = offset_ns as f64;
if !self.initialized {
self.estimate_ns = x;
self.initialized = true;
} else {
self.estimate_ns = self.alpha * x + (1.0 - self.alpha) * self.estimate_ns;
}
self.sample_count += 1;
}
#[must_use]
pub fn estimate_ns(&self) -> i64 {
self.estimate_ns.round() as i64
}
#[must_use]
pub fn sample_count(&self) -> u64 {
self.sample_count
}
pub fn reset(&mut self) {
self.estimate_ns = 0.0;
self.sample_count = 0;
self.initialized = false;
}
}
#[derive(Debug, Clone)]
pub struct ClockServo {
pub kp: f64,
pub ki: f64,
integral_ns: f64,
pub integral_clamp_ns: f64,
pub last_correction_ppb: f64,
lock_count: u32,
pub lock_threshold_ns: i64,
pub lock_count_threshold: u32,
locked: bool,
}
impl ClockServo {
#[must_use]
pub fn new(kp: f64, ki: f64, integral_clamp_ns: f64) -> Self {
Self {
kp,
ki,
integral_ns: 0.0,
integral_clamp_ns: integral_clamp_ns.abs(),
last_correction_ppb: 0.0,
lock_count: 0,
lock_threshold_ns: 100,
lock_count_threshold: 4,
locked: false,
}
}
pub fn update(&mut self, offset_ns: i64) -> f64 {
let e = offset_ns as f64;
self.integral_ns =
(self.integral_ns + e).clamp(-self.integral_clamp_ns, self.integral_clamp_ns);
let correction = self.kp * e + self.ki * self.integral_ns;
self.last_correction_ppb = correction;
if offset_ns.abs() <= self.lock_threshold_ns {
self.lock_count = self.lock_count.saturating_add(1);
if self.lock_count >= self.lock_count_threshold {
self.locked = true;
}
} else {
self.lock_count = 0;
self.locked = false;
}
correction
}
#[must_use]
pub fn is_locked(&self) -> bool {
self.locked
}
pub fn reset(&mut self) {
self.integral_ns = 0.0;
self.last_correction_ppb = 0.0;
self.lock_count = 0;
self.locked = false;
}
}
#[derive(Debug, Clone, Copy)]
pub struct PathDelay {
pub t1: PtpTimestamp,
pub t2: PtpTimestamp,
pub t3: PtpTimestamp,
pub t4: PtpTimestamp,
}
impl PathDelay {
#[must_use]
pub fn mean_delay_nanos(&self) -> Option<u64> {
let fwd = self.t2.diff_nanos(self.t1); let rev = self.t4.diff_nanos(self.t3); let sum = fwd.checked_add(rev)?;
if sum < 0 {
return None;
}
Some((sum as u64) / 2)
}
#[must_use]
pub fn offset_nanos(&self) -> Option<i64> {
let delay = self.mean_delay_nanos()? as i64;
let fwd = self.t2.diff_nanos(self.t1);
Some(fwd - delay)
}
#[must_use]
pub fn mean_delay_duration(&self) -> Option<Duration> {
let ns = self.mean_delay_nanos()?;
Some(Duration::from_nanos(ns))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn ts(s: u64, ns: u32) -> PtpTimestamp {
PtpTimestamp::new(s, ns)
}
fn port(id: u64, port: u16) -> PortIdentity {
PortIdentity::from_u64(id, port)
}
#[test]
fn test_ptp_timestamp_to_nanos() {
let t = ts(1, 500_000_000);
assert_eq!(t.to_nanos(), 1_500_000_000);
}
#[test]
fn test_ptp_timestamp_diff_positive() {
let a = ts(2, 0);
let b = ts(1, 0);
assert_eq!(a.diff_nanos(b), 1_000_000_000);
}
#[test]
fn test_ptp_timestamp_add_nanos() {
let t = ts(1, 0);
let t2 = t.add_nanos(500);
assert_eq!(t2.seconds, 1);
assert_eq!(t2.nanoseconds, 500);
}
#[test]
fn test_ptp_timestamp_add_nanos_wrap() {
let t = ts(0, 999_999_900);
let t2 = t.add_nanos(200);
assert_eq!(t2.seconds, 1);
assert_eq!(t2.nanoseconds, 100);
}
#[test]
fn test_message_type_from_nibble_roundtrip() {
let types = [
MessageType::Sync,
MessageType::DelayReq,
MessageType::FollowUp,
MessageType::DelayResp,
MessageType::Announce,
];
for mt in types {
let nibble = mt as u8;
let decoded = MessageType::from_nibble(nibble).expect("should decode");
assert_eq!(mt, decoded);
}
}
#[test]
fn test_sync_is_event_message() {
assert!(MessageType::Sync.is_event());
assert!(MessageType::DelayReq.is_event());
assert!(!MessageType::FollowUp.is_event());
assert!(!MessageType::Announce.is_event());
}
#[test]
fn test_message_header_construction() {
let src = port(0xAABBCC, 1);
let hdr = MessageHeader::new(MessageType::Sync, src, 0, 42, 44);
assert_eq!(hdr.ptp_version, 2);
assert_eq!(hdr.sequence_id, 42);
assert_eq!(hdr.message_length, 44);
assert_eq!(hdr.message_type(), Some(MessageType::Sync));
}
#[test]
fn test_two_step_flag() {
let src = port(0x01, 1);
let sync = SyncMessage::two_step(src, 0, 1, 0);
assert!(sync.header.is_two_step());
}
#[test]
fn test_one_step_not_two_step() {
let src = port(0x01, 1);
let sync = SyncMessage::one_step(src, 0, 1, ts(100, 0), 0);
assert!(!sync.header.is_two_step());
}
#[test]
fn test_announce_message_validates_ok() {
let src = port(0x1234_5678_9ABC_DEF0, 1);
let msg = AnnounceMessage::new(src, 0, 1, ts(1_000_000, 0), [0u8; 8], 128, 128, 135, 0, 0);
assert!(msg.validate().is_ok());
}
#[test]
fn test_announce_message_wrong_version_fails() {
let src = port(0x01, 1);
let mut msg = AnnounceMessage::new(src, 0, 1, ts(0, 0), [0u8; 8], 128, 128, 248, 0, 0);
msg.header.ptp_version = 1;
assert!(msg.validate().is_err());
}
#[test]
fn test_follow_up_sequence_matches_sync() {
let src = port(0x01, 1);
let seq = 77u16;
let fu = FollowUpMessage::new(src, 0, seq, ts(1, 500_000));
assert_eq!(fu.header.sequence_id, seq);
assert_eq!(fu.precise_origin_timestamp, ts(1, 500_000));
}
#[test]
fn test_delay_req_resp_pair() {
let master = port(0xAA, 1);
let slave = port(0xBB, 1);
let t3 = ts(100, 0);
let req = DelayReqMessage::new(slave, 0, 5, t3);
assert_eq!(req.header.sequence_id, 5);
let t4 = ts(100, 500);
let resp = DelayRespMessage::new(master, 0, 5, t4, slave);
assert_eq!(resp.receive_timestamp, t4);
assert_eq!(resp.requesting_port_identity, slave);
assert_eq!(resp.header.sequence_id, 5);
}
#[test]
fn test_path_delay_symmetric() {
let pd = PathDelay {
t1: ts(0, 0),
t2: ts(0, 500),
t3: ts(0, 600),
t4: ts(0, 1_100),
};
assert_eq!(pd.mean_delay_nanos(), Some(500));
assert_eq!(pd.offset_nanos(), Some(0));
}
#[test]
fn test_path_delay_asymmetric() {
let pd = PathDelay {
t1: ts(0, 0),
t2: ts(0, 600),
t3: ts(0, 700),
t4: ts(0, 1_100),
};
assert_eq!(pd.mean_delay_nanos(), Some(500));
assert_eq!(pd.offset_nanos(), Some(100));
}
#[test]
fn test_path_delay_duration_conversion() {
let pd = PathDelay {
t1: ts(0, 0),
t2: ts(0, 2_000),
t3: ts(0, 3_000),
t4: ts(0, 5_000),
};
let dur = pd.mean_delay_duration().expect("should compute duration");
assert_eq!(dur.as_nanos(), 2000);
}
#[test]
fn test_offset_estimator_first_sample_exact() {
let mut est = OffsetEstimator::new(0.5).expect("valid alpha");
est.update(1000);
assert_eq!(est.estimate_ns(), 1000);
}
#[test]
fn test_offset_estimator_smoothing() {
let mut est = OffsetEstimator::new(0.5).expect("valid alpha");
est.update(0);
est.update(1000);
assert_eq!(est.estimate_ns(), 500);
}
#[test]
fn test_offset_estimator_invalid_alpha() {
assert!(OffsetEstimator::new(0.0).is_err());
assert!(OffsetEstimator::new(1.5).is_err());
}
#[test]
fn test_offset_estimator_reset() {
let mut est = OffsetEstimator::new(0.3).expect("valid");
est.update(5000);
est.update(4000);
est.reset();
assert_eq!(est.sample_count(), 0);
est.update(100);
assert_eq!(est.estimate_ns(), 100);
}
#[test]
fn test_servo_zero_offset_zero_correction() {
let mut servo = ClockServo::new(0.7, 0.3, 200_000.0);
let correction = servo.update(0);
assert!((correction).abs() < 1e-6);
}
#[test]
fn test_servo_positive_offset_positive_correction() {
let mut servo = ClockServo::new(0.7, 0.3, 200_000.0);
let correction = servo.update(100);
assert!(
correction > 0.0,
"positive offset should yield positive correction"
);
}
#[test]
fn test_servo_lock_after_sustained_small_offset() {
let mut servo = ClockServo::new(0.7, 0.3, 200_000.0);
servo.lock_threshold_ns = 200; servo.lock_count_threshold = 3;
assert!(!servo.is_locked());
for _ in 0..4 {
servo.update(50); }
assert!(servo.is_locked());
}
#[test]
fn test_servo_loses_lock_on_large_offset() {
let mut servo = ClockServo::new(0.7, 0.3, 200_000.0);
servo.lock_threshold_ns = 200;
servo.lock_count_threshold = 3;
for _ in 0..4 {
servo.update(50);
}
assert!(servo.is_locked());
servo.update(50_000); assert!(!servo.is_locked());
}
#[test]
fn test_servo_reset_clears_lock() {
let mut servo = ClockServo::new(0.7, 0.3, 200_000.0);
servo.lock_threshold_ns = 200;
servo.lock_count_threshold = 3;
for _ in 0..4 {
servo.update(50);
}
servo.reset();
assert!(!servo.is_locked());
}
#[test]
fn test_servo_integral_clamp() {
let mut servo = ClockServo::new(0.0, 1.0, 100.0); for _ in 0..1000 {
servo.update(10_000);
}
assert!(
servo.last_correction_ppb.abs() <= 101.0,
"integral clamp should limit correction: {}",
servo.last_correction_ppb
);
}
}