1use crate::{ClientDataValue, CovNotification, CovPropertyValue, EventNotification};
7use rustbac_core::apdu::{ApduType, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader};
8use rustbac_core::encoding::{reader::Reader, writer::Writer};
9use rustbac_core::npdu::Npdu;
10use rustbac_core::services::acknowledge_alarm::EventState;
11use rustbac_core::services::cov_notification::{
12 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
13 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
14};
15use rustbac_core::services::event_notification::{
16 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
17 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
18};
19use rustbac_datalink::{DataLink, DataLinkAddress};
20use std::sync::Arc;
21use tokio::sync::mpsc;
22
23#[derive(Debug, Clone)]
25pub enum Notification {
26 Cov(CovNotification),
27 Event(EventNotification),
28}
29
30pub struct NotificationListener {
32 rx: mpsc::UnboundedReceiver<Notification>,
33}
34
35impl NotificationListener {
36 pub async fn recv(&mut self) -> Option<Notification> {
38 self.rx.recv().await
39 }
40}
41
42pub fn create_notification_listener<D: DataLink + 'static>(
58 datalink: Arc<D>,
59) -> (NotificationListener, impl std::future::Future<Output = ()>) {
60 let (tx, rx) = mpsc::unbounded_channel();
61 let driver = async move {
62 let mut buf = [0u8; 1500];
63 loop {
64 let (n, source) = match datalink.recv(&mut buf).await {
65 Ok(v) => v,
66 Err(_) => continue,
67 };
68
69 if let Some((notification, ack)) = parse_notification(&buf[..n], source) {
70 if let Some(ack_bytes) = ack {
71 let _ = datalink.send(source, &ack_bytes).await;
72 }
73 if tx.send(notification).is_err() {
74 break; }
76 }
77 }
78 };
79
80 (NotificationListener { rx }, driver)
81}
82
83fn parse_notification(
84 frame: &[u8],
85 source: DataLinkAddress,
86) -> Option<(Notification, Option<Vec<u8>>)> {
87 let apdu = extract_apdu(frame)?;
88 let first = *apdu.first()?;
89 let apdu_type = ApduType::from_u8(first >> 4)?;
90
91 match apdu_type {
92 ApduType::UnconfirmedRequest => {
93 let mut r = Reader::new(apdu);
94 let header = UnconfirmedRequestHeader::decode(&mut r).ok()?;
95 match header.service_choice {
96 SERVICE_UNCONFIRMED_COV_NOTIFICATION => {
97 let cov = CovNotificationRequest::decode_after_header(&mut r).ok()?;
98 let notification = build_cov_notification(source, false, cov)?;
99 Some((Notification::Cov(notification), None))
100 }
101 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION => {
102 let evt = EventNotificationRequest::decode_after_header(&mut r).ok()?;
103 let notification = build_event_notification(source, false, evt)?;
104 Some((Notification::Event(notification), None))
105 }
106 _ => None,
107 }
108 }
109 ApduType::ConfirmedRequest => {
110 let mut r = Reader::new(apdu);
111 let header = ConfirmedRequestHeader::decode(&mut r).ok()?;
112 match header.service_choice {
113 SERVICE_CONFIRMED_COV_NOTIFICATION => {
114 let cov = CovNotificationRequest::decode_after_header(&mut r).ok()?;
115 let notification = build_cov_notification(source, true, cov)?;
116 let ack =
117 build_simple_ack(header.invoke_id, SERVICE_CONFIRMED_COV_NOTIFICATION);
118 Some((Notification::Cov(notification), Some(ack)))
119 }
120 SERVICE_CONFIRMED_EVENT_NOTIFICATION => {
121 let evt = EventNotificationRequest::decode_after_header(&mut r).ok()?;
122 let notification = build_event_notification(source, true, evt)?;
123 let ack =
124 build_simple_ack(header.invoke_id, SERVICE_CONFIRMED_EVENT_NOTIFICATION);
125 Some((Notification::Event(notification), Some(ack)))
126 }
127 _ => None,
128 }
129 }
130 _ => None,
131 }
132}
133
134fn extract_apdu(frame: &[u8]) -> Option<&[u8]> {
135 let mut r = Reader::new(frame);
136 Npdu::decode(&mut r).ok()?;
137 let remaining = r.remaining();
138 if remaining == 0 {
139 return None;
140 }
141 Some(&frame[frame.len() - remaining..])
142}
143
144fn build_simple_ack(invoke_id: u8, service_choice: u8) -> Vec<u8> {
145 let mut buf = [0u8; 32];
146 let mut w = Writer::new(&mut buf);
147 Npdu::new(0).encode(&mut w).unwrap();
148 SimpleAck {
149 invoke_id,
150 service_choice,
151 }
152 .encode(&mut w)
153 .unwrap();
154 w.as_written().to_vec()
155}
156
157fn build_cov_notification(
158 source: DataLinkAddress,
159 confirmed: bool,
160 cov: CovNotificationRequest<'_>,
161) -> Option<CovNotification> {
162 let values = cov
163 .values
164 .into_iter()
165 .filter_map(|v| {
166 Some(CovPropertyValue {
167 property_id: v.property_id,
168 array_index: v.array_index,
169 value: into_client_value(v.value)?,
170 priority: v.priority,
171 })
172 })
173 .collect();
174
175 Some(CovNotification {
176 source,
177 confirmed,
178 subscriber_process_id: cov.subscriber_process_id,
179 initiating_device_id: cov.initiating_device_id,
180 monitored_object_id: cov.monitored_object_id,
181 time_remaining_seconds: cov.time_remaining_seconds,
182 values,
183 })
184}
185
186fn build_event_notification(
187 source: DataLinkAddress,
188 confirmed: bool,
189 evt: EventNotificationRequest<'_>,
190) -> Option<EventNotification> {
191 Some(EventNotification {
192 source,
193 confirmed,
194 process_id: evt.process_id,
195 initiating_device_id: evt.initiating_device_id,
196 event_object_id: evt.event_object_id,
197 timestamp: evt.timestamp,
198 notification_class: evt.notification_class,
199 priority: evt.priority,
200 event_type: evt.event_type,
201 message_text: evt.message_text.map(|s| s.to_string()),
202 notify_type: evt.notify_type,
203 ack_required: evt.ack_required,
204 from_state_raw: evt.from_state,
205 from_state: EventState::from_u32(evt.from_state),
206 to_state_raw: evt.to_state,
207 to_state: EventState::from_u32(evt.to_state),
208 })
209}
210
211fn into_client_value(value: rustbac_core::types::DataValue<'_>) -> Option<ClientDataValue> {
212 use rustbac_core::types::DataValue;
213 Some(match value {
214 DataValue::Null => ClientDataValue::Null,
215 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
216 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
217 DataValue::Signed(v) => ClientDataValue::Signed(v),
218 DataValue::Real(v) => ClientDataValue::Real(v),
219 DataValue::Double(v) => ClientDataValue::Double(v),
220 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
221 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
222 DataValue::BitString(v) => ClientDataValue::BitString {
223 unused_bits: v.unused_bits,
224 data: v.data.to_vec(),
225 },
226 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
227 DataValue::Date(v) => ClientDataValue::Date(v),
228 DataValue::Time(v) => ClientDataValue::Time(v),
229 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
230 DataValue::Constructed { tag_num, values } => {
231 let children: Vec<_> = values.into_iter().filter_map(into_client_value).collect();
232 ClientDataValue::Constructed {
233 tag_num,
234 values: children,
235 }
236 }
237 })
238}