1use crate::{ClientDataValue, CovNotification, CovPropertyValue, EventNotification};
7use rustbac_core::apdu::{
8 abort_reason, AbortPdu, ApduType, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader,
9};
10use rustbac_core::encoding::{reader::Reader, writer::Writer};
11use rustbac_core::npdu::Npdu;
12use rustbac_core::services::acknowledge_alarm::EventState;
13use rustbac_core::services::cov_notification::{
14 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
15 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
16};
17use rustbac_core::services::event_notification::{
18 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
19 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
20};
21use rustbac_datalink::{DataLink, DataLinkAddress};
22use std::sync::Arc;
23use tokio::sync::mpsc;
24
25pub const DEFAULT_NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
30
31#[derive(Debug, Clone)]
33pub enum Notification {
34 Cov(CovNotification),
36 Event(EventNotification),
38}
39
40pub struct NotificationListener {
46 rx: mpsc::Receiver<Notification>,
47}
48
49impl NotificationListener {
50 pub async fn recv(&mut self) -> Option<Notification> {
52 self.rx.recv().await
53 }
54}
55
56pub fn create_notification_listener<D: DataLink + 'static>(
65 datalink: Arc<D>,
66) -> (NotificationListener, impl std::future::Future<Output = ()>) {
67 create_notification_listener_with_capacity(datalink, DEFAULT_NOTIFICATION_CHANNEL_CAPACITY)
68}
69
70pub fn create_notification_listener_with_capacity<D: DataLink + 'static>(
75 datalink: Arc<D>,
76 capacity: usize,
77) -> (NotificationListener, impl std::future::Future<Output = ()>) {
78 let (tx, rx) = mpsc::channel(capacity.max(1));
79 let driver = async move {
80 let mut buf = [0u8; 1500];
81 loop {
82 let (n, source) = match datalink.recv(&mut buf).await {
83 Ok(v) => v,
84 Err(_) => continue,
85 };
86
87 match parse_notification(&buf[..n], source) {
88 ParseResult::None => {}
89 ParseResult::Abort(ack_bytes) => {
90 #[cfg(feature = "tracing")]
91 tracing::warn!("segmented notification aborted — segmentation not supported");
92 let _ = datalink.send(source, &ack_bytes).await;
93 }
94 ParseResult::Notification(notification, ack) => {
95 if let Some(ack_bytes) = ack {
96 let _ = datalink.send(source, &ack_bytes).await;
97 }
98 if tx.try_send(notification).is_err() {
103 if tx.is_closed() {
104 break; }
106 #[cfg(feature = "tracing")]
107 tracing::warn!("notification channel full — dropping notification");
108 }
109 }
110 }
111 }
112 };
113
114 (NotificationListener { rx }, driver)
115}
116
117enum ParseResult {
118 None,
119 Abort(Vec<u8>),
121 Notification(Notification, Option<Vec<u8>>),
123}
124
125fn parse_notification(frame: &[u8], source: DataLinkAddress) -> ParseResult {
126 let apdu = match extract_apdu(frame) {
127 Some(a) => a,
128 None => return ParseResult::None,
129 };
130 let first = match apdu.first() {
131 Some(&b) => b,
132 None => return ParseResult::None,
133 };
134 let apdu_type = match ApduType::from_u8(first >> 4) {
135 Some(t) => t,
136 None => return ParseResult::None,
137 };
138
139 match apdu_type {
140 ApduType::UnconfirmedRequest => {
141 let mut r = Reader::new(apdu);
142 let header = match UnconfirmedRequestHeader::decode(&mut r) {
143 Ok(h) => h,
144 Err(_) => return ParseResult::None,
145 };
146 match header.service_choice {
147 SERVICE_UNCONFIRMED_COV_NOTIFICATION => {
148 let cov = match CovNotificationRequest::decode_after_header(&mut r) {
149 Ok(c) => c,
150 Err(_) => return ParseResult::None,
151 };
152 match build_cov_notification(source, false, cov) {
153 Some(n) => ParseResult::Notification(Notification::Cov(n), None),
154 None => ParseResult::None,
155 }
156 }
157 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION => {
158 let evt = match EventNotificationRequest::decode_after_header(&mut r) {
159 Ok(e) => e,
160 Err(_) => return ParseResult::None,
161 };
162 match build_event_notification(source, false, evt) {
163 Some(n) => ParseResult::Notification(Notification::Event(n), None),
164 None => ParseResult::None,
165 }
166 }
167 _ => ParseResult::None,
168 }
169 }
170 ApduType::ConfirmedRequest => {
171 let mut r = Reader::new(apdu);
172 let header = match ConfirmedRequestHeader::decode(&mut r) {
173 Ok(h) => h,
174 Err(_) => return ParseResult::None,
175 };
176
177 if header.segmented {
180 return ParseResult::Abort(build_abort(header.invoke_id));
181 }
182
183 match header.service_choice {
184 SERVICE_CONFIRMED_COV_NOTIFICATION => {
185 let cov = match CovNotificationRequest::decode_after_header(&mut r) {
186 Ok(c) => c,
187 Err(_) => return ParseResult::None,
188 };
189 match build_cov_notification(source, true, cov) {
190 Some(n) => {
191 let ack = build_simple_ack(
192 header.invoke_id,
193 SERVICE_CONFIRMED_COV_NOTIFICATION,
194 );
195 ParseResult::Notification(Notification::Cov(n), Some(ack))
196 }
197 None => ParseResult::None,
198 }
199 }
200 SERVICE_CONFIRMED_EVENT_NOTIFICATION => {
201 let evt = match EventNotificationRequest::decode_after_header(&mut r) {
202 Ok(e) => e,
203 Err(_) => return ParseResult::None,
204 };
205 match build_event_notification(source, true, evt) {
206 Some(n) => {
207 let ack = build_simple_ack(
208 header.invoke_id,
209 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
210 );
211 ParseResult::Notification(Notification::Event(n), Some(ack))
212 }
213 None => ParseResult::None,
214 }
215 }
216 _ => ParseResult::None,
217 }
218 }
219 _ => ParseResult::None,
220 }
221}
222
223fn extract_apdu(frame: &[u8]) -> Option<&[u8]> {
224 let mut r = Reader::new(frame);
225 Npdu::decode(&mut r).ok()?;
226 let remaining = r.remaining();
227 if remaining == 0 {
228 return None;
229 }
230 Some(&frame[frame.len() - remaining..])
231}
232
233fn build_simple_ack(invoke_id: u8, service_choice: u8) -> Vec<u8> {
234 let mut buf = [0u8; 32];
235 let mut w = Writer::new(&mut buf);
236 Npdu::new(0).encode(&mut w).unwrap();
237 SimpleAck {
238 invoke_id,
239 service_choice,
240 }
241 .encode(&mut w)
242 .unwrap();
243 w.as_written().to_vec()
244}
245
246fn build_abort(invoke_id: u8) -> Vec<u8> {
247 let mut buf = [0u8; 32];
248 let mut w = Writer::new(&mut buf);
249 Npdu::new(0).encode(&mut w).unwrap();
250 AbortPdu {
251 server: false,
252 invoke_id,
253 reason: abort_reason::SEGMENTATION_NOT_SUPPORTED,
254 }
255 .encode(&mut w)
256 .unwrap();
257 w.as_written().to_vec()
258}
259
260fn build_cov_notification(
261 source: DataLinkAddress,
262 confirmed: bool,
263 cov: CovNotificationRequest<'_>,
264) -> Option<CovNotification> {
265 let values = cov
266 .values
267 .into_iter()
268 .filter_map(|v| {
269 Some(CovPropertyValue {
270 property_id: v.property_id,
271 array_index: v.array_index,
272 value: into_client_value(v.value)?,
273 priority: v.priority,
274 })
275 })
276 .collect();
277
278 Some(CovNotification {
279 source,
280 confirmed,
281 subscriber_process_id: cov.subscriber_process_id,
282 initiating_device_id: cov.initiating_device_id,
283 monitored_object_id: cov.monitored_object_id,
284 time_remaining_seconds: cov.time_remaining_seconds,
285 values,
286 })
287}
288
289fn build_event_notification(
290 source: DataLinkAddress,
291 confirmed: bool,
292 evt: EventNotificationRequest<'_>,
293) -> Option<EventNotification> {
294 Some(EventNotification {
295 source,
296 confirmed,
297 process_id: evt.process_id,
298 initiating_device_id: evt.initiating_device_id,
299 event_object_id: evt.event_object_id,
300 timestamp: evt.timestamp,
301 notification_class: evt.notification_class,
302 priority: evt.priority,
303 event_type: evt.event_type,
304 message_text: evt.message_text.map(|s| s.to_string()),
305 notify_type: evt.notify_type,
306 ack_required: evt.ack_required,
307 from_state_raw: evt.from_state,
308 from_state: EventState::from_u32(evt.from_state),
309 to_state_raw: evt.to_state,
310 to_state: EventState::from_u32(evt.to_state),
311 })
312}
313
314fn into_client_value(value: rustbac_core::types::DataValue<'_>) -> Option<ClientDataValue> {
315 use rustbac_core::types::DataValue;
316 Some(match value {
317 DataValue::Null => ClientDataValue::Null,
318 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
319 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
320 DataValue::Signed(v) => ClientDataValue::Signed(v),
321 DataValue::Real(v) => ClientDataValue::Real(v),
322 DataValue::Double(v) => ClientDataValue::Double(v),
323 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
324 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
325 DataValue::BitString(v) => ClientDataValue::BitString {
326 unused_bits: v.unused_bits,
327 data: v.data.to_vec(),
328 },
329 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
330 DataValue::Date(v) => ClientDataValue::Date(v),
331 DataValue::Time(v) => ClientDataValue::Time(v),
332 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
333 DataValue::Constructed { tag_num, values } => {
334 let children: Vec<_> = values.into_iter().filter_map(into_client_value).collect();
335 ClientDataValue::Constructed {
336 tag_num,
337 values: children,
338 }
339 }
340 })
341}