1use alloc::boxed::Box;
10use alloc::collections::BTreeMap;
11use alloc::string::String;
12use alloc::vec::Vec;
13
14use crate::object_cache::{ObjectId, ObjectRef};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum ObjectChangeKind {
19 Created,
21 Modified,
23 Deleted,
25}
26
27pub trait HomeListener: Send + Sync {
30 fn on_object_created(&mut self, obj: &ObjectRef);
32 fn on_object_modified(&mut self, obj: &ObjectRef);
34 fn on_object_deleted(&mut self, id: &ObjectId);
36}
37
38pub trait ObjectListener: Send + Sync {
40 fn on_state_changed(&mut self, obj: &ObjectRef, kind: ObjectChangeKind);
42}
43
44#[derive(Default)]
46pub struct HomeFactory {
47 homes: BTreeMap<String, Vec<Box<dyn HomeListener>>>,
48}
49
50impl core::fmt::Debug for HomeFactory {
51 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
52 f.debug_struct("HomeFactory")
53 .field("home_count", &self.homes.len())
54 .finish_non_exhaustive()
55 }
56}
57
58impl HomeFactory {
59 #[must_use]
61 pub fn new() -> Self {
62 Self::default()
63 }
64
65 pub fn register_home_listener(&mut self, topic: String, listener: Box<dyn HomeListener>) {
67 self.homes.entry(topic).or_default().push(listener);
68 }
69
70 #[must_use]
72 pub fn listener_count(&self, topic: &str) -> usize {
73 self.homes.get(topic).map(Vec::len).unwrap_or(0)
74 }
75
76 #[must_use]
78 pub fn topics(&self) -> Vec<String> {
79 self.homes.keys().cloned().collect()
80 }
81
82 pub fn fanout(&mut self, obj: &ObjectRef, kind: ObjectChangeKind) {
84 if let Some(listeners) = self.homes.get_mut(&obj.id.topic) {
85 for l in listeners {
86 match kind {
87 ObjectChangeKind::Created => l.on_object_created(obj),
88 ObjectChangeKind::Modified => l.on_object_modified(obj),
89 ObjectChangeKind::Deleted => l.on_object_deleted(&obj.id),
90 }
91 }
92 }
93 }
94}
95
96#[derive(Default)]
99pub struct SubscriptionRegistry {
100 home: HomeFactory,
101 object_listeners: BTreeMap<ObjectId, Vec<Box<dyn ObjectListener>>>,
102}
103
104impl core::fmt::Debug for SubscriptionRegistry {
105 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
106 f.debug_struct("SubscriptionRegistry")
107 .field("home", &self.home)
108 .field("object_listener_count", &self.object_listeners.len())
109 .finish_non_exhaustive()
110 }
111}
112
113impl SubscriptionRegistry {
114 #[must_use]
116 pub fn new() -> Self {
117 Self::default()
118 }
119
120 pub fn home_mut(&mut self) -> &mut HomeFactory {
122 &mut self.home
123 }
124
125 pub fn register_object_listener(&mut self, id: ObjectId, listener: Box<dyn ObjectListener>) {
127 self.object_listeners.entry(id).or_default().push(listener);
128 }
129
130 pub fn notify(&mut self, obj: &ObjectRef, kind: ObjectChangeKind) {
132 self.home.fanout(obj, kind);
133 if let Some(listeners) = self.object_listeners.get_mut(&obj.id) {
134 for l in listeners {
135 l.on_state_changed(obj, kind);
136 }
137 }
138 }
139}
140
141#[cfg(test)]
142#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
143mod tests {
144 use super::*;
145 use crate::object_cache::ObjectState;
146 use alloc::string::ToString;
147 use core::sync::atomic::{AtomicUsize, Ordering};
148
149 struct CountingHome {
150 created: alloc::sync::Arc<AtomicUsize>,
151 modified: alloc::sync::Arc<AtomicUsize>,
152 deleted: alloc::sync::Arc<AtomicUsize>,
153 }
154 impl HomeListener for CountingHome {
155 fn on_object_created(&mut self, _: &ObjectRef) {
156 self.created.fetch_add(1, Ordering::Relaxed);
157 }
158 fn on_object_modified(&mut self, _: &ObjectRef) {
159 self.modified.fetch_add(1, Ordering::Relaxed);
160 }
161 fn on_object_deleted(&mut self, _: &ObjectId) {
162 self.deleted.fetch_add(1, Ordering::Relaxed);
163 }
164 }
165
166 struct CountingObject {
167 n: alloc::sync::Arc<AtomicUsize>,
168 }
169 impl ObjectListener for CountingObject {
170 fn on_state_changed(&mut self, _: &ObjectRef, _: ObjectChangeKind) {
171 self.n.fetch_add(1, Ordering::Relaxed);
172 }
173 }
174
175 fn obj(topic: &str, key: &[u8]) -> ObjectRef {
176 ObjectRef {
177 id: ObjectId::new(topic.into(), key.to_vec()),
178 state: alloc::vec![],
179 lifecycle: ObjectState::New,
180 version: 1,
181 }
182 }
183
184 #[test]
185 fn home_listener_registered_per_topic() {
186 let mut h = HomeFactory::new();
187 let c = alloc::sync::Arc::new(AtomicUsize::new(0));
188 h.register_home_listener(
189 "T".into(),
190 Box::new(CountingHome {
191 created: c.clone(),
192 modified: alloc::sync::Arc::new(AtomicUsize::new(0)),
193 deleted: alloc::sync::Arc::new(AtomicUsize::new(0)),
194 }),
195 );
196 assert_eq!(h.listener_count("T"), 1);
197 assert_eq!(h.topics(), alloc::vec!["T".to_string()]);
198 }
199
200 #[test]
201 fn fanout_invokes_listeners_for_matching_topic() {
202 let mut h = HomeFactory::new();
203 let created = alloc::sync::Arc::new(AtomicUsize::new(0));
204 let modified = alloc::sync::Arc::new(AtomicUsize::new(0));
205 let deleted = alloc::sync::Arc::new(AtomicUsize::new(0));
206 h.register_home_listener(
207 "T".into(),
208 Box::new(CountingHome {
209 created: created.clone(),
210 modified: modified.clone(),
211 deleted: deleted.clone(),
212 }),
213 );
214 h.fanout(&obj("T", b"k"), ObjectChangeKind::Created);
215 h.fanout(&obj("T", b"k"), ObjectChangeKind::Modified);
216 h.fanout(&obj("T", b"k"), ObjectChangeKind::Deleted);
217 assert_eq!(created.load(Ordering::Relaxed), 1);
218 assert_eq!(modified.load(Ordering::Relaxed), 1);
219 assert_eq!(deleted.load(Ordering::Relaxed), 1);
220 }
221
222 #[test]
223 fn fanout_ignores_other_topics() {
224 let mut h = HomeFactory::new();
225 let n = alloc::sync::Arc::new(AtomicUsize::new(0));
226 h.register_home_listener(
227 "T".into(),
228 Box::new(CountingHome {
229 created: n.clone(),
230 modified: alloc::sync::Arc::new(AtomicUsize::new(0)),
231 deleted: alloc::sync::Arc::new(AtomicUsize::new(0)),
232 }),
233 );
234 h.fanout(&obj("OTHER", b"k"), ObjectChangeKind::Created);
235 assert_eq!(n.load(Ordering::Relaxed), 0);
236 }
237
238 #[test]
239 fn registry_notifies_both_home_and_object_listeners() {
240 let mut r = SubscriptionRegistry::new();
241 let home_n = alloc::sync::Arc::new(AtomicUsize::new(0));
242 let obj_n = alloc::sync::Arc::new(AtomicUsize::new(0));
243 r.home_mut().register_home_listener(
244 "T".into(),
245 Box::new(CountingHome {
246 created: home_n.clone(),
247 modified: alloc::sync::Arc::new(AtomicUsize::new(0)),
248 deleted: alloc::sync::Arc::new(AtomicUsize::new(0)),
249 }),
250 );
251 r.register_object_listener(
252 ObjectId::new("T".into(), b"k".to_vec()),
253 Box::new(CountingObject { n: obj_n.clone() }),
254 );
255 r.notify(&obj("T", b"k"), ObjectChangeKind::Created);
256 assert_eq!(home_n.load(Ordering::Relaxed), 1);
257 assert_eq!(obj_n.load(Ordering::Relaxed), 1);
258 }
259
260 #[test]
261 fn object_listener_only_fires_for_its_own_id() {
262 let mut r = SubscriptionRegistry::new();
263 let n = alloc::sync::Arc::new(AtomicUsize::new(0));
264 r.register_object_listener(
265 ObjectId::new("T".into(), b"a".to_vec()),
266 Box::new(CountingObject { n: n.clone() }),
267 );
268 r.notify(&obj("T", b"b"), ObjectChangeKind::Modified);
269 assert_eq!(n.load(Ordering::Relaxed), 0);
270 }
271}