Skip to main content

rustbac_client/
listener.rs

1//! Long-running async notification listener.
2//!
3//! Provides a notification listener that receives COV and event notifications
4//! and dispatches them through a bounded channel.
5
6use crate::{ClientDataValue, CovNotification, CovPropertyValue, EventNotification};
7use rustbac_core::apdu::{
8    abort_reason, AbortPdu, ApduType, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader,
9};
10use rustbac_core::encoding::{reader::Reader, writer::Writer};
11use rustbac_core::npdu::Npdu;
12use rustbac_core::services::acknowledge_alarm::EventState;
13use rustbac_core::services::cov_notification::{
14    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
15    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
16};
17use rustbac_core::services::event_notification::{
18    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
19    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
20};
21use rustbac_datalink::{DataLink, DataLinkAddress};
22use std::sync::Arc;
23use tokio::sync::mpsc;
24
25/// Default capacity of the bounded channel used by [`create_notification_listener`].
26///
27/// When this many notifications are queued without being consumed the driver loop drops
28/// new arrivals rather than growing the queue without bound.
29pub const DEFAULT_NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
30
31/// A notification received from a BACnet device — either a COV or an event notification.
32#[derive(Debug, Clone)]
33pub enum Notification {
34    /// A change-of-value notification (confirmed or unconfirmed SubscribeCOV / SubscribeCOVProperty).
35    Cov(CovNotification),
36    /// An event notification (confirmed or unconfirmed EventNotification service).
37    Event(EventNotification),
38}
39
40/// Consumer half of a BACnet notification channel.
41///
42/// Produced by [`create_notification_listener`] or
43/// [`create_notification_listener_with_capacity`]. The channel is closed and `recv`
44/// returns `None` once the driver future finishes (i.e. when this receiver is dropped).
45pub struct NotificationListener {
46    rx: mpsc::Receiver<Notification>,
47}
48
49impl NotificationListener {
50    /// Wait for and return the next notification. Returns `None` when the driver has stopped.
51    pub async fn recv(&mut self) -> Option<Notification> {
52        self.rx.recv().await
53    }
54}
55
56/// Create a notification listener backed by a channel with [`DEFAULT_NOTIFICATION_CHANNEL_CAPACITY`].
57///
58/// Returns `(listener, driver)` where `driver` is a future that must be
59/// polled (e.g. via `tokio::spawn`) for notifications to be received.
60/// The driver runs until the [`NotificationListener`] is dropped. When the channel is
61/// full, incoming notifications are silently discarded to bound memory usage.
62/// Confirmed notifications are automatically acknowledged; segmented confirmed
63/// notifications are rejected with an Abort PDU (segmentation not supported).
64pub fn create_notification_listener<D: DataLink + 'static>(
65    datalink: Arc<D>,
66) -> (NotificationListener, impl std::future::Future<Output = ()>) {
67    create_notification_listener_with_capacity(datalink, DEFAULT_NOTIFICATION_CHANNEL_CAPACITY)
68}
69
70/// Like [`create_notification_listener`] but with an explicit channel `capacity`.
71///
72/// `capacity` is clamped to a minimum of 1. Prefer
73/// [`create_notification_listener`] unless you need a non-default buffer size.
74pub fn create_notification_listener_with_capacity<D: DataLink + 'static>(
75    datalink: Arc<D>,
76    capacity: usize,
77) -> (NotificationListener, impl std::future::Future<Output = ()>) {
78    let (tx, rx) = mpsc::channel(capacity.max(1));
79    let driver = async move {
80        let mut buf = [0u8; 1500];
81        loop {
82            let (n, source) = match datalink.recv(&mut buf).await {
83                Ok(v) => v,
84                Err(_) => continue,
85            };
86
87            match parse_notification(&buf[..n], source) {
88                ParseResult::None => {}
89                ParseResult::Abort(ack_bytes) => {
90                    #[cfg(feature = "tracing")]
91                    tracing::warn!("segmented notification aborted — segmentation not supported");
92                    let _ = datalink.send(source, &ack_bytes).await;
93                }
94                ParseResult::Notification(notification, ack) => {
95                    if let Some(ack_bytes) = ack {
96                        let _ = datalink.send(source, &ack_bytes).await;
97                    }
98                    // Drop notifications when the consumer is slow rather than
99                    // growing the queue without bound. Break only when the
100                    // receiver has been dropped; a full channel just discards
101                    // this notification.
102                    match tx.try_send(notification) {
103                        Ok(()) => {}
104                        Err(_) if tx.is_closed() => break, // receiver dropped
105                        Err(_) => {
106                            #[cfg(feature = "tracing")]
107                            tracing::warn!("notification channel full — dropping notification");
108                        }
109                    }
110                }
111            }
112        }
113    };
114
115    (NotificationListener { rx }, driver)
116}
117
118enum ParseResult {
119    None,
120    /// Segmented request we cannot handle — send an Abort, emit no notification.
121    Abort(Vec<u8>),
122    /// Parsed notification and optional ack to send back.
123    Notification(Notification, Option<Vec<u8>>),
124}
125
126fn parse_notification(frame: &[u8], source: DataLinkAddress) -> ParseResult {
127    let apdu = match extract_apdu(frame) {
128        Some(a) => a,
129        None => return ParseResult::None,
130    };
131    let first = match apdu.first() {
132        Some(&b) => b,
133        None => return ParseResult::None,
134    };
135    let apdu_type = match ApduType::from_u8(first >> 4) {
136        Some(t) => t,
137        None => return ParseResult::None,
138    };
139
140    match apdu_type {
141        ApduType::UnconfirmedRequest => {
142            let mut r = Reader::new(apdu);
143            let header = match UnconfirmedRequestHeader::decode(&mut r) {
144                Ok(h) => h,
145                Err(_) => return ParseResult::None,
146            };
147            match header.service_choice {
148                SERVICE_UNCONFIRMED_COV_NOTIFICATION => {
149                    let cov = match CovNotificationRequest::decode_after_header(&mut r) {
150                        Ok(c) => c,
151                        Err(_) => return ParseResult::None,
152                    };
153                    match build_cov_notification(source, false, cov) {
154                        Some(n) => ParseResult::Notification(Notification::Cov(n), None),
155                        None => ParseResult::None,
156                    }
157                }
158                SERVICE_UNCONFIRMED_EVENT_NOTIFICATION => {
159                    let evt = match EventNotificationRequest::decode_after_header(&mut r) {
160                        Ok(e) => e,
161                        Err(_) => return ParseResult::None,
162                    };
163                    match build_event_notification(source, false, evt) {
164                        Some(n) => ParseResult::Notification(Notification::Event(n), None),
165                        None => ParseResult::None,
166                    }
167                }
168                _ => ParseResult::None,
169            }
170        }
171        ApduType::ConfirmedRequest => {
172            let mut r = Reader::new(apdu);
173            let header = match ConfirmedRequestHeader::decode(&mut r) {
174                Ok(h) => h,
175                Err(_) => return ParseResult::None,
176            };
177
178            // Segmented confirmed notifications are not supported. Send an
179            // Abort so the remote device knows and doesn't keep retrying.
180            if header.segmented {
181                return ParseResult::Abort(build_abort(header.invoke_id));
182            }
183
184            match header.service_choice {
185                SERVICE_CONFIRMED_COV_NOTIFICATION => {
186                    let cov = match CovNotificationRequest::decode_after_header(&mut r) {
187                        Ok(c) => c,
188                        Err(_) => return ParseResult::None,
189                    };
190                    match build_cov_notification(source, true, cov) {
191                        Some(n) => {
192                            let ack = build_simple_ack(
193                                header.invoke_id,
194                                SERVICE_CONFIRMED_COV_NOTIFICATION,
195                            );
196                            ParseResult::Notification(Notification::Cov(n), Some(ack))
197                        }
198                        None => ParseResult::None,
199                    }
200                }
201                SERVICE_CONFIRMED_EVENT_NOTIFICATION => {
202                    let evt = match EventNotificationRequest::decode_after_header(&mut r) {
203                        Ok(e) => e,
204                        Err(_) => return ParseResult::None,
205                    };
206                    match build_event_notification(source, true, evt) {
207                        Some(n) => {
208                            let ack = build_simple_ack(
209                                header.invoke_id,
210                                SERVICE_CONFIRMED_EVENT_NOTIFICATION,
211                            );
212                            ParseResult::Notification(Notification::Event(n), Some(ack))
213                        }
214                        None => ParseResult::None,
215                    }
216                }
217                _ => ParseResult::None,
218            }
219        }
220        _ => ParseResult::None,
221    }
222}
223
224fn extract_apdu(frame: &[u8]) -> Option<&[u8]> {
225    let mut r = Reader::new(frame);
226    Npdu::decode(&mut r).ok()?;
227    let remaining = r.remaining();
228    if remaining == 0 {
229        return None;
230    }
231    Some(&frame[frame.len() - remaining..])
232}
233
234fn build_simple_ack(invoke_id: u8, service_choice: u8) -> Vec<u8> {
235    let mut buf = [0u8; 32];
236    let mut w = Writer::new(&mut buf);
237    Npdu::new(0).encode(&mut w).unwrap();
238    SimpleAck {
239        invoke_id,
240        service_choice,
241    }
242    .encode(&mut w)
243    .unwrap();
244    w.as_written().to_vec()
245}
246
247fn build_abort(invoke_id: u8) -> Vec<u8> {
248    let mut buf = [0u8; 32];
249    let mut w = Writer::new(&mut buf);
250    Npdu::new(0).encode(&mut w).unwrap();
251    AbortPdu {
252        server: false,
253        invoke_id,
254        reason: abort_reason::SEGMENTATION_NOT_SUPPORTED,
255    }
256    .encode(&mut w)
257    .unwrap();
258    w.as_written().to_vec()
259}
260
261fn build_cov_notification(
262    source: DataLinkAddress,
263    confirmed: bool,
264    cov: CovNotificationRequest<'_>,
265) -> Option<CovNotification> {
266    let values = cov
267        .values
268        .into_iter()
269        .filter_map(|v| {
270            Some(CovPropertyValue {
271                property_id: v.property_id,
272                array_index: v.array_index,
273                value: into_client_value(v.value)?,
274                priority: v.priority,
275            })
276        })
277        .collect();
278
279    Some(CovNotification {
280        source,
281        confirmed,
282        subscriber_process_id: cov.subscriber_process_id,
283        initiating_device_id: cov.initiating_device_id,
284        monitored_object_id: cov.monitored_object_id,
285        time_remaining_seconds: cov.time_remaining_seconds,
286        values,
287    })
288}
289
290fn build_event_notification(
291    source: DataLinkAddress,
292    confirmed: bool,
293    evt: EventNotificationRequest<'_>,
294) -> Option<EventNotification> {
295    Some(EventNotification {
296        source,
297        confirmed,
298        process_id: evt.process_id,
299        initiating_device_id: evt.initiating_device_id,
300        event_object_id: evt.event_object_id,
301        timestamp: evt.timestamp,
302        notification_class: evt.notification_class,
303        priority: evt.priority,
304        event_type: evt.event_type,
305        message_text: evt.message_text.map(|s| s.to_string()),
306        notify_type: evt.notify_type,
307        ack_required: evt.ack_required,
308        from_state_raw: evt.from_state,
309        from_state: EventState::from_u32(evt.from_state),
310        to_state_raw: evt.to_state,
311        to_state: EventState::from_u32(evt.to_state),
312    })
313}
314
315fn into_client_value(value: rustbac_core::types::DataValue<'_>) -> Option<ClientDataValue> {
316    use rustbac_core::types::DataValue;
317    Some(match value {
318        DataValue::Null => ClientDataValue::Null,
319        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
320        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
321        DataValue::Signed(v) => ClientDataValue::Signed(v),
322        DataValue::Real(v) => ClientDataValue::Real(v),
323        DataValue::Double(v) => ClientDataValue::Double(v),
324        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
325        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
326        DataValue::BitString(v) => ClientDataValue::BitString {
327            unused_bits: v.unused_bits,
328            data: v.data.to_vec(),
329        },
330        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
331        DataValue::Date(v) => ClientDataValue::Date(v),
332        DataValue::Time(v) => ClientDataValue::Time(v),
333        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
334        DataValue::Constructed { tag_num, values } => {
335            let children: Vec<_> = values.into_iter().filter_map(into_client_value).collect();
336            ClientDataValue::Constructed {
337                tag_num,
338                values: children,
339            }
340        }
341    })
342}