Skip to main content

rustbac_client/
listener.rs

1//! Long-running async notification listener.
2//!
3//! Provides a notification listener that receives COV and event notifications
4//! and dispatches them through a bounded channel.
5
6use crate::{ClientDataValue, CovNotification, CovPropertyValue, EventNotification};
7use rustbac_core::apdu::{
8    abort_reason, AbortPdu, ApduType, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader,
9};
10use rustbac_core::encoding::{reader::Reader, writer::Writer};
11use rustbac_core::npdu::Npdu;
12use rustbac_core::services::acknowledge_alarm::EventState;
13use rustbac_core::services::cov_notification::{
14    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
15    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
16};
17use rustbac_core::services::event_notification::{
18    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
19    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
20};
21use rustbac_datalink::{DataLink, DataLinkAddress};
22use std::sync::Arc;
23use tokio::sync::mpsc;
24
25/// Default capacity of the bounded channel used by [`create_notification_listener`].
26///
27/// When this many notifications are queued without being consumed the driver loop drops
28/// new arrivals rather than growing the queue without bound.
29pub const DEFAULT_NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
30
31/// A notification received from a BACnet device — either a COV or an event notification.
32#[derive(Debug, Clone)]
33pub enum Notification {
34    /// A change-of-value notification (confirmed or unconfirmed SubscribeCOV / SubscribeCOVProperty).
35    Cov(CovNotification),
36    /// An event notification (confirmed or unconfirmed EventNotification service).
37    Event(EventNotification),
38}
39
40/// Consumer half of a BACnet notification channel.
41///
42/// Produced by [`create_notification_listener`] or
43/// [`create_notification_listener_with_capacity`]. The channel is closed and `recv`
44/// returns `None` once the driver future finishes (i.e. when this receiver is dropped).
45pub struct NotificationListener {
46    rx: mpsc::Receiver<Notification>,
47}
48
49impl NotificationListener {
50    /// Wait for and return the next notification. Returns `None` when the driver has stopped.
51    pub async fn recv(&mut self) -> Option<Notification> {
52        self.rx.recv().await
53    }
54}
55
56/// Create a notification listener backed by a channel with [`DEFAULT_NOTIFICATION_CHANNEL_CAPACITY`].
57///
58/// Returns `(listener, driver)` where `driver` is a future that must be
59/// polled (e.g. via `tokio::spawn`) for notifications to be received.
60/// The driver runs until the [`NotificationListener`] is dropped. When the channel is
61/// full, incoming notifications are silently discarded to bound memory usage.
62/// Confirmed notifications are automatically acknowledged; segmented confirmed
63/// notifications are rejected with an Abort PDU (segmentation not supported).
64pub fn create_notification_listener<D: DataLink + 'static>(
65    datalink: Arc<D>,
66) -> (NotificationListener, impl std::future::Future<Output = ()>) {
67    create_notification_listener_with_capacity(datalink, DEFAULT_NOTIFICATION_CHANNEL_CAPACITY)
68}
69
70/// Like [`create_notification_listener`] but with an explicit channel `capacity`.
71///
72/// `capacity` is clamped to a minimum of 1. Prefer
73/// [`create_notification_listener`] unless you need a non-default buffer size.
74pub fn create_notification_listener_with_capacity<D: DataLink + 'static>(
75    datalink: Arc<D>,
76    capacity: usize,
77) -> (NotificationListener, impl std::future::Future<Output = ()>) {
78    let (tx, rx) = mpsc::channel(capacity.max(1));
79    let driver = async move {
80        let mut buf = [0u8; 1500];
81        loop {
82            let (n, source) = match datalink.recv(&mut buf).await {
83                Ok(v) => v,
84                Err(_) => continue,
85            };
86
87            match parse_notification(&buf[..n], source) {
88                ParseResult::None => {}
89                ParseResult::Abort(ack_bytes) => {
90                    #[cfg(feature = "tracing")]
91                    tracing::warn!("segmented notification aborted — segmentation not supported");
92                    let _ = datalink.send(source, &ack_bytes).await;
93                }
94                ParseResult::Notification(notification, ack) => {
95                    if let Some(ack_bytes) = ack {
96                        let _ = datalink.send(source, &ack_bytes).await;
97                    }
98                    // Drop notifications when the consumer is slow rather than
99                    // growing the queue without bound. Break only when the
100                    // receiver has been dropped; a full channel just discards
101                    // this notification.
102                    if tx.try_send(notification).is_err() {
103                        if tx.is_closed() {
104                            break; // receiver dropped
105                        }
106                        #[cfg(feature = "tracing")]
107                        tracing::warn!("notification channel full — dropping notification");
108                    }
109                }
110            }
111        }
112    };
113
114    (NotificationListener { rx }, driver)
115}
116
117enum ParseResult {
118    None,
119    /// Segmented request we cannot handle — send an Abort, emit no notification.
120    Abort(Vec<u8>),
121    /// Parsed notification and optional ack to send back.
122    Notification(Notification, Option<Vec<u8>>),
123}
124
125fn parse_notification(frame: &[u8], source: DataLinkAddress) -> ParseResult {
126    let apdu = match extract_apdu(frame) {
127        Some(a) => a,
128        None => return ParseResult::None,
129    };
130    let first = match apdu.first() {
131        Some(&b) => b,
132        None => return ParseResult::None,
133    };
134    let apdu_type = match ApduType::from_u8(first >> 4) {
135        Some(t) => t,
136        None => return ParseResult::None,
137    };
138
139    match apdu_type {
140        ApduType::UnconfirmedRequest => {
141            let mut r = Reader::new(apdu);
142            let header = match UnconfirmedRequestHeader::decode(&mut r) {
143                Ok(h) => h,
144                Err(_) => return ParseResult::None,
145            };
146            match header.service_choice {
147                SERVICE_UNCONFIRMED_COV_NOTIFICATION => {
148                    let cov = match CovNotificationRequest::decode_after_header(&mut r) {
149                        Ok(c) => c,
150                        Err(_) => return ParseResult::None,
151                    };
152                    match build_cov_notification(source, false, cov) {
153                        Some(n) => ParseResult::Notification(Notification::Cov(n), None),
154                        None => ParseResult::None,
155                    }
156                }
157                SERVICE_UNCONFIRMED_EVENT_NOTIFICATION => {
158                    let evt = match EventNotificationRequest::decode_after_header(&mut r) {
159                        Ok(e) => e,
160                        Err(_) => return ParseResult::None,
161                    };
162                    match build_event_notification(source, false, evt) {
163                        Some(n) => ParseResult::Notification(Notification::Event(n), None),
164                        None => ParseResult::None,
165                    }
166                }
167                _ => ParseResult::None,
168            }
169        }
170        ApduType::ConfirmedRequest => {
171            let mut r = Reader::new(apdu);
172            let header = match ConfirmedRequestHeader::decode(&mut r) {
173                Ok(h) => h,
174                Err(_) => return ParseResult::None,
175            };
176
177            // Segmented confirmed notifications are not supported. Send an
178            // Abort so the remote device knows and doesn't keep retrying.
179            if header.segmented {
180                return ParseResult::Abort(build_abort(header.invoke_id));
181            }
182
183            match header.service_choice {
184                SERVICE_CONFIRMED_COV_NOTIFICATION => {
185                    let cov = match CovNotificationRequest::decode_after_header(&mut r) {
186                        Ok(c) => c,
187                        Err(_) => return ParseResult::None,
188                    };
189                    match build_cov_notification(source, true, cov) {
190                        Some(n) => {
191                            let ack = build_simple_ack(
192                                header.invoke_id,
193                                SERVICE_CONFIRMED_COV_NOTIFICATION,
194                            );
195                            ParseResult::Notification(Notification::Cov(n), Some(ack))
196                        }
197                        None => ParseResult::None,
198                    }
199                }
200                SERVICE_CONFIRMED_EVENT_NOTIFICATION => {
201                    let evt = match EventNotificationRequest::decode_after_header(&mut r) {
202                        Ok(e) => e,
203                        Err(_) => return ParseResult::None,
204                    };
205                    match build_event_notification(source, true, evt) {
206                        Some(n) => {
207                            let ack = build_simple_ack(
208                                header.invoke_id,
209                                SERVICE_CONFIRMED_EVENT_NOTIFICATION,
210                            );
211                            ParseResult::Notification(Notification::Event(n), Some(ack))
212                        }
213                        None => ParseResult::None,
214                    }
215                }
216                _ => ParseResult::None,
217            }
218        }
219        _ => ParseResult::None,
220    }
221}
222
223fn extract_apdu(frame: &[u8]) -> Option<&[u8]> {
224    let mut r = Reader::new(frame);
225    Npdu::decode(&mut r).ok()?;
226    let remaining = r.remaining();
227    if remaining == 0 {
228        return None;
229    }
230    Some(&frame[frame.len() - remaining..])
231}
232
233fn build_simple_ack(invoke_id: u8, service_choice: u8) -> Vec<u8> {
234    let mut buf = [0u8; 32];
235    let mut w = Writer::new(&mut buf);
236    Npdu::new(0).encode(&mut w).unwrap();
237    SimpleAck {
238        invoke_id,
239        service_choice,
240    }
241    .encode(&mut w)
242    .unwrap();
243    w.as_written().to_vec()
244}
245
246fn build_abort(invoke_id: u8) -> Vec<u8> {
247    let mut buf = [0u8; 32];
248    let mut w = Writer::new(&mut buf);
249    Npdu::new(0).encode(&mut w).unwrap();
250    AbortPdu {
251        server: false,
252        invoke_id,
253        reason: abort_reason::SEGMENTATION_NOT_SUPPORTED,
254    }
255    .encode(&mut w)
256    .unwrap();
257    w.as_written().to_vec()
258}
259
260fn build_cov_notification(
261    source: DataLinkAddress,
262    confirmed: bool,
263    cov: CovNotificationRequest<'_>,
264) -> Option<CovNotification> {
265    let values = cov
266        .values
267        .into_iter()
268        .filter_map(|v| {
269            Some(CovPropertyValue {
270                property_id: v.property_id,
271                array_index: v.array_index,
272                value: into_client_value(v.value)?,
273                priority: v.priority,
274            })
275        })
276        .collect();
277
278    Some(CovNotification {
279        source,
280        confirmed,
281        subscriber_process_id: cov.subscriber_process_id,
282        initiating_device_id: cov.initiating_device_id,
283        monitored_object_id: cov.monitored_object_id,
284        time_remaining_seconds: cov.time_remaining_seconds,
285        values,
286    })
287}
288
289fn build_event_notification(
290    source: DataLinkAddress,
291    confirmed: bool,
292    evt: EventNotificationRequest<'_>,
293) -> Option<EventNotification> {
294    Some(EventNotification {
295        source,
296        confirmed,
297        process_id: evt.process_id,
298        initiating_device_id: evt.initiating_device_id,
299        event_object_id: evt.event_object_id,
300        timestamp: evt.timestamp,
301        notification_class: evt.notification_class,
302        priority: evt.priority,
303        event_type: evt.event_type,
304        message_text: evt.message_text.map(|s| s.to_string()),
305        notify_type: evt.notify_type,
306        ack_required: evt.ack_required,
307        from_state_raw: evt.from_state,
308        from_state: EventState::from_u32(evt.from_state),
309        to_state_raw: evt.to_state,
310        to_state: EventState::from_u32(evt.to_state),
311    })
312}
313
314fn into_client_value(value: rustbac_core::types::DataValue<'_>) -> Option<ClientDataValue> {
315    use rustbac_core::types::DataValue;
316    Some(match value {
317        DataValue::Null => ClientDataValue::Null,
318        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
319        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
320        DataValue::Signed(v) => ClientDataValue::Signed(v),
321        DataValue::Real(v) => ClientDataValue::Real(v),
322        DataValue::Double(v) => ClientDataValue::Double(v),
323        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
324        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
325        DataValue::BitString(v) => ClientDataValue::BitString {
326            unused_bits: v.unused_bits,
327            data: v.data.to_vec(),
328        },
329        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
330        DataValue::Date(v) => ClientDataValue::Date(v),
331        DataValue::Time(v) => ClientDataValue::Time(v),
332        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
333        DataValue::Constructed { tag_num, values } => {
334            let children: Vec<_> = values.into_iter().filter_map(into_client_value).collect();
335            ClientDataValue::Constructed {
336                tag_num,
337                values: children,
338            }
339        }
340    })
341}