Skip to main content

rustbac_client/
cov_manager.rs

1use crate::{BacnetClient, CovNotification, CovPropertyValue};
2use rustbac_core::services::subscribe_cov::SubscribeCovRequest;
3use rustbac_core::services::subscribe_cov_property::SubscribeCovPropertyRequest;
4use rustbac_core::types::{ObjectId, PropertyId};
5use rustbac_datalink::{DataLink, DataLinkAddress};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{mpsc, watch};
9use tokio::time::Instant;
10
11/// Source of a [`CovUpdate`].
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
14pub enum UpdateSource {
15    Cov,
16    Poll,
17}
18
19/// A managed COV subscription spec.
20#[derive(Debug, Clone)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct CovSubscriptionSpec {
23    pub address: DataLinkAddress,
24    pub object_id: ObjectId,
25    pub property_id: Option<PropertyId>,
26    pub lifetime_seconds: u32,
27    pub cov_increment: Option<f32>,
28    pub confirmed: bool,
29    pub subscriber_process_id: u32,
30}
31
32/// A single update emitted by [`CovManager`].
33#[derive(Debug, Clone)]
34#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
35pub struct CovUpdate {
36    pub address: DataLinkAddress,
37    pub object_id: ObjectId,
38    pub values: Vec<CovPropertyValue>,
39    pub source: UpdateSource,
40}
41
42/// Background COV manager handle.
43#[derive(Debug)]
44pub struct CovManager {
45    thread: Option<std::thread::JoinHandle<()>>,
46    shutdown: watch::Sender<bool>,
47    rx: mpsc::UnboundedReceiver<CovUpdate>,
48}
49
50impl CovManager {
51    /// Receive the next update from the manager.
52    pub async fn recv(&mut self) -> Option<CovUpdate> {
53        self.rx.recv().await
54    }
55
56    /// Stop the manager task.
57    pub fn stop(mut self) {
58        let _ = self.shutdown.send(true);
59        if let Some(thread) = self.thread.take() {
60            let _ = thread.join();
61        }
62    }
63}
64
65impl Drop for CovManager {
66    fn drop(&mut self) {
67        let _ = self.shutdown.send(true);
68    }
69}
70
71/// Builder for [`CovManager`].
72pub struct CovManagerBuilder<D: DataLink> {
73    client: Arc<BacnetClient<D>>,
74    subscriptions: Vec<CovSubscriptionSpec>,
75    poll_interval: Duration,
76    silence_threshold: Duration,
77    renewal_fraction: f64,
78}
79
80impl<D: DataLink + 'static> CovManagerBuilder<D> {
81    pub fn new(client: Arc<BacnetClient<D>>) -> Self {
82        Self {
83            client,
84            subscriptions: Vec::new(),
85            poll_interval: Duration::from_secs(30),
86            silence_threshold: Duration::from_secs(5 * 60),
87            renewal_fraction: 0.75,
88        }
89    }
90
91    pub fn subscribe(mut self, spec: CovSubscriptionSpec) -> Self {
92        self.subscriptions.push(spec);
93        self
94    }
95
96    pub fn poll_interval(mut self, duration: Duration) -> Self {
97        self.poll_interval = duration;
98        self
99    }
100
101    pub fn silence_threshold(mut self, duration: Duration) -> Self {
102        self.silence_threshold = duration;
103        self
104    }
105
106    pub fn renewal_fraction(mut self, fraction: f64) -> Self {
107        self.renewal_fraction = fraction;
108        self
109    }
110
111    pub fn build(self) -> Result<CovManager, crate::ClientError> {
112        let runtime_handle = tokio::runtime::Handle::try_current()
113            .map_err(|_| crate::ClientError::NoTokioRuntime)?;
114
115        let (tx, rx) = mpsc::unbounded_channel();
116        let (shutdown_tx, shutdown_rx) = watch::channel(false);
117        let poll_interval = self.poll_interval.max(Duration::from_millis(1));
118        let silence_threshold = self.silence_threshold.max(Duration::from_millis(1));
119        let renewal_fraction = sanitize_fraction(self.renewal_fraction);
120        let client = self.client;
121        let subscriptions = self.subscriptions;
122
123        let thread = std::thread::spawn(move || {
124            runtime_handle.block_on(async move {
125                run_cov_manager(
126                    client,
127                    subscriptions,
128                    tx,
129                    shutdown_rx,
130                    poll_interval,
131                    silence_threshold,
132                    renewal_fraction,
133                )
134                .await;
135            });
136        });
137        Ok(CovManager {
138            thread: Some(thread),
139            shutdown: shutdown_tx,
140            rx,
141        })
142    }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146enum SubscriptionMode {
147    Cov,
148    Polling,
149}
150
151#[derive(Debug, Clone)]
152struct SubscriptionState {
153    spec: CovSubscriptionSpec,
154    mode: SubscriptionMode,
155    /// Set when an actual COV notification is received (not on subscribe).
156    last_notification: Option<Instant>,
157    /// Set when the subscription first transitions into Cov mode; used as the
158    /// silence reference when no notification has been received yet.  Reset to
159    /// `None` when the subscription transitions to Polling mode so that a
160    /// later re-subscribe starts a fresh silence window.
161    cov_mode_since: Option<Instant>,
162    next_renewal: Instant,
163    next_poll: Instant,
164}
165
166impl SubscriptionState {
167    fn new(
168        spec: CovSubscriptionSpec,
169        poll_interval: Duration,
170        renewal_fraction: f64,
171        now: Instant,
172    ) -> Self {
173        let lifetime_seconds = spec.lifetime_seconds;
174        Self {
175            spec,
176            mode: SubscriptionMode::Cov,
177            last_notification: None,
178            cov_mode_since: None,
179            next_renewal: now + renewal_delay_seconds(lifetime_seconds, renewal_fraction),
180            next_poll: now + poll_interval,
181        }
182    }
183
184    fn on_subscribe_success(
185        &mut self,
186        now: Instant,
187        renewal_fraction: f64,
188        poll_interval: Duration,
189    ) {
190        // Record when we first enter (or re-enter) Cov mode so the silence
191        // window is anchored to the original subscription, not to each renewal.
192        // Also set it if it was cleared by a prior on_subscribe_failure.
193        if self.mode != SubscriptionMode::Cov || self.cov_mode_since.is_none() {
194            self.cov_mode_since = Some(now);
195        }
196        self.mode = SubscriptionMode::Cov;
197        // NOTE: last_notification is intentionally NOT updated here.  It is
198        // only set when an actual COV notification arrives from the device.
199        // Resetting it on every SimpleAck would prevent the silence detector
200        // from discovering that the device accepted the subscription but is
201        // not sending any notifications.
202        self.next_renewal =
203            now + renewal_delay_seconds(self.spec.lifetime_seconds, renewal_fraction);
204        self.next_poll = now + poll_interval;
205    }
206
207    fn on_subscribe_failure(&mut self, now: Instant, renewal_fraction: f64) {
208        self.mode = SubscriptionMode::Polling;
209        // Reset so a later re-subscribe starts a fresh silence window.
210        self.cov_mode_since = None;
211        self.next_renewal =
212            now + renewal_delay_seconds(self.spec.lifetime_seconds, renewal_fraction);
213        self.next_poll = now;
214    }
215
216    /// Returns `true` when the subscription is in Cov mode but no notification
217    /// has been received within `threshold` of the last notification (or of
218    /// when the subscription was established if none has ever been received).
219    fn is_silent(&self, now: Instant, threshold: Duration) -> bool {
220        if self.mode != SubscriptionMode::Cov {
221            return false;
222        }
223        // Use the most recent notification as reference, falling back to when
224        // we first entered Cov mode.  If neither is set the subscription was
225        // just created and the silence window hasn't started yet.
226        let reference = self.last_notification.or(self.cov_mode_since);
227        reference
228            .map(|t| now.saturating_duration_since(t) > threshold)
229            .unwrap_or(false)
230    }
231}
232
233async fn run_cov_manager<D: DataLink>(
234    client: Arc<BacnetClient<D>>,
235    subscriptions: Vec<CovSubscriptionSpec>,
236    tx: mpsc::UnboundedSender<CovUpdate>,
237    mut shutdown_rx: watch::Receiver<bool>,
238    poll_interval: Duration,
239    silence_threshold: Duration,
240    renewal_fraction: f64,
241) {
242    if subscriptions.is_empty() {
243        return;
244    }
245
246    let now = Instant::now();
247    let mut states: Vec<SubscriptionState> = subscriptions
248        .into_iter()
249        .map(|spec| SubscriptionState::new(spec, poll_interval, renewal_fraction, now))
250        .collect();
251
252    for state in &mut states {
253        let attempt = subscribe_spec(&client, &state.spec).await;
254        let now = Instant::now();
255        if attempt {
256            state.on_subscribe_success(now, renewal_fraction, poll_interval);
257        } else {
258            state.on_subscribe_failure(now, renewal_fraction);
259        }
260    }
261
262    let listen_window = poll_interval
263        .min(Duration::from_secs(1))
264        .max(Duration::from_millis(50));
265
266    loop {
267        if *shutdown_rx.borrow() {
268            return;
269        }
270
271        let recv_result = tokio::select! {
272            _ = shutdown_rx.changed() => {
273                if *shutdown_rx.borrow() {
274                    return;
275                }
276                continue;
277            }
278            recv_result = client.recv_cov_notification(listen_window) => recv_result,
279        };
280
281        match recv_result {
282            Ok(Some(notification)) => {
283                let now = Instant::now();
284                for state in &mut states {
285                    if !notification_matches_spec(&notification, &state.spec) {
286                        continue;
287                    }
288
289                    state.last_notification = Some(now);
290                    state.mode = SubscriptionMode::Cov;
291
292                    let values = filter_cov_values(&notification.values, state.spec.property_id);
293                    if state.spec.property_id.is_some() && values.is_empty() {
294                        continue;
295                    }
296
297                    let update = CovUpdate {
298                        address: state.spec.address,
299                        object_id: state.spec.object_id,
300                        values,
301                        source: UpdateSource::Cov,
302                    };
303                    if tx.send(update).is_err() {
304                        return;
305                    }
306                }
307            }
308            Ok(None) => {}
309            Err(err) => {
310                log::debug!("cov manager recv error: {err}");
311            }
312        }
313
314        let now = Instant::now();
315        for state in &mut states {
316            if now >= state.next_renewal {
317                if subscribe_spec(&client, &state.spec).await {
318                    state.on_subscribe_success(Instant::now(), renewal_fraction, poll_interval);
319                } else {
320                    state.on_subscribe_failure(Instant::now(), renewal_fraction);
321                }
322            }
323
324            if state.is_silent(now, silence_threshold) {
325                state.mode = SubscriptionMode::Polling;
326                if subscribe_spec(&client, &state.spec).await {
327                    state.on_subscribe_success(Instant::now(), renewal_fraction, poll_interval);
328                } else {
329                    state.on_subscribe_failure(Instant::now(), renewal_fraction);
330                }
331            }
332
333            if state.mode == SubscriptionMode::Polling && now >= state.next_poll {
334                if subscribe_spec(&client, &state.spec).await {
335                    state.on_subscribe_success(Instant::now(), renewal_fraction, poll_interval);
336                    continue;
337                }
338
339                if let Some(update) = poll_spec(&client, &state.spec).await {
340                    if tx.send(update).is_err() {
341                        return;
342                    }
343                }
344                state.next_poll = Instant::now() + poll_interval;
345            }
346        }
347    }
348}
349
350async fn subscribe_spec<D: DataLink>(client: &BacnetClient<D>, spec: &CovSubscriptionSpec) -> bool {
351    match spec.property_id {
352        Some(property_id) => client
353            .subscribe_cov_property(
354                spec.address,
355                SubscribeCovPropertyRequest {
356                    subscriber_process_id: spec.subscriber_process_id,
357                    monitored_object_id: spec.object_id,
358                    issue_confirmed_notifications: Some(spec.confirmed),
359                    lifetime_seconds: Some(spec.lifetime_seconds),
360                    monitored_property_id: property_id,
361                    monitored_property_array_index: None,
362                    cov_increment: spec.cov_increment,
363                    invoke_id: 0,
364                },
365            )
366            .await
367            .is_ok(),
368        None => client
369            .subscribe_cov(
370                spec.address,
371                SubscribeCovRequest {
372                    subscriber_process_id: spec.subscriber_process_id,
373                    monitored_object_id: spec.object_id,
374                    issue_confirmed_notifications: Some(spec.confirmed),
375                    lifetime_seconds: Some(spec.lifetime_seconds),
376                    invoke_id: 0,
377                },
378            )
379            .await
380            .is_ok(),
381    }
382}
383
384async fn poll_spec<D: DataLink>(
385    client: &BacnetClient<D>,
386    spec: &CovSubscriptionSpec,
387) -> Option<CovUpdate> {
388    let property_id = spec.property_id.unwrap_or(PropertyId::PresentValue);
389    let value = client
390        .read_property(spec.address, spec.object_id, property_id)
391        .await
392        .ok()?;
393
394    Some(CovUpdate {
395        address: spec.address,
396        object_id: spec.object_id,
397        values: vec![CovPropertyValue {
398            property_id,
399            array_index: None,
400            value,
401            priority: None,
402        }],
403        source: UpdateSource::Poll,
404    })
405}
406
407fn notification_matches_spec(notification: &CovNotification, spec: &CovSubscriptionSpec) -> bool {
408    notification.source == spec.address
409        && notification.monitored_object_id == spec.object_id
410        && notification.subscriber_process_id == spec.subscriber_process_id
411}
412
413fn filter_cov_values(
414    values: &[CovPropertyValue],
415    property_id: Option<PropertyId>,
416) -> Vec<CovPropertyValue> {
417    match property_id {
418        Some(property_id) => values
419            .iter()
420            .filter(|value| value.property_id == property_id)
421            .cloned()
422            .collect(),
423        None => values.to_vec(),
424    }
425}
426
427fn sanitize_fraction(fraction: f64) -> f64 {
428    if !fraction.is_finite() {
429        return 0.75;
430    }
431    fraction.clamp(0.01, 1.0)
432}
433
434fn renewal_delay_seconds(lifetime_seconds: u32, renewal_fraction: f64) -> Duration {
435    let seconds = ((lifetime_seconds.max(1) as f64) * sanitize_fraction(renewal_fraction))
436        .round()
437        .max(1.0);
438    Duration::from_secs(seconds as u64)
439}
440
441#[cfg(test)]
442mod tests {
443    use super::{
444        notification_matches_spec, renewal_delay_seconds, CovManagerBuilder, CovSubscriptionSpec,
445        SubscriptionMode, SubscriptionState, UpdateSource,
446    };
447    use crate::{BacnetClient, ClientDataValue, CovNotification, SimulatedDevice};
448    use rustbac_core::types::{ObjectId, ObjectType, PropertyId};
449    use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
450    use std::collections::HashMap;
451    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
452    use std::sync::Arc;
453    use std::time::Duration;
454    use tokio::sync::{mpsc, Mutex};
455    use tokio::time::{timeout, Instant};
456
457    #[derive(Clone)]
458    struct ChannelDataLink {
459        local_addr: DataLinkAddress,
460        tx: mpsc::UnboundedSender<(Vec<u8>, DataLinkAddress)>,
461        rx: Arc<Mutex<mpsc::UnboundedReceiver<(Vec<u8>, DataLinkAddress)>>>,
462    }
463
464    impl DataLink for ChannelDataLink {
465        async fn send(
466            &self,
467            _address: DataLinkAddress,
468            payload: &[u8],
469        ) -> Result<(), DataLinkError> {
470            self.tx
471                .send((payload.to_vec(), self.local_addr))
472                .map_err(|_| DataLinkError::InvalidFrame)
473        }
474
475        async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
476            let mut rx = self.rx.lock().await;
477            let Some((payload, source)) = rx.recv().await else {
478                return Err(DataLinkError::InvalidFrame);
479            };
480            if payload.len() > buf.len() {
481                return Err(DataLinkError::FrameTooLarge);
482            }
483            buf[..payload.len()].copy_from_slice(&payload);
484            Ok((payload.len(), source))
485        }
486    }
487
488    fn datalink_pair() -> (ChannelDataLink, ChannelDataLink, DataLinkAddress) {
489        let client_addr =
490            DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47820));
491        let simulator_addr =
492            DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47821));
493        let (client_tx, simulator_rx) = mpsc::unbounded_channel();
494        let (simulator_tx, client_rx) = mpsc::unbounded_channel();
495
496        (
497            ChannelDataLink {
498                local_addr: client_addr,
499                tx: client_tx,
500                rx: Arc::new(Mutex::new(client_rx)),
501            },
502            ChannelDataLink {
503                local_addr: simulator_addr,
504                tx: simulator_tx,
505                rx: Arc::new(Mutex::new(simulator_rx)),
506            },
507            simulator_addr,
508        )
509    }
510
511    #[test]
512    fn state_transitions_cov_to_polling_to_cov() {
513        let now = Instant::now();
514        let spec = CovSubscriptionSpec {
515            address: DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47830)),
516            object_id: ObjectId::new(ObjectType::AnalogInput, 1),
517            property_id: Some(PropertyId::PresentValue),
518            lifetime_seconds: 60,
519            cov_increment: None,
520            confirmed: false,
521            subscriber_process_id: 1,
522        };
523        let mut state = SubscriptionState::new(spec, Duration::from_secs(1), 0.75, now);
524        state.mode = SubscriptionMode::Cov;
525        state.last_notification = Some(now - Duration::from_secs(10));
526
527        assert!(state.is_silent(now, Duration::from_secs(5)));
528
529        state.on_subscribe_failure(now, 0.75);
530        assert_eq!(state.mode, SubscriptionMode::Polling);
531
532        state.on_subscribe_success(now, 0.75, Duration::from_secs(1));
533        assert_eq!(state.mode, SubscriptionMode::Cov);
534        // last_notification is NOT set by on_subscribe_success — only by real notifications.
535        assert_eq!(state.last_notification, Some(now - Duration::from_secs(10)));
536        // cov_mode_since is set on the Polling→Cov transition.
537        assert_eq!(state.cov_mode_since, Some(now));
538    }
539
540    /// A device that accepts subscriptions but never sends notifications should
541    /// be detected as silent and not have its silence window reset by renewals.
542    #[test]
543    fn silence_detected_when_no_notifications_ever_received() {
544        let spec = CovSubscriptionSpec {
545            address: DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47830)),
546            object_id: ObjectId::new(ObjectType::AnalogInput, 1),
547            property_id: None,
548            lifetime_seconds: 60,
549            cov_increment: None,
550            confirmed: false,
551            subscriber_process_id: 1,
552        };
553        let t0 = Instant::now();
554        let mut state = SubscriptionState::new(spec, Duration::from_secs(30), 0.75, t0);
555
556        // Simulate initial subscribe succeeding (SimpleAck received).
557        state.on_subscribe_success(t0, 0.75, Duration::from_secs(30));
558        assert_eq!(state.mode, SubscriptionMode::Cov);
559        assert_eq!(state.last_notification, None); // no real notification yet
560        assert_eq!(state.cov_mode_since, Some(t0));
561
562        // No notification received — but cov_mode_since is set so silence is detectable.
563        let threshold = Duration::from_secs(5 * 60);
564        let t_later = t0 + threshold + Duration::from_secs(1);
565        assert!(state.is_silent(t_later, threshold));
566
567        // A renewal (another SimpleAck) must NOT reset the silence window.
568        state.on_subscribe_success(t_later, 0.75, Duration::from_secs(30));
569        assert!(
570            state.is_silent(t_later, threshold),
571            "silence window must not be reset by renewal"
572        );
573    }
574
575    #[test]
576    fn renewal_fraction_scales_lifetime() {
577        assert_eq!(renewal_delay_seconds(120, 0.75), Duration::from_secs(90));
578        assert_eq!(renewal_delay_seconds(120, 1.5), Duration::from_secs(120));
579        assert_eq!(renewal_delay_seconds(120, 0.0), Duration::from_secs(1));
580    }
581
582    #[test]
583    fn matching_includes_subscriber_process_id() {
584        let address = DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47830));
585        let object_id = ObjectId::new(ObjectType::AnalogInput, 1);
586        let spec = CovSubscriptionSpec {
587            address,
588            object_id,
589            property_id: None,
590            lifetime_seconds: 60,
591            cov_increment: None,
592            confirmed: false,
593            subscriber_process_id: 42,
594        };
595        let mut notification = CovNotification {
596            source: address,
597            confirmed: false,
598            subscriber_process_id: 7,
599            initiating_device_id: ObjectId::new(ObjectType::Device, 100),
600            monitored_object_id: object_id,
601            time_remaining_seconds: 30,
602            values: vec![],
603        };
604        assert!(!notification_matches_spec(&notification, &spec));
605
606        notification.subscriber_process_id = 42;
607        assert!(notification_matches_spec(&notification, &spec));
608    }
609
610    #[tokio::test]
611    async fn polling_fallback_emits_updates_with_simulator() {
612        let (client_dl, simulator_dl, simulator_addr) = datalink_pair();
613
614        let simulator = SimulatedDevice::new(2000, simulator_dl);
615        let object_id = ObjectId::new(ObjectType::AnalogInput, 1);
616        let mut props = HashMap::new();
617        props.insert(PropertyId::PresentValue, ClientDataValue::Real(42.0));
618        props.insert(
619            PropertyId::ObjectName,
620            ClientDataValue::CharacterString("AI-1".to_string()),
621        );
622        simulator.add_object(object_id, props).await;
623
624        let simulator_task = tokio::spawn(async move {
625            let _ = simulator.run().await;
626        });
627
628        let client = Arc::new(
629            BacnetClient::with_datalink(client_dl).with_response_timeout(Duration::from_millis(50)),
630        );
631
632        let spec = CovSubscriptionSpec {
633            address: simulator_addr,
634            object_id,
635            property_id: Some(PropertyId::PresentValue),
636            lifetime_seconds: 30,
637            cov_increment: None,
638            confirmed: false,
639            subscriber_process_id: 99,
640        };
641
642        let mut manager = CovManagerBuilder::new(client)
643            .subscribe(spec)
644            .poll_interval(Duration::from_millis(75))
645            .silence_threshold(Duration::from_millis(200))
646            .build()
647            .expect("build() failed: no Tokio runtime");
648
649        let update = timeout(Duration::from_secs(2), manager.recv())
650            .await
651            .expect("manager recv timed out")
652            .expect("manager channel closed unexpectedly");
653
654        assert_eq!(update.source, UpdateSource::Poll);
655        assert_eq!(update.object_id, object_id);
656        assert_eq!(update.values.len(), 1);
657        assert_eq!(update.values[0].property_id, PropertyId::PresentValue);
658        assert_eq!(update.values[0].value, ClientDataValue::Real(42.0));
659
660        manager.stop();
661        simulator_task.abort();
662    }
663}