use zerodds_qos::{
DurabilityKind, HistoryKind, HistoryQosPolicy, ReaderQos, ReliabilityKind, WriterQos,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackpressureMode {
Block,
DropOldest,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReplayMode {
None,
LastN(u32),
All,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadlineMode {
None,
Emit {
period_ms: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WsBehavior {
pub backpressure: BackpressureMode,
pub replay: ReplayMode,
pub deadline: DeadlineMode,
pub send_queue_capacity: usize,
}
impl Default for WsBehavior {
fn default() -> Self {
Self {
backpressure: BackpressureMode::Block,
replay: ReplayMode::None,
deadline: DeadlineMode::None,
send_queue_capacity: 1024,
}
}
}
impl WsBehavior {
#[must_use]
pub fn default_for_topic() -> Self {
let writer = WriterQos::default();
let reader = ReaderQos::default();
dds_qos_to_ws_behavior(&writer, &reader)
}
}
#[must_use]
pub fn dds_qos_to_ws_behavior(writer: &WriterQos, reader: &ReaderQos) -> WsBehavior {
let backpressure = match (writer.reliability.kind, reader.reliability.kind) {
(ReliabilityKind::Reliable, _) | (_, ReliabilityKind::Reliable) => BackpressureMode::Block,
_ => BackpressureMode::DropOldest,
};
let replay = replay_for(writer.durability.kind, &writer.history);
let deadline = deadline_for(&writer.deadline);
let cap = capacity_for(&writer.history);
WsBehavior {
backpressure,
replay,
deadline,
send_queue_capacity: cap,
}
}
fn replay_for(d: DurabilityKind, history: &HistoryQosPolicy) -> ReplayMode {
match d {
DurabilityKind::Volatile => ReplayMode::None,
DurabilityKind::TransientLocal => match history.kind {
HistoryKind::KeepLast => ReplayMode::LastN(history.depth.max(1) as u32),
HistoryKind::KeepAll => ReplayMode::All,
},
DurabilityKind::Transient | DurabilityKind::Persistent => ReplayMode::All,
}
}
fn deadline_for(d: &zerodds_qos::DeadlineQosPolicy) -> DeadlineMode {
if d.period == zerodds_qos::Duration::INFINITE || d.period == zerodds_qos::Duration::ZERO {
return DeadlineMode::None;
}
let frac_ms = ((d.period.fraction as u64) * 1000) >> 32;
let total_ms = (d.period.seconds.max(0) as u64) * 1000 + frac_ms;
if total_ms == 0 {
DeadlineMode::None
} else {
DeadlineMode::Emit {
period_ms: total_ms,
}
}
}
fn capacity_for(history: &HistoryQosPolicy) -> usize {
match history.kind {
HistoryKind::KeepLast => (history.depth.max(1) as usize).max(16),
HistoryKind::KeepAll => 8192,
}
}
#[cfg(test)]
#[allow(clippy::panic, clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use zerodds_qos::{Duration, ReliabilityQosPolicy};
#[test]
fn reliable_writer_yields_block_backpressure() {
let mut w = WriterQos::default();
let r = ReaderQos::default();
w.reliability = ReliabilityQosPolicy {
kind: ReliabilityKind::Reliable,
..w.reliability
};
let b = dds_qos_to_ws_behavior(&w, &r);
assert_eq!(b.backpressure, BackpressureMode::Block);
}
#[test]
fn best_effort_yields_drop_oldest() {
let mut w = WriterQos::default();
w.reliability = ReliabilityQosPolicy {
kind: ReliabilityKind::BestEffort,
..w.reliability
};
let mut r = ReaderQos::default();
r.reliability = ReliabilityQosPolicy {
kind: ReliabilityKind::BestEffort,
..r.reliability
};
let b = dds_qos_to_ws_behavior(&w, &r);
assert_eq!(b.backpressure, BackpressureMode::DropOldest);
}
#[test]
fn transient_local_keep_last_yields_replay_lastn() {
let mut w = WriterQos::default();
w.durability.kind = DurabilityKind::TransientLocal;
w.history.kind = HistoryKind::KeepLast;
w.history.depth = 7;
let r = ReaderQos::default();
let b = dds_qos_to_ws_behavior(&w, &r);
assert_eq!(b.replay, ReplayMode::LastN(7));
}
#[test]
fn volatile_yields_no_replay() {
let mut w = WriterQos::default();
w.durability.kind = DurabilityKind::Volatile;
let r = ReaderQos::default();
let b = dds_qos_to_ws_behavior(&w, &r);
assert_eq!(b.replay, ReplayMode::None);
}
#[test]
fn transient_yields_all_replay() {
let mut w = WriterQos::default();
w.durability.kind = DurabilityKind::Transient;
let r = ReaderQos::default();
let b = dds_qos_to_ws_behavior(&w, &r);
assert_eq!(b.replay, ReplayMode::All);
}
#[test]
fn deadline_set_yields_emit_mode() {
let mut w = WriterQos::default();
w.deadline.period = Duration::from_millis(500);
let r = ReaderQos::default();
let b = dds_qos_to_ws_behavior(&w, &r);
match b.deadline {
DeadlineMode::Emit { period_ms } => assert!(period_ms >= 1),
DeadlineMode::None => panic!("expected emit"),
}
}
#[test]
fn deadline_infinite_yields_none() {
let w = WriterQos::default();
let r = ReaderQos::default();
let b = dds_qos_to_ws_behavior(&w, &r);
assert_eq!(b.deadline, DeadlineMode::None);
}
#[test]
fn default_for_topic_uses_writer_defaults() {
let b = WsBehavior::default_for_topic();
assert_eq!(b.backpressure, BackpressureMode::Block);
assert_eq!(b.replay, ReplayMode::None);
}
}