use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use speedy::{Readable, Writable};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::result::QosError,
messages::submessages::elements::parameter::Parameter,
serialization::{
pl_cdr_adapters::{PlCdrDeserializeError, PlCdrSerializeError},
speedy_pl_cdr_helpers::*,
},
structure::{duration::Duration, endpoint::ReliabilityKind, parameter_id::ParameterId},
};
pub trait HasQoSPolicy {
fn qos(&self) -> QosPolicies;
}
pub trait MutQosPolicy {
fn set_qos(&mut self, new_qos: &QosPolicies) -> Result<(), QosError>;
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub enum QosPolicyId {
Durability, Presentation, Deadline,
LatencyBudget, Ownership,
Liveliness,
TimeBasedFilter, Reliability, DestinationOrder,
History, ResourceLimits,
Lifespan,
Property, }
#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct QosPolicyBuilder {
durability: Option<policy::Durability>,
presentation: Option<policy::Presentation>,
deadline: Option<policy::Deadline>,
latency_budget: Option<policy::LatencyBudget>,
ownership: Option<policy::Ownership>,
liveliness: Option<policy::Liveliness>,
time_based_filter: Option<policy::TimeBasedFilter>,
reliability: Option<policy::Reliability>,
destination_order: Option<policy::DestinationOrder>,
history: Option<policy::History>,
resource_limits: Option<policy::ResourceLimits>,
lifespan: Option<policy::Lifespan>,
#[cfg(feature = "security")]
property: Option<policy::Property>,
}
impl QosPolicyBuilder {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub const fn durability(mut self, durability: policy::Durability) -> Self {
self.durability = Some(durability);
self
}
#[must_use]
pub const fn presentation(mut self, presentation: policy::Presentation) -> Self {
self.presentation = Some(presentation);
self
}
#[must_use]
pub const fn deadline(mut self, deadline: policy::Deadline) -> Self {
self.deadline = Some(deadline);
self
}
#[must_use]
pub const fn latency_budget(mut self, latency_budget: policy::LatencyBudget) -> Self {
self.latency_budget = Some(latency_budget);
self
}
#[must_use]
pub const fn ownership(mut self, ownership: policy::Ownership) -> Self {
self.ownership = Some(ownership);
self
}
#[must_use]
pub const fn liveliness(mut self, liveliness: policy::Liveliness) -> Self {
self.liveliness = Some(liveliness);
self
}
#[must_use]
pub const fn time_based_filter(mut self, time_based_filter: policy::TimeBasedFilter) -> Self {
self.time_based_filter = Some(time_based_filter);
self
}
#[must_use]
pub const fn reliability(mut self, reliability: policy::Reliability) -> Self {
self.reliability = Some(reliability);
self
}
#[must_use]
pub const fn best_effort(mut self) -> Self {
self.reliability = Some(policy::Reliability::BestEffort);
self
}
#[must_use]
pub const fn reliable(mut self, max_blocking_time: Duration) -> Self {
self.reliability = Some(policy::Reliability::Reliable { max_blocking_time });
self
}
#[must_use]
pub const fn destination_order(mut self, destination_order: policy::DestinationOrder) -> Self {
self.destination_order = Some(destination_order);
self
}
#[must_use]
pub const fn history(mut self, history: policy::History) -> Self {
self.history = Some(history);
self
}
#[must_use]
pub const fn resource_limits(mut self, resource_limits: policy::ResourceLimits) -> Self {
self.resource_limits = Some(resource_limits);
self
}
#[must_use]
pub const fn lifespan(mut self, lifespan: policy::Lifespan) -> Self {
self.lifespan = Some(lifespan);
self
}
#[cfg(feature = "security")]
#[must_use]
pub fn property(mut self, property: policy::Property) -> Self {
self.property = Some(property);
self
}
pub fn build(self) -> QosPolicies {
QosPolicies {
durability: self.durability,
presentation: self.presentation,
deadline: self.deadline,
latency_budget: self.latency_budget,
ownership: self.ownership,
liveliness: self.liveliness,
time_based_filter: self.time_based_filter,
reliability: self.reliability,
destination_order: self.destination_order,
history: self.history,
resource_limits: self.resource_limits,
lifespan: self.lifespan,
#[cfg(feature = "security")]
property: self.property,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct QosPolicies {
pub(crate) durability: Option<policy::Durability>,
pub(crate) presentation: Option<policy::Presentation>,
pub(crate) deadline: Option<policy::Deadline>,
pub(crate) latency_budget: Option<policy::LatencyBudget>,
pub(crate) ownership: Option<policy::Ownership>,
pub(crate) liveliness: Option<policy::Liveliness>,
pub(crate) time_based_filter: Option<policy::TimeBasedFilter>,
pub(crate) reliability: Option<policy::Reliability>,
pub(crate) destination_order: Option<policy::DestinationOrder>,
pub(crate) history: Option<policy::History>,
pub(crate) resource_limits: Option<policy::ResourceLimits>,
pub(crate) lifespan: Option<policy::Lifespan>,
#[cfg(feature = "security")]
pub(crate) property: Option<policy::Property>,
}
impl QosPolicies {
pub fn qos_none() -> Self {
Self::default()
}
pub fn builder() -> QosPolicyBuilder {
QosPolicyBuilder::new()
}
pub const fn durability(&self) -> Option<policy::Durability> {
self.durability
}
pub fn is_volatile(&self) -> bool {
matches!(self.durability, Some(policy::Durability::Volatile))
}
pub const fn presentation(&self) -> Option<policy::Presentation> {
self.presentation
}
pub const fn deadline(&self) -> Option<policy::Deadline> {
self.deadline
}
pub const fn latency_budget(&self) -> Option<policy::LatencyBudget> {
self.latency_budget
}
pub const fn ownership(&self) -> Option<policy::Ownership> {
self.ownership
}
pub const fn liveliness(&self) -> Option<policy::Liveliness> {
self.liveliness
}
pub const fn time_based_filter(&self) -> Option<policy::TimeBasedFilter> {
self.time_based_filter
}
pub const fn reliability(&self) -> Option<policy::Reliability> {
self.reliability
}
pub fn is_reliable(&self) -> bool {
matches!(self.reliability, Some(policy::Reliability::Reliable { .. }))
}
pub const fn reliable_max_blocking_time(&self) -> Option<Duration> {
if let Some(policy::Reliability::Reliable { max_blocking_time }) = self.reliability {
Some(max_blocking_time)
} else {
None
}
}
pub const fn destination_order(&self) -> Option<policy::DestinationOrder> {
self.destination_order
}
pub const fn history(&self) -> Option<policy::History> {
self.history
}
pub const fn resource_limits(&self) -> Option<policy::ResourceLimits> {
self.resource_limits
}
pub const fn lifespan(&self) -> Option<policy::Lifespan> {
self.lifespan
}
#[cfg(feature = "security")]
pub fn property(&self) -> Option<policy::Property> {
self.property.clone()
}
#[must_use]
pub fn modify_by(&self, other: &Self) -> Self {
Self {
durability: other.durability.or(self.durability),
presentation: other.presentation.or(self.presentation),
deadline: other.deadline.or(self.deadline),
latency_budget: other.latency_budget.or(self.latency_budget),
ownership: other.ownership.or(self.ownership),
liveliness: other.liveliness.or(self.liveliness),
time_based_filter: other.time_based_filter.or(self.time_based_filter),
reliability: other.reliability.or(self.reliability),
destination_order: other.destination_order.or(self.destination_order),
history: other.history.or(self.history),
resource_limits: other.resource_limits.or(self.resource_limits),
lifespan: other.lifespan.or(self.lifespan),
#[cfg(feature = "security")]
property: other.property.clone().or(self.property.clone()),
}
}
pub fn compliance_failure_wrt(&self, other: &Self) -> Option<QosPolicyId> {
trace!(
"QoS compatibility check - offered: {:?} - requested {:?}",
self,
other
);
let result = self.compliance_failure_wrt_impl(other);
trace!("Result: {:?}", result);
result
}
fn compliance_failure_wrt_impl(&self, other: &Self) -> Option<QosPolicyId> {
if let (Some(off), Some(req)) = (self.durability, other.durability) {
if off < req {
return Some(QosPolicyId::Durability);
}
}
if let (Some(off), Some(req)) = (self.presentation, other.presentation) {
if (req.coherent_access && !off.coherent_access)
|| (req.ordered_access && !off.ordered_access)
|| (req.access_scope > off.access_scope)
{
return Some(QosPolicyId::Presentation);
}
}
if let (Some(off), Some(req)) = (self.deadline, other.deadline) {
if off.0 > req.0 {
return Some(QosPolicyId::Deadline);
}
}
if let (Some(off), Some(req)) = (self.latency_budget, other.latency_budget) {
if off.duration > req.duration {
return Some(QosPolicyId::LatencyBudget);
}
}
if let (Some(off), Some(req)) = (self.ownership, other.ownership) {
if off != req {
return Some(QosPolicyId::Ownership);
}
}
if let (Some(off), Some(req)) = (self.liveliness, other.liveliness) {
if off < req {
return Some(QosPolicyId::Liveliness);
}
}
if let (Some(off), Some(req)) = (self.reliability, other.reliability) {
if off < req {
return Some(QosPolicyId::Reliability);
}
}
if let (Some(off), Some(req)) = (self.destination_order, other.destination_order) {
if off < req {
return Some(QosPolicyId::DestinationOrder);
}
}
None
}
pub fn to_parameter_list(
&self,
ctx: speedy::Endianness,
) -> Result<Vec<Parameter>, PlCdrSerializeError> {
let mut pl = Vec::with_capacity(8);
let QosPolicies {
durability,
presentation,
deadline,
latency_budget,
ownership,
liveliness,
time_based_filter,
reliability,
destination_order,
history,
resource_limits,
lifespan,
#[cfg(feature = "security")]
property: _, } = self;
macro_rules! emit {
($pid:ident, $member:expr, $type:ty) => {
pl.push(Parameter::new(ParameterId::$pid, {
let m: &$type = $member;
m.write_to_vec_with_ctx(ctx)?
}))
};
}
macro_rules! emit_option {
($pid:ident, $member:expr, $type:ty) => {
if let Some(m) = $member {
emit!($pid, m, $type)
}
};
}
use policy::*;
emit_option!(PID_DURABILITY, durability, Durability);
emit_option!(PID_PRESENTATION, presentation, Presentation);
emit_option!(PID_DEADLINE, deadline, Deadline);
emit_option!(PID_LATENCY_BUDGET, latency_budget, LatencyBudget);
match ownership {
Some(Ownership::Exclusive { strength }) => {
emit!(PID_OWNERSHIP, &OwnershipKind::Exclusive, OwnershipKind);
emit!(PID_OWNERSHIP_STRENGTH, strength, i32);
}
Some(Ownership::Shared) => {
emit!(PID_OWNERSHIP, &OwnershipKind::Shared, OwnershipKind);
}
None => (),
}
emit_option!(PID_LIVELINESS, liveliness, policy::Liveliness);
emit_option!(
PID_TIME_BASED_FILTER,
time_based_filter,
policy::TimeBasedFilter
);
if let Some(rel) = reliability.as_ref() {
let reliability_ser = match rel {
Reliability::BestEffort => ReliabilitySerialization {
reliability_kind: ReliabilityKind::BestEffort,
max_blocking_time: Duration::ZERO, },
Reliability::Reliable { max_blocking_time } => ReliabilitySerialization {
reliability_kind: ReliabilityKind::Reliable,
max_blocking_time: *max_blocking_time,
},
};
emit!(PID_RELIABILITY, &reliability_ser, ReliabilitySerialization);
}
emit_option!(
PID_DESTINATION_ORDER,
destination_order,
policy::DestinationOrder
);
if let Some(history) = history.as_ref() {
let history_ser = match history {
History::KeepLast { depth } => HistorySerialization {
kind: HistoryKind::KeepLast,
depth: *depth,
},
History::KeepAll => HistorySerialization {
kind: HistoryKind::KeepAll,
depth: 0,
},
};
emit!(PID_HISTORY, &history_ser, HistorySerialization);
}
emit_option!(PID_RESOURCE_LIMITS, resource_limits, policy::ResourceLimits);
emit_option!(PID_LIFESPAN, lifespan, policy::Lifespan);
Ok(pl)
}
pub fn from_parameter_list(
ctx: speedy::Endianness,
pl_map: &BTreeMap<ParameterId, Vec<&Parameter>>,
) -> Result<QosPolicies, PlCdrDeserializeError> {
macro_rules! get_option {
($pid:ident) => {
get_option_from_pl_map(pl_map, ctx, ParameterId::$pid, "<not_used>")?
};
}
let durability: Option<policy::Durability> = get_option!(PID_DURABILITY);
let presentation: Option<policy::Presentation> = get_option!(PID_PRESENTATION);
let deadline: Option<policy::Deadline> = get_option!(PID_DEADLINE);
let latency_budget: Option<policy::LatencyBudget> = get_option!(PID_LATENCY_BUDGET);
let ownership_kind: Option<OwnershipKind> = get_option!(PID_OWNERSHIP);
let ownership_strength: Option<i32> = get_option!(PID_OWNERSHIP_STRENGTH);
let ownership = match (ownership_kind, ownership_strength) {
(Some(OwnershipKind::Shared), None) => Some(policy::Ownership::Shared),
(Some(OwnershipKind::Shared), Some(_strength)) => {
warn!("QosPolicies deserializer: Received OwnershipKind::Shared and a strength value.");
None
}
(Some(OwnershipKind::Exclusive), Some(strength)) => {
Some(policy::Ownership::Exclusive { strength })
}
(Some(OwnershipKind::Exclusive), None) => {
warn!("QosPolicies deserializer: Received OwnershipKind::Exclusive but no strength value.");
None
}
(None, Some(_strength)) => {
warn!(
"QosPolicies deserializer: Received ownership strength value, but no kind parameter."
);
None
}
(None, None) => None,
};
let reliability_ser: Option<ReliabilitySerialization> = get_option!(PID_RELIABILITY);
let reliability = reliability_ser.map(|rs| match rs.reliability_kind {
ReliabilityKind::BestEffort => policy::Reliability::BestEffort,
ReliabilityKind::Reliable => policy::Reliability::Reliable {
max_blocking_time: rs.max_blocking_time,
},
});
let destination_order: Option<policy::DestinationOrder> = get_option!(PID_DESTINATION_ORDER);
let history_ser: Option<HistorySerialization> = get_option!(PID_HISTORY);
let history = history_ser.map(|h| match h.kind {
HistoryKind::KeepAll => policy::History::KeepAll,
HistoryKind::KeepLast => policy::History::KeepLast { depth: h.depth },
});
let liveliness: Option<policy::Liveliness> = get_option!(PID_LIVELINESS);
let time_based_filter: Option<policy::TimeBasedFilter> = get_option!(PID_TIME_BASED_FILTER);
let resource_limits: Option<policy::ResourceLimits> = get_option!(PID_RESOURCE_LIMITS);
let lifespan: Option<policy::Lifespan> = get_option!(PID_LIFESPAN);
#[cfg(feature = "security")]
let property: Option<policy::Property> = None; Ok(QosPolicies {
durability,
presentation,
deadline,
latency_budget,
ownership,
liveliness,
time_based_filter,
reliability,
destination_order,
history,
resource_limits,
lifespan,
#[cfg(feature = "security")]
property,
})
}
}
#[derive(Writable, Readable, Clone)]
enum HistoryKind {
KeepLast,
KeepAll,
}
#[derive(Writable, Readable, Clone)]
struct HistorySerialization {
pub kind: HistoryKind,
pub depth: i32,
}
#[derive(Writable, Readable)]
enum OwnershipKind {
Shared,
Exclusive,
}
#[derive(Writable, Readable, Clone)]
struct ReliabilitySerialization {
pub reliability_kind: ReliabilityKind,
pub max_blocking_time: Duration,
}
pub const LENGTH_UNLIMITED: i32 = -1;
pub mod policy {
use std::cmp::Ordering;
use speedy::{Readable, Writable};
use serde::{Deserialize, Serialize};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
#[cfg(feature = "security")]
use speedy::{Context, IsEof, Reader, Writer};
use crate::structure::duration::Duration;
#[cfg(feature = "security")]
use crate::serialization::speedy_pl_cdr_helpers::*;
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
pub struct Lifespan {
pub duration: Duration,
}
#[derive(
Copy,
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Readable,
Writable,
Serialize,
Deserialize,
)]
pub enum Durability {
Volatile,
TransientLocal,
Transient,
Persistent,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
pub struct Presentation {
pub access_scope: PresentationAccessScope,
pub coherent_access: bool,
pub ordered_access: bool,
}
#[derive(
Copy,
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Readable,
Writable,
Serialize,
Deserialize,
)]
pub enum PresentationAccessScope {
Instance,
Topic,
Group,
}
#[derive(
Copy,
Clone,
Debug,
PartialEq,
Eq,
Ord,
PartialOrd,
Hash,
Readable,
Writable,
Serialize,
Deserialize,
)]
pub struct Deadline(pub Duration);
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
pub struct LatencyBudget {
pub duration: Duration,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Ownership {
Shared,
Exclusive { strength: i32 }, }
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
pub enum Liveliness {
Automatic { lease_duration: Duration },
ManualByParticipant { lease_duration: Duration },
ManualByTopic { lease_duration: Duration },
}
impl Liveliness {
fn kind_num(&self) -> i32 {
match self {
Self::Automatic { .. } => 0,
Self::ManualByParticipant { .. } => 1,
Self::ManualByTopic { .. } => 2,
}
}
pub fn duration(&self) -> Duration {
match self {
Self::Automatic { lease_duration }
| Self::ManualByParticipant { lease_duration }
| Self::ManualByTopic { lease_duration } => *lease_duration,
}
}
}
impl Ord for Liveliness {
fn cmp(&self, other: &Self) -> Ordering {
other
.kind_num()
.cmp(&other.kind_num())
.then_with(|| self.duration().cmp(&other.duration()).reverse())
}
}
impl PartialOrd for Liveliness {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
pub struct TimeBasedFilter {
pub minimum_separation: Duration,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum Reliability {
BestEffort,
Reliable { max_blocking_time: Duration },
}
impl Ord for Reliability {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Self::BestEffort, Self::BestEffort) | (Self::Reliable { .. }, Self::Reliable { .. }) => {
Ordering::Equal
}
(Self::BestEffort, Self::Reliable { .. }) => Ordering::Less,
(Self::Reliable { .. }, Self::BestEffort) => Ordering::Greater,
}
}
}
impl PartialOrd for Reliability {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(
Copy,
Clone,
Debug,
PartialEq,
Eq,
Ord,
PartialOrd,
Hash,
Readable,
Writable,
Serialize,
Deserialize,
)]
pub enum DestinationOrder {
ByReceptionTimestamp,
BySourceTimeStamp,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum History {
KeepLast { depth: i32 },
KeepAll,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Writable, Readable, Serialize, Deserialize)]
pub struct ResourceLimits {
pub max_samples: i32,
pub max_instances: i32,
pub max_samples_per_instance: i32,
}
#[cfg(feature = "security")]
use crate::security;
#[cfg(feature = "security")]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Property {
pub value: Vec<security::types::Property>,
pub binary_value: Vec<security::types::BinaryProperty>,
}
#[cfg(feature = "security")]
impl<'a, C: Context> Readable<'a, C> for Property {
fn read_from<R: Reader<'a, C>>(reader: &mut R) -> Result<Self, C::Error> {
let count = reader.read_u32()?;
let mut value = Vec::new();
let mut prev_len = 0;
for _ in 0..count {
read_pad(reader, prev_len, 4)?;
let s: security::types::Property = reader.read_value()?;
prev_len = s.serialized_len();
value.push(s);
}
read_pad(reader, prev_len, 4)?;
let mut binary_value = Vec::new();
match reader.read_u32() {
Ok(count) => {
prev_len = 0;
for _ in 0..count {
read_pad(reader, prev_len, 4)?;
let s: security::types::BinaryProperty = reader.read_value()?;
prev_len = s.serialized_len();
binary_value.push(s);
}
}
Err(e) => {
if e.is_eof() {
debug!("Non-security PropertyQosPolicy");
} else {
return Err(e);
}
}
}
Ok(Property {
value,
binary_value,
})
}
}
#[cfg(feature = "security")]
impl<C: Context> Writable<C> for Property {
fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
let propagate_value: Vec<&security::Property> =
self.value.iter().filter(|p| p.propagate).collect();
writer.write_u32(propagate_value.len() as u32)?;
let mut prev_len = 0;
for prop in propagate_value {
write_pad(writer, prev_len, 4)?;
writer.write_value(prop)?;
prev_len = prop.serialized_len();
}
let propagate_bin_value: Vec<&security::BinaryProperty> =
self.binary_value.iter().filter(|p| p.propagate).collect();
write_pad(writer, prev_len, 4)?;
writer.write_u32(propagate_bin_value.len() as u32)?;
let mut prev_len = 0;
for prop in propagate_bin_value {
write_pad(writer, prev_len, 4)?;
writer.write_value(prop)?;
prev_len = prop.serialized_len();
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[cfg(feature = "security")]
pub struct DataTag {
pub tags: Vec<security::types::Tag>,
}
#[cfg(feature = "security")]
impl<'a, C: Context> Readable<'a, C> for DataTag {
fn read_from<R: Reader<'a, C>>(reader: &mut R) -> Result<Self, C::Error> {
let count = reader.read_u32()?;
let mut tags = Vec::new();
let mut prev_len = 0;
for _ in 0..count {
read_pad(reader, prev_len, 4)?;
let s: security::types::Tag = reader.read_value()?;
prev_len = s.serialized_len();
tags.push(s);
}
Ok(DataTag { tags })
}
}
#[cfg(feature = "security")]
impl<C: Context> Writable<C> for DataTag {
fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
writer.write_u32(self.tags.len() as u32)?;
let mut prev_len = 0;
for tag in &self.tags {
write_pad(writer, prev_len, 4)?;
writer.write_value(tag)?;
prev_len = tag.serialized_len();
}
Ok(())
}
}
}