use core::{cell::RefCell, ops::ControlFlow};
pub use actions::{
ForwardedTLV, ForwardedTLVProvider, NoForwardedTLVs, PortAction, PortActionIterator,
TimestampContext,
};
pub use measurement::Measurement;
use rand::Rng;
use state::PortState;
use self::sequence_id::SequenceIdGenerator;
pub use crate::datastructures::messages::{
is_compatible as is_message_buffer_compatible, MAX_DATA_LEN,
};
#[cfg(doc)]
use crate::PtpInstance;
use crate::{
bmc::{
acceptable_master::AcceptableMasterList,
bmca::{BestAnnounceMessage, Bmca},
},
clock::Clock,
config::PortConfig,
datastructures::{
common::PortIdentity,
messages::{Message, MessageBody},
},
filters::{Filter, FilterEstimate},
observability::{self, port::PortDS},
ptp_instance::{PtpInstanceState, PtpInstanceStateMutex},
time::{Duration, Time},
};
macro_rules! actions {
[] => {
{
crate::port::PortActionIterator::from(::arrayvec::ArrayVec::new())
}
};
[$action:expr] => {
{
let mut list = ::arrayvec::ArrayVec::new();
list.push($action);
crate::port::PortActionIterator::from(list)
}
};
[$action1:expr, $action2:expr] => {
{
let mut list = ::arrayvec::ArrayVec::new();
list.push($action1);
list.push($action2);
crate::port::PortActionIterator::from(list)
}
};
}
mod actions;
mod bmca;
mod master;
mod measurement;
mod sequence_id;
mod slave;
pub(crate) mod state;
#[derive(Debug)]
pub struct Port<'a, L, A, R, C, F: Filter, S = RefCell<PtpInstanceState>> {
config: PortConfig<()>,
filter_config: F::Config,
clock: C,
pub(crate) port_identity: PortIdentity,
port_state: PortState,
instance_state: &'a S,
bmca: Bmca<A>,
packet_buffer: [u8; MAX_DATA_LEN],
lifecycle: L,
rng: R,
multiport_disable: Option<Duration>,
announce_seq_ids: SequenceIdGenerator,
sync_seq_ids: SequenceIdGenerator,
delay_seq_ids: SequenceIdGenerator,
pdelay_seq_ids: SequenceIdGenerator,
filter: F,
mean_delay: Option<Duration>,
peer_delay_state: PeerDelayState,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum PeerDelayState {
Empty,
Measuring {
id: u16,
responder_identity: Option<PortIdentity>,
request_send_time: Option<Time>,
request_recv_time: Option<Time>,
response_send_time: Option<Time>,
response_recv_time: Option<Time>,
},
PostMeasurement {
id: u16,
responder_identity: PortIdentity,
},
}
#[derive(Debug)]
pub struct Running;
#[derive(Debug)]
pub struct InBmca {
pending_action: PortActionIterator<'static>,
local_best: Option<BestAnnounceMessage>,
}
impl<'a, A: AcceptableMasterList, C: Clock, F: Filter, R: Rng, S: PtpInstanceStateMutex>
Port<'a, Running, A, R, C, F, S>
{
pub fn handle_send_timestamp(
&mut self,
context: TimestampContext,
timestamp: Time,
) -> PortActionIterator<'_> {
match context.inner {
actions::TimestampContextInner::Sync { id } => {
self.handle_sync_timestamp(id, timestamp)
}
actions::TimestampContextInner::DelayReq { id } => {
self.handle_delay_timestamp(id, timestamp)
}
actions::TimestampContextInner::PDelayReq { id } => {
self.handle_pdelay_timestamp(id, timestamp)
}
actions::TimestampContextInner::PDelayResp {
id,
requestor_identity,
} => self.handle_pdelay_response_timestamp(id, requestor_identity, timestamp),
}
}
pub fn handle_announce_timer(
&mut self,
tlv_provider: &mut impl ForwardedTLVProvider,
) -> PortActionIterator<'_> {
self.send_announce(tlv_provider)
}
pub fn handle_sync_timer(&mut self) -> PortActionIterator<'_> {
self.send_sync()
}
pub fn handle_delay_request_timer(&mut self) -> PortActionIterator<'_> {
self.send_delay_request()
}
pub fn handle_announce_receipt_timer(&mut self) -> PortActionIterator<'_> {
if self
.instance_state
.with_ref(|state| state.default_ds.slave_only)
{
if !matches!(self.port_state, PortState::Listening) {
self.set_forced_port_state(PortState::Listening);
}
let duration = self.config.announce_duration(&mut self.rng);
actions![PortAction::ResetAnnounceReceiptTimer { duration }]
} else {
match self.port_state {
PortState::Master => (),
_ => self.set_forced_port_state(PortState::Master),
}
actions![
PortAction::ResetAnnounceTimer {
duration: core::time::Duration::from_secs(0)
},
PortAction::ResetSyncTimer {
duration: core::time::Duration::from_secs(0)
}
]
}
}
pub fn handle_filter_update_timer(&mut self) -> PortActionIterator {
let update = self.filter.update(&mut self.clock);
if update.mean_delay.is_some() {
self.mean_delay = update.mean_delay;
}
PortActionIterator::from_filter(update)
}
pub fn start_bmca(self) -> Port<'a, InBmca, A, R, C, F, S> {
Port {
port_state: self.port_state,
instance_state: self.instance_state,
config: self.config,
filter_config: self.filter_config,
clock: self.clock,
port_identity: self.port_identity,
bmca: self.bmca,
rng: self.rng,
multiport_disable: self.multiport_disable,
packet_buffer: [0; MAX_DATA_LEN],
lifecycle: InBmca {
pending_action: actions![],
local_best: None,
},
announce_seq_ids: self.announce_seq_ids,
sync_seq_ids: self.sync_seq_ids,
delay_seq_ids: self.delay_seq_ids,
pdelay_seq_ids: self.pdelay_seq_ids,
filter: self.filter,
mean_delay: self.mean_delay,
peer_delay_state: self.peer_delay_state,
}
}
fn parse_and_filter<'b>(
&mut self,
data: &'b [u8],
) -> ControlFlow<PortActionIterator<'b>, Message<'b>> {
if !is_message_buffer_compatible(data) {
return ControlFlow::Break(actions![]);
}
let message = match Message::deserialize(data) {
Ok(message) => message,
Err(error) => {
log::warn!("Could not parse packet: {:?}", error);
return ControlFlow::Break(actions![]);
}
};
let domain_matches = self.instance_state.with_ref(|state| {
message.header().sdo_id == state.default_ds.sdo_id
&& message.header().domain_number == state.default_ds.domain_number
});
if !domain_matches {
return ControlFlow::Break(actions![]);
}
ControlFlow::Continue(message)
}
pub fn handle_event_receive<'b>(
&'b mut self,
data: &'b [u8],
timestamp: Time,
) -> PortActionIterator<'b> {
let message = match self.parse_and_filter(data) {
ControlFlow::Continue(value) => value,
ControlFlow::Break(value) => return value,
};
match message.body {
MessageBody::Sync(sync) => self.handle_sync(message.header, sync, timestamp),
MessageBody::DelayReq(delay_request) => {
self.handle_delay_req(message.header, delay_request, timestamp)
}
MessageBody::PDelayReq(_) => self.handle_pdelay_req(message.header, timestamp),
MessageBody::PDelayResp(peer_delay_response) => {
self.handle_peer_delay_response(message.header, peer_delay_response, timestamp)
}
_ => self.handle_general_internal(message),
}
}
pub fn handle_general_receive<'b>(&'b mut self, data: &'b [u8]) -> PortActionIterator<'b> {
let message = match self.parse_and_filter(data) {
ControlFlow::Continue(value) => value,
ControlFlow::Break(value) => return value,
};
self.handle_general_internal(message)
}
fn handle_general_internal<'b>(&'b mut self, message: Message<'b>) -> PortActionIterator<'b> {
match message.body {
MessageBody::Announce(announce) => self.handle_announce(&message, announce),
MessageBody::FollowUp(follow_up) => self.handle_follow_up(message.header, follow_up),
MessageBody::DelayResp(delay_response) => {
self.handle_delay_resp(message.header, delay_response)
}
MessageBody::PDelayRespFollowUp(peer_delay_follow_up) => {
self.handle_peer_delay_response_follow_up(message.header, peer_delay_follow_up)
}
MessageBody::Sync(_)
| MessageBody::DelayReq(_)
| MessageBody::PDelayReq(_)
| MessageBody::PDelayResp(_) => {
log::warn!("Received event message over general interface");
actions![]
}
MessageBody::Management(_) | MessageBody::Signaling(_) => actions![],
}
}
}
impl<'a, A, C, F: Filter, R, S> Port<'a, InBmca, A, R, C, F, S> {
pub fn end_bmca(
self,
) -> (
Port<'a, Running, A, R, C, F, S>,
PortActionIterator<'static>,
) {
(
Port {
port_state: self.port_state,
instance_state: self.instance_state,
config: self.config,
filter_config: self.filter_config,
clock: self.clock,
port_identity: self.port_identity,
bmca: self.bmca,
rng: self.rng,
multiport_disable: self.multiport_disable,
packet_buffer: [0; MAX_DATA_LEN],
lifecycle: Running,
announce_seq_ids: self.announce_seq_ids,
sync_seq_ids: self.sync_seq_ids,
delay_seq_ids: self.delay_seq_ids,
pdelay_seq_ids: self.pdelay_seq_ids,
filter: self.filter,
mean_delay: self.mean_delay,
peer_delay_state: self.peer_delay_state,
},
self.lifecycle.pending_action,
)
}
}
impl<L, A, R, C: Clock, F: Filter, S> Port<'_, L, A, R, C, F, S> {
fn set_forced_port_state(&mut self, mut state: PortState) {
log::info!(
"new state for port {}: {} -> {}",
self.port_identity.port_number,
self.port_state,
state
);
core::mem::swap(&mut self.port_state, &mut state);
if matches!(state, PortState::Slave(_) | PortState::Faulty)
|| matches!(self.port_state, PortState::Faulty)
{
let mut filter = F::new(self.filter_config.clone());
core::mem::swap(&mut filter, &mut self.filter);
filter.demobilize(&mut self.clock);
}
}
}
impl<L, A, R, C, F: Filter, S> Port<'_, L, A, R, C, F, S> {
pub fn is_steering(&self) -> bool {
matches!(self.port_state, PortState::Slave(_))
}
pub fn is_master(&self) -> bool {
matches!(self.port_state, PortState::Master)
}
pub(crate) fn state(&self) -> &PortState {
&self.port_state
}
pub(crate) fn number(&self) -> u16 {
self.port_identity.port_number
}
pub fn port_ds(&self) -> PortDS {
PortDS {
port_identity: self.port_identity,
port_state: match self.port_state {
PortState::Faulty => observability::port::PortState::Faulty,
PortState::Listening => observability::port::PortState::Listening,
PortState::Master => observability::port::PortState::Master,
PortState::Passive => observability::port::PortState::Passive,
PortState::Slave(_) => observability::port::PortState::Slave,
},
log_announce_interval: self.config.announce_interval.as_log_2(),
announce_receipt_timeout: self.config.announce_receipt_timeout,
log_sync_interval: self.config.sync_interval.as_log_2(),
delay_mechanism: match self.config.delay_mechanism {
crate::config::DelayMechanism::E2E { interval } => {
observability::port::DelayMechanism::E2E {
log_min_delay_req_interval: interval.as_log_2(),
}
}
crate::config::DelayMechanism::P2P { interval } => {
observability::port::DelayMechanism::P2P {
log_min_p_delay_req_interval: interval.as_log_2(),
mean_link_delay: self.mean_delay.map(|v| v.into()).unwrap_or_default(),
}
}
},
version_number: 2,
minor_version_number: self.config.minor_ptp_version as u8,
delay_asymmetry: self.config.delay_asymmetry.into(),
master_only: self.config.master_only,
}
}
pub fn port_current_ds_contribution(&self) -> Option<FilterEstimate> {
if matches!(self.port_state, PortState::Slave(_)) {
Some(self.filter.current_estimates())
} else {
None
}
}
}
impl<'a, A, C, F: Filter, R: Rng, S: PtpInstanceStateMutex> Port<'a, InBmca, A, R, C, F, S> {
pub(crate) fn new(
instance_state: &'a S,
config: PortConfig<A>,
filter_config: F::Config,
clock: C,
port_identity: PortIdentity,
mut rng: R,
) -> Self {
let duration = config.announce_duration(&mut rng);
let bmca = Bmca::new(
config.acceptable_master_list,
config.announce_interval.as_duration().into(),
port_identity,
);
let filter = F::new(filter_config.clone());
Port {
config: PortConfig {
acceptable_master_list: (),
delay_mechanism: config.delay_mechanism,
announce_interval: config.announce_interval,
announce_receipt_timeout: config.announce_receipt_timeout,
sync_interval: config.sync_interval,
master_only: config.master_only,
delay_asymmetry: config.delay_asymmetry,
minor_ptp_version: config.minor_ptp_version,
},
filter_config,
clock,
port_identity,
port_state: PortState::Listening,
instance_state,
bmca,
rng,
multiport_disable: None,
packet_buffer: [0; MAX_DATA_LEN],
lifecycle: InBmca {
pending_action: actions![PortAction::ResetAnnounceReceiptTimer { duration }],
local_best: None,
},
announce_seq_ids: SequenceIdGenerator::new(),
sync_seq_ids: SequenceIdGenerator::new(),
delay_seq_ids: SequenceIdGenerator::new(),
pdelay_seq_ids: SequenceIdGenerator::new(),
filter,
mean_delay: None,
peer_delay_state: PeerDelayState::Empty,
}
}
}
#[cfg(test)]
mod tests {
use core::cell::RefCell;
use super::*;
use crate::{
config::{
AcceptAnyMaster, DelayMechanism, InstanceConfig, PtpMinorVersion, TimePropertiesDS,
},
datastructures::datasets::{InternalDefaultDS, InternalParentDS, PathTraceDS},
filters::BasicFilter,
time::{Duration, Interval, Time},
Clock,
};
pub(super) struct TestClock;
impl Clock for TestClock {
type Error = ();
fn set_frequency(&mut self, _freq: f64) -> Result<Time, Self::Error> {
Ok(Time::default())
}
fn now(&self) -> Time {
panic!("Shouldn't be called");
}
fn set_properties(
&mut self,
_time_properties_ds: &TimePropertiesDS,
) -> Result<(), Self::Error> {
Ok(())
}
fn step_clock(&mut self, _offset: Duration) -> Result<Time, Self::Error> {
Ok(Time::default())
}
}
pub(super) fn setup_test_port(
state: &RefCell<PtpInstanceState>,
) -> Port<'_, Running, AcceptAnyMaster, rand::rngs::mock::StepRng, TestClock, BasicFilter> {
let port = Port::<_, _, _, _, BasicFilter>::new(
state,
PortConfig {
acceptable_master_list: AcceptAnyMaster,
delay_mechanism: DelayMechanism::E2E {
interval: Interval::from_log_2(1),
},
announce_interval: Interval::from_log_2(1),
announce_receipt_timeout: 3,
sync_interval: Interval::from_log_2(0),
master_only: false,
delay_asymmetry: Duration::ZERO,
minor_ptp_version: PtpMinorVersion::One,
},
0.25,
TestClock,
Default::default(),
rand::rngs::mock::StepRng::new(2, 1),
);
let (port, _) = port.end_bmca();
port
}
pub(super) fn setup_test_port_custom_identity(
state: &RefCell<PtpInstanceState>,
port_identity: PortIdentity,
) -> Port<'_, Running, AcceptAnyMaster, rand::rngs::mock::StepRng, TestClock, BasicFilter> {
let port = Port::<_, _, _, _, BasicFilter>::new(
&state,
PortConfig {
acceptable_master_list: AcceptAnyMaster,
delay_mechanism: DelayMechanism::E2E {
interval: Interval::from_log_2(1),
},
announce_interval: Interval::from_log_2(1),
announce_receipt_timeout: 3,
sync_interval: Interval::from_log_2(0),
master_only: false,
delay_asymmetry: Duration::ZERO,
minor_ptp_version: PtpMinorVersion::One,
},
0.25,
TestClock,
port_identity,
rand::rngs::mock::StepRng::new(2, 1),
);
let (port, _) = port.end_bmca();
port
}
pub(super) fn setup_test_port_custom_filter<F: Filter>(
state: &RefCell<PtpInstanceState>,
filter_config: F::Config,
) -> Port<'_, Running, AcceptAnyMaster, rand::rngs::mock::StepRng, TestClock, F> {
let port = Port::<_, _, _, _, F>::new(
state,
PortConfig {
acceptable_master_list: AcceptAnyMaster,
delay_mechanism: DelayMechanism::E2E {
interval: Interval::from_log_2(1),
},
announce_interval: Interval::from_log_2(1),
announce_receipt_timeout: 3,
sync_interval: Interval::from_log_2(0),
master_only: false,
delay_asymmetry: Duration::ZERO,
minor_ptp_version: PtpMinorVersion::One,
},
filter_config,
TestClock,
Default::default(),
rand::rngs::mock::StepRng::new(2, 1),
);
let (port, _) = port.end_bmca();
port
}
pub(super) fn setup_test_state() -> RefCell<PtpInstanceState> {
let default_ds = InternalDefaultDS::new(InstanceConfig {
clock_identity: Default::default(),
priority_1: 255,
priority_2: 255,
domain_number: 0,
slave_only: false,
sdo_id: Default::default(),
path_trace: false,
});
let parent_ds = InternalParentDS::new(default_ds);
let state = RefCell::new(PtpInstanceState {
default_ds,
current_ds: Default::default(),
parent_ds,
time_properties_ds: Default::default(),
path_trace_ds: PathTraceDS::new(false),
});
state
}
}