1use std::collections::HashMap;
4use std::time::Instant;
5
6use bacnet_types::enums::PropertyIdentifier;
7use bacnet_types::primitives::ObjectIdentifier;
8use bacnet_types::MacAddr;
9
10#[derive(Debug, Clone)]
12pub struct CovSubscription {
13 pub subscriber_mac: MacAddr,
15 pub subscriber_process_identifier: u32,
17 pub monitored_object_identifier: ObjectIdentifier,
19 pub issue_confirmed_notifications: bool,
21 pub expires_at: Option<Instant>,
23 pub last_notified_value: Option<f32>,
26 pub monitored_property: Option<PropertyIdentifier>,
28 pub monitored_property_array_index: Option<u32>,
30 pub cov_increment: Option<f32>,
32}
33
34type SubKey = (MacAddr, u32, ObjectIdentifier);
36
37#[derive(Debug, Default)]
39pub struct CovSubscriptionTable {
40 subs: HashMap<SubKey, CovSubscription>,
41}
42
43impl CovSubscriptionTable {
44 pub fn new() -> Self {
45 Self {
46 subs: HashMap::new(),
47 }
48 }
49
50 pub fn subscribe(&mut self, sub: CovSubscription) {
52 let key = (
53 sub.subscriber_mac.clone(),
54 sub.subscriber_process_identifier,
55 sub.monitored_object_identifier,
56 );
57 self.subs.insert(key, sub);
58 }
59
60 pub fn unsubscribe(
62 &mut self,
63 mac: &[u8],
64 process_id: u32,
65 monitored_object: ObjectIdentifier,
66 ) -> bool {
67 let key = (MacAddr::from_slice(mac), process_id, monitored_object);
68 self.subs.remove(&key).is_some()
69 }
70
71 pub fn subscriptions_for(&mut self, oid: &ObjectIdentifier) -> Vec<&CovSubscription> {
73 let now = Instant::now();
74 self.subs
76 .retain(|_, sub| sub.expires_at.is_none_or(|exp| exp > now));
77 self.subs
78 .values()
79 .filter(|sub| sub.monitored_object_identifier == *oid)
80 .collect()
81 }
82
83 pub fn set_last_notified_value(
85 &mut self,
86 mac: &[u8],
87 process_id: u32,
88 monitored_object: ObjectIdentifier,
89 value: f32,
90 ) {
91 let key = (MacAddr::from_slice(mac), process_id, monitored_object);
92 if let Some(sub) = self.subs.get_mut(&key) {
93 sub.last_notified_value = Some(value);
94 }
95 }
96
97 pub fn should_notify(
105 sub: &CovSubscription,
106 current_value: Option<f32>,
107 cov_increment: Option<f32>,
108 ) -> bool {
109 match (cov_increment, current_value) {
110 (Some(increment), Some(current)) => {
111 match sub.last_notified_value {
112 None => true, Some(last) => (current - last).abs() >= increment,
114 }
115 }
116 _ => true, }
118 }
119
120 pub fn len(&self) -> usize {
122 self.subs.len()
123 }
124
125 pub fn is_empty(&self) -> bool {
127 self.subs.is_empty()
128 }
129
130 pub fn purge_expired(&mut self) -> usize {
132 let before = self.subs.len();
133 let now = Instant::now();
134 self.subs
135 .retain(|_, sub| sub.expires_at.is_none_or(|exp| exp > now));
136 before - self.subs.len()
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use bacnet_types::enums::ObjectType;
144 use std::time::Duration;
145
146 fn ai1() -> ObjectIdentifier {
147 ObjectIdentifier::new(ObjectType::ANALOG_INPUT, 1).unwrap()
148 }
149
150 fn ai2() -> ObjectIdentifier {
151 ObjectIdentifier::new(ObjectType::ANALOG_INPUT, 2).unwrap()
152 }
153
154 fn make_sub(mac: &[u8], process_id: u32, oid: ObjectIdentifier) -> CovSubscription {
155 CovSubscription {
156 subscriber_mac: MacAddr::from_slice(mac),
157 subscriber_process_identifier: process_id,
158 monitored_object_identifier: oid,
159 issue_confirmed_notifications: false,
160 expires_at: None,
161 last_notified_value: None,
162 monitored_property: None,
163 monitored_property_array_index: None,
164 cov_increment: None,
165 }
166 }
167
168 #[test]
169 fn subscribe_and_lookup() {
170 let mut table = CovSubscriptionTable::new();
171 table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
172 assert_eq!(table.len(), 1);
173 assert_eq!(table.subscriptions_for(&ai1()).len(), 1);
174 assert_eq!(table.subscriptions_for(&ai2()).len(), 0);
175 }
176
177 #[test]
178 fn unsubscribe() {
179 let mut table = CovSubscriptionTable::new();
180 table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
181 assert!(table.unsubscribe(&[1, 2, 3], 1, ai1()));
182 assert!(!table.unsubscribe(&[1, 2, 3], 1, ai1())); assert!(table.is_empty());
184 }
185
186 #[test]
187 fn expired_subscriptions_purged_on_lookup() {
188 let mut table = CovSubscriptionTable::new();
189 let mut sub = make_sub(&[1, 2, 3], 1, ai1());
190 sub.expires_at = Some(Instant::now() - Duration::from_secs(1)); table.subscribe(sub);
192 assert_eq!(table.subscriptions_for(&ai1()).len(), 0);
193 assert!(table.is_empty());
194 }
195
196 #[test]
197 fn multiple_subscribers_same_object() {
198 let mut table = CovSubscriptionTable::new();
199 table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
200 table.subscribe(make_sub(&[4, 5, 6], 2, ai1()));
201 assert_eq!(table.subscriptions_for(&ai1()).len(), 2);
202 }
203
204 #[test]
205 fn should_notify_no_increment_always_fires() {
206 let sub = make_sub(&[1, 2, 3], 1, ai1());
207 assert!(CovSubscriptionTable::should_notify(&sub, Some(1.0), None));
209 }
210
211 #[test]
212 fn should_notify_first_notification_always_fires() {
213 let sub = make_sub(&[1, 2, 3], 1, ai1());
214 assert!(CovSubscriptionTable::should_notify(
216 &sub,
217 Some(72.5),
218 Some(1.0)
219 ));
220 }
221
222 #[test]
223 fn should_notify_change_exceeds_increment() {
224 let mut sub = make_sub(&[1, 2, 3], 1, ai1());
225 sub.last_notified_value = Some(70.0);
226 assert!(CovSubscriptionTable::should_notify(
228 &sub,
229 Some(72.5),
230 Some(1.0)
231 ));
232 }
233
234 #[test]
235 fn should_notify_change_below_increment() {
236 let mut sub = make_sub(&[1, 2, 3], 1, ai1());
237 sub.last_notified_value = Some(72.0);
238 assert!(!CovSubscriptionTable::should_notify(
240 &sub,
241 Some(72.3),
242 Some(1.0)
243 ));
244 }
245
246 #[test]
247 fn should_notify_exact_increment() {
248 let mut sub = make_sub(&[1, 2, 3], 1, ai1());
249 sub.last_notified_value = Some(70.0);
250 assert!(CovSubscriptionTable::should_notify(
252 &sub,
253 Some(71.0),
254 Some(1.0)
255 ));
256 }
257
258 #[test]
259 fn should_notify_zero_increment_always_fires() {
260 let mut sub = make_sub(&[1, 2, 3], 1, ai1());
261 sub.last_notified_value = Some(72.0);
262 assert!(CovSubscriptionTable::should_notify(
264 &sub,
265 Some(72.001),
266 Some(0.0)
267 ));
268 }
269
270 #[test]
271 fn set_last_notified_value_updates() {
272 let mut table = CovSubscriptionTable::new();
273 table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
274 table.set_last_notified_value(&[1, 2, 3], 1, ai1(), 72.5);
275
276 let subs = table.subscriptions_for(&ai1());
277 assert_eq!(subs[0].last_notified_value, Some(72.5));
278 }
279
280 #[test]
281 fn upsert_replaces_existing() {
282 let mut table = CovSubscriptionTable::new();
283 let mut sub = make_sub(&[1, 2, 3], 1, ai1());
284 sub.issue_confirmed_notifications = false;
285 table.subscribe(sub);
286 let mut sub2 = make_sub(&[1, 2, 3], 1, ai1());
288 sub2.issue_confirmed_notifications = true;
289 table.subscribe(sub2);
290 assert_eq!(table.len(), 1);
291 let subs = table.subscriptions_for(&ai1());
292 assert!(subs[0].issue_confirmed_notifications);
293 }
294
295 #[test]
296 fn same_subscriber_different_objects_both_exist() {
297 let mut table = CovSubscriptionTable::new();
298 table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
300 table.subscribe(make_sub(&[1, 2, 3], 1, ai2()));
301 assert_eq!(table.len(), 2);
302 assert_eq!(table.subscriptions_for(&ai1()).len(), 1);
303 assert_eq!(table.subscriptions_for(&ai2()).len(), 1);
304 }
305
306 #[test]
307 fn purge_expired_removes_stale_subscriptions() {
308 let mut table = CovSubscriptionTable::new();
309 let mut sub1 = make_sub(&[1, 2, 3], 1, ai1());
310 sub1.expires_at = Some(Instant::now() - Duration::from_secs(10));
311 table.subscribe(sub1);
312
313 let mut sub2 = make_sub(&[4, 5, 6], 2, ai1());
314 sub2.expires_at = None; table.subscribe(sub2);
316
317 let purged = table.purge_expired();
318 assert_eq!(purged, 1);
319 assert_eq!(table.len(), 1);
320 }
321
322 #[test]
323 fn purge_expired_returns_zero_when_none_expired() {
324 let mut table = CovSubscriptionTable::new();
325 table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
326 let purged = table.purge_expired();
327 assert_eq!(purged, 0);
328 assert_eq!(table.len(), 1);
329 }
330}