rustdds/dds/
qos.rs

1use std::collections::BTreeMap;
2
3use serde::{Deserialize, Serialize};
4use speedy::{Readable, Writable};
5#[allow(unused_imports)]
6use log::{debug, error, info, trace, warn};
7
8use crate::{
9  dds::result::QosError,
10  messages::submessages::elements::parameter::Parameter,
11  serialization::{
12    pl_cdr_adapters::{PlCdrDeserializeError, PlCdrSerializeError},
13    speedy_pl_cdr_helpers::*,
14  },
15  structure::{duration::Duration, endpoint::ReliabilityKind, parameter_id::ParameterId},
16};
17
18// This is to be implemented by all DomainParticipant, Publisher, Subscriber,
19// DataWriter, DataReader, Topic
20/// Trait that is implemented by all necessary DDS Entities that are required to
21/// provide QosPolicies.
22pub trait HasQoSPolicy {
23  fn qos(&self) -> QosPolicies;
24}
25
26/// Trait that is implemented by all necessary DDS Entities that are required to
27/// have a mutable QosPolicies.
28pub trait MutQosPolicy {
29  fn set_qos(&mut self, new_qos: &QosPolicies) -> Result<(), QosError>;
30}
31
32/// DDS spec 2.3.3 defines this as "long" with named constants from 0 to 22.
33/// numbering is from IDL PSM, but it should be unnecessary at the Rust
34/// application interface
35#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
36pub enum QosPolicyId {
37  // Invalid  // We should represent this using Option<QosPolicyId> where needed
38  // UserData,  // 1
39  Durability,   // 2
40  Presentation, // 3
41  Deadline,
42  LatencyBudget, // 5
43  Ownership,
44  // OwnershipStrength, // 7
45  Liveliness,
46  TimeBasedFilter, // 9
47  // Partition,
48
49  // Note: If "Partition" is ever implemented, observe also DDS Security spec v1.1
50  // Section "7.3.5 Immutability of Publisher Partition Qos in combination with non-volatile
51  // Durability kind" when implementing.
52  Reliability, // 11
53  DestinationOrder,
54  History, // 13
55  ResourceLimits,
56  // EntityFactory, // 15
57  // WriterDataLifeCycle,
58  // ReaderDataLifeCycle, // 17
59  // TopicData, // 18
60  // GroupData,
61  // TransportPriority, // 20
62  Lifespan,
63  // DurabilityService, // 22
64  Property, // No Id in the security spec (But this is from older DDS/RTPs spec.)
65}
66
67/// Utility for building [QosPolicies]
68#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69pub struct QosPolicyBuilder {
70  durability: Option<policy::Durability>,
71  presentation: Option<policy::Presentation>,
72  deadline: Option<policy::Deadline>,
73  latency_budget: Option<policy::LatencyBudget>,
74  ownership: Option<policy::Ownership>,
75  liveliness: Option<policy::Liveliness>,
76  time_based_filter: Option<policy::TimeBasedFilter>,
77  reliability: Option<policy::Reliability>,
78  destination_order: Option<policy::DestinationOrder>,
79  history: Option<policy::History>,
80  resource_limits: Option<policy::ResourceLimits>,
81  lifespan: Option<policy::Lifespan>,
82  #[cfg(feature = "security")]
83  property: Option<policy::Property>,
84}
85
86impl QosPolicyBuilder {
87  pub fn new() -> Self {
88    Self::default()
89  }
90
91  #[must_use]
92  pub const fn durability(mut self, durability: policy::Durability) -> Self {
93    self.durability = Some(durability);
94    self
95  }
96
97  #[must_use]
98  pub const fn presentation(mut self, presentation: policy::Presentation) -> Self {
99    self.presentation = Some(presentation);
100    self
101  }
102
103  #[must_use]
104  pub const fn deadline(mut self, deadline: policy::Deadline) -> Self {
105    self.deadline = Some(deadline);
106    self
107  }
108
109  #[must_use]
110  pub const fn latency_budget(mut self, latency_budget: policy::LatencyBudget) -> Self {
111    self.latency_budget = Some(latency_budget);
112    self
113  }
114
115  #[must_use]
116  pub const fn ownership(mut self, ownership: policy::Ownership) -> Self {
117    self.ownership = Some(ownership);
118    self
119  }
120
121  #[must_use]
122  pub const fn liveliness(mut self, liveliness: policy::Liveliness) -> Self {
123    self.liveliness = Some(liveliness);
124    self
125  }
126
127  #[must_use]
128  pub const fn time_based_filter(mut self, time_based_filter: policy::TimeBasedFilter) -> Self {
129    self.time_based_filter = Some(time_based_filter);
130    self
131  }
132
133  #[must_use]
134  pub const fn reliability(mut self, reliability: policy::Reliability) -> Self {
135    self.reliability = Some(reliability);
136    self
137  }
138
139  #[must_use]
140  pub const fn best_effort(mut self) -> Self {
141    self.reliability = Some(policy::Reliability::BestEffort);
142    self
143  }
144
145  #[must_use]
146  pub const fn reliable(mut self, max_blocking_time: Duration) -> Self {
147    self.reliability = Some(policy::Reliability::Reliable { max_blocking_time });
148    self
149  }
150
151  #[must_use]
152  pub const fn destination_order(mut self, destination_order: policy::DestinationOrder) -> Self {
153    self.destination_order = Some(destination_order);
154    self
155  }
156
157  #[must_use]
158  pub const fn history(mut self, history: policy::History) -> Self {
159    self.history = Some(history);
160    self
161  }
162
163  #[must_use]
164  pub const fn resource_limits(mut self, resource_limits: policy::ResourceLimits) -> Self {
165    self.resource_limits = Some(resource_limits);
166    self
167  }
168
169  #[must_use]
170  pub const fn lifespan(mut self, lifespan: policy::Lifespan) -> Self {
171    self.lifespan = Some(lifespan);
172    self
173  }
174
175  #[cfg(feature = "security")]
176  #[must_use]
177  pub fn property(mut self, property: policy::Property) -> Self {
178    self.property = Some(property);
179    self
180  }
181
182  pub fn build(self) -> QosPolicies {
183    QosPolicies {
184      durability: self.durability,
185      presentation: self.presentation,
186      deadline: self.deadline,
187      latency_budget: self.latency_budget,
188      ownership: self.ownership,
189      liveliness: self.liveliness,
190      time_based_filter: self.time_based_filter,
191      reliability: self.reliability,
192      destination_order: self.destination_order,
193      history: self.history,
194      resource_limits: self.resource_limits,
195      lifespan: self.lifespan,
196      #[cfg(feature = "security")]
197      property: self.property,
198    }
199  }
200}
201
202/// Describes a set of RTPS/DDS QoS policies
203///
204/// QosPolicies are constructed using a [`QosPolicyBuilder`]
205#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)]
206pub struct QosPolicies {
207  // pub(crate) because as we want to have some builtin QoS Policies as constant.
208  pub(crate) durability: Option<policy::Durability>,
209  pub(crate) presentation: Option<policy::Presentation>,
210  pub(crate) deadline: Option<policy::Deadline>,
211  pub(crate) latency_budget: Option<policy::LatencyBudget>,
212  pub(crate) ownership: Option<policy::Ownership>,
213  pub(crate) liveliness: Option<policy::Liveliness>,
214  pub(crate) time_based_filter: Option<policy::TimeBasedFilter>,
215  pub(crate) reliability: Option<policy::Reliability>,
216  pub(crate) destination_order: Option<policy::DestinationOrder>,
217  pub(crate) history: Option<policy::History>,
218  pub(crate) resource_limits: Option<policy::ResourceLimits>,
219  pub(crate) lifespan: Option<policy::Lifespan>,
220  #[cfg(feature = "security")]
221  pub(crate) property: Option<policy::Property>,
222}
223
224impl QosPolicies {
225  // TODO: rename this to "none", as this is already member of QosPolicies, so
226  // context in implied
227  pub fn qos_none() -> Self {
228    Self::default()
229  }
230
231  pub fn builder() -> QosPolicyBuilder {
232    QosPolicyBuilder::new()
233  }
234
235  pub const fn durability(&self) -> Option<policy::Durability> {
236    self.durability
237  }
238
239  pub fn is_volatile(&self) -> bool {
240    matches!(self.durability, Some(policy::Durability::Volatile))
241  }
242
243  pub const fn presentation(&self) -> Option<policy::Presentation> {
244    self.presentation
245  }
246
247  pub const fn deadline(&self) -> Option<policy::Deadline> {
248    self.deadline
249  }
250
251  pub const fn latency_budget(&self) -> Option<policy::LatencyBudget> {
252    self.latency_budget
253  }
254
255  pub const fn ownership(&self) -> Option<policy::Ownership> {
256    self.ownership
257  }
258
259  pub const fn liveliness(&self) -> Option<policy::Liveliness> {
260    self.liveliness
261  }
262
263  pub const fn time_based_filter(&self) -> Option<policy::TimeBasedFilter> {
264    self.time_based_filter
265  }
266
267  pub const fn reliability(&self) -> Option<policy::Reliability> {
268    self.reliability
269  }
270
271  pub fn is_reliable(&self) -> bool {
272    matches!(self.reliability, Some(policy::Reliability::Reliable { .. }))
273  }
274
275  pub const fn reliable_max_blocking_time(&self) -> Option<Duration> {
276    if let Some(policy::Reliability::Reliable { max_blocking_time }) = self.reliability {
277      Some(max_blocking_time)
278    } else {
279      None
280    }
281  }
282
283  pub const fn destination_order(&self) -> Option<policy::DestinationOrder> {
284    self.destination_order
285  }
286
287  pub const fn history(&self) -> Option<policy::History> {
288    self.history
289  }
290
291  pub const fn resource_limits(&self) -> Option<policy::ResourceLimits> {
292    self.resource_limits
293  }
294
295  pub const fn lifespan(&self) -> Option<policy::Lifespan> {
296    self.lifespan
297  }
298
299  #[cfg(feature = "security")]
300  pub fn property(&self) -> Option<policy::Property> {
301    self.property.clone()
302  }
303
304  /// Merge two QosPolicies
305  ///
306  /// Constructs a QosPolicy, where each policy is taken from `self`,
307  /// and overwritten with those policies from `other` that are defined.
308  #[must_use]
309  pub fn modify_by(&self, other: &Self) -> Self {
310    Self {
311      durability: other.durability.or(self.durability),
312      presentation: other.presentation.or(self.presentation),
313      deadline: other.deadline.or(self.deadline),
314      latency_budget: other.latency_budget.or(self.latency_budget),
315      ownership: other.ownership.or(self.ownership),
316      liveliness: other.liveliness.or(self.liveliness),
317      time_based_filter: other.time_based_filter.or(self.time_based_filter),
318      reliability: other.reliability.or(self.reliability),
319      destination_order: other.destination_order.or(self.destination_order),
320      history: other.history.or(self.history),
321      resource_limits: other.resource_limits.or(self.resource_limits),
322      lifespan: other.lifespan.or(self.lifespan),
323      #[cfg(feature = "security")]
324      property: other.property.clone().or(self.property.clone()),
325    }
326  }
327
328  /// Check if policy complies to another policy.
329  ///
330  /// `self` is the "offered" (publisher) QoS
331  /// `other` is the "requested" (subscriber) QoS
332  ///
333  /// * None => Policies are compatible
334  /// * Some(policyId) => Failure, where policyId is (any) one of the policies
335  ///   causing incompliance
336  ///
337  /// Compliance (compatibility) is defined in the table in DDS spec v1.4
338  /// Section "2.2.3 Supported QoS"
339  ///
340  /// This is not symmetric.
341  pub fn compliance_failure_wrt(&self, other: &Self) -> Option<QosPolicyId> {
342    trace!(
343      "QoS compatibility check - offered: {:?} - requested {:?}",
344      self,
345      other
346    );
347    let result = self.compliance_failure_wrt_impl(other);
348    trace!("Result: {:?}", result);
349    result
350  }
351
352  fn compliance_failure_wrt_impl(&self, other: &Self) -> Option<QosPolicyId> {
353    // TODO: Check for cases where policy is requested, but not offered (None)
354
355    // check Durability: Offered must be better than or equal to Requested.
356    if let (Some(off), Some(req)) = (self.durability, other.durability) {
357      if off < req {
358        return Some(QosPolicyId::Durability);
359      }
360    }
361
362    // check Presentation:
363    // * If coherent_access is requested, it must be offered also. AND
364    // * Same for ordered_access. AND
365    // * Offered access scope is broader than requested.
366    if let (Some(off), Some(req)) = (self.presentation, other.presentation) {
367      if (req.coherent_access && !off.coherent_access)
368        || (req.ordered_access && !off.ordered_access)
369        || (req.access_scope > off.access_scope)
370      {
371        return Some(QosPolicyId::Presentation);
372      }
373    }
374
375    // check Deadline: offered period <= requested period
376    if let (Some(off), Some(req)) = (self.deadline, other.deadline) {
377      if off.0 > req.0 {
378        return Some(QosPolicyId::Deadline);
379      }
380    }
381
382    // check Latency Budget:
383    // offered duration <= requested duration
384    if let (Some(off), Some(req)) = (self.latency_budget, other.latency_budget) {
385      if off.duration > req.duration {
386        return Some(QosPolicyId::LatencyBudget);
387      }
388    }
389
390    // check Ownership:
391    // offered kind == requested kind
392    if let (Some(off), Some(req)) = (self.ownership, other.ownership) {
393      if off != req {
394        return Some(QosPolicyId::Ownership);
395      }
396    }
397
398    // check Liveliness
399    // offered kind >= requested kind
400    // Definition: AUTOMATIC < MANUAL_BY_PARTICIPANT < MANUAL_BY_TOPIC
401    // AND offered lease_duration <= requested lease_duration
402    //
403    // See Ord implementation on Liveliness.
404    if let (Some(off), Some(req)) = (self.liveliness, other.liveliness) {
405      if off < req {
406        return Some(QosPolicyId::Liveliness);
407      }
408    }
409
410    // check Reliability
411    // offered kind >= requested kind
412    // kind ranking: BEST_EFFORT < RELIABLE
413    if let (Some(off), Some(req)) = (self.reliability, other.reliability) {
414      if off < req {
415        return Some(QosPolicyId::Reliability);
416      }
417    }
418
419    // check Destination Order
420    // offered kind >= requested kind
421    // kind ranking: BY_RECEPTION_TIMESTAMP < BY_SOURCE_TIMESTAMP
422    if let (Some(off), Some(req)) = (self.destination_order, other.destination_order) {
423      if off < req {
424        return Some(QosPolicyId::DestinationOrder);
425      }
426    }
427
428    // default value. no incompatibility detected.
429    None
430  }
431
432  // serialization
433  pub fn to_parameter_list(
434    &self,
435    ctx: speedy::Endianness,
436  ) -> Result<Vec<Parameter>, PlCdrSerializeError> {
437    let mut pl = Vec::with_capacity(8);
438
439    let QosPolicies {
440      // bind self to (a) destructure, and (b) ensure all fields are handled
441      durability,
442      presentation,
443      deadline,
444      latency_budget,
445      ownership,
446      liveliness,
447      time_based_filter,
448      reliability,
449      destination_order,
450      history,
451      resource_limits,
452      lifespan,
453      #[cfg(feature = "security")]
454        property: _, // TODO: properties to parameter list?
455    } = self;
456
457    macro_rules! emit {
458      ($pid:ident, $member:expr, $type:ty) => {
459        pl.push(Parameter::new(ParameterId::$pid, {
460          let m: &$type = $member;
461          m.write_to_vec_with_ctx(ctx)?
462        }))
463      };
464    }
465    macro_rules! emit_option {
466      ($pid:ident, $member:expr, $type:ty) => {
467        if let Some(m) = $member {
468          emit!($pid, m, $type)
469        }
470      };
471    }
472
473    use policy::*;
474
475    emit_option!(PID_DURABILITY, durability, Durability);
476    emit_option!(PID_PRESENTATION, presentation, Presentation);
477    emit_option!(PID_DEADLINE, deadline, Deadline);
478    emit_option!(PID_LATENCY_BUDGET, latency_budget, LatencyBudget);
479
480    // Ownership serializes to (maybe) two separate Parameters
481    match ownership {
482      Some(Ownership::Exclusive { strength }) => {
483        emit!(PID_OWNERSHIP, &OwnershipKind::Exclusive, OwnershipKind);
484        emit!(PID_OWNERSHIP_STRENGTH, strength, i32);
485      }
486      Some(Ownership::Shared) => {
487        emit!(PID_OWNERSHIP, &OwnershipKind::Shared, OwnershipKind);
488      }
489      None => (),
490    }
491    // This should serialize as is
492    emit_option!(PID_LIVELINESS, liveliness, policy::Liveliness);
493    emit_option!(
494      PID_TIME_BASED_FILTER,
495      time_based_filter,
496      policy::TimeBasedFilter
497    );
498
499    if let Some(rel) = reliability.as_ref() {
500      let reliability_ser = match rel {
501        Reliability::BestEffort => ReliabilitySerialization {
502          reliability_kind: ReliabilityKind::BestEffort,
503          max_blocking_time: Duration::ZERO, // dummy value for serialization
504        },
505        Reliability::Reliable { max_blocking_time } => ReliabilitySerialization {
506          reliability_kind: ReliabilityKind::Reliable,
507          max_blocking_time: *max_blocking_time,
508        },
509      };
510      emit!(PID_RELIABILITY, &reliability_ser, ReliabilitySerialization);
511    }
512
513    emit_option!(
514      PID_DESTINATION_ORDER,
515      destination_order,
516      policy::DestinationOrder
517    );
518
519    if let Some(history) = history.as_ref() {
520      let history_ser = match history {
521        History::KeepLast { depth } => HistorySerialization {
522          kind: HistoryKind::KeepLast,
523          depth: *depth,
524        },
525        History::KeepAll => HistorySerialization {
526          kind: HistoryKind::KeepAll,
527          depth: 0,
528        },
529      };
530      emit!(PID_HISTORY, &history_ser, HistorySerialization);
531    }
532    emit_option!(PID_RESOURCE_LIMITS, resource_limits, policy::ResourceLimits);
533    emit_option!(PID_LIFESPAN, lifespan, policy::Lifespan);
534
535    Ok(pl)
536  }
537
538  pub fn from_parameter_list(
539    ctx: speedy::Endianness,
540    pl_map: &BTreeMap<ParameterId, Vec<&Parameter>>,
541  ) -> Result<QosPolicies, PlCdrDeserializeError> {
542    macro_rules! get_option {
543      ($pid:ident) => {
544        get_option_from_pl_map(pl_map, ctx, ParameterId::$pid, "<not_used>")?
545      };
546    }
547
548    let durability: Option<policy::Durability> = get_option!(PID_DURABILITY);
549    let presentation: Option<policy::Presentation> = get_option!(PID_PRESENTATION);
550    let deadline: Option<policy::Deadline> = get_option!(PID_DEADLINE);
551    let latency_budget: Option<policy::LatencyBudget> = get_option!(PID_LATENCY_BUDGET);
552
553    // Ownership is in 2 parts
554    let ownership_kind: Option<OwnershipKind> = get_option!(PID_OWNERSHIP);
555    let ownership_strength: Option<i32> = get_option!(PID_OWNERSHIP_STRENGTH);
556    let ownership = match (ownership_kind, ownership_strength) {
557      (Some(OwnershipKind::Shared), None) => Some(policy::Ownership::Shared),
558      (Some(OwnershipKind::Shared), Some(_strength)) => {
559        warn!("QosPolicies deserializer: Received OwnershipKind::Shared and a strength value.");
560        None
561      }
562      (Some(OwnershipKind::Exclusive), Some(strength)) => {
563        Some(policy::Ownership::Exclusive { strength })
564      }
565      (Some(OwnershipKind::Exclusive), None) => {
566        warn!("QosPolicies deserializer: Received OwnershipKind::Exclusive but no strength value.");
567        None
568      }
569      (None, Some(_strength)) => {
570        warn!(
571          "QosPolicies deserializer: Received ownership strength value, but no kind parameter."
572        );
573        None
574      }
575      (None, None) => None,
576    };
577
578    let reliability_ser: Option<ReliabilitySerialization> = get_option!(PID_RELIABILITY);
579    let reliability = reliability_ser.map(|rs| match rs.reliability_kind {
580      ReliabilityKind::BestEffort => policy::Reliability::BestEffort,
581      ReliabilityKind::Reliable => policy::Reliability::Reliable {
582        max_blocking_time: rs.max_blocking_time,
583      },
584    });
585    let destination_order: Option<policy::DestinationOrder> = get_option!(PID_DESTINATION_ORDER);
586
587    let history_ser: Option<HistorySerialization> = get_option!(PID_HISTORY);
588    let history = history_ser.map(|h| match h.kind {
589      HistoryKind::KeepAll => policy::History::KeepAll,
590      HistoryKind::KeepLast => policy::History::KeepLast { depth: h.depth },
591    });
592
593    let liveliness: Option<policy::Liveliness> = get_option!(PID_LIVELINESS);
594    let time_based_filter: Option<policy::TimeBasedFilter> = get_option!(PID_TIME_BASED_FILTER);
595
596    let resource_limits: Option<policy::ResourceLimits> = get_option!(PID_RESOURCE_LIMITS);
597    let lifespan: Option<policy::Lifespan> = get_option!(PID_LIFESPAN);
598
599    #[cfg(feature = "security")]
600    let property: Option<policy::Property> = None; // TODO: Should also properties be read?
601
602    // We construct using the struct syntax directly rather than the builder,
603    // so we cannot forget any field.
604    Ok(QosPolicies {
605      durability,
606      presentation,
607      deadline,
608      latency_budget,
609      ownership,
610      liveliness,
611      time_based_filter,
612      reliability,
613      destination_order,
614      history,
615      resource_limits,
616      lifespan,
617      #[cfg(feature = "security")]
618      property,
619    })
620  }
621}
622
623#[derive(Writable, Readable, Clone)]
624//#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
625enum HistoryKind {
626  KeepLast,
627  KeepAll,
628}
629
630#[derive(Writable, Readable, Clone)]
631struct HistorySerialization {
632  pub kind: HistoryKind,
633  pub depth: i32,
634}
635
636#[derive(Writable, Readable)]
637//#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
638enum OwnershipKind {
639  Shared,
640  Exclusive,
641}
642
643#[derive(Writable, Readable, Clone)]
644struct ReliabilitySerialization {
645  pub reliability_kind: ReliabilityKind,
646  pub max_blocking_time: Duration,
647}
648
649// DDS spec v1.4 p.139
650// TODO: Replace this with Option construct so that
651// None means no limit and Some(limit) gives the limit when defined.
652// Use is in resource_limits.
653pub const LENGTH_UNLIMITED: i32 = -1;
654
655// put these into a submodule to avoid repeating the word "policy" or
656// "qospolicy"
657/// Contains all available QoSPolicies
658pub mod policy {
659  use std::cmp::Ordering;
660
661  use speedy::{Readable, Writable};
662  use serde::{Deserialize, Serialize};
663  #[allow(unused_imports)]
664  use log::{debug, error, info, trace, warn};
665  #[cfg(feature = "security")]
666  use speedy::{Context, IsEof, Reader, Writer};
667
668  use crate::structure::duration::Duration;
669  #[cfg(feature = "security")]
670  use crate::serialization::speedy_pl_cdr_helpers::*;
671
672  /*
673  pub struct UserData {
674    pub value: Vec<u8>,
675  }
676
677  pub struct TopicData {
678    pub value: Vec<u8>,
679  }
680
681  pub struct GroupData {
682    pub value: Vec<u8>,
683  }
684
685  pub struct TransportPriority {
686    pub value: i32,
687  }
688  */
689
690  /// DDS 2.2.3.16 LIFESPAN
691  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
692  pub struct Lifespan {
693    pub duration: Duration,
694  }
695
696  /// DDS 2.2.3.4 DURABILITY
697  ///
698  /// DDS Spec 1.4:
699  ///
700  /// "This QoS policy controls whether the Service will actually make data
701  /// available to late-joining readers. Note that although related, this does
702  /// not strictly control what data the Service will maintain internally. That
703  /// is, the Service may choose to maintain some data for its own purposes
704  /// (e.g., flow control) and yet not make it available to late-joining readers
705  /// if the DURABILITY QoS policy is set to VOLATILE."
706  #[derive(
707    Copy,
708    Clone,
709    Debug,
710    PartialEq,
711    Eq,
712    PartialOrd,
713    Ord,
714    Hash,
715    Readable,
716    Writable,
717    Serialize,
718    Deserialize,
719  )]
720  pub enum Durability {
721    Volatile,
722    TransientLocal,
723    Transient,
724    Persistent,
725  }
726
727  /// DDS 2.2.3.6 PRESENTATION
728  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
729  pub struct Presentation {
730    pub access_scope: PresentationAccessScope,
731    pub coherent_access: bool,
732    pub ordered_access: bool,
733  }
734
735  /// Access scope that is part of DDS 2.2.3.6 PRESENTATION
736  #[derive(
737    Copy,
738    Clone,
739    Debug,
740    PartialEq,
741    Eq,
742    PartialOrd,
743    Ord,
744    Hash,
745    Readable,
746    Writable,
747    Serialize,
748    Deserialize,
749  )]
750  pub enum PresentationAccessScope {
751    Instance,
752    Topic,
753    Group,
754  }
755
756  /// DDS 2.2.3.7 DEADLINE
757  #[derive(
758    Copy,
759    Clone,
760    Debug,
761    PartialEq,
762    Eq,
763    Ord,
764    PartialOrd,
765    Hash,
766    Readable,
767    Writable,
768    Serialize,
769    Deserialize,
770  )]
771  pub struct Deadline(pub Duration);
772
773  /// DDS 2.2.3.8 LATENCY_BUDGET
774  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
775  pub struct LatencyBudget {
776    pub duration: Duration,
777  }
778
779  /// DDS 2.2.3.9 OWNERSHIP
780  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
781  pub enum Ownership {
782    Shared,
783    Exclusive { strength: i32 }, // This also implements OwnershipStrength
784  }
785
786  /// DDS 2.2.3.11 LIVELINESS
787  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
788  pub enum Liveliness {
789    Automatic { lease_duration: Duration },
790    ManualByParticipant { lease_duration: Duration },
791    ManualByTopic { lease_duration: Duration },
792  }
793
794  impl Liveliness {
795    fn kind_num(&self) -> i32 {
796      match self {
797        Self::Automatic { .. } => 0,
798        Self::ManualByParticipant { .. } => 1,
799        Self::ManualByTopic { .. } => 2,
800      }
801    }
802
803    pub fn duration(&self) -> Duration {
804      match self {
805        Self::Automatic { lease_duration }
806        | Self::ManualByParticipant { lease_duration }
807        | Self::ManualByTopic { lease_duration } => *lease_duration,
808      }
809    }
810  }
811
812  impl Ord for Liveliness {
813    fn cmp(&self, other: &Self) -> Ordering {
814      // Manual liveliness is greater than automatic, but
815      // duration compares in reverse
816      other
817        .kind_num()
818        .cmp(&other.kind_num())
819        .then_with(|| self.duration().cmp(&other.duration()).reverse())
820    }
821  }
822
823  impl PartialOrd for Liveliness {
824    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
825      Some(self.cmp(other))
826    }
827  }
828
829  /// DDS 2.2.3.12 TIME_BASED_FILTER
830  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Readable, Writable, Serialize, Deserialize)]
831  pub struct TimeBasedFilter {
832    pub minimum_separation: Duration,
833  }
834
835  /*
836  pub struct Partition {
837    pub name: Vec<Vec<u8>>,
838  }
839  */
840
841  /// DDS 2.2.3.14 RELIABILITY
842  #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
843  pub enum Reliability {
844    BestEffort,
845    Reliable { max_blocking_time: Duration },
846  }
847
848  impl Ord for Reliability {
849    // max_blocking_time is not compared.
850    // TODO: This is kind of bad, because now Eq and Ord are inconsistent:
851    // If we have A and B both Reliability::Reliable, but with different
852    // max_blocking_time, then Ord will say they are Ordering::Equal,
853    // but Eq will say they are not equal.
854    fn cmp(&self, other: &Self) -> Ordering {
855      match (self, other) {
856        (Self::BestEffort, Self::BestEffort) | (Self::Reliable { .. }, Self::Reliable { .. }) => {
857          Ordering::Equal
858        }
859        (Self::BestEffort, Self::Reliable { .. }) => Ordering::Less,
860        (Self::Reliable { .. }, Self::BestEffort) => Ordering::Greater,
861      }
862    }
863  }
864
865  impl PartialOrd for Reliability {
866    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
867      Some(self.cmp(other))
868    }
869  }
870
871  /// DDS 2.2.3.17 DESTINATION_ORDER
872  #[derive(
873    Copy,
874    Clone,
875    Debug,
876    PartialEq,
877    Eq,
878    Ord,
879    PartialOrd,
880    Hash,
881    Readable,
882    Writable,
883    Serialize,
884    Deserialize,
885  )]
886  pub enum DestinationOrder {
887    ByReceptionTimestamp,
888    BySourceTimeStamp,
889  }
890
891  /// DDS 2.2.3.18 HISTORY
892  #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
893  pub enum History {
894    // Variants must be in this order ot derive Ord correctly.
895    KeepLast { depth: i32 },
896    KeepAll,
897  }
898
899  /// DDS Spec v1.4 Section 2.2.3.19 RESOURCE_LIMITS
900  ///
901  /// DDS Spec v1.4 p.147 "struct ResourceLimitsQosPolicy" defines the
902  /// fields as "long". The "long" type of OMG IDL is defined to have
903  /// 32-bit (signed, 2's complement) range in the OMG IDL spec v4.2, Table
904  /// 7-13: Integer Types.
905  ///
906  /// But it does not make sense to have negative limits, so these should be
907  /// unsigned.
908  ///
909  /// Negative values are needed, because DDS spec defines the special value
910  /// const long LENGTH_UNLIMITED = -1;
911  #[derive(Copy, Clone, Debug, PartialEq, Eq, Writable, Readable, Serialize, Deserialize)]
912  pub struct ResourceLimits {
913    pub max_samples: i32,
914    pub max_instances: i32,
915    pub max_samples_per_instance: i32,
916  }
917
918  #[cfg(feature = "security")]
919  use crate::security;
920  // DDS Security spec v1.1
921  // Section 7.2.5 PropertyQosPolicy, DomainParticipantQos, DataWriterQos, and
922  // DataReaderQos
923  #[cfg(feature = "security")]
924  #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
925  pub struct Property {
926    pub value: Vec<security::types::Property>,
927    pub binary_value: Vec<security::types::BinaryProperty>,
928  }
929
930  #[cfg(feature = "security")]
931  impl<'a, C: Context> Readable<'a, C> for Property {
932    fn read_from<R: Reader<'a, C>>(reader: &mut R) -> Result<Self, C::Error> {
933      let count = reader.read_u32()?;
934      let mut value = Vec::new();
935
936      let mut prev_len = 0;
937      for _ in 0..count {
938        read_pad(reader, prev_len, 4)?;
939        let s: security::types::Property = reader.read_value()?;
940        prev_len = s.serialized_len();
941        value.push(s);
942      }
943
944      // Depending on the RTPS version used by writer, PropertyQoSPolicy may end here,
945      // i.e. there is no "binary_value". Pad should still always exist.
946      read_pad(reader, prev_len, 4)?;
947      let mut binary_value = Vec::new();
948
949      match reader.read_u32() {
950        Ok(count) => {
951          prev_len = 0;
952          for _ in 0..count {
953            read_pad(reader, prev_len, 4)?;
954            let s: security::types::BinaryProperty = reader.read_value()?;
955            prev_len = s.serialized_len();
956            binary_value.push(s);
957          }
958        }
959        Err(e) => {
960          if e.is_eof() {
961            // This is ok. Only String properties, no binary.
962            debug!("Non-security PropertyQosPolicy");
963          } else {
964            return Err(e);
965          }
966        }
967      }
968
969      Ok(Property {
970        value,
971        binary_value,
972      })
973    }
974  }
975
976  // Writing several strings is a bit complicated, because
977  // we have to keep track of alignment.
978  // Again, alignment comes BEFORE string length, or vector item count, not after
979  // string.
980  #[cfg(feature = "security")]
981  impl<C: Context> Writable<C> for Property {
982    fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
983      // First self.value
984      // Only those properties with propagate=true are written
985      let propagate_value: Vec<&security::Property> =
986        self.value.iter().filter(|p| p.propagate).collect();
987
988      // propagate_value vector length
989      writer.write_u32(propagate_value.len() as u32)?;
990
991      let mut prev_len = 0;
992      for prop in propagate_value {
993        write_pad(writer, prev_len, 4)?;
994        writer.write_value(prop)?;
995        prev_len = prop.serialized_len();
996      }
997
998      // Then self.bin_value
999      // Only those binary properties with propagate=true are written
1000      let propagate_bin_value: Vec<&security::BinaryProperty> =
1001        self.binary_value.iter().filter(|p| p.propagate).collect();
1002
1003      // propagate_bin_value vector length
1004      write_pad(writer, prev_len, 4)?;
1005      writer.write_u32(propagate_bin_value.len() as u32)?;
1006      // and the elements
1007      let mut prev_len = 0;
1008      for prop in propagate_bin_value {
1009        write_pad(writer, prev_len, 4)?;
1010        writer.write_value(prop)?;
1011        prev_len = prop.serialized_len();
1012      }
1013
1014      Ok(())
1015    }
1016  }
1017
1018  // DDS Security spec v1.1
1019  // Section 7.2.5 PropertyQosPolicy, DomainParticipantQos, DataWriterQos, and
1020  // DataReaderQos
1021  //
1022  // so this is DataTagQosPolicy, which is an alias for "DataTags"
1023  // We call it qos::policy::DataTag
1024  #[derive(Clone, Debug, PartialEq, Eq, Default)]
1025  #[cfg(feature = "security")]
1026  pub struct DataTag {
1027    pub tags: Vec<security::types::Tag>,
1028  }
1029
1030  #[cfg(feature = "security")]
1031  impl<'a, C: Context> Readable<'a, C> for DataTag {
1032    fn read_from<R: Reader<'a, C>>(reader: &mut R) -> Result<Self, C::Error> {
1033      let count = reader.read_u32()?;
1034      let mut tags = Vec::new();
1035
1036      let mut prev_len = 0;
1037      for _ in 0..count {
1038        read_pad(reader, prev_len, 4)?;
1039        let s: security::types::Tag = reader.read_value()?;
1040        prev_len = s.serialized_len();
1041        tags.push(s);
1042      }
1043      Ok(DataTag { tags })
1044    }
1045  }
1046
1047  // Writing several strings is a bit complicated, because
1048  // we have to keep track of alignment.
1049  // Again, alignment comes BEFORE string length, or vector item count, not after
1050  // string.
1051  #[cfg(feature = "security")]
1052  impl<C: Context> Writable<C> for DataTag {
1053    fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
1054      writer.write_u32(self.tags.len() as u32)?;
1055
1056      let mut prev_len = 0;
1057      for tag in &self.tags {
1058        write_pad(writer, prev_len, 4)?;
1059        writer.write_value(tag)?;
1060        prev_len = tag.serialized_len();
1061      }
1062      Ok(())
1063    }
1064  }
1065} // mod policy