Skip to main content

rustbac_client/
listener.rs

1//! Long-running async notification listener.
2//!
3//! Provides a notification listener that receives COV and event notifications
4//! and dispatches them through an unbounded channel.
5
6use crate::{ClientDataValue, CovNotification, CovPropertyValue, EventNotification};
7use rustbac_core::apdu::{ApduType, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader};
8use rustbac_core::encoding::{reader::Reader, writer::Writer};
9use rustbac_core::npdu::Npdu;
10use rustbac_core::services::acknowledge_alarm::EventState;
11use rustbac_core::services::cov_notification::{
12    CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
13    SERVICE_UNCONFIRMED_COV_NOTIFICATION,
14};
15use rustbac_core::services::event_notification::{
16    EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
17    SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
18};
19use rustbac_datalink::{DataLink, DataLinkAddress};
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23/// A notification received from a BACnet device.
24#[derive(Debug, Clone)]
25pub enum Notification {
26    Cov(CovNotification),
27    Event(EventNotification),
28}
29
30/// A receiver for BACnet notifications dispatched by a listener loop.
31pub struct NotificationListener {
32    rx: mpsc::UnboundedReceiver<Notification>,
33}
34
35impl NotificationListener {
36    /// Receive the next notification, waiting indefinitely.
37    pub async fn recv(&mut self) -> Option<Notification> {
38        self.rx.recv().await
39    }
40}
41
42/// Create a notification listener and the async driver loop.
43///
44/// Returns `(listener, driver)` where `driver` is a future that must be
45/// polled (e.g. via `tokio::spawn`) for notifications to be received.
46/// The driver runs until the [`NotificationListener`] is dropped.
47///
48/// # Example
49///
50/// ```ignore
51/// let (mut listener, driver) = create_notification_listener(datalink);
52/// tokio::spawn(driver);
53/// while let Some(notification) = listener.recv().await {
54///     // handle notification
55/// }
56/// ```
57pub fn create_notification_listener<D: DataLink + 'static>(
58    datalink: Arc<D>,
59) -> (NotificationListener, impl std::future::Future<Output = ()>) {
60    let (tx, rx) = mpsc::unbounded_channel();
61    let driver = async move {
62        let mut buf = [0u8; 1500];
63        loop {
64            let (n, source) = match datalink.recv(&mut buf).await {
65                Ok(v) => v,
66                Err(_) => continue,
67            };
68
69            if let Some((notification, ack)) = parse_notification(&buf[..n], source) {
70                if let Some(ack_bytes) = ack {
71                    let _ = datalink.send(source, &ack_bytes).await;
72                }
73                if tx.send(notification).is_err() {
74                    break; // receiver dropped
75                }
76            }
77        }
78    };
79
80    (NotificationListener { rx }, driver)
81}
82
83fn parse_notification(
84    frame: &[u8],
85    source: DataLinkAddress,
86) -> Option<(Notification, Option<Vec<u8>>)> {
87    let apdu = extract_apdu(frame)?;
88    let first = *apdu.first()?;
89    let apdu_type = ApduType::from_u8(first >> 4)?;
90
91    match apdu_type {
92        ApduType::UnconfirmedRequest => {
93            let mut r = Reader::new(apdu);
94            let header = UnconfirmedRequestHeader::decode(&mut r).ok()?;
95            match header.service_choice {
96                SERVICE_UNCONFIRMED_COV_NOTIFICATION => {
97                    let cov = CovNotificationRequest::decode_after_header(&mut r).ok()?;
98                    let notification = build_cov_notification(source, false, cov)?;
99                    Some((Notification::Cov(notification), None))
100                }
101                SERVICE_UNCONFIRMED_EVENT_NOTIFICATION => {
102                    let evt = EventNotificationRequest::decode_after_header(&mut r).ok()?;
103                    let notification = build_event_notification(source, false, evt)?;
104                    Some((Notification::Event(notification), None))
105                }
106                _ => None,
107            }
108        }
109        ApduType::ConfirmedRequest => {
110            let mut r = Reader::new(apdu);
111            let header = ConfirmedRequestHeader::decode(&mut r).ok()?;
112            match header.service_choice {
113                SERVICE_CONFIRMED_COV_NOTIFICATION => {
114                    let cov = CovNotificationRequest::decode_after_header(&mut r).ok()?;
115                    let notification = build_cov_notification(source, true, cov)?;
116                    let ack =
117                        build_simple_ack(header.invoke_id, SERVICE_CONFIRMED_COV_NOTIFICATION);
118                    Some((Notification::Cov(notification), Some(ack)))
119                }
120                SERVICE_CONFIRMED_EVENT_NOTIFICATION => {
121                    let evt = EventNotificationRequest::decode_after_header(&mut r).ok()?;
122                    let notification = build_event_notification(source, true, evt)?;
123                    let ack =
124                        build_simple_ack(header.invoke_id, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
125                    Some((Notification::Event(notification), Some(ack)))
126                }
127                _ => None,
128            }
129        }
130        _ => None,
131    }
132}
133
134fn extract_apdu(frame: &[u8]) -> Option<&[u8]> {
135    let mut r = Reader::new(frame);
136    Npdu::decode(&mut r).ok()?;
137    let remaining = r.remaining();
138    if remaining == 0 {
139        return None;
140    }
141    Some(&frame[frame.len() - remaining..])
142}
143
144fn build_simple_ack(invoke_id: u8, service_choice: u8) -> Vec<u8> {
145    let mut buf = [0u8; 32];
146    let mut w = Writer::new(&mut buf);
147    Npdu::new(0).encode(&mut w).unwrap();
148    SimpleAck {
149        invoke_id,
150        service_choice,
151    }
152    .encode(&mut w)
153    .unwrap();
154    w.as_written().to_vec()
155}
156
157fn build_cov_notification(
158    source: DataLinkAddress,
159    confirmed: bool,
160    cov: CovNotificationRequest<'_>,
161) -> Option<CovNotification> {
162    let values = cov
163        .values
164        .into_iter()
165        .filter_map(|v| {
166            Some(CovPropertyValue {
167                property_id: v.property_id,
168                array_index: v.array_index,
169                value: into_client_value(v.value)?,
170                priority: v.priority,
171            })
172        })
173        .collect();
174
175    Some(CovNotification {
176        source,
177        confirmed,
178        subscriber_process_id: cov.subscriber_process_id,
179        initiating_device_id: cov.initiating_device_id,
180        monitored_object_id: cov.monitored_object_id,
181        time_remaining_seconds: cov.time_remaining_seconds,
182        values,
183    })
184}
185
186fn build_event_notification(
187    source: DataLinkAddress,
188    confirmed: bool,
189    evt: EventNotificationRequest<'_>,
190) -> Option<EventNotification> {
191    Some(EventNotification {
192        source,
193        confirmed,
194        process_id: evt.process_id,
195        initiating_device_id: evt.initiating_device_id,
196        event_object_id: evt.event_object_id,
197        timestamp: evt.timestamp,
198        notification_class: evt.notification_class,
199        priority: evt.priority,
200        event_type: evt.event_type,
201        message_text: evt.message_text.map(|s| s.to_string()),
202        notify_type: evt.notify_type,
203        ack_required: evt.ack_required,
204        from_state_raw: evt.from_state,
205        from_state: EventState::from_u32(evt.from_state),
206        to_state_raw: evt.to_state,
207        to_state: EventState::from_u32(evt.to_state),
208    })
209}
210
211fn into_client_value(value: rustbac_core::types::DataValue<'_>) -> Option<ClientDataValue> {
212    use rustbac_core::types::DataValue;
213    Some(match value {
214        DataValue::Null => ClientDataValue::Null,
215        DataValue::Boolean(v) => ClientDataValue::Boolean(v),
216        DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
217        DataValue::Signed(v) => ClientDataValue::Signed(v),
218        DataValue::Real(v) => ClientDataValue::Real(v),
219        DataValue::Double(v) => ClientDataValue::Double(v),
220        DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
221        DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
222        DataValue::BitString(v) => ClientDataValue::BitString {
223            unused_bits: v.unused_bits,
224            data: v.data.to_vec(),
225        },
226        DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
227        DataValue::Date(v) => ClientDataValue::Date(v),
228        DataValue::Time(v) => ClientDataValue::Time(v),
229        DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
230        DataValue::Constructed { tag_num, values } => {
231            let children: Vec<_> = values.into_iter().filter_map(into_client_value).collect();
232            ClientDataValue::Constructed {
233                tag_num,
234                values: children,
235            }
236        }
237    })
238}