1use crate::{BacnetClient, CovNotification, CovPropertyValue};
2use rustbac_core::services::subscribe_cov::SubscribeCovRequest;
3use rustbac_core::services::subscribe_cov_property::SubscribeCovPropertyRequest;
4use rustbac_core::types::{ObjectId, PropertyId};
5use rustbac_datalink::{DataLink, DataLinkAddress};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{mpsc, watch};
9use tokio::time::Instant;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
14pub enum UpdateSource {
15 Cov,
16 Poll,
17}
18
19#[derive(Debug, Clone)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct CovSubscriptionSpec {
23 pub address: DataLinkAddress,
24 pub object_id: ObjectId,
25 pub property_id: Option<PropertyId>,
26 pub lifetime_seconds: u32,
27 pub cov_increment: Option<f32>,
28 pub confirmed: bool,
29 pub subscriber_process_id: u32,
30}
31
32#[derive(Debug, Clone)]
34#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
35pub struct CovUpdate {
36 pub address: DataLinkAddress,
37 pub object_id: ObjectId,
38 pub values: Vec<CovPropertyValue>,
39 pub source: UpdateSource,
40}
41
42#[derive(Debug)]
44pub struct CovManager {
45 thread: Option<std::thread::JoinHandle<()>>,
46 shutdown: watch::Sender<bool>,
47 rx: mpsc::UnboundedReceiver<CovUpdate>,
48}
49
50impl CovManager {
51 pub async fn recv(&mut self) -> Option<CovUpdate> {
53 self.rx.recv().await
54 }
55
56 pub fn stop(mut self) {
58 let _ = self.shutdown.send(true);
59 if let Some(thread) = self.thread.take() {
60 let _ = thread.join();
61 }
62 }
63}
64
65impl Drop for CovManager {
66 fn drop(&mut self) {
67 let _ = self.shutdown.send(true);
68 }
69}
70
71pub struct CovManagerBuilder<D: DataLink> {
73 client: Arc<BacnetClient<D>>,
74 subscriptions: Vec<CovSubscriptionSpec>,
75 poll_interval: Duration,
76 silence_threshold: Duration,
77 renewal_fraction: f64,
78}
79
80impl<D: DataLink + 'static> CovManagerBuilder<D> {
81 pub fn new(client: Arc<BacnetClient<D>>) -> Self {
82 Self {
83 client,
84 subscriptions: Vec::new(),
85 poll_interval: Duration::from_secs(30),
86 silence_threshold: Duration::from_secs(5 * 60),
87 renewal_fraction: 0.75,
88 }
89 }
90
91 pub fn subscribe(mut self, spec: CovSubscriptionSpec) -> Self {
92 self.subscriptions.push(spec);
93 self
94 }
95
96 pub fn poll_interval(mut self, duration: Duration) -> Self {
97 self.poll_interval = duration;
98 self
99 }
100
101 pub fn silence_threshold(mut self, duration: Duration) -> Self {
102 self.silence_threshold = duration;
103 self
104 }
105
106 pub fn renewal_fraction(mut self, fraction: f64) -> Self {
107 self.renewal_fraction = fraction;
108 self
109 }
110
111 pub fn build(self) -> Result<CovManager, crate::ClientError> {
112 let runtime_handle = tokio::runtime::Handle::try_current()
113 .map_err(|_| crate::ClientError::NoTokioRuntime)?;
114
115 let (tx, rx) = mpsc::unbounded_channel();
116 let (shutdown_tx, shutdown_rx) = watch::channel(false);
117 let poll_interval = self.poll_interval.max(Duration::from_millis(1));
118 let silence_threshold = self.silence_threshold.max(Duration::from_millis(1));
119 let renewal_fraction = sanitize_fraction(self.renewal_fraction);
120 let client = self.client;
121 let subscriptions = self.subscriptions;
122
123 let thread = std::thread::spawn(move || {
124 runtime_handle.block_on(async move {
125 run_cov_manager(
126 client,
127 subscriptions,
128 tx,
129 shutdown_rx,
130 poll_interval,
131 silence_threshold,
132 renewal_fraction,
133 )
134 .await;
135 });
136 });
137 Ok(CovManager {
138 thread: Some(thread),
139 shutdown: shutdown_tx,
140 rx,
141 })
142 }
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146enum SubscriptionMode {
147 Cov,
148 Polling,
149}
150
151#[derive(Debug, Clone)]
152struct SubscriptionState {
153 spec: CovSubscriptionSpec,
154 mode: SubscriptionMode,
155 last_notification: Option<Instant>,
157 cov_mode_since: Option<Instant>,
162 next_renewal: Instant,
163 next_poll: Instant,
164}
165
166impl SubscriptionState {
167 fn new(
168 spec: CovSubscriptionSpec,
169 poll_interval: Duration,
170 renewal_fraction: f64,
171 now: Instant,
172 ) -> Self {
173 let lifetime_seconds = spec.lifetime_seconds;
174 Self {
175 spec,
176 mode: SubscriptionMode::Cov,
177 last_notification: None,
178 cov_mode_since: None,
179 next_renewal: now + renewal_delay_seconds(lifetime_seconds, renewal_fraction),
180 next_poll: now + poll_interval,
181 }
182 }
183
184 fn on_subscribe_success(
185 &mut self,
186 now: Instant,
187 renewal_fraction: f64,
188 poll_interval: Duration,
189 ) {
190 if self.mode != SubscriptionMode::Cov || self.cov_mode_since.is_none() {
194 self.cov_mode_since = Some(now);
195 }
196 self.mode = SubscriptionMode::Cov;
197 self.next_renewal =
203 now + renewal_delay_seconds(self.spec.lifetime_seconds, renewal_fraction);
204 self.next_poll = now + poll_interval;
205 }
206
207 fn on_subscribe_failure(&mut self, now: Instant, renewal_fraction: f64) {
208 self.mode = SubscriptionMode::Polling;
209 self.cov_mode_since = None;
211 self.next_renewal =
212 now + renewal_delay_seconds(self.spec.lifetime_seconds, renewal_fraction);
213 self.next_poll = now;
214 }
215
216 fn is_silent(&self, now: Instant, threshold: Duration) -> bool {
220 if self.mode != SubscriptionMode::Cov {
221 return false;
222 }
223 let reference = self.last_notification.or(self.cov_mode_since);
227 reference
228 .map(|t| now.saturating_duration_since(t) > threshold)
229 .unwrap_or(false)
230 }
231}
232
233async fn run_cov_manager<D: DataLink>(
234 client: Arc<BacnetClient<D>>,
235 subscriptions: Vec<CovSubscriptionSpec>,
236 tx: mpsc::UnboundedSender<CovUpdate>,
237 mut shutdown_rx: watch::Receiver<bool>,
238 poll_interval: Duration,
239 silence_threshold: Duration,
240 renewal_fraction: f64,
241) {
242 if subscriptions.is_empty() {
243 return;
244 }
245
246 let now = Instant::now();
247 let mut states: Vec<SubscriptionState> = subscriptions
248 .into_iter()
249 .map(|spec| SubscriptionState::new(spec, poll_interval, renewal_fraction, now))
250 .collect();
251
252 for state in &mut states {
253 let attempt = subscribe_spec(&client, &state.spec).await;
254 let now = Instant::now();
255 if attempt {
256 state.on_subscribe_success(now, renewal_fraction, poll_interval);
257 } else {
258 state.on_subscribe_failure(now, renewal_fraction);
259 }
260 }
261
262 let listen_window = poll_interval
263 .min(Duration::from_secs(1))
264 .max(Duration::from_millis(50));
265
266 loop {
267 if *shutdown_rx.borrow() {
268 return;
269 }
270
271 let recv_result = tokio::select! {
272 _ = shutdown_rx.changed() => {
273 if *shutdown_rx.borrow() {
274 return;
275 }
276 continue;
277 }
278 recv_result = client.recv_cov_notification(listen_window) => recv_result,
279 };
280
281 match recv_result {
282 Ok(Some(notification)) => {
283 let now = Instant::now();
284 for state in &mut states {
285 if !notification_matches_spec(¬ification, &state.spec) {
286 continue;
287 }
288
289 state.last_notification = Some(now);
290 state.mode = SubscriptionMode::Cov;
291
292 let values = filter_cov_values(¬ification.values, state.spec.property_id);
293 if state.spec.property_id.is_some() && values.is_empty() {
294 continue;
295 }
296
297 let update = CovUpdate {
298 address: state.spec.address,
299 object_id: state.spec.object_id,
300 values,
301 source: UpdateSource::Cov,
302 };
303 if tx.send(update).is_err() {
304 return;
305 }
306 }
307 }
308 Ok(None) => {}
309 Err(err) => {
310 log::debug!("cov manager recv error: {err}");
311 }
312 }
313
314 let now = Instant::now();
315 for state in &mut states {
316 if now >= state.next_renewal {
317 if subscribe_spec(&client, &state.spec).await {
318 state.on_subscribe_success(Instant::now(), renewal_fraction, poll_interval);
319 } else {
320 state.on_subscribe_failure(Instant::now(), renewal_fraction);
321 }
322 }
323
324 if state.is_silent(now, silence_threshold) {
325 state.mode = SubscriptionMode::Polling;
326 if subscribe_spec(&client, &state.spec).await {
327 state.on_subscribe_success(Instant::now(), renewal_fraction, poll_interval);
328 } else {
329 state.on_subscribe_failure(Instant::now(), renewal_fraction);
330 }
331 }
332
333 if state.mode == SubscriptionMode::Polling && now >= state.next_poll {
334 if subscribe_spec(&client, &state.spec).await {
335 state.on_subscribe_success(Instant::now(), renewal_fraction, poll_interval);
336 continue;
337 }
338
339 if let Some(update) = poll_spec(&client, &state.spec).await {
340 if tx.send(update).is_err() {
341 return;
342 }
343 }
344 state.next_poll = Instant::now() + poll_interval;
345 }
346 }
347 }
348}
349
350async fn subscribe_spec<D: DataLink>(client: &BacnetClient<D>, spec: &CovSubscriptionSpec) -> bool {
351 match spec.property_id {
352 Some(property_id) => client
353 .subscribe_cov_property(
354 spec.address,
355 SubscribeCovPropertyRequest {
356 subscriber_process_id: spec.subscriber_process_id,
357 monitored_object_id: spec.object_id,
358 issue_confirmed_notifications: Some(spec.confirmed),
359 lifetime_seconds: Some(spec.lifetime_seconds),
360 monitored_property_id: property_id,
361 monitored_property_array_index: None,
362 cov_increment: spec.cov_increment,
363 invoke_id: 0,
364 },
365 )
366 .await
367 .is_ok(),
368 None => client
369 .subscribe_cov(
370 spec.address,
371 SubscribeCovRequest {
372 subscriber_process_id: spec.subscriber_process_id,
373 monitored_object_id: spec.object_id,
374 issue_confirmed_notifications: Some(spec.confirmed),
375 lifetime_seconds: Some(spec.lifetime_seconds),
376 invoke_id: 0,
377 },
378 )
379 .await
380 .is_ok(),
381 }
382}
383
384async fn poll_spec<D: DataLink>(
385 client: &BacnetClient<D>,
386 spec: &CovSubscriptionSpec,
387) -> Option<CovUpdate> {
388 let property_id = spec.property_id.unwrap_or(PropertyId::PresentValue);
389 let value = client
390 .read_property(spec.address, spec.object_id, property_id)
391 .await
392 .ok()?;
393
394 Some(CovUpdate {
395 address: spec.address,
396 object_id: spec.object_id,
397 values: vec![CovPropertyValue {
398 property_id,
399 array_index: None,
400 value,
401 priority: None,
402 }],
403 source: UpdateSource::Poll,
404 })
405}
406
407fn notification_matches_spec(notification: &CovNotification, spec: &CovSubscriptionSpec) -> bool {
408 notification.source == spec.address
409 && notification.monitored_object_id == spec.object_id
410 && notification.subscriber_process_id == spec.subscriber_process_id
411}
412
413fn filter_cov_values(
414 values: &[CovPropertyValue],
415 property_id: Option<PropertyId>,
416) -> Vec<CovPropertyValue> {
417 match property_id {
418 Some(property_id) => values
419 .iter()
420 .filter(|value| value.property_id == property_id)
421 .cloned()
422 .collect(),
423 None => values.to_vec(),
424 }
425}
426
427fn sanitize_fraction(fraction: f64) -> f64 {
428 if !fraction.is_finite() {
429 return 0.75;
430 }
431 fraction.clamp(0.01, 1.0)
432}
433
434fn renewal_delay_seconds(lifetime_seconds: u32, renewal_fraction: f64) -> Duration {
435 let seconds = ((lifetime_seconds.max(1) as f64) * sanitize_fraction(renewal_fraction))
436 .round()
437 .max(1.0);
438 Duration::from_secs(seconds as u64)
439}
440
441#[cfg(test)]
442mod tests {
443 use super::{
444 notification_matches_spec, renewal_delay_seconds, CovManagerBuilder, CovSubscriptionSpec,
445 SubscriptionMode, SubscriptionState, UpdateSource,
446 };
447 use crate::{BacnetClient, ClientDataValue, CovNotification, SimulatedDevice};
448 use rustbac_core::types::{ObjectId, ObjectType, PropertyId};
449 use rustbac_datalink::{DataLink, DataLinkAddress, DataLinkError};
450 use std::collections::HashMap;
451 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
452 use std::sync::Arc;
453 use std::time::Duration;
454 use tokio::sync::{mpsc, Mutex};
455 use tokio::time::{timeout, Instant};
456
457 #[derive(Clone)]
458 struct ChannelDataLink {
459 local_addr: DataLinkAddress,
460 tx: mpsc::UnboundedSender<(Vec<u8>, DataLinkAddress)>,
461 rx: Arc<Mutex<mpsc::UnboundedReceiver<(Vec<u8>, DataLinkAddress)>>>,
462 }
463
464 impl DataLink for ChannelDataLink {
465 async fn send(
466 &self,
467 _address: DataLinkAddress,
468 payload: &[u8],
469 ) -> Result<(), DataLinkError> {
470 self.tx
471 .send((payload.to_vec(), self.local_addr))
472 .map_err(|_| DataLinkError::InvalidFrame)
473 }
474
475 async fn recv(&self, buf: &mut [u8]) -> Result<(usize, DataLinkAddress), DataLinkError> {
476 let mut rx = self.rx.lock().await;
477 let Some((payload, source)) = rx.recv().await else {
478 return Err(DataLinkError::InvalidFrame);
479 };
480 if payload.len() > buf.len() {
481 return Err(DataLinkError::FrameTooLarge);
482 }
483 buf[..payload.len()].copy_from_slice(&payload);
484 Ok((payload.len(), source))
485 }
486 }
487
488 fn datalink_pair() -> (ChannelDataLink, ChannelDataLink, DataLinkAddress) {
489 let client_addr =
490 DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47820));
491 let simulator_addr =
492 DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47821));
493 let (client_tx, simulator_rx) = mpsc::unbounded_channel();
494 let (simulator_tx, client_rx) = mpsc::unbounded_channel();
495
496 (
497 ChannelDataLink {
498 local_addr: client_addr,
499 tx: client_tx,
500 rx: Arc::new(Mutex::new(client_rx)),
501 },
502 ChannelDataLink {
503 local_addr: simulator_addr,
504 tx: simulator_tx,
505 rx: Arc::new(Mutex::new(simulator_rx)),
506 },
507 simulator_addr,
508 )
509 }
510
511 #[test]
512 fn state_transitions_cov_to_polling_to_cov() {
513 let now = Instant::now();
514 let spec = CovSubscriptionSpec {
515 address: DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47830)),
516 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
517 property_id: Some(PropertyId::PresentValue),
518 lifetime_seconds: 60,
519 cov_increment: None,
520 confirmed: false,
521 subscriber_process_id: 1,
522 };
523 let mut state = SubscriptionState::new(spec, Duration::from_secs(1), 0.75, now);
524 state.mode = SubscriptionMode::Cov;
525 state.last_notification = Some(now - Duration::from_secs(10));
526
527 assert!(state.is_silent(now, Duration::from_secs(5)));
528
529 state.on_subscribe_failure(now, 0.75);
530 assert_eq!(state.mode, SubscriptionMode::Polling);
531
532 state.on_subscribe_success(now, 0.75, Duration::from_secs(1));
533 assert_eq!(state.mode, SubscriptionMode::Cov);
534 assert_eq!(state.last_notification, Some(now - Duration::from_secs(10)));
536 assert_eq!(state.cov_mode_since, Some(now));
538 }
539
540 #[test]
543 fn silence_detected_when_no_notifications_ever_received() {
544 let spec = CovSubscriptionSpec {
545 address: DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47830)),
546 object_id: ObjectId::new(ObjectType::AnalogInput, 1),
547 property_id: None,
548 lifetime_seconds: 60,
549 cov_increment: None,
550 confirmed: false,
551 subscriber_process_id: 1,
552 };
553 let t0 = Instant::now();
554 let mut state = SubscriptionState::new(spec, Duration::from_secs(30), 0.75, t0);
555
556 state.on_subscribe_success(t0, 0.75, Duration::from_secs(30));
558 assert_eq!(state.mode, SubscriptionMode::Cov);
559 assert_eq!(state.last_notification, None); assert_eq!(state.cov_mode_since, Some(t0));
561
562 let threshold = Duration::from_secs(5 * 60);
564 let t_later = t0 + threshold + Duration::from_secs(1);
565 assert!(state.is_silent(t_later, threshold));
566
567 state.on_subscribe_success(t_later, 0.75, Duration::from_secs(30));
569 assert!(
570 state.is_silent(t_later, threshold),
571 "silence window must not be reset by renewal"
572 );
573 }
574
575 #[test]
576 fn renewal_fraction_scales_lifetime() {
577 assert_eq!(renewal_delay_seconds(120, 0.75), Duration::from_secs(90));
578 assert_eq!(renewal_delay_seconds(120, 1.5), Duration::from_secs(120));
579 assert_eq!(renewal_delay_seconds(120, 0.0), Duration::from_secs(1));
580 }
581
582 #[test]
583 fn matching_includes_subscriber_process_id() {
584 let address = DataLinkAddress::Ip(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 47830));
585 let object_id = ObjectId::new(ObjectType::AnalogInput, 1);
586 let spec = CovSubscriptionSpec {
587 address,
588 object_id,
589 property_id: None,
590 lifetime_seconds: 60,
591 cov_increment: None,
592 confirmed: false,
593 subscriber_process_id: 42,
594 };
595 let mut notification = CovNotification {
596 source: address,
597 confirmed: false,
598 subscriber_process_id: 7,
599 initiating_device_id: ObjectId::new(ObjectType::Device, 100),
600 monitored_object_id: object_id,
601 time_remaining_seconds: 30,
602 values: vec![],
603 };
604 assert!(!notification_matches_spec(¬ification, &spec));
605
606 notification.subscriber_process_id = 42;
607 assert!(notification_matches_spec(¬ification, &spec));
608 }
609
610 #[tokio::test]
611 async fn polling_fallback_emits_updates_with_simulator() {
612 let (client_dl, simulator_dl, simulator_addr) = datalink_pair();
613
614 let simulator = SimulatedDevice::new(2000, simulator_dl);
615 let object_id = ObjectId::new(ObjectType::AnalogInput, 1);
616 let mut props = HashMap::new();
617 props.insert(PropertyId::PresentValue, ClientDataValue::Real(42.0));
618 props.insert(
619 PropertyId::ObjectName,
620 ClientDataValue::CharacterString("AI-1".to_string()),
621 );
622 simulator.add_object(object_id, props).await;
623
624 let simulator_task = tokio::spawn(async move {
625 let _ = simulator.run().await;
626 });
627
628 let client = Arc::new(
629 BacnetClient::with_datalink(client_dl).with_response_timeout(Duration::from_millis(50)),
630 );
631
632 let spec = CovSubscriptionSpec {
633 address: simulator_addr,
634 object_id,
635 property_id: Some(PropertyId::PresentValue),
636 lifetime_seconds: 30,
637 cov_increment: None,
638 confirmed: false,
639 subscriber_process_id: 99,
640 };
641
642 let mut manager = CovManagerBuilder::new(client)
643 .subscribe(spec)
644 .poll_interval(Duration::from_millis(75))
645 .silence_threshold(Duration::from_millis(200))
646 .build()
647 .expect("build() failed: no Tokio runtime");
648
649 let update = timeout(Duration::from_secs(2), manager.recv())
650 .await
651 .expect("manager recv timed out")
652 .expect("manager channel closed unexpectedly");
653
654 assert_eq!(update.source, UpdateSource::Poll);
655 assert_eq!(update.object_id, object_id);
656 assert_eq!(update.values.len(), 1);
657 assert_eq!(update.values[0].property_id, PropertyId::PresentValue);
658 assert_eq!(update.values[0].value, ClientDataValue::Real(42.0));
659
660 manager.stop();
661 simulator_task.abort();
662 }
663}