Skip to main content

bacnet_server/
cov.rs

1//! COV subscription engine — tracks SubscribeCOV subscriptions and their lifetimes.
2
3use std::collections::HashMap;
4use std::time::Instant;
5
6use bacnet_types::enums::PropertyIdentifier;
7use bacnet_types::primitives::ObjectIdentifier;
8use bacnet_types::MacAddr;
9
10/// An active COV subscription.
11#[derive(Debug, Clone)]
12pub struct CovSubscription {
13    /// MAC address of the subscriber.
14    pub subscriber_mac: MacAddr,
15    /// Process identifier chosen by the subscriber.
16    pub subscriber_process_identifier: u32,
17    /// The object being monitored.
18    pub monitored_object_identifier: ObjectIdentifier,
19    /// Whether to send ConfirmedCOVNotification (true) or Unconfirmed (false).
20    pub issue_confirmed_notifications: bool,
21    /// When this subscription expires (None = infinite lifetime).
22    pub expires_at: Option<Instant>,
23    /// Last present_value for which a COV notification was sent.
24    /// Used with COV_Increment to decide whether to fire again.
25    pub last_notified_value: Option<f32>,
26    /// Property-level filter (SubscribeCOVProperty only).
27    pub monitored_property: Option<PropertyIdentifier>,
28    /// Array index within monitored property (SubscribeCOVProperty only).
29    pub monitored_property_array_index: Option<u32>,
30    /// COV increment override (SubscribeCOVProperty only).
31    pub cov_increment: Option<f32>,
32}
33
34/// Key for uniquely identifying a subscription: (subscriber_mac, process_id, monitored_object).
35type SubKey = (MacAddr, u32, ObjectIdentifier);
36
37/// Table of active COV subscriptions.
38#[derive(Debug, Default)]
39pub struct CovSubscriptionTable {
40    subs: HashMap<SubKey, CovSubscription>,
41}
42
43impl CovSubscriptionTable {
44    pub fn new() -> Self {
45        Self {
46            subs: HashMap::new(),
47        }
48    }
49
50    /// Add or update a subscription.
51    pub fn subscribe(&mut self, sub: CovSubscription) {
52        let key = (
53            sub.subscriber_mac.clone(),
54            sub.subscriber_process_identifier,
55            sub.monitored_object_identifier,
56        );
57        self.subs.insert(key, sub);
58    }
59
60    /// Remove a subscription by subscriber MAC, process identifier, and monitored object.
61    pub fn unsubscribe(
62        &mut self,
63        mac: &[u8],
64        process_id: u32,
65        monitored_object: ObjectIdentifier,
66    ) -> bool {
67        let key = (MacAddr::from_slice(mac), process_id, monitored_object);
68        self.subs.remove(&key).is_some()
69    }
70
71    /// Get all active (non-expired) subscriptions for a given object.
72    pub fn subscriptions_for(&mut self, oid: &ObjectIdentifier) -> Vec<&CovSubscription> {
73        let now = Instant::now();
74        // Purge expired before returning
75        self.subs
76            .retain(|_, sub| sub.expires_at.is_none_or(|exp| exp > now));
77        self.subs
78            .values()
79            .filter(|sub| sub.monitored_object_identifier == *oid)
80            .collect()
81    }
82
83    /// Update the last-notified value for a subscription.
84    pub fn set_last_notified_value(
85        &mut self,
86        mac: &[u8],
87        process_id: u32,
88        monitored_object: ObjectIdentifier,
89        value: f32,
90    ) {
91        let key = (MacAddr::from_slice(mac), process_id, monitored_object);
92        if let Some(sub) = self.subs.get_mut(&key) {
93            sub.last_notified_value = Some(value);
94        }
95    }
96
97    /// Check if a COV notification should fire for a subscription given
98    /// the current present_value and the object's COV_Increment.
99    ///
100    /// Returns `true` if:
101    /// - No COV_Increment (binary/multi-state objects — always notify)
102    /// - No previous notified value (first notification)
103    /// - `|current - last_notified| >= cov_increment`
104    pub fn should_notify(
105        sub: &CovSubscription,
106        current_value: Option<f32>,
107        cov_increment: Option<f32>,
108    ) -> bool {
109        match (cov_increment, current_value) {
110            (Some(increment), Some(current)) => {
111                match sub.last_notified_value {
112                    None => true, // First notification — always fire
113                    Some(last) => (current - last).abs() >= increment,
114                }
115            }
116            _ => true, // No increment or no numeric value — always notify
117        }
118    }
119
120    /// Number of active subscriptions.
121    pub fn len(&self) -> usize {
122        self.subs.len()
123    }
124
125    /// Whether the table is empty.
126    pub fn is_empty(&self) -> bool {
127        self.subs.is_empty()
128    }
129
130    /// Remove all expired subscriptions. Returns the number removed.
131    pub fn purge_expired(&mut self) -> usize {
132        let before = self.subs.len();
133        let now = Instant::now();
134        self.subs
135            .retain(|_, sub| sub.expires_at.is_none_or(|exp| exp > now));
136        before - self.subs.len()
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use bacnet_types::enums::ObjectType;
144    use std::time::Duration;
145
146    fn ai1() -> ObjectIdentifier {
147        ObjectIdentifier::new(ObjectType::ANALOG_INPUT, 1).unwrap()
148    }
149
150    fn ai2() -> ObjectIdentifier {
151        ObjectIdentifier::new(ObjectType::ANALOG_INPUT, 2).unwrap()
152    }
153
154    fn make_sub(mac: &[u8], process_id: u32, oid: ObjectIdentifier) -> CovSubscription {
155        CovSubscription {
156            subscriber_mac: MacAddr::from_slice(mac),
157            subscriber_process_identifier: process_id,
158            monitored_object_identifier: oid,
159            issue_confirmed_notifications: false,
160            expires_at: None,
161            last_notified_value: None,
162            monitored_property: None,
163            monitored_property_array_index: None,
164            cov_increment: None,
165        }
166    }
167
168    #[test]
169    fn subscribe_and_lookup() {
170        let mut table = CovSubscriptionTable::new();
171        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
172        assert_eq!(table.len(), 1);
173        assert_eq!(table.subscriptions_for(&ai1()).len(), 1);
174        assert_eq!(table.subscriptions_for(&ai2()).len(), 0);
175    }
176
177    #[test]
178    fn unsubscribe() {
179        let mut table = CovSubscriptionTable::new();
180        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
181        assert!(table.unsubscribe(&[1, 2, 3], 1, ai1()));
182        assert!(!table.unsubscribe(&[1, 2, 3], 1, ai1())); // already removed
183        assert!(table.is_empty());
184    }
185
186    #[test]
187    fn expired_subscriptions_purged_on_lookup() {
188        let mut table = CovSubscriptionTable::new();
189        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
190        sub.expires_at = Some(Instant::now() - Duration::from_secs(1)); // already expired
191        table.subscribe(sub);
192        assert_eq!(table.subscriptions_for(&ai1()).len(), 0);
193        assert!(table.is_empty());
194    }
195
196    #[test]
197    fn multiple_subscribers_same_object() {
198        let mut table = CovSubscriptionTable::new();
199        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
200        table.subscribe(make_sub(&[4, 5, 6], 2, ai1()));
201        assert_eq!(table.subscriptions_for(&ai1()).len(), 2);
202    }
203
204    #[test]
205    fn should_notify_no_increment_always_fires() {
206        let sub = make_sub(&[1, 2, 3], 1, ai1());
207        // Binary/multi-state objects have no COV_Increment
208        assert!(CovSubscriptionTable::should_notify(&sub, Some(1.0), None));
209    }
210
211    #[test]
212    fn should_notify_first_notification_always_fires() {
213        let sub = make_sub(&[1, 2, 3], 1, ai1());
214        // First notification (last_notified_value = None)
215        assert!(CovSubscriptionTable::should_notify(
216            &sub,
217            Some(72.5),
218            Some(1.0)
219        ));
220    }
221
222    #[test]
223    fn should_notify_change_exceeds_increment() {
224        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
225        sub.last_notified_value = Some(70.0);
226        // Change of 2.5 >= increment of 1.0
227        assert!(CovSubscriptionTable::should_notify(
228            &sub,
229            Some(72.5),
230            Some(1.0)
231        ));
232    }
233
234    #[test]
235    fn should_notify_change_below_increment() {
236        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
237        sub.last_notified_value = Some(72.0);
238        // Change of 0.3 < increment of 1.0
239        assert!(!CovSubscriptionTable::should_notify(
240            &sub,
241            Some(72.3),
242            Some(1.0)
243        ));
244    }
245
246    #[test]
247    fn should_notify_exact_increment() {
248        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
249        sub.last_notified_value = Some(70.0);
250        // Change of exactly 1.0 == increment of 1.0 → fires
251        assert!(CovSubscriptionTable::should_notify(
252            &sub,
253            Some(71.0),
254            Some(1.0)
255        ));
256    }
257
258    #[test]
259    fn should_notify_zero_increment_always_fires() {
260        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
261        sub.last_notified_value = Some(72.0);
262        // COV_Increment = 0.0 means any change fires
263        assert!(CovSubscriptionTable::should_notify(
264            &sub,
265            Some(72.001),
266            Some(0.0)
267        ));
268    }
269
270    #[test]
271    fn set_last_notified_value_updates() {
272        let mut table = CovSubscriptionTable::new();
273        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
274        table.set_last_notified_value(&[1, 2, 3], 1, ai1(), 72.5);
275
276        let subs = table.subscriptions_for(&ai1());
277        assert_eq!(subs[0].last_notified_value, Some(72.5));
278    }
279
280    #[test]
281    fn upsert_replaces_existing() {
282        let mut table = CovSubscriptionTable::new();
283        let mut sub = make_sub(&[1, 2, 3], 1, ai1());
284        sub.issue_confirmed_notifications = false;
285        table.subscribe(sub);
286        // Same (mac, process_id, object) key — replaces the existing entry
287        let mut sub2 = make_sub(&[1, 2, 3], 1, ai1());
288        sub2.issue_confirmed_notifications = true;
289        table.subscribe(sub2);
290        assert_eq!(table.len(), 1);
291        let subs = table.subscriptions_for(&ai1());
292        assert!(subs[0].issue_confirmed_notifications);
293    }
294
295    #[test]
296    fn same_subscriber_different_objects_both_exist() {
297        let mut table = CovSubscriptionTable::new();
298        // Same (mac, process_id) but different monitored objects
299        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
300        table.subscribe(make_sub(&[1, 2, 3], 1, ai2()));
301        assert_eq!(table.len(), 2);
302        assert_eq!(table.subscriptions_for(&ai1()).len(), 1);
303        assert_eq!(table.subscriptions_for(&ai2()).len(), 1);
304    }
305
306    #[test]
307    fn purge_expired_removes_stale_subscriptions() {
308        let mut table = CovSubscriptionTable::new();
309        let mut sub1 = make_sub(&[1, 2, 3], 1, ai1());
310        sub1.expires_at = Some(Instant::now() - Duration::from_secs(10));
311        table.subscribe(sub1);
312
313        let mut sub2 = make_sub(&[4, 5, 6], 2, ai1());
314        sub2.expires_at = None; // infinite lifetime
315        table.subscribe(sub2);
316
317        let purged = table.purge_expired();
318        assert_eq!(purged, 1);
319        assert_eq!(table.len(), 1);
320    }
321
322    #[test]
323    fn purge_expired_returns_zero_when_none_expired() {
324        let mut table = CovSubscriptionTable::new();
325        table.subscribe(make_sub(&[1, 2, 3], 1, ai1()));
326        let purged = table.purge_expired();
327        assert_eq!(purged, 0);
328        assert_eq!(table.len(), 1);
329    }
330}