Skip to main content

zerodds_dlrl/
subscription.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Subscription hierarchy — DDS 1.4 §B.3 + §B.6.
5//!
6//! Spec §B.3: `HomeFactory` holds `HomeListener` instances per topic;
7//! `ObjectListener` reacts to object lifecycle events within a home.
8
9use alloc::boxed::Box;
10use alloc::collections::BTreeMap;
11use alloc::string::String;
12use alloc::vec::Vec;
13
14use crate::object_cache::{ObjectId, ObjectRef};
15
16/// Object change kind. Spec §B.6.6.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ObjectChangeKind {
19    /// New object created.
20    Created,
21    /// Object modified.
22    Modified,
23    /// Object deleted.
24    Deleted,
25}
26
27/// HomeListener — invoked when an object appears in / disappears from a
28/// home (topic). Spec §B.3.4.
29pub trait HomeListener: Send + Sync {
30    /// An object was created in the home.
31    fn on_object_created(&mut self, obj: &ObjectRef);
32    /// An object was modified in the home.
33    fn on_object_modified(&mut self, obj: &ObjectRef);
34    /// An object was deleted.
35    fn on_object_deleted(&mut self, id: &ObjectId);
36}
37
38/// ObjectListener — per-object listener. Spec §B.6.7.
39pub trait ObjectListener: Send + Sync {
40    /// The object's own state has changed.
41    fn on_state_changed(&mut self, obj: &ObjectRef, kind: ObjectChangeKind);
42}
43
44/// HomeFactory — Spec §B.3.1.
45#[derive(Default)]
46pub struct HomeFactory {
47    homes: BTreeMap<String, Vec<Box<dyn HomeListener>>>,
48}
49
50impl core::fmt::Debug for HomeFactory {
51    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
52        f.debug_struct("HomeFactory")
53            .field("home_count", &self.homes.len())
54            .finish_non_exhaustive()
55    }
56}
57
58impl HomeFactory {
59    /// Constructor.
60    #[must_use]
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Registers a HomeListener for a topic.
66    pub fn register_home_listener(&mut self, topic: String, listener: Box<dyn HomeListener>) {
67        self.homes.entry(topic).or_default().push(listener);
68    }
69
70    /// Number of listeners for a topic.
71    #[must_use]
72    pub fn listener_count(&self, topic: &str) -> usize {
73        self.homes.get(topic).map(Vec::len).unwrap_or(0)
74    }
75
76    /// List of all registered topics.
77    #[must_use]
78    pub fn topics(&self) -> Vec<String> {
79        self.homes.keys().cloned().collect()
80    }
81
82    /// Fan-out of an object event to all listeners of the topic.
83    pub fn fanout(&mut self, obj: &ObjectRef, kind: ObjectChangeKind) {
84        if let Some(listeners) = self.homes.get_mut(&obj.id.topic) {
85            for l in listeners {
86                match kind {
87                    ObjectChangeKind::Created => l.on_object_created(obj),
88                    ObjectChangeKind::Modified => l.on_object_modified(obj),
89                    ObjectChangeKind::Deleted => l.on_object_deleted(&obj.id),
90                }
91            }
92        }
93    }
94}
95
96/// SubscriptionRegistry — combines HomeFactory with per-object
97/// listeners.
98#[derive(Default)]
99pub struct SubscriptionRegistry {
100    home: HomeFactory,
101    object_listeners: BTreeMap<ObjectId, Vec<Box<dyn ObjectListener>>>,
102}
103
104impl core::fmt::Debug for SubscriptionRegistry {
105    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
106        f.debug_struct("SubscriptionRegistry")
107            .field("home", &self.home)
108            .field("object_listener_count", &self.object_listeners.len())
109            .finish_non_exhaustive()
110    }
111}
112
113impl SubscriptionRegistry {
114    /// Constructor.
115    #[must_use]
116    pub fn new() -> Self {
117        Self::default()
118    }
119
120    /// Mutable reference to the HomeFactory.
121    pub fn home_mut(&mut self) -> &mut HomeFactory {
122        &mut self.home
123    }
124
125    /// Registers a per-object listener.
126    pub fn register_object_listener(&mut self, id: ObjectId, listener: Box<dyn ObjectListener>) {
127        self.object_listeners.entry(id).or_default().push(listener);
128    }
129
130    /// Notify all relevant listeners (home and object level).
131    pub fn notify(&mut self, obj: &ObjectRef, kind: ObjectChangeKind) {
132        self.home.fanout(obj, kind);
133        if let Some(listeners) = self.object_listeners.get_mut(&obj.id) {
134            for l in listeners {
135                l.on_state_changed(obj, kind);
136            }
137        }
138    }
139}
140
141#[cfg(test)]
142#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
143mod tests {
144    use super::*;
145    use crate::object_cache::ObjectState;
146    use alloc::string::ToString;
147    use core::sync::atomic::{AtomicUsize, Ordering};
148
149    struct CountingHome {
150        created: alloc::sync::Arc<AtomicUsize>,
151        modified: alloc::sync::Arc<AtomicUsize>,
152        deleted: alloc::sync::Arc<AtomicUsize>,
153    }
154    impl HomeListener for CountingHome {
155        fn on_object_created(&mut self, _: &ObjectRef) {
156            self.created.fetch_add(1, Ordering::Relaxed);
157        }
158        fn on_object_modified(&mut self, _: &ObjectRef) {
159            self.modified.fetch_add(1, Ordering::Relaxed);
160        }
161        fn on_object_deleted(&mut self, _: &ObjectId) {
162            self.deleted.fetch_add(1, Ordering::Relaxed);
163        }
164    }
165
166    struct CountingObject {
167        n: alloc::sync::Arc<AtomicUsize>,
168    }
169    impl ObjectListener for CountingObject {
170        fn on_state_changed(&mut self, _: &ObjectRef, _: ObjectChangeKind) {
171            self.n.fetch_add(1, Ordering::Relaxed);
172        }
173    }
174
175    fn obj(topic: &str, key: &[u8]) -> ObjectRef {
176        ObjectRef {
177            id: ObjectId::new(topic.into(), key.to_vec()),
178            state: alloc::vec![],
179            lifecycle: ObjectState::New,
180            version: 1,
181        }
182    }
183
184    #[test]
185    fn home_listener_registered_per_topic() {
186        let mut h = HomeFactory::new();
187        let c = alloc::sync::Arc::new(AtomicUsize::new(0));
188        h.register_home_listener(
189            "T".into(),
190            Box::new(CountingHome {
191                created: c.clone(),
192                modified: alloc::sync::Arc::new(AtomicUsize::new(0)),
193                deleted: alloc::sync::Arc::new(AtomicUsize::new(0)),
194            }),
195        );
196        assert_eq!(h.listener_count("T"), 1);
197        assert_eq!(h.topics(), alloc::vec!["T".to_string()]);
198    }
199
200    #[test]
201    fn fanout_invokes_listeners_for_matching_topic() {
202        let mut h = HomeFactory::new();
203        let created = alloc::sync::Arc::new(AtomicUsize::new(0));
204        let modified = alloc::sync::Arc::new(AtomicUsize::new(0));
205        let deleted = alloc::sync::Arc::new(AtomicUsize::new(0));
206        h.register_home_listener(
207            "T".into(),
208            Box::new(CountingHome {
209                created: created.clone(),
210                modified: modified.clone(),
211                deleted: deleted.clone(),
212            }),
213        );
214        h.fanout(&obj("T", b"k"), ObjectChangeKind::Created);
215        h.fanout(&obj("T", b"k"), ObjectChangeKind::Modified);
216        h.fanout(&obj("T", b"k"), ObjectChangeKind::Deleted);
217        assert_eq!(created.load(Ordering::Relaxed), 1);
218        assert_eq!(modified.load(Ordering::Relaxed), 1);
219        assert_eq!(deleted.load(Ordering::Relaxed), 1);
220    }
221
222    #[test]
223    fn fanout_ignores_other_topics() {
224        let mut h = HomeFactory::new();
225        let n = alloc::sync::Arc::new(AtomicUsize::new(0));
226        h.register_home_listener(
227            "T".into(),
228            Box::new(CountingHome {
229                created: n.clone(),
230                modified: alloc::sync::Arc::new(AtomicUsize::new(0)),
231                deleted: alloc::sync::Arc::new(AtomicUsize::new(0)),
232            }),
233        );
234        h.fanout(&obj("OTHER", b"k"), ObjectChangeKind::Created);
235        assert_eq!(n.load(Ordering::Relaxed), 0);
236    }
237
238    #[test]
239    fn registry_notifies_both_home_and_object_listeners() {
240        let mut r = SubscriptionRegistry::new();
241        let home_n = alloc::sync::Arc::new(AtomicUsize::new(0));
242        let obj_n = alloc::sync::Arc::new(AtomicUsize::new(0));
243        r.home_mut().register_home_listener(
244            "T".into(),
245            Box::new(CountingHome {
246                created: home_n.clone(),
247                modified: alloc::sync::Arc::new(AtomicUsize::new(0)),
248                deleted: alloc::sync::Arc::new(AtomicUsize::new(0)),
249            }),
250        );
251        r.register_object_listener(
252            ObjectId::new("T".into(), b"k".to_vec()),
253            Box::new(CountingObject { n: obj_n.clone() }),
254        );
255        r.notify(&obj("T", b"k"), ObjectChangeKind::Created);
256        assert_eq!(home_n.load(Ordering::Relaxed), 1);
257        assert_eq!(obj_n.load(Ordering::Relaxed), 1);
258    }
259
260    #[test]
261    fn object_listener_only_fires_for_its_own_id() {
262        let mut r = SubscriptionRegistry::new();
263        let n = alloc::sync::Arc::new(AtomicUsize::new(0));
264        r.register_object_listener(
265            ObjectId::new("T".into(), b"a".to_vec()),
266            Box::new(CountingObject { n: n.clone() }),
267        );
268        r.notify(&obj("T", b"b"), ObjectChangeKind::Modified);
269        assert_eq!(n.load(Ordering::Relaxed), 0);
270    }
271}