Skip to main content

bacnet_network/
layer.rs

1//! NetworkLayer for local BACnet packet assembly and dispatch.
2//!
3//! The network layer wraps a transport and provides APDU-level send/receive
4//! by handling NPDU encoding/decoding. This is a non-router implementation:
5//! it does not forward messages between networks, but it can address remote
6//! devices through local routers via NPDU destination fields (DNET/DADR).
7
8use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
9use bacnet_transport::port::TransportPort;
10use bacnet_types::enums::NetworkPriority;
11use bacnet_types::error::Error;
12use bacnet_types::MacAddr;
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tracing::{debug, warn};
17
18/// A received APDU with source addressing information.
19pub struct ReceivedApdu {
20    /// Raw APDU bytes.
21    pub apdu: Bytes,
22    /// Source MAC address in transport-native format.
23    pub source_mac: MacAddr,
24    /// Source network address if the APDU was routed (NPDU had source field).
25    pub source_network: Option<NpduAddress>,
26    /// Optional reply channel for MS/TP DataExpectingReply flows.
27    /// The application layer can send NPDU-wrapped reply bytes through this channel.
28    pub reply_tx: Option<oneshot::Sender<Bytes>>,
29}
30
31impl Clone for ReceivedApdu {
32    fn clone(&self) -> Self {
33        Self {
34            apdu: self.apdu.clone(),
35            source_mac: self.source_mac.clone(),
36            source_network: self.source_network.clone(),
37            reply_tx: None,
38        }
39    }
40}
41
42impl std::fmt::Debug for ReceivedApdu {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("ReceivedApdu")
45            .field("apdu", &self.apdu)
46            .field("source_mac", &self.source_mac)
47            .field("source_network", &self.source_network)
48            .field("reply_tx", &self.reply_tx.as_ref().map(|_| "Some(...)"))
49            .finish()
50    }
51}
52
53/// Non-router BACnet network layer.
54///
55/// Wraps a [`TransportPort`] and provides APDU-level send/receive by handling
56/// NPDU framing. This layer does not act as a router (it does not forward
57/// messages between networks), but it can send to remote devices through
58/// local routers using NPDU destination addressing.
59pub struct NetworkLayer<T: TransportPort> {
60    transport: T,
61    dispatch_task: Option<JoinHandle<()>>,
62}
63
64impl<T: TransportPort + 'static> NetworkLayer<T> {
65    /// Create a new network layer wrapping the given transport.
66    pub fn new(transport: T) -> Self {
67        Self {
68            transport,
69            dispatch_task: None,
70        }
71    }
72
73    /// Start the network layer. Returns a receiver for incoming APDUs.
74    ///
75    /// This starts the underlying transport and spawns a dispatch task that
76    /// decodes incoming NPDUs and extracts APDUs.
77    pub async fn start(&mut self) -> Result<mpsc::Receiver<ReceivedApdu>, Error> {
78        let mut npdu_rx = self.transport.start().await?;
79
80        let (apdu_tx, apdu_rx) = mpsc::channel(256);
81
82        let dispatch_task = tokio::spawn(async move {
83            while let Some(received) = npdu_rx.recv().await {
84                match decode_npdu(received.npdu.clone()) {
85                    Ok(npdu) => {
86                        if npdu.is_network_message {
87                            debug!(
88                                message_type = npdu.message_type,
89                                "Ignoring network layer message (non-router mode)"
90                            );
91                            continue;
92                        }
93
94                        // Non-routing node: discard messages with a specific DNET.
95                        if let Some(ref dest) = npdu.destination {
96                            if dest.network != 0xFFFF {
97                                debug!(
98                                    dnet = dest.network,
99                                    "Discarding routed message (non-router)"
100                                );
101                                continue;
102                            }
103                        }
104
105                        let source_network = npdu.source.clone();
106
107                        let apdu = ReceivedApdu {
108                            apdu: npdu.payload,
109                            source_mac: received.source_mac,
110                            source_network,
111                            reply_tx: received.reply_tx,
112                        };
113
114                        if apdu_tx.send(apdu).await.is_err() {
115                            break;
116                        }
117                    }
118                    Err(e) => {
119                        warn!(error = %e, "Failed to decode NPDU");
120                    }
121                }
122            }
123        });
124
125        self.dispatch_task = Some(dispatch_task);
126
127        Ok(apdu_rx)
128    }
129
130    /// Send an APDU to a specific local destination by MAC address.
131    pub async fn send_apdu(
132        &self,
133        apdu: &[u8],
134        destination_mac: &[u8],
135        expecting_reply: bool,
136        priority: NetworkPriority,
137    ) -> Result<(), Error> {
138        let npdu = Npdu {
139            is_network_message: false,
140            expecting_reply,
141            priority,
142            destination: None,
143            source: None,
144            payload: Bytes::copy_from_slice(apdu),
145            ..Npdu::default()
146        };
147
148        let mut buf = BytesMut::with_capacity(2 + apdu.len());
149        encode_npdu(&mut buf, &npdu)?;
150
151        self.transport.send_unicast(&buf, destination_mac).await
152    }
153
154    /// Broadcast an APDU on the local network.
155    pub async fn broadcast_apdu(
156        &self,
157        apdu: &[u8],
158        expecting_reply: bool,
159        priority: NetworkPriority,
160    ) -> Result<(), Error> {
161        let npdu = Npdu {
162            is_network_message: false,
163            expecting_reply,
164            priority,
165            destination: None,
166            source: None,
167            payload: Bytes::copy_from_slice(apdu),
168            ..Npdu::default()
169        };
170
171        let mut buf = BytesMut::with_capacity(2 + apdu.len());
172        encode_npdu(&mut buf, &npdu)?;
173
174        self.transport.send_broadcast(&buf).await
175    }
176
177    /// Broadcast an APDU globally (DNET=0xFFFF, hop_count=255).
178    ///
179    /// Unlike `broadcast_apdu()` which only reaches the local subnet, this
180    /// sets DNET=0xFFFF so routers will forward to all reachable networks.
181    pub async fn broadcast_global_apdu(
182        &self,
183        apdu: &[u8],
184        expecting_reply: bool,
185        priority: NetworkPriority,
186    ) -> Result<(), Error> {
187        let npdu = Npdu {
188            is_network_message: false,
189            expecting_reply,
190            priority,
191            destination: Some(NpduAddress {
192                network: 0xFFFF,
193                mac_address: MacAddr::new(),
194            }),
195            source: None,
196            hop_count: 255,
197            payload: Bytes::copy_from_slice(apdu),
198            ..Npdu::default()
199        };
200
201        let mut buf = BytesMut::with_capacity(8 + apdu.len());
202        encode_npdu(&mut buf, &npdu)?;
203        self.transport.send_broadcast(&buf).await
204    }
205
206    /// Broadcast an APDU to a specific remote network via routers.
207    ///
208    /// Like `broadcast_global_apdu()` but targets a single network number
209    /// instead of all networks (DNET=0xFFFF).
210    pub async fn broadcast_to_network(
211        &self,
212        apdu: &[u8],
213        dest_network: u16,
214        expecting_reply: bool,
215        priority: NetworkPriority,
216    ) -> Result<(), Error> {
217        if dest_network == 0xFFFF {
218            return Err(Error::Encoding(
219                "dest_network 0xFFFF is reserved for global broadcasts; use broadcast_global_apdu instead".into(),
220            ));
221        }
222        let npdu = Npdu {
223            is_network_message: false,
224            expecting_reply,
225            priority,
226            destination: Some(NpduAddress {
227                network: dest_network,
228                mac_address: MacAddr::new(),
229            }),
230            source: None,
231            hop_count: 255,
232            payload: Bytes::copy_from_slice(apdu),
233            ..Npdu::default()
234        };
235
236        let mut buf = BytesMut::with_capacity(8 + apdu.len());
237        encode_npdu(&mut buf, &npdu)?;
238        self.transport.send_broadcast(&buf).await
239    }
240
241    /// Send an APDU to a remote device through a local router.
242    ///
243    /// The NPDU is sent via unicast to `router_mac` (the next-hop router on
244    /// the local network), but the NPDU header addresses the final destination
245    /// with `dest_network` / `dest_mac`.
246    pub async fn send_apdu_routed(
247        &self,
248        apdu: &[u8],
249        dest_network: u16,
250        dest_mac: &[u8],
251        router_mac: &[u8],
252        expecting_reply: bool,
253        priority: NetworkPriority,
254    ) -> Result<(), Error> {
255        let npdu = Npdu {
256            is_network_message: false,
257            expecting_reply,
258            priority,
259            destination: Some(NpduAddress {
260                network: dest_network,
261                mac_address: MacAddr::from_slice(dest_mac),
262            }),
263            source: None,
264            hop_count: 255,
265            payload: Bytes::copy_from_slice(apdu),
266            ..Npdu::default()
267        };
268
269        let mut buf = BytesMut::with_capacity(8 + dest_mac.len() + apdu.len());
270        encode_npdu(&mut buf, &npdu)?;
271
272        self.transport.send_unicast(&buf, router_mac).await
273    }
274
275    /// Access the underlying transport.
276    ///
277    /// Useful for transport-specific operations like BBMD registration
278    /// after the network layer has been started.
279    pub fn transport(&self) -> &T {
280        &self.transport
281    }
282
283    /// Get the transport's local MAC address.
284    pub fn local_mac(&self) -> &[u8] {
285        self.transport.local_mac()
286    }
287
288    /// Stop the network layer and underlying transport.
289    pub async fn stop(&mut self) -> Result<(), Error> {
290        if let Some(task) = self.dispatch_task.take() {
291            task.abort();
292            let _ = task.await;
293        }
294        self.transport.stop().await
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use bacnet_transport::bip::BipTransport;
302    use std::net::Ipv4Addr;
303    use tokio::time::{timeout, Duration};
304
305    #[tokio::test]
306    async fn send_receive_apdu_unicast() {
307        let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
308        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
309
310        let mut net_a = NetworkLayer::new(transport_a);
311        let mut net_b = NetworkLayer::new(transport_b);
312
313        let _rx_a = net_a.start().await.unwrap();
314        let mut rx_b = net_b.start().await.unwrap();
315
316        let test_apdu = vec![0x10, 0x08];
317
318        net_a
319            .send_apdu(
320                &test_apdu,
321                net_b.local_mac(),
322                false,
323                NetworkPriority::NORMAL,
324            )
325            .await
326            .unwrap();
327
328        let received = timeout(Duration::from_secs(2), rx_b.recv())
329            .await
330            .expect("Timed out waiting for APDU")
331            .expect("Channel closed");
332
333        assert_eq!(received.apdu, test_apdu);
334        assert_eq!(received.source_mac.as_slice(), net_a.local_mac());
335        assert!(received.source_network.is_none());
336
337        net_a.stop().await.unwrap();
338        net_b.stop().await.unwrap();
339    }
340
341    #[tokio::test]
342    async fn end_to_end_who_is() {
343        use bacnet_encoding::apdu::{decode_apdu, encode_apdu, Apdu, UnconfirmedRequest};
344        use bacnet_types::enums::UnconfirmedServiceChoice;
345
346        let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
347        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
348
349        let mut net_a = NetworkLayer::new(transport_a);
350        let mut net_b = NetworkLayer::new(transport_b);
351
352        let _rx_a = net_a.start().await.unwrap();
353        let mut rx_b = net_b.start().await.unwrap();
354
355        let who_is_apdu = Apdu::UnconfirmedRequest(UnconfirmedRequest {
356            service_choice: UnconfirmedServiceChoice::WHO_IS,
357            service_request: Bytes::new(),
358        });
359        let mut apdu_buf = BytesMut::new();
360        encode_apdu(&mut apdu_buf, &who_is_apdu);
361
362        net_a
363            .send_apdu(&apdu_buf, net_b.local_mac(), false, NetworkPriority::NORMAL)
364            .await
365            .unwrap();
366
367        let received = timeout(Duration::from_secs(2), rx_b.recv())
368            .await
369            .expect("Timed out waiting for APDU")
370            .expect("Channel closed");
371
372        let decoded_apdu = decode_apdu(received.apdu.clone()).unwrap();
373        match decoded_apdu {
374            Apdu::UnconfirmedRequest(req) => {
375                assert_eq!(req.service_choice, UnconfirmedServiceChoice::WHO_IS);
376                assert!(req.service_request.is_empty());
377            }
378            other => panic!("Expected UnconfirmedRequest, got {:?}", other),
379        }
380
381        net_a.stop().await.unwrap();
382        net_b.stop().await.unwrap();
383    }
384
385    #[test]
386    fn global_broadcast_npdu_has_dnet_ffff() {
387        use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
388        use bacnet_types::enums::NetworkPriority;
389
390        let npdu = Npdu {
391            is_network_message: false,
392            expecting_reply: false,
393            priority: NetworkPriority::NORMAL,
394            destination: Some(NpduAddress {
395                network: 0xFFFF,
396                mac_address: MacAddr::new(),
397            }),
398            source: None,
399            hop_count: 255,
400            payload: Bytes::from_static(&[0xAA]),
401            ..Npdu::default()
402        };
403
404        let mut buf = bytes::BytesMut::new();
405        encode_npdu(&mut buf, &npdu).unwrap();
406        let decoded = decode_npdu(Bytes::from(buf)).unwrap();
407        let dest = decoded.destination.unwrap();
408        assert_eq!(dest.network, 0xFFFF);
409        assert!(dest.mac_address.is_empty());
410        assert_eq!(decoded.hop_count, 255);
411    }
412
413    #[test]
414    fn transport_accessor() {
415        let transport = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
416        let net = NetworkLayer::new(transport);
417        let mac = net.transport().local_mac();
418        assert_eq!(mac.len(), 6);
419    }
420
421    #[test]
422    fn routed_send_encodes_dnet_dadr() {
423        use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
424        use bacnet_types::enums::NetworkPriority;
425
426        let npdu = Npdu {
427            is_network_message: false,
428            expecting_reply: true,
429            priority: NetworkPriority::NORMAL,
430            destination: Some(NpduAddress {
431                network: 100,
432                mac_address: MacAddr::from_slice(&[1, 2, 3, 4, 5, 6]),
433            }),
434            source: None,
435            hop_count: 255,
436            payload: Bytes::from_static(&[0xAA, 0xBB]),
437            ..Npdu::default()
438        };
439
440        let mut buf = bytes::BytesMut::new();
441        encode_npdu(&mut buf, &npdu).unwrap();
442        let decoded = decode_npdu(Bytes::from(buf)).unwrap();
443        let dest = decoded.destination.unwrap();
444        assert_eq!(dest.network, 100);
445        assert_eq!(dest.mac_address.as_slice(), &[1, 2, 3, 4, 5, 6]);
446        assert_eq!(decoded.hop_count, 255);
447        assert!(decoded.expecting_reply);
448    }
449
450    #[test]
451    fn broadcast_to_network_encodes_specific_dnet() {
452        use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
453        use bacnet_types::enums::NetworkPriority;
454
455        let npdu = Npdu {
456            is_network_message: false,
457            expecting_reply: false,
458            priority: NetworkPriority::NORMAL,
459            destination: Some(NpduAddress {
460                network: 42,
461                mac_address: MacAddr::new(),
462            }),
463            source: None,
464            hop_count: 255,
465            payload: Bytes::from_static(&[0xCC]),
466            ..Npdu::default()
467        };
468
469        let mut buf = bytes::BytesMut::new();
470        encode_npdu(&mut buf, &npdu).unwrap();
471        let decoded = decode_npdu(Bytes::from(buf)).unwrap();
472        let dest = decoded.destination.unwrap();
473        assert_eq!(dest.network, 42);
474        assert!(dest.mac_address.is_empty());
475        assert_eq!(decoded.hop_count, 255);
476        assert!(!decoded.expecting_reply);
477    }
478
479    #[test]
480    fn broadcast_to_network_rejects_dnet_ffff() {
481        use bacnet_types::enums::NetworkPriority;
482
483        let transport = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
484        let net = NetworkLayer::new(transport);
485
486        let rt = tokio::runtime::Builder::new_current_thread()
487            .enable_all()
488            .build()
489            .unwrap();
490        let result = rt.block_on(async {
491            net.broadcast_to_network(&[0xAA], 0xFFFF, false, NetworkPriority::NORMAL)
492                .await
493        });
494        assert!(result.is_err());
495        let err_msg = format!("{}", result.unwrap_err());
496        assert!(
497            err_msg.contains("0xFFFF"),
498            "Error should mention 0xFFFF: {err_msg}"
499        );
500    }
501}