use alloc::collections::VecDeque;
use alloc::sync::Arc;
use alloc::vec::Vec;
use std::sync::Mutex;
use crate::comm::{ConnectError, Disconnected, StructuredPushConsumer};
use crate::event::StructuredEvent;
use crate::filter::Filter;
use crate::qos::QoSProperties;
struct PushLink {
consumer: Arc<dyn StructuredPushConsumer>,
filter: Arc<Filter>,
}
struct ChannelState {
push_links: Vec<PushLink>,
pending: VecDeque<StructuredEvent>,
qos: QoSProperties,
}
struct ChannelInner {
state: Mutex<ChannelState>,
}
impl ChannelInner {
fn deliver(&self, event: StructuredEvent) {
if let Ok(mut s) = self.state.lock() {
for link in &s.push_links {
if link.filter.match_structured(&event) {
let _ = link.consumer.push_structured_event(event.clone());
}
}
let max = s
.qos
.get(crate::qos::MAX_QUEUE_LENGTH)
.and_then(any_as_u32)
.unwrap_or(0);
s.pending.push_back(event);
if max > 0 {
while s.pending.len() > max as usize {
s.pending.pop_front();
}
}
}
}
}
fn any_as_u32(a: &zerodds_cdr::CorbaAny) -> Option<u32> {
match &a.0 {
zerodds_cdr::AnyValue::ULong(v) => Some(*v),
zerodds_cdr::AnyValue::Long(v) if *v >= 0 => Some(*v as u32),
_ => None,
}
}
#[derive(Default)]
pub struct EventChannelFactory {
channels: Mutex<Vec<(i32, Arc<EventChannel>)>>,
next_id: core::sync::atomic::AtomicI32,
}
impl core::fmt::Debug for EventChannelFactory {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("EventChannelFactory")
.finish_non_exhaustive()
}
}
impl EventChannelFactory {
#[must_use]
pub fn new() -> Self {
Self {
channels: Mutex::new(Vec::new()),
next_id: core::sync::atomic::AtomicI32::new(1),
}
}
pub fn create_channel(&self, initial_qos: QoSProperties) -> (Arc<EventChannel>, i32) {
let ch = Arc::new(EventChannel::with_qos(initial_qos));
let id = self
.next_id
.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
if let Ok(mut g) = self.channels.lock() {
g.push((id, Arc::clone(&ch)));
}
(ch, id)
}
#[must_use]
pub fn all_channels(&self) -> Vec<i32> {
self.channels
.lock()
.map(|g| g.iter().map(|(id, _)| *id).collect())
.unwrap_or_default()
}
#[must_use]
pub fn get_event_channel(&self, id: i32) -> Option<Arc<EventChannel>> {
self.channels
.lock()
.ok()?
.iter()
.find(|(cid, _)| *cid == id)
.map(|(_, ch)| Arc::clone(ch))
}
}
pub struct EventChannel {
inner: Arc<ChannelInner>,
consumer_admin: Arc<ConsumerAdmin>,
supplier_admin: Arc<SupplierAdmin>,
}
impl core::fmt::Debug for EventChannel {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("EventChannel").finish_non_exhaustive()
}
}
impl Default for EventChannel {
fn default() -> Self {
Self::with_qos(QoSProperties::new())
}
}
impl EventChannel {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_qos(qos: QoSProperties) -> Self {
let inner = Arc::new(ChannelInner {
state: Mutex::new(ChannelState {
push_links: Vec::new(),
pending: VecDeque::new(),
qos,
}),
});
Self {
consumer_admin: Arc::new(ConsumerAdmin {
inner: Arc::clone(&inner),
}),
supplier_admin: Arc::new(SupplierAdmin {
inner: Arc::clone(&inner),
}),
inner,
}
}
#[must_use]
pub fn for_consumers(&self) -> Arc<ConsumerAdmin> {
Arc::clone(&self.consumer_admin)
}
#[must_use]
pub fn for_suppliers(&self) -> Arc<SupplierAdmin> {
Arc::clone(&self.supplier_admin)
}
pub fn set_qos(&self, props: &crate::event::PropertySeq) {
if let Ok(mut s) = self.inner.state.lock() {
s.qos.apply(props);
}
}
#[must_use]
pub fn get_qos(&self) -> QoSProperties {
self.inner
.state
.lock()
.map(|s| s.qos.clone())
.unwrap_or_default()
}
pub fn destroy(&self) {
if let Ok(mut s) = self.inner.state.lock() {
s.push_links.clear();
s.pending.clear();
}
}
}
pub struct ConsumerAdmin {
inner: Arc<ChannelInner>,
}
impl core::fmt::Debug for ConsumerAdmin {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("ConsumerAdmin").finish_non_exhaustive()
}
}
impl ConsumerAdmin {
#[must_use]
pub fn obtain_structured_push_supplier(&self) -> Arc<StructuredProxyPushSupplier> {
Arc::new(StructuredProxyPushSupplier {
inner: Arc::clone(&self.inner),
filter: Arc::new(Filter::new("EXTENDED_TCL")),
})
}
#[must_use]
pub fn obtain_structured_pull_supplier(&self) -> Arc<StructuredProxyPullSupplier> {
Arc::new(StructuredProxyPullSupplier {
inner: Arc::clone(&self.inner),
filter: Arc::new(Filter::new("EXTENDED_TCL")),
})
}
}
pub struct SupplierAdmin {
inner: Arc<ChannelInner>,
}
impl core::fmt::Debug for SupplierAdmin {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("SupplierAdmin").finish_non_exhaustive()
}
}
impl SupplierAdmin {
#[must_use]
pub fn obtain_structured_push_consumer(&self) -> Arc<StructuredProxyPushConsumer> {
Arc::new(StructuredProxyPushConsumer {
inner: Arc::clone(&self.inner),
})
}
}
pub struct StructuredProxyPushConsumer {
inner: Arc<ChannelInner>,
}
impl StructuredProxyPushConsumer {
pub fn connect_structured_push_supplier(&self) -> Result<(), ConnectError> {
Ok(())
}
pub fn push_structured_event(&self, event: StructuredEvent) {
self.inner.deliver(event);
}
}
pub struct StructuredProxyPushSupplier {
inner: Arc<ChannelInner>,
filter: Arc<Filter>,
}
impl StructuredProxyPushSupplier {
#[must_use]
pub fn filter(&self) -> Arc<Filter> {
Arc::clone(&self.filter)
}
pub fn connect_structured_push_consumer(
&self,
consumer: Arc<dyn StructuredPushConsumer>,
) -> Result<(), Disconnected> {
let mut s = self.inner.state.lock().map_err(|_| Disconnected)?;
s.push_links.push(PushLink {
consumer,
filter: Arc::clone(&self.filter),
});
Ok(())
}
}
pub struct StructuredProxyPullSupplier {
inner: Arc<ChannelInner>,
filter: Arc<Filter>,
}
impl StructuredProxyPullSupplier {
#[must_use]
pub fn filter(&self) -> Arc<Filter> {
Arc::clone(&self.filter)
}
pub fn connect_structured_pull_consumer(&self) -> Result<(), ConnectError> {
Ok(())
}
pub fn try_pull_structured_event(&self) -> Result<(StructuredEvent, bool), Disconnected> {
let mut s = self.inner.state.lock().map_err(|_| Disconnected)?;
while let Some(ev) = s.pending.pop_front() {
if self.filter.match_structured(&ev) {
return Ok((ev, true));
}
}
Ok((StructuredEvent::default(), false))
}
pub fn pull_structured_event(&self) -> Result<StructuredEvent, Disconnected> {
match self.try_pull_structured_event()? {
(ev, true) => Ok(ev),
(_, false) => Err(Disconnected),
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use crate::event::EventType;
use crate::filter::ConstraintExp;
use alloc::string::ToString;
use core::sync::atomic::{AtomicUsize, Ordering};
use zerodds_cdr::{AnyValue, CorbaAny};
struct Recorder {
n: AtomicUsize,
last: Mutex<Option<StructuredEvent>>,
}
impl StructuredPushConsumer for Recorder {
fn push_structured_event(&self, event: StructuredEvent) -> Result<(), Disconnected> {
self.n.fetch_add(1, Ordering::Relaxed);
if let Ok(mut g) = self.last.lock() {
*g = Some(event);
}
Ok(())
}
fn disconnect(&self) {}
}
fn ev(t: &str, body: &str) -> StructuredEvent {
StructuredEvent::new("D", t, "n", CorbaAny(AnyValue::Str(body.into())))
}
#[test]
fn push_fanout_to_connected_consumer() {
let ch = EventChannel::new();
let rec = Arc::new(Recorder {
n: AtomicUsize::new(0),
last: Mutex::new(None),
});
let sup = ch.for_consumers().obtain_structured_push_supplier();
sup.connect_structured_push_consumer(rec.clone()).unwrap();
let pc = ch.for_suppliers().obtain_structured_push_consumer();
pc.connect_structured_push_supplier().unwrap();
pc.push_structured_event(ev("T", "hello"));
assert_eq!(rec.n.load(Ordering::Relaxed), 1);
assert_eq!(
rec.last.lock().unwrap().as_ref().unwrap().remainder_of_body,
CorbaAny(AnyValue::Str("hello".into()))
);
}
#[test]
fn pull_supplier_filters_by_event_type() {
let ch = EventChannel::new();
let pull = ch.for_consumers().obtain_structured_pull_supplier();
pull.filter().add_constraints(alloc::vec![ConstraintExp {
event_types: alloc::vec![EventType::new("D", "Wanted")],
constraint_expr: alloc::string::String::new(),
}]);
let pc = ch.for_suppliers().obtain_structured_push_consumer();
pc.push_structured_event(ev("Unwanted", "x"));
pc.push_structured_event(ev("Wanted", "y"));
let (got, has) = pull.try_pull_structured_event().unwrap();
assert!(has);
assert_eq!(got.event_type().type_name, "Wanted");
let (_, has2) = pull.try_pull_structured_event().unwrap();
assert!(!has2, "queue empty (Unwanted filtered/consumed)");
}
#[test]
fn factory_create_and_lookup() {
let fac = EventChannelFactory::new();
let (_ch, id) = fac.create_channel(QoSProperties::new());
assert!(fac.all_channels().contains(&id));
assert!(fac.get_event_channel(id).is_some());
assert!(fac.get_event_channel(9999).is_none());
}
#[test]
fn max_queue_length_caps_pending() {
let mut qos = QoSProperties::new();
qos.set(crate::qos::MAX_QUEUE_LENGTH, CorbaAny(AnyValue::ULong(2)));
let ch = EventChannel::with_qos(qos);
let pull = ch.for_consumers().obtain_structured_pull_supplier();
let pc = ch.for_suppliers().obtain_structured_push_consumer();
for i in 0..5 {
pc.push_structured_event(ev("T", &alloc::format!("e{i}")));
}
let mut got = Vec::new();
while let Ok((e, true)) = pull.try_pull_structured_event() {
if let AnyValue::Str(s) = &e.remainder_of_body.0 {
got.push(s.clone());
}
}
assert_eq!(got, alloc::vec!["e3".to_string(), "e4".to_string()]);
}
}