Skip to main content

bacnet_server/
cov.rs

1//! COV subscription engine — tracks SubscribeCOV subscriptions and their lifetimes.
2
3use std::collections::HashMap;
4use std::time::Instant;
5
6use bacnet_types::enums::PropertyIdentifier;
7use bacnet_types::primitives::ObjectIdentifier;
8use bacnet_types::MacAddr;
9
10/// An active COV subscription.
11#[derive(Debug, Clone)]
12pub struct CovSubscription {
13    /// MAC address of the subscriber.
14    pub subscriber_mac: MacAddr,
15    /// Process identifier chosen by the subscriber.
16    pub subscriber_process_identifier: u32,
17    /// The object being monitored.
18    pub monitored_object_identifier: ObjectIdentifier,
19    /// Whether to send ConfirmedCOVNotification (true) or Unconfirmed (false).
20    pub issue_confirmed_notifications: bool,
21    /// When this subscription expires (None = infinite lifetime).
22    pub expires_at: Option<Instant>,
23    /// Last present_value for which a COV notification was sent.
24    /// Used with COV_Increment to decide whether to fire again.
25    pub last_notified_value: Option<f32>,
26    /// Property-level filter (SubscribeCOVProperty only).
27    pub monitored_property: Option<PropertyIdentifier>,
28    /// Array index within monitored property (SubscribeCOVProperty only).
29    pub monitored_property_array_index: Option<u32>,
30    /// COV increment override (SubscribeCOVProperty only).
31    pub cov_increment: Option<f32>,
32}
33
34/// Key for uniquely identifying a subscription:
35/// (subscriber_mac, process_id, monitored_object, monitored_property).
36/// Including monitored_property ensures SubscribeCOV (whole-object) and
37/// SubscribeCOVProperty (per-property) coexist as independent subscriptions.
38type SubKey = (MacAddr, u32, ObjectIdentifier, Option<PropertyIdentifier>);
39
40/// Table of active COV subscriptions.
41#[derive(Debug, Default)]
42pub struct CovSubscriptionTable {
43    subs: HashMap<SubKey, CovSubscription>,
44}
45
46impl CovSubscriptionTable {
47    pub fn new() -> Self {
48        Self {
49            subs: HashMap::new(),
50        }
51    }
52
53    /// Add or update a subscription.
54    pub fn subscribe(&mut self, sub: CovSubscription) {
55        let key = (
56            sub.subscriber_mac.clone(),
57            sub.subscriber_process_identifier,
58            sub.monitored_object_identifier,
59            sub.monitored_property,
60        );
61        self.subs.insert(key, sub);
62    }
63
64    /// Remove a subscription by subscriber MAC, process identifier, and monitored object.
65    pub fn unsubscribe(
66        &mut self,
67        mac: &[u8],
68        process_id: u32,
69        monitored_object: ObjectIdentifier,
70    ) -> bool {
71        let key = (MacAddr::from_slice(mac), process_id, monitored_object, None);
72        self.subs.remove(&key).is_some()
73    }
74
75    /// Unsubscribe a per-property subscription.
76    pub fn unsubscribe_property(
77        &mut self,
78        mac: &[u8],
79        process_id: u32,
80        monitored_object: ObjectIdentifier,
81        monitored_property: PropertyIdentifier,
82    ) -> bool {
83        let key = (
84            MacAddr::from_slice(mac),
85            process_id,
86            monitored_object,
87            Some(monitored_property),
88        );
89        self.subs.remove(&key).is_some()
90    }
91
92    /// Remove all subscriptions for a given object (used on DeleteObject).
93    pub fn remove_for_object(&mut self, oid: ObjectIdentifier) {
94        self.subs.retain(|k, _| k.2 != oid);
95    }
96
97    /// Get all active (non-expired) subscriptions for a given object.
98    pub fn subscriptions_for(&mut self, oid: &ObjectIdentifier) -> Vec<&CovSubscription> {
99        let now = Instant::now();
100        self.subs
101            .retain(|_, sub| sub.expires_at.is_none_or(|exp| exp > now));
102        self.subs
103            .values()
104            .filter(|sub| sub.monitored_object_identifier == *oid)
105            .collect()
106    }
107
108    /// Update the last-notified value for a subscription.
109    pub fn set_last_notified_value(
110        &mut self,
111        mac: &[u8],
112        process_id: u32,
113        monitored_object: ObjectIdentifier,
114        monitored_property: Option<PropertyIdentifier>,
115        value: f32,
116    ) {
117        let key = (
118            MacAddr::from_slice(mac),
119            process_id,
120            monitored_object,
121            monitored_property,
122        );
123        if let Some(sub) = self.subs.get_mut(&key) {
124            sub.last_notified_value = Some(value);
125        }
126    }
127
128    /// Check if a COV notification should fire for a subscription given
129    /// the current present_value and the object's COV_Increment.
130    ///
131    /// Returns `true` if:
132    /// - No COV_Increment (binary/multi-state objects — always notify)
133    /// - No previous notified value (first notification)
134    /// - `|current - last_notified| >= cov_increment`
135    pub fn should_notify(
136        sub: &CovSubscription,
137        current_value: Option<f32>,
138        cov_increment: Option<f32>,
139    ) -> bool {
140        match (cov_increment, current_value) {
141            (Some(increment), Some(current)) => {
142                match sub.last_notified_value {
143                    None => true, // First notification — always fire
144                    Some(last) => (current - last).abs() >= increment,
145                }
146            }
147            _ => true, // No increment or no numeric value — always notify
148        }
149    }
150
151    /// Number of active subscriptions.
152    pub fn len(&self) -> usize {
153        self.subs.len()
154    }
155
156    /// Whether the table is empty.
157    pub fn is_empty(&self) -> bool {
158        self.subs.is_empty()
159    }
160
161    /// Remove all expired subscriptions. Returns the number removed.
162    pub fn purge_expired(&mut self) -> usize {
163        let before = self.subs.len();
164        let now = Instant::now();
165        self.subs
166            .retain(|_, sub| sub.expires_at.is_none_or(|exp| exp > now));
167        before - self.subs.len()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use bacnet_types::enums::ObjectType;
175    use std::time::Duration;
176
177    fn ai1() -> ObjectIdentifier {
178        ObjectIdentifier::new(ObjectType::ANALOG_INPUT, 1).unwrap()
179    }
180
181    fn ai2() -> ObjectIdentifier {
182        ObjectIdentifier::new(ObjectType::ANALOG_INPUT, 2).unwrap()
183    }
184
185    fn make_sub(mac: &[u8], process_id: u32, oid: ObjectIdentifier) -> CovSubscription {
186        CovSubscription {
187            subscriber_mac: MacAddr::from_slice(mac),
188            subscriber_process_identifier: process_id,
189            monitored_object_identifier: oid,
190            issue_confirmed_notifications: false,
191            expires_at: None,
192            last_notified_value: None,
193            monitored_property: None,
194            monitored_property_array_index: None,
195            cov_increment: None,
196        }
197    }
198
199    #[test]
200    fn subscribe_and_lookup() {
201        let mut table = CovSubscriptionTable::new();
202        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
203        assert_eq!(table.len(), 1);
204        assert_eq!(table.subscriptions_for(&ai1()).len(), 1);
205        assert_eq!(table.subscriptions_for(&ai2()).len(), 0);
206    }
207
208    #[test]
209    fn unsubscribe() {
210        let mut table = CovSubscriptionTable::new();
211        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
212        assert!(table.unsubscribe(&[1, 2, 3], 1, ai1()));
213        assert!(!table.unsubscribe(&[1, 2, 3], 1, ai1())); // already removed
214        assert!(table.is_empty());
215    }
216
217    #[test]
218    fn expired_subscriptions_purged_on_lookup() {
219        let mut table = CovSubscriptionTable::new();
220        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
221        sub.expires_at = Some(Instant::now() - Duration::from_secs(1)); // already expired
222        table.subscribe(sub);
223        assert_eq!(table.subscriptions_for(&ai1()).len(), 0);
224        assert!(table.is_empty());
225    }
226
227    #[test]
228    fn multiple_subscribers_same_object() {
229        let mut table = CovSubscriptionTable::new();
230        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
231        table.subscribe(make_sub(&[4, 5, 6], 2, ai1()));
232        assert_eq!(table.subscriptions_for(&ai1()).len(), 2);
233    }
234
235    #[test]
236    fn should_notify_no_increment_always_fires() {
237        let sub = make_sub(&[1, 2, 3], 1, ai1());
238        // Binary/multi-state objects have no COV_Increment
239        assert!(CovSubscriptionTable::should_notify(&sub, Some(1.0), None));
240    }
241
242    #[test]
243    fn should_notify_first_notification_always_fires() {
244        let sub = make_sub(&[1, 2, 3], 1, ai1());
245        // First notification (last_notified_value = None)
246        assert!(CovSubscriptionTable::should_notify(
247            &sub,
248            Some(72.5),
249            Some(1.0)
250        ));
251    }
252
253    #[test]
254    fn should_notify_change_exceeds_increment() {
255        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
256        sub.last_notified_value = Some(70.0);
257        // Change of 2.5 >= increment of 1.0
258        assert!(CovSubscriptionTable::should_notify(
259            &sub,
260            Some(72.5),
261            Some(1.0)
262        ));
263    }
264
265    #[test]
266    fn should_notify_change_below_increment() {
267        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
268        sub.last_notified_value = Some(72.0);
269        // Change of 0.3 < increment of 1.0
270        assert!(!CovSubscriptionTable::should_notify(
271            &sub,
272            Some(72.3),
273            Some(1.0)
274        ));
275    }
276
277    #[test]
278    fn should_notify_exact_increment() {
279        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
280        sub.last_notified_value = Some(70.0);
281        // Change of exactly 1.0 == increment of 1.0 → fires
282        assert!(CovSubscriptionTable::should_notify(
283            &sub,
284            Some(71.0),
285            Some(1.0)
286        ));
287    }
288
289    #[test]
290    fn should_notify_zero_increment_always_fires() {
291        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
292        sub.last_notified_value = Some(72.0);
293        // COV_Increment = 0.0 means any change fires
294        assert!(CovSubscriptionTable::should_notify(
295            &sub,
296            Some(72.001),
297            Some(0.0)
298        ));
299    }
300
301    #[test]
302    fn set_last_notified_value_updates() {
303        let mut table = CovSubscriptionTable::new();
304        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
305        table.set_last_notified_value(&[1, 2, 3], 1, ai1(), None, 72.5);
306
307        let subs = table.subscriptions_for(&ai1());
308        assert_eq!(subs[0].last_notified_value, Some(72.5));
309    }
310
311    #[test]
312    fn upsert_replaces_existing() {
313        let mut table = CovSubscriptionTable::new();
314        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
315        sub.issue_confirmed_notifications = false;
316        table.subscribe(sub);
317        // Same (mac, process_id, object) key — replaces the existing entry
318        let mut sub2 = make_sub(&[1, 2, 3], 1, ai1());
319        sub2.issue_confirmed_notifications = true;
320        table.subscribe(sub2);
321        assert_eq!(table.len(), 1);
322        let subs = table.subscriptions_for(&ai1());
323        assert!(subs[0].issue_confirmed_notifications);
324    }
325
326    #[test]
327    fn same_subscriber_different_objects_both_exist() {
328        let mut table = CovSubscriptionTable::new();
329        // Same (mac, process_id) but different monitored objects
330        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
331        table.subscribe(make_sub(&[1, 2, 3], 1, ai2()));
332        assert_eq!(table.len(), 2);
333        assert_eq!(table.subscriptions_for(&ai1()).len(), 1);
334        assert_eq!(table.subscriptions_for(&ai2()).len(), 1);
335    }
336
337    #[test]
338    fn purge_expired_removes_stale_subscriptions() {
339        let mut table = CovSubscriptionTable::new();
340        let mut sub1 = make_sub(&[1, 2, 3], 1, ai1());
341        sub1.expires_at = Some(Instant::now() - Duration::from_secs(10));
342        table.subscribe(sub1);
343
344        let mut sub2 = make_sub(&[4, 5, 6], 2, ai1());
345        sub2.expires_at = None; // infinite lifetime
346        table.subscribe(sub2);
347
348        let purged = table.purge_expired();
349        assert_eq!(purged, 1);
350        assert_eq!(table.len(), 1);
351    }
352
353    #[test]
354    fn purge_expired_returns_zero_when_none_expired() {
355        let mut table = CovSubscriptionTable::new();
356        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
357        let purged = table.purge_expired();
358        assert_eq!(purged, 0);
359        assert_eq!(table.len(), 1);
360    }
361}