1use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
9use bacnet_transport::port::TransportPort;
10use bacnet_types::enums::NetworkPriority;
11use bacnet_types::error::Error;
12use bacnet_types::MacAddr;
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tracing::{debug, warn};
17
18pub struct ReceivedApdu {
20 pub apdu: Bytes,
22 pub source_mac: MacAddr,
24 pub source_network: Option<NpduAddress>,
26 pub reply_tx: Option<oneshot::Sender<Bytes>>,
29}
30
31impl Clone for ReceivedApdu {
32 fn clone(&self) -> Self {
33 Self {
34 apdu: self.apdu.clone(),
35 source_mac: self.source_mac.clone(),
36 source_network: self.source_network.clone(),
37 reply_tx: None,
38 }
39 }
40}
41
42impl std::fmt::Debug for ReceivedApdu {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("ReceivedApdu")
45 .field("apdu", &self.apdu)
46 .field("source_mac", &self.source_mac)
47 .field("source_network", &self.source_network)
48 .field("reply_tx", &self.reply_tx.as_ref().map(|_| "Some(...)"))
49 .finish()
50 }
51}
52
53pub struct NetworkLayer<T: TransportPort> {
60 transport: T,
61 dispatch_task: Option<JoinHandle<()>>,
62}
63
64impl<T: TransportPort + 'static> NetworkLayer<T> {
65 pub fn new(transport: T) -> Self {
67 Self {
68 transport,
69 dispatch_task: None,
70 }
71 }
72
73 pub async fn start(&mut self) -> Result<mpsc::Receiver<ReceivedApdu>, Error> {
78 let mut npdu_rx = self.transport.start().await?;
79
80 let (apdu_tx, apdu_rx) = mpsc::channel(256);
81
82 let dispatch_task = tokio::spawn(async move {
83 while let Some(received) = npdu_rx.recv().await {
84 match decode_npdu(received.npdu.clone()) {
85 Ok(npdu) => {
86 if npdu.is_network_message {
87 debug!(
88 message_type = npdu.message_type,
89 "Ignoring network layer message (non-router mode)"
90 );
91 continue;
92 }
93
94 if let Some(ref dest) = npdu.destination {
96 if dest.network != 0xFFFF {
97 debug!(
98 dnet = dest.network,
99 "Discarding routed message (non-router)"
100 );
101 continue;
102 }
103 }
104
105 let source_network = npdu.source.clone();
106
107 let apdu = ReceivedApdu {
108 apdu: npdu.payload,
109 source_mac: received.source_mac,
110 source_network,
111 reply_tx: received.reply_tx,
112 };
113
114 if apdu_tx.send(apdu).await.is_err() {
115 break;
116 }
117 }
118 Err(e) => {
119 warn!(error = %e, "Failed to decode NPDU");
120 }
121 }
122 }
123 });
124
125 self.dispatch_task = Some(dispatch_task);
126
127 Ok(apdu_rx)
128 }
129
130 pub async fn send_apdu(
132 &self,
133 apdu: &[u8],
134 destination_mac: &[u8],
135 expecting_reply: bool,
136 priority: NetworkPriority,
137 ) -> Result<(), Error> {
138 let npdu = Npdu {
139 is_network_message: false,
140 expecting_reply,
141 priority,
142 destination: None,
143 source: None,
144 payload: Bytes::copy_from_slice(apdu),
145 ..Npdu::default()
146 };
147
148 let mut buf = BytesMut::with_capacity(2 + apdu.len());
149 encode_npdu(&mut buf, &npdu)?;
150
151 self.transport.send_unicast(&buf, destination_mac).await
152 }
153
154 pub async fn broadcast_apdu(
156 &self,
157 apdu: &[u8],
158 expecting_reply: bool,
159 priority: NetworkPriority,
160 ) -> Result<(), Error> {
161 let npdu = Npdu {
162 is_network_message: false,
163 expecting_reply,
164 priority,
165 destination: None,
166 source: None,
167 payload: Bytes::copy_from_slice(apdu),
168 ..Npdu::default()
169 };
170
171 let mut buf = BytesMut::with_capacity(2 + apdu.len());
172 encode_npdu(&mut buf, &npdu)?;
173
174 self.transport.send_broadcast(&buf).await
175 }
176
177 pub async fn broadcast_global_apdu(
182 &self,
183 apdu: &[u8],
184 expecting_reply: bool,
185 priority: NetworkPriority,
186 ) -> Result<(), Error> {
187 let npdu = Npdu {
188 is_network_message: false,
189 expecting_reply,
190 priority,
191 destination: Some(NpduAddress {
192 network: 0xFFFF,
193 mac_address: MacAddr::new(),
194 }),
195 source: None,
196 hop_count: 255,
197 payload: Bytes::copy_from_slice(apdu),
198 ..Npdu::default()
199 };
200
201 let mut buf = BytesMut::with_capacity(8 + apdu.len());
202 encode_npdu(&mut buf, &npdu)?;
203 self.transport.send_broadcast(&buf).await
204 }
205
206 pub async fn broadcast_to_network(
211 &self,
212 apdu: &[u8],
213 dest_network: u16,
214 expecting_reply: bool,
215 priority: NetworkPriority,
216 ) -> Result<(), Error> {
217 if dest_network == 0xFFFF {
218 return Err(Error::Encoding(
219 "dest_network 0xFFFF is reserved for global broadcasts; use broadcast_global_apdu instead".into(),
220 ));
221 }
222 let npdu = Npdu {
223 is_network_message: false,
224 expecting_reply,
225 priority,
226 destination: Some(NpduAddress {
227 network: dest_network,
228 mac_address: MacAddr::new(),
229 }),
230 source: None,
231 hop_count: 255,
232 payload: Bytes::copy_from_slice(apdu),
233 ..Npdu::default()
234 };
235
236 let mut buf = BytesMut::with_capacity(8 + apdu.len());
237 encode_npdu(&mut buf, &npdu)?;
238 self.transport.send_broadcast(&buf).await
239 }
240
241 pub async fn send_apdu_routed(
247 &self,
248 apdu: &[u8],
249 dest_network: u16,
250 dest_mac: &[u8],
251 router_mac: &[u8],
252 expecting_reply: bool,
253 priority: NetworkPriority,
254 ) -> Result<(), Error> {
255 let npdu = Npdu {
256 is_network_message: false,
257 expecting_reply,
258 priority,
259 destination: Some(NpduAddress {
260 network: dest_network,
261 mac_address: MacAddr::from_slice(dest_mac),
262 }),
263 source: None,
264 hop_count: 255,
265 payload: Bytes::copy_from_slice(apdu),
266 ..Npdu::default()
267 };
268
269 let mut buf = BytesMut::with_capacity(8 + dest_mac.len() + apdu.len());
270 encode_npdu(&mut buf, &npdu)?;
271
272 self.transport.send_unicast(&buf, router_mac).await
273 }
274
275 pub fn transport(&self) -> &T {
280 &self.transport
281 }
282
283 pub fn local_mac(&self) -> &[u8] {
285 self.transport.local_mac()
286 }
287
288 pub async fn stop(&mut self) -> Result<(), Error> {
290 if let Some(task) = self.dispatch_task.take() {
291 task.abort();
292 let _ = task.await;
293 }
294 self.transport.stop().await
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use bacnet_transport::bip::BipTransport;
302 use std::net::Ipv4Addr;
303 use tokio::time::{timeout, Duration};
304
305 #[tokio::test]
306 async fn send_receive_apdu_unicast() {
307 let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
308 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
309
310 let mut net_a = NetworkLayer::new(transport_a);
311 let mut net_b = NetworkLayer::new(transport_b);
312
313 let _rx_a = net_a.start().await.unwrap();
314 let mut rx_b = net_b.start().await.unwrap();
315
316 let test_apdu = vec![0x10, 0x08];
317
318 net_a
319 .send_apdu(
320 &test_apdu,
321 net_b.local_mac(),
322 false,
323 NetworkPriority::NORMAL,
324 )
325 .await
326 .unwrap();
327
328 let received = timeout(Duration::from_secs(2), rx_b.recv())
329 .await
330 .expect("Timed out waiting for APDU")
331 .expect("Channel closed");
332
333 assert_eq!(received.apdu, test_apdu);
334 assert_eq!(received.source_mac.as_slice(), net_a.local_mac());
335 assert!(received.source_network.is_none());
336
337 net_a.stop().await.unwrap();
338 net_b.stop().await.unwrap();
339 }
340
341 #[tokio::test]
342 async fn end_to_end_who_is() {
343 use bacnet_encoding::apdu::{decode_apdu, encode_apdu, Apdu, UnconfirmedRequest};
344 use bacnet_types::enums::UnconfirmedServiceChoice;
345
346 let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
347 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
348
349 let mut net_a = NetworkLayer::new(transport_a);
350 let mut net_b = NetworkLayer::new(transport_b);
351
352 let _rx_a = net_a.start().await.unwrap();
353 let mut rx_b = net_b.start().await.unwrap();
354
355 let who_is_apdu = Apdu::UnconfirmedRequest(UnconfirmedRequest {
356 service_choice: UnconfirmedServiceChoice::WHO_IS,
357 service_request: Bytes::new(),
358 });
359 let mut apdu_buf = BytesMut::new();
360 encode_apdu(&mut apdu_buf, &who_is_apdu);
361
362 net_a
363 .send_apdu(&apdu_buf, net_b.local_mac(), false, NetworkPriority::NORMAL)
364 .await
365 .unwrap();
366
367 let received = timeout(Duration::from_secs(2), rx_b.recv())
368 .await
369 .expect("Timed out waiting for APDU")
370 .expect("Channel closed");
371
372 let decoded_apdu = decode_apdu(received.apdu.clone()).unwrap();
373 match decoded_apdu {
374 Apdu::UnconfirmedRequest(req) => {
375 assert_eq!(req.service_choice, UnconfirmedServiceChoice::WHO_IS);
376 assert!(req.service_request.is_empty());
377 }
378 other => panic!("Expected UnconfirmedRequest, got {:?}", other),
379 }
380
381 net_a.stop().await.unwrap();
382 net_b.stop().await.unwrap();
383 }
384
385 #[test]
386 fn global_broadcast_npdu_has_dnet_ffff() {
387 use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
388 use bacnet_types::enums::NetworkPriority;
389
390 let npdu = Npdu {
391 is_network_message: false,
392 expecting_reply: false,
393 priority: NetworkPriority::NORMAL,
394 destination: Some(NpduAddress {
395 network: 0xFFFF,
396 mac_address: MacAddr::new(),
397 }),
398 source: None,
399 hop_count: 255,
400 payload: Bytes::from_static(&[0xAA]),
401 ..Npdu::default()
402 };
403
404 let mut buf = bytes::BytesMut::new();
405 encode_npdu(&mut buf, &npdu).unwrap();
406 let decoded = decode_npdu(Bytes::from(buf)).unwrap();
407 let dest = decoded.destination.unwrap();
408 assert_eq!(dest.network, 0xFFFF);
409 assert!(dest.mac_address.is_empty());
410 assert_eq!(decoded.hop_count, 255);
411 }
412
413 #[test]
414 fn transport_accessor() {
415 let transport = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
416 let net = NetworkLayer::new(transport);
417 let mac = net.transport().local_mac();
418 assert_eq!(mac.len(), 6);
419 }
420
421 #[test]
422 fn routed_send_encodes_dnet_dadr() {
423 use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
424 use bacnet_types::enums::NetworkPriority;
425
426 let npdu = Npdu {
427 is_network_message: false,
428 expecting_reply: true,
429 priority: NetworkPriority::NORMAL,
430 destination: Some(NpduAddress {
431 network: 100,
432 mac_address: MacAddr::from_slice(&[1, 2, 3, 4, 5, 6]),
433 }),
434 source: None,
435 hop_count: 255,
436 payload: Bytes::from_static(&[0xAA, 0xBB]),
437 ..Npdu::default()
438 };
439
440 let mut buf = bytes::BytesMut::new();
441 encode_npdu(&mut buf, &npdu).unwrap();
442 let decoded = decode_npdu(Bytes::from(buf)).unwrap();
443 let dest = decoded.destination.unwrap();
444 assert_eq!(dest.network, 100);
445 assert_eq!(dest.mac_address.as_slice(), &[1, 2, 3, 4, 5, 6]);
446 assert_eq!(decoded.hop_count, 255);
447 assert!(decoded.expecting_reply);
448 }
449
450 #[test]
451 fn broadcast_to_network_encodes_specific_dnet() {
452 use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
453 use bacnet_types::enums::NetworkPriority;
454
455 let npdu = Npdu {
456 is_network_message: false,
457 expecting_reply: false,
458 priority: NetworkPriority::NORMAL,
459 destination: Some(NpduAddress {
460 network: 42,
461 mac_address: MacAddr::new(),
462 }),
463 source: None,
464 hop_count: 255,
465 payload: Bytes::from_static(&[0xCC]),
466 ..Npdu::default()
467 };
468
469 let mut buf = bytes::BytesMut::new();
470 encode_npdu(&mut buf, &npdu).unwrap();
471 let decoded = decode_npdu(Bytes::from(buf)).unwrap();
472 let dest = decoded.destination.unwrap();
473 assert_eq!(dest.network, 42);
474 assert!(dest.mac_address.is_empty());
475 assert_eq!(decoded.hop_count, 255);
476 assert!(!decoded.expecting_reply);
477 }
478
479 #[test]
480 fn broadcast_to_network_rejects_dnet_ffff() {
481 use bacnet_types::enums::NetworkPriority;
482
483 let transport = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
484 let net = NetworkLayer::new(transport);
485
486 let rt = tokio::runtime::Builder::new_current_thread()
487 .enable_all()
488 .build()
489 .unwrap();
490 let result = rt.block_on(async {
491 net.broadcast_to_network(&[0xAA], 0xFFFF, false, NetworkPriority::NORMAL)
492 .await
493 });
494 assert!(result.is_err());
495 let err_msg = format!("{}", result.unwrap_err());
496 assert!(
497 err_msg.contains("0xFFFF"),
498 "Error should mention 0xFFFF: {err_msg}"
499 );
500 }
501}