1use crate::{ClientDataValue, CovNotification, CovPropertyValue, EventNotification};
7use rustbac_core::apdu::{
8 abort_reason, AbortPdu, ApduType, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader,
9};
10use rustbac_core::encoding::{reader::Reader, writer::Writer};
11use rustbac_core::npdu::Npdu;
12use rustbac_core::services::acknowledge_alarm::EventState;
13use rustbac_core::services::cov_notification::{
14 CovNotificationRequest, SERVICE_CONFIRMED_COV_NOTIFICATION,
15 SERVICE_UNCONFIRMED_COV_NOTIFICATION,
16};
17use rustbac_core::services::event_notification::{
18 EventNotificationRequest, SERVICE_CONFIRMED_EVENT_NOTIFICATION,
19 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION,
20};
21use rustbac_datalink::{DataLink, DataLinkAddress};
22use std::sync::Arc;
23use tokio::sync::mpsc;
24
25pub const DEFAULT_NOTIFICATION_CHANNEL_CAPACITY: usize = 256;
30
31#[derive(Debug, Clone)]
33pub enum Notification {
34 Cov(CovNotification),
36 Event(EventNotification),
38}
39
40pub struct NotificationListener {
46 rx: mpsc::Receiver<Notification>,
47}
48
49impl NotificationListener {
50 pub async fn recv(&mut self) -> Option<Notification> {
52 self.rx.recv().await
53 }
54}
55
56pub fn create_notification_listener<D: DataLink + 'static>(
65 datalink: Arc<D>,
66) -> (NotificationListener, impl std::future::Future<Output = ()>) {
67 create_notification_listener_with_capacity(datalink, DEFAULT_NOTIFICATION_CHANNEL_CAPACITY)
68}
69
70pub fn create_notification_listener_with_capacity<D: DataLink + 'static>(
75 datalink: Arc<D>,
76 capacity: usize,
77) -> (NotificationListener, impl std::future::Future<Output = ()>) {
78 let (tx, rx) = mpsc::channel(capacity.max(1));
79 let driver = async move {
80 let mut buf = [0u8; 1500];
81 loop {
82 let (n, source) = match datalink.recv(&mut buf).await {
83 Ok(v) => v,
84 Err(_) => continue,
85 };
86
87 match parse_notification(&buf[..n], source) {
88 ParseResult::None => {}
89 ParseResult::Abort(ack_bytes) => {
90 #[cfg(feature = "tracing")]
91 tracing::warn!("segmented notification aborted — segmentation not supported");
92 let _ = datalink.send(source, &ack_bytes).await;
93 }
94 ParseResult::Notification(notification, ack) => {
95 if let Some(ack_bytes) = ack {
96 let _ = datalink.send(source, &ack_bytes).await;
97 }
98 match tx.try_send(notification) {
103 Ok(()) => {}
104 Err(_) if tx.is_closed() => break, Err(_) => {
106 #[cfg(feature = "tracing")]
107 tracing::warn!("notification channel full — dropping notification");
108 }
109 }
110 }
111 }
112 }
113 };
114
115 (NotificationListener { rx }, driver)
116}
117
118enum ParseResult {
119 None,
120 Abort(Vec<u8>),
122 Notification(Notification, Option<Vec<u8>>),
124}
125
126fn parse_notification(frame: &[u8], source: DataLinkAddress) -> ParseResult {
127 let apdu = match extract_apdu(frame) {
128 Some(a) => a,
129 None => return ParseResult::None,
130 };
131 let first = match apdu.first() {
132 Some(&b) => b,
133 None => return ParseResult::None,
134 };
135 let apdu_type = match ApduType::from_u8(first >> 4) {
136 Some(t) => t,
137 None => return ParseResult::None,
138 };
139
140 match apdu_type {
141 ApduType::UnconfirmedRequest => {
142 let mut r = Reader::new(apdu);
143 let header = match UnconfirmedRequestHeader::decode(&mut r) {
144 Ok(h) => h,
145 Err(_) => return ParseResult::None,
146 };
147 match header.service_choice {
148 SERVICE_UNCONFIRMED_COV_NOTIFICATION => {
149 let cov = match CovNotificationRequest::decode_after_header(&mut r) {
150 Ok(c) => c,
151 Err(_) => return ParseResult::None,
152 };
153 match build_cov_notification(source, false, cov) {
154 Some(n) => ParseResult::Notification(Notification::Cov(n), None),
155 None => ParseResult::None,
156 }
157 }
158 SERVICE_UNCONFIRMED_EVENT_NOTIFICATION => {
159 let evt = match EventNotificationRequest::decode_after_header(&mut r) {
160 Ok(e) => e,
161 Err(_) => return ParseResult::None,
162 };
163 match build_event_notification(source, false, evt) {
164 Some(n) => ParseResult::Notification(Notification::Event(n), None),
165 None => ParseResult::None,
166 }
167 }
168 _ => ParseResult::None,
169 }
170 }
171 ApduType::ConfirmedRequest => {
172 let mut r = Reader::new(apdu);
173 let header = match ConfirmedRequestHeader::decode(&mut r) {
174 Ok(h) => h,
175 Err(_) => return ParseResult::None,
176 };
177
178 if header.segmented {
181 return ParseResult::Abort(build_abort(header.invoke_id));
182 }
183
184 match header.service_choice {
185 SERVICE_CONFIRMED_COV_NOTIFICATION => {
186 let cov = match CovNotificationRequest::decode_after_header(&mut r) {
187 Ok(c) => c,
188 Err(_) => return ParseResult::None,
189 };
190 match build_cov_notification(source, true, cov) {
191 Some(n) => {
192 let ack = build_simple_ack(
193 header.invoke_id,
194 SERVICE_CONFIRMED_COV_NOTIFICATION,
195 );
196 ParseResult::Notification(Notification::Cov(n), Some(ack))
197 }
198 None => ParseResult::None,
199 }
200 }
201 SERVICE_CONFIRMED_EVENT_NOTIFICATION => {
202 let evt = match EventNotificationRequest::decode_after_header(&mut r) {
203 Ok(e) => e,
204 Err(_) => return ParseResult::None,
205 };
206 match build_event_notification(source, true, evt) {
207 Some(n) => {
208 let ack = build_simple_ack(
209 header.invoke_id,
210 SERVICE_CONFIRMED_EVENT_NOTIFICATION,
211 );
212 ParseResult::Notification(Notification::Event(n), Some(ack))
213 }
214 None => ParseResult::None,
215 }
216 }
217 _ => ParseResult::None,
218 }
219 }
220 _ => ParseResult::None,
221 }
222}
223
224fn extract_apdu(frame: &[u8]) -> Option<&[u8]> {
225 let mut r = Reader::new(frame);
226 Npdu::decode(&mut r).ok()?;
227 let remaining = r.remaining();
228 if remaining == 0 {
229 return None;
230 }
231 Some(&frame[frame.len() - remaining..])
232}
233
234fn build_simple_ack(invoke_id: u8, service_choice: u8) -> Vec<u8> {
235 let mut buf = [0u8; 32];
236 let mut w = Writer::new(&mut buf);
237 Npdu::new(0).encode(&mut w).unwrap();
238 SimpleAck {
239 invoke_id,
240 service_choice,
241 }
242 .encode(&mut w)
243 .unwrap();
244 w.as_written().to_vec()
245}
246
247fn build_abort(invoke_id: u8) -> Vec<u8> {
248 let mut buf = [0u8; 32];
249 let mut w = Writer::new(&mut buf);
250 Npdu::new(0).encode(&mut w).unwrap();
251 AbortPdu {
252 server: false,
253 invoke_id,
254 reason: abort_reason::SEGMENTATION_NOT_SUPPORTED,
255 }
256 .encode(&mut w)
257 .unwrap();
258 w.as_written().to_vec()
259}
260
261fn build_cov_notification(
262 source: DataLinkAddress,
263 confirmed: bool,
264 cov: CovNotificationRequest<'_>,
265) -> Option<CovNotification> {
266 let values = cov
267 .values
268 .into_iter()
269 .filter_map(|v| {
270 Some(CovPropertyValue {
271 property_id: v.property_id,
272 array_index: v.array_index,
273 value: into_client_value(v.value)?,
274 priority: v.priority,
275 })
276 })
277 .collect();
278
279 Some(CovNotification {
280 source,
281 confirmed,
282 subscriber_process_id: cov.subscriber_process_id,
283 initiating_device_id: cov.initiating_device_id,
284 monitored_object_id: cov.monitored_object_id,
285 time_remaining_seconds: cov.time_remaining_seconds,
286 values,
287 })
288}
289
290fn build_event_notification(
291 source: DataLinkAddress,
292 confirmed: bool,
293 evt: EventNotificationRequest<'_>,
294) -> Option<EventNotification> {
295 Some(EventNotification {
296 source,
297 confirmed,
298 process_id: evt.process_id,
299 initiating_device_id: evt.initiating_device_id,
300 event_object_id: evt.event_object_id,
301 timestamp: evt.timestamp,
302 notification_class: evt.notification_class,
303 priority: evt.priority,
304 event_type: evt.event_type,
305 message_text: evt.message_text.map(|s| s.to_string()),
306 notify_type: evt.notify_type,
307 ack_required: evt.ack_required,
308 from_state_raw: evt.from_state,
309 from_state: EventState::from_u32(evt.from_state),
310 to_state_raw: evt.to_state,
311 to_state: EventState::from_u32(evt.to_state),
312 })
313}
314
315fn into_client_value(value: rustbac_core::types::DataValue<'_>) -> Option<ClientDataValue> {
316 use rustbac_core::types::DataValue;
317 Some(match value {
318 DataValue::Null => ClientDataValue::Null,
319 DataValue::Boolean(v) => ClientDataValue::Boolean(v),
320 DataValue::Unsigned(v) => ClientDataValue::Unsigned(v),
321 DataValue::Signed(v) => ClientDataValue::Signed(v),
322 DataValue::Real(v) => ClientDataValue::Real(v),
323 DataValue::Double(v) => ClientDataValue::Double(v),
324 DataValue::OctetString(v) => ClientDataValue::OctetString(v.to_vec()),
325 DataValue::CharacterString(v) => ClientDataValue::CharacterString(v.to_string()),
326 DataValue::BitString(v) => ClientDataValue::BitString {
327 unused_bits: v.unused_bits,
328 data: v.data.to_vec(),
329 },
330 DataValue::Enumerated(v) => ClientDataValue::Enumerated(v),
331 DataValue::Date(v) => ClientDataValue::Date(v),
332 DataValue::Time(v) => ClientDataValue::Time(v),
333 DataValue::ObjectId(v) => ClientDataValue::ObjectId(v),
334 DataValue::Constructed { tag_num, values } => {
335 let children: Vec<_> = values.into_iter().filter_map(into_client_value).collect();
336 ClientDataValue::Constructed {
337 tag_num,
338 values: children,
339 }
340 }
341 })
342}