use alloc::string::String;
use alloc::vec::Vec;
use crate::errors::{AmqpError, instance_unknown, register_missing_key, unknown_dds_operation};
use crate::properties::DdsOperation;
#[derive(Debug, Clone)]
pub struct InboundOperation {
pub operation: DdsOperation,
pub topic: String,
pub instance_handle: Option<[u8; 16]>,
pub body: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DispatchOutcome {
Accepted,
UnknownInstance,
RegisterMissingKey,
UnknownOperation(String),
}
impl DispatchOutcome {
#[must_use]
pub fn to_amqp_error(&self, key_hex: &str) -> Option<AmqpError> {
match self {
Self::Accepted => None,
Self::UnknownInstance => Some(instance_unknown("unregister/dispose", key_hex)),
Self::RegisterMissingKey => Some(register_missing_key()),
Self::UnknownOperation(v) => Some(unknown_dds_operation(v)),
}
}
}
pub trait DdsOperationDispatcher {
fn dispatch(&self, op: &InboundOperation) -> DispatchOutcome;
}
#[derive(Debug, Default)]
pub struct AcceptAllDispatcher;
impl DdsOperationDispatcher for AcceptAllDispatcher {
fn dispatch(&self, _op: &InboundOperation) -> DispatchOutcome {
DispatchOutcome::Accepted
}
}
#[derive(Debug, Default)]
pub struct InstanceTrackingDispatcher {
known: alloc::collections::BTreeSet<[u8; 16]>,
}
impl InstanceTrackingDispatcher {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn known_count(&self) -> usize {
self.known.len()
}
}
impl DdsOperationDispatcher for InstanceTrackingDispatcher {
fn dispatch(&self, op: &InboundOperation) -> DispatchOutcome {
match op.operation {
DdsOperation::Register => {
if op.body.is_empty() {
return DispatchOutcome::RegisterMissingKey;
}
DispatchOutcome::Accepted
}
DdsOperation::Unregister | DdsOperation::Dispose => {
if let Some(h) = op.instance_handle {
if !self.known.contains(&h) {
return DispatchOutcome::UnknownInstance;
}
}
DispatchOutcome::Accepted
}
DdsOperation::Write => DispatchOutcome::Accepted,
}
}
}
impl InstanceTrackingDispatcher {
pub fn register_instance(&mut self, handle: [u8; 16]) {
self.known.insert(handle);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DispositionState {
Accepted,
Rejected,
Released,
Modified,
}
pub trait DispositionMapper {
fn apply(&self, sample_handle: [u8; 16], state: DispositionState);
}
#[derive(Debug, Default)]
pub struct NoopDispositionMapper;
impl DispositionMapper for NoopDispositionMapper {
fn apply(&self, _: [u8; 16], _: DispositionState) {}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn op(operation: DdsOperation) -> InboundOperation {
InboundOperation {
operation,
topic: "T".into(),
instance_handle: Some([0u8; 16]),
body: alloc::vec![1, 2, 3],
}
}
#[test]
fn accept_all_returns_accepted_for_every_operation() {
let d = AcceptAllDispatcher;
for o in [
DdsOperation::Write,
DdsOperation::Register,
DdsOperation::Unregister,
DdsOperation::Dispose,
] {
assert_eq!(d.dispatch(&op(o)), DispatchOutcome::Accepted);
}
}
#[test]
fn instance_tracking_register_with_body_accepts() {
let d = InstanceTrackingDispatcher::new();
let r = d.dispatch(&op(DdsOperation::Register));
assert_eq!(r, DispatchOutcome::Accepted);
}
#[test]
fn instance_tracking_register_empty_body_yields_missing_key() {
let d = InstanceTrackingDispatcher::new();
let mut o = op(DdsOperation::Register);
o.body = alloc::vec![];
assert_eq!(d.dispatch(&o), DispatchOutcome::RegisterMissingKey);
}
#[test]
fn instance_tracking_unregister_unknown_yields_unknown_instance() {
let d = InstanceTrackingDispatcher::new();
let r = d.dispatch(&op(DdsOperation::Unregister));
assert_eq!(r, DispatchOutcome::UnknownInstance);
}
#[test]
fn instance_tracking_dispose_unknown_yields_unknown_instance() {
let d = InstanceTrackingDispatcher::new();
let r = d.dispatch(&op(DdsOperation::Dispose));
assert_eq!(r, DispatchOutcome::UnknownInstance);
}
#[test]
fn instance_tracking_unregister_known_accepts() {
let mut d = InstanceTrackingDispatcher::new();
d.register_instance([1u8; 16]);
let mut o = op(DdsOperation::Unregister);
o.instance_handle = Some([1u8; 16]);
assert_eq!(d.dispatch(&o), DispatchOutcome::Accepted);
}
#[test]
fn dispatch_outcome_to_amqp_error_maps_correctly() {
assert!(DispatchOutcome::Accepted.to_amqp_error("k").is_none());
let e = DispatchOutcome::UnknownInstance
.to_amqp_error("key-7")
.unwrap();
assert_eq!(
e.condition,
crate::errors::AmqpErrorCondition::PreconditionFailed
);
let e = DispatchOutcome::RegisterMissingKey
.to_amqp_error("k")
.unwrap();
assert_eq!(e.condition, crate::errors::AmqpErrorCondition::DecodeError);
let e = DispatchOutcome::UnknownOperation("teleport".into())
.to_amqp_error("k")
.unwrap();
assert_eq!(
e.condition,
crate::errors::AmqpErrorCondition::NotImplemented
);
}
#[test]
fn noop_disposition_mapper_does_nothing() {
let m = NoopDispositionMapper;
m.apply([0u8; 16], DispositionState::Accepted);
m.apply([0u8; 16], DispositionState::Rejected);
m.apply([0u8; 16], DispositionState::Released);
m.apply([0u8; 16], DispositionState::Modified);
}
}