Skip to main content

bacnet_network/
router.rs

1//! BACnet half-router — forwards APDUs between BACnet networks.
2//!
3//! Per ASHRAE 135-2020 Clause 6.4, a BACnet router connects two or more
4//! BACnet networks. It forwards messages between them by manipulating
5//! the NPDU source/destination fields and decrementing the hop count.
6//!
7//! This implementation supports:
8//! - Forwarding APDUs between directly-connected networks
9//! - Who-Is-Router-To-Network / I-Am-Router-To-Network messages
10//! - Reject-Message-To-Network for unknown routes
11//! - Learned routes from I-Am-Router-To-Network announcements
12
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
17use bacnet_transport::port::TransportPort;
18use bacnet_types::enums::{NetworkMessageType, RejectMessageReason};
19use bacnet_types::error::Error;
20use bacnet_types::MacAddr;
21use bytes::{BufMut, Bytes, BytesMut};
22use tokio::sync::{mpsc, Mutex};
23use tokio::task::JoinHandle;
24use tracing::{debug, warn};
25
26use crate::layer::ReceivedApdu;
27use crate::router_table::{ReachabilityStatus, RouterTable};
28
29/// A send request to be forwarded on a port.
30#[derive(Debug)]
31enum SendRequest {
32    Unicast { npdu: Bytes, mac: MacAddr },
33    Broadcast { npdu: Bytes },
34}
35
36/// A router port: a transport bound to a specific BACnet network number.
37pub struct RouterPort<T: TransportPort> {
38    /// The transport for this port.
39    pub transport: T,
40    /// The network number assigned to this port.
41    pub network_number: u16,
42}
43
44/// BACnet router connecting multiple networks.
45///
46/// The router holds multiple ports, each bound to a different BACnet network.
47/// When an NPDU arrives on one port with a destination network that maps to
48/// another port, the router forwards the message.
49pub struct BACnetRouter {
50    /// Shared routing table.
51    table: Arc<Mutex<RouterTable>>,
52    /// Dispatch tasks (one per port).
53    dispatch_tasks: Vec<JoinHandle<()>>,
54    /// Sender tasks (one per port, owns the transport for outgoing messages).
55    sender_tasks: Vec<JoinHandle<()>>,
56    /// Background task that purges stale learned routes.
57    aging_task: Option<JoinHandle<()>>,
58}
59
60impl BACnetRouter {
61    /// Create and start a router from a list of ports.
62    ///
63    /// Returns the router and a receiver for APDUs destined to local
64    /// applications (messages without remote destination or where this
65    /// router is the final hop).
66    pub async fn start<T: TransportPort + 'static>(
67        mut ports: Vec<RouterPort<T>>,
68    ) -> Result<(Self, mpsc::Receiver<ReceivedApdu>), Error> {
69        let mut table = RouterTable::new();
70
71        // Reject duplicate network numbers
72        {
73            let mut seen = std::collections::HashSet::new();
74            for port in &ports {
75                if !seen.insert(port.network_number) {
76                    return Err(Error::Encoding(format!(
77                        "Duplicate network number {} in router ports",
78                        port.network_number
79                    )));
80                }
81            }
82        }
83
84        // Register directly-connected networks
85        for (idx, port) in ports.iter().enumerate() {
86            table.add_direct(port.network_number, idx);
87        }
88
89        let table = Arc::new(Mutex::new(table));
90        let (local_tx, local_rx) = mpsc::channel(256);
91
92        // Start each transport, set up send channels
93        let mut port_receivers = Vec::new();
94        let mut send_txs: Vec<mpsc::Sender<SendRequest>> = Vec::new();
95        let mut sender_tasks = Vec::new();
96        let mut port_networks = Vec::new();
97        let mut port_local_macs = Vec::new();
98
99        for port in &mut ports {
100            let rx = port.transport.start().await?;
101            port_receivers.push(rx);
102            port_networks.push(port.network_number);
103            port_local_macs.push(MacAddr::from_slice(port.transport.local_mac()));
104        }
105
106        // Move transports into sender tasks
107        for port in ports {
108            let (send_tx, mut send_rx) = mpsc::channel::<SendRequest>(256);
109            send_txs.push(send_tx);
110
111            let transport = port.transport;
112            let task = tokio::spawn(async move {
113                while let Some(req) = send_rx.recv().await {
114                    match req {
115                        SendRequest::Unicast { npdu, mac } => {
116                            if let Err(e) = transport.send_unicast(&npdu, &mac).await {
117                                warn!(error = %e, "Router send_unicast failed");
118                            }
119                        }
120                        SendRequest::Broadcast { npdu } => {
121                            if let Err(e) = transport.send_broadcast(&npdu).await {
122                                warn!(error = %e, "Router send_broadcast failed");
123                            }
124                        }
125                    }
126                }
127            });
128            sender_tasks.push(task);
129        }
130
131        let send_txs = Arc::new(send_txs);
132
133        // Announce I-Am-Router-To-Network on each port listing networks reachable via other ports.
134        for (port_idx, tx) in send_txs.iter().enumerate() {
135            let other_networks: Vec<u16> = port_networks
136                .iter()
137                .enumerate()
138                .filter(|(idx, _)| *idx != port_idx)
139                .map(|(_, net)| *net)
140                .collect();
141
142            if other_networks.is_empty() {
143                continue;
144            }
145
146            let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
147            for net in &other_networks {
148                payload.put_u16(*net);
149            }
150
151            let payload_len = payload.len();
152            let response = Npdu {
153                is_network_message: true,
154                message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
155                payload: payload.freeze(),
156                ..Npdu::default()
157            };
158
159            let mut buf = BytesMut::with_capacity(8 + payload_len);
160            if let Err(e) = encode_npdu(&mut buf, &response) {
161                warn!("Failed to encode I-Am-Router NPDU: {e}");
162                continue;
163            }
164
165            if let Err(e) = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() }) {
166                warn!(%e, "Router dropped I-Am-Router announcement: output channel full");
167            }
168        }
169
170        let mut dispatch_tasks = Vec::new();
171
172        for (port_idx, mut rx) in port_receivers.into_iter().enumerate() {
173            let table = Arc::clone(&table);
174            let local_tx = local_tx.clone();
175            let send_txs = Arc::clone(&send_txs);
176            let port_network = port_networks[port_idx];
177            let local_mac = port_local_macs[port_idx].clone();
178
179            let task = tokio::spawn(async move {
180                while let Some(received) = rx.recv().await {
181                    match decode_npdu(received.npdu.clone()) {
182                        Ok(npdu) => {
183                            if npdu.is_network_message {
184                                // Proprietary network messages (type >= 0x80) with DNET
185                                // should be forwarded, not processed locally.
186                                let is_proprietary =
187                                    npdu.message_type.map(|t| t >= 0x80).unwrap_or(false);
188                                let has_remote_dest = npdu
189                                    .destination
190                                    .as_ref()
191                                    .is_some_and(|d| d.network != 0xFFFF);
192                                if is_proprietary && has_remote_dest {
193                                    // Fall through to normal DNET routing below
194                                } else {
195                                    handle_network_message(
196                                        &table,
197                                        &send_txs,
198                                        port_idx,
199                                        port_network,
200                                        &received.source_mac,
201                                        &npdu,
202                                    )
203                                    .await;
204                                    continue;
205                                }
206                            }
207
208                            if let Some(ref dest) = npdu.destination {
209                                let dest_net = dest.network;
210
211                                // Global broadcast — forward to all other ports
212                                if dest_net == 0xFFFF {
213                                    forward_broadcast(
214                                        &send_txs,
215                                        port_idx,
216                                        port_network,
217                                        &received.source_mac,
218                                        &npdu,
219                                    );
220
221                                    // Deliver locally as well
222                                    let apdu = ReceivedApdu {
223                                        apdu: npdu.payload,
224                                        source_mac: received.source_mac,
225                                        source_network: npdu.source,
226                                        reply_tx: received.reply_tx,
227                                    };
228                                    let _ = local_tx.send(apdu).await;
229                                    continue;
230                                }
231
232                                // Route lookup for destination network
233                                let (route, reachability) = {
234                                    let mut tbl = table.lock().await;
235                                    let route = tbl.lookup(dest_net).cloned();
236                                    let reachability = tbl.effective_reachability(dest_net);
237                                    if route.is_some() {
238                                        tbl.touch(dest_net);
239                                    }
240                                    (route, reachability)
241                                };
242
243                                if let Some(route) = route {
244                                    // Check reachability before forwarding (spec 6.6.3.6)
245                                    match reachability.unwrap_or(ReachabilityStatus::Reachable) {
246                                        ReachabilityStatus::Busy => {
247                                            send_reject(
248                                                &send_txs[port_idx],
249                                                &received.source_mac,
250                                                dest_net,
251                                                RejectMessageReason::ROUTER_BUSY,
252                                            );
253                                            continue;
254                                        }
255                                        ReachabilityStatus::Unreachable => {
256                                            send_reject(
257                                                &send_txs[port_idx],
258                                                &received.source_mac,
259                                                dest_net,
260                                                RejectMessageReason::NOT_DIRECTLY_CONNECTED,
261                                            );
262                                            continue;
263                                        }
264                                        ReachabilityStatus::Reachable => {}
265                                    }
266                                    if route.port_index == port_idx && route.directly_connected {
267                                        let dest_mac = npdu
268                                            .destination
269                                            .as_ref()
270                                            .map(|d| &d.mac_address[..])
271                                            .unwrap_or(&[]);
272                                        if dest_mac == &local_mac[..] {
273                                            // DADR matches our MAC: deliver locally
274                                            let apdu = ReceivedApdu {
275                                                apdu: npdu.payload,
276                                                source_mac: received.source_mac,
277                                                source_network: npdu.source,
278                                                reply_tx: received.reply_tx,
279                                            };
280                                            let _ = local_tx.send(apdu).await;
281                                        } else {
282                                            // Remote broadcast to our network (DLEN=0):
283                                            // deliver locally AND forward
284                                            if dest_mac.is_empty() {
285                                                let apdu = ReceivedApdu {
286                                                    apdu: npdu.payload.clone(),
287                                                    source_mac: received.source_mac.clone(),
288                                                    source_network: npdu.source.clone(),
289                                                    reply_tx: None,
290                                                };
291                                                let _ = local_tx.send(apdu).await;
292                                            }
293                                            forward_unicast(
294                                                &send_txs,
295                                                &route,
296                                                port_network,
297                                                &received.source_mac,
298                                                npdu,
299                                                port_idx,
300                                            );
301                                        }
302                                    } else {
303                                        forward_unicast(
304                                            &send_txs,
305                                            &route,
306                                            port_network,
307                                            &received.source_mac,
308                                            npdu,
309                                            port_idx,
310                                        );
311                                    }
312                                } else {
313                                    // Unknown network: send reject
314                                    send_reject(
315                                        &send_txs[port_idx],
316                                        &received.source_mac,
317                                        dest_net,
318                                        RejectMessageReason::NOT_DIRECTLY_CONNECTED,
319                                    );
320                                }
321                            } else {
322                                let apdu = ReceivedApdu {
323                                    apdu: npdu.payload,
324                                    source_mac: received.source_mac,
325                                    source_network: npdu.source,
326                                    reply_tx: received.reply_tx,
327                                };
328                                let _ = local_tx.send(apdu).await;
329                            }
330                        }
331                        Err(e) => {
332                            warn!(error = %e, port = port_idx, "Router decode failed");
333                        }
334                    }
335                }
336            });
337
338            dispatch_tasks.push(task);
339        }
340
341        // Periodically purge stale learned routes.
342        let aging_table = Arc::clone(&table);
343        let aging_task = tokio::spawn(async move {
344            let mut interval = tokio::time::interval(Duration::from_secs(60));
345            let max_age = Duration::from_secs(300); // 5 minutes
346            loop {
347                interval.tick().await;
348                let mut tbl = aging_table.lock().await;
349                let purged = tbl.purge_stale(max_age);
350                tbl.clear_expired_busy();
351                drop(tbl);
352                for net in purged {
353                    debug!(network = net, "Purged stale route");
354                }
355            }
356        });
357
358        Ok((
359            Self {
360                table,
361                dispatch_tasks,
362                sender_tasks,
363                aging_task: Some(aging_task),
364            },
365            local_rx,
366        ))
367    }
368
369    /// Get a reference to the routing table.
370    pub fn table(&self) -> &Arc<Mutex<RouterTable>> {
371        &self.table
372    }
373
374    /// Stop the router.
375    pub async fn stop(&mut self) {
376        for task in self.dispatch_tasks.drain(..) {
377            task.abort();
378            let _ = task.await;
379        }
380        for task in self.sender_tasks.drain(..) {
381            task.abort();
382            let _ = task.await;
383        }
384        if let Some(task) = self.aging_task.take() {
385            task.abort();
386            let _ = task.await;
387        }
388    }
389}
390
391/// Build the source NpduAddress for a forwarded message.
392fn build_source(npdu: &Npdu, source_network: u16, source_mac: &[u8]) -> NpduAddress {
393    npdu.source.clone().unwrap_or(NpduAddress {
394        network: source_network,
395        mac_address: MacAddr::from_slice(source_mac),
396    })
397}
398
399/// Forward a message to a specific destination via the route entry.
400fn forward_unicast(
401    send_txs: &[mpsc::Sender<SendRequest>],
402    route: &crate::router_table::RouteEntry,
403    source_network: u16,
404    source_mac: &[u8],
405    npdu: Npdu,
406    _source_port_idx: usize,
407) {
408    if npdu.hop_count == 0 {
409        warn!("Discarding NPDU with hop_count=0");
410        return;
411    }
412
413    let payload_len = npdu.payload.len();
414    let source = build_source(&npdu, source_network, source_mac);
415    let dest_mac;
416    let forwarded_dest;
417    let forwarded_hop_count;
418
419    if route.directly_connected {
420        // Directly connected: strip DNET/DADR/Hop Count from NPCI, send to DADR.
421        dest_mac = npdu
422            .destination
423            .as_ref()
424            .map(|d| d.mac_address.clone())
425            .unwrap_or_default();
426        forwarded_dest = None;
427        forwarded_hop_count = 0; // not used without destination
428    } else {
429        dest_mac = route.next_hop_mac.clone();
430        forwarded_dest = npdu.destination;
431        forwarded_hop_count = npdu.hop_count - 1;
432    };
433
434    let forwarded = Npdu {
435        is_network_message: npdu.is_network_message,
436        expecting_reply: npdu.expecting_reply,
437        priority: npdu.priority,
438        destination: forwarded_dest,
439        source: Some(source),
440        hop_count: forwarded_hop_count,
441        message_type: None,
442        vendor_id: None,
443        payload: npdu.payload,
444    };
445
446    let mut buf = BytesMut::with_capacity(32 + payload_len);
447    if let Err(e) = encode_npdu(&mut buf, &forwarded) {
448        warn!("Failed to encode forwarded NPDU: {e}");
449        return;
450    }
451
452    if route.port_index >= send_txs.len() {
453        warn!(
454            port = route.port_index,
455            "Route references invalid port index"
456        );
457        return;
458    }
459    if dest_mac.is_empty() {
460        if let Err(e) =
461            send_txs[route.port_index].try_send(SendRequest::Broadcast { npdu: buf.freeze() })
462        {
463            warn!(%e, "Router dropped message: output channel full");
464        }
465    } else if let Err(e) = send_txs[route.port_index].try_send(SendRequest::Unicast {
466        npdu: buf.freeze(),
467        mac: dest_mac,
468    }) {
469        warn!(%e, "Router dropped message: output channel full");
470    }
471}
472
473/// Forward a global broadcast to all ports except the source port.
474fn forward_broadcast(
475    send_txs: &[mpsc::Sender<SendRequest>],
476    source_port: usize,
477    source_network: u16,
478    source_mac: &[u8],
479    npdu: &Npdu,
480) {
481    if npdu.hop_count == 0 {
482        warn!("Discarding NPDU with hop_count=0");
483        return;
484    }
485
486    let forwarded = Npdu {
487        is_network_message: npdu.is_network_message,
488        expecting_reply: npdu.expecting_reply,
489        priority: npdu.priority,
490        destination: npdu.destination.clone(),
491        source: Some(build_source(npdu, source_network, source_mac)),
492        hop_count: npdu.hop_count - 1,
493        message_type: npdu.message_type,
494        vendor_id: npdu.vendor_id,
495        payload: npdu.payload.clone(),
496    };
497
498    let mut buf = BytesMut::with_capacity(32 + npdu.payload.len());
499    if let Err(e) = encode_npdu(&mut buf, &forwarded) {
500        warn!("Failed to encode forwarded broadcast NPDU: {e}");
501        return;
502    }
503
504    let encoded = buf.freeze();
505    for (idx, tx) in send_txs.iter().enumerate() {
506        if idx == source_port {
507            continue;
508        }
509        if let Err(e) = tx.try_send(SendRequest::Broadcast {
510            npdu: encoded.clone(),
511        }) {
512            warn!(%e, "Router dropped broadcast: output channel full");
513        }
514    }
515}
516
517/// Handle a network-layer message.
518async fn handle_network_message(
519    table: &Arc<Mutex<RouterTable>>,
520    send_txs: &[mpsc::Sender<SendRequest>],
521    port_idx: usize,
522    port_network: u16,
523    source_mac: &[u8],
524    npdu: &Npdu,
525) {
526    const MAX_LEARNED_ROUTES: usize = 256;
527
528    let msg_type = match npdu.message_type {
529        Some(t) => t,
530        None => return,
531    };
532
533    if msg_type == NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw() {
534        let table = table.lock().await;
535
536        let requested_network = if npdu.payload.len() >= 2 {
537            Some(u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]))
538        } else {
539            None
540        };
541
542        let networks: Vec<u16> = if let Some(net) = requested_network {
543            // Only respond if the network is reachable via a different port.
544            match table.lookup(net) {
545                Some(entry) if entry.port_index != port_idx => vec![net],
546                _ => {
547                    // Unknown: forward Who-Is-Router to all other ports to discover the path.
548                    drop(table);
549                    let forward = Npdu {
550                        is_network_message: true,
551                        message_type: Some(NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw()),
552                        payload: npdu.payload.clone(),
553                        ..Npdu::default()
554                    };
555                    let mut fwd_buf = BytesMut::with_capacity(8);
556                    if let Ok(()) = encode_npdu(&mut fwd_buf, &forward) {
557                        let frozen = fwd_buf.freeze();
558                        for (i, tx) in send_txs.iter().enumerate() {
559                            if i != port_idx {
560                                let _ = tx.try_send(SendRequest::Broadcast {
561                                    npdu: frozen.clone(),
562                                });
563                            }
564                        }
565                    }
566                    return;
567                }
568            }
569        } else {
570            table.networks_not_on_port(port_idx)
571        };
572
573        if networks.is_empty() {
574            return;
575        }
576
577        let mut payload = BytesMut::with_capacity(networks.len() * 2);
578        for net in &networks {
579            payload.put_u16(*net);
580        }
581
582        let payload_len = payload.len();
583        let response = Npdu {
584            is_network_message: true,
585            message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
586            payload: payload.freeze(),
587            ..Npdu::default()
588        };
589
590        let mut buf = BytesMut::with_capacity(4 + payload_len);
591        if let Err(e) = encode_npdu(&mut buf, &response) {
592            warn!("Failed to encode I-Am-Router response NPDU: {e}");
593            return;
594        }
595
596        // I-Am-Router-To-Network is always broadcast.
597        if let Err(e) = send_txs[port_idx].try_send(SendRequest::Broadcast { npdu: buf.freeze() }) {
598            warn!(%e, "Router dropped I-Am-Router response: output channel full");
599        }
600    } else if msg_type == NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw() {
601        let data = &npdu.payload;
602        let mut offset = 0;
603        let mut table = table.lock().await;
604
605        while offset + 2 <= data.len() {
606            let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
607            offset += 2;
608
609            if table.len() >= MAX_LEARNED_ROUTES && table.lookup(net).is_none() {
610                warn!("Router table learned routes cap ({MAX_LEARNED_ROUTES}) reached, ignoring further networks");
611                break;
612            }
613
614            if table.add_learned_with_flap_detection(net, port_idx, MacAddr::from_slice(source_mac))
615            {
616                debug!(
617                    network = net,
618                    port = port_idx,
619                    "Learned route from I-Am-Router-To-Network"
620                );
621            }
622        }
623        drop(table);
624
625        // Re-broadcast to all other ports unconditionally (spec 6.6.3.3).
626        if !npdu.payload.is_empty() {
627            let rebroadcast = Npdu {
628                is_network_message: true,
629                message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
630                payload: npdu.payload.clone(),
631                ..Npdu::default()
632            };
633            let mut buf = BytesMut::with_capacity(4 + npdu.payload.len());
634            if let Ok(()) = encode_npdu(&mut buf, &rebroadcast) {
635                let frozen = buf.freeze();
636                for (i, tx) in send_txs.iter().enumerate() {
637                    if i != port_idx {
638                        let _ = tx.try_send(SendRequest::Broadcast {
639                            npdu: frozen.clone(),
640                        });
641                    }
642                }
643            }
644        }
645    } else if msg_type == NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw() {
646        if npdu.payload.len() >= 3 {
647            let reason = npdu.payload[0];
648            let rejected_net = u16::from_be_bytes([npdu.payload[1], npdu.payload[2]]);
649            warn!(
650                network = rejected_net,
651                reason = reason,
652                "Received Reject-Message-To-Network"
653            );
654            {
655                let mut tbl = table.lock().await;
656                if let Some(entry) = tbl.lookup(rejected_net) {
657                    if !entry.directly_connected {
658                        match reason {
659                            1 => tbl.mark_unreachable(rejected_net),
660                            2 => tbl
661                                .mark_busy(rejected_net, Instant::now() + Duration::from_secs(30)),
662                            _ => {
663                                tbl.remove(rejected_net);
664                            }
665                        }
666                    }
667                }
668            }
669
670            // Relay the reject to the originating node if SNET/SADR is present.
671            if let Some(ref source) = npdu.source {
672                let tbl = table.lock().await;
673                if let Some(route) = tbl.lookup(source.network) {
674                    let dest_port = route.port_index;
675                    let dest_mac = if route.directly_connected {
676                        source.mac_address.clone()
677                    } else {
678                        route.next_hop_mac.clone()
679                    };
680                    drop(tbl);
681
682                    let forwarded = Npdu {
683                        is_network_message: true,
684                        message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
685                        destination: Some(NpduAddress {
686                            network: source.network,
687                            mac_address: source.mac_address.clone(),
688                        }),
689                        hop_count: 255,
690                        payload: npdu.payload.clone(),
691                        ..Npdu::default()
692                    };
693                    let mut buf = BytesMut::with_capacity(32);
694                    if let Ok(()) = encode_npdu(&mut buf, &forwarded) {
695                        if dest_mac.is_empty() {
696                            let _ = send_txs[dest_port]
697                                .try_send(SendRequest::Broadcast { npdu: buf.freeze() });
698                        } else {
699                            let _ = send_txs[dest_port].try_send(SendRequest::Unicast {
700                                npdu: buf.freeze(),
701                                mac: dest_mac,
702                            });
703                        }
704                    }
705                }
706            }
707        }
708    } else if msg_type == NetworkMessageType::ROUTER_BUSY_TO_NETWORK.to_raw() {
709        let data = &npdu.payload;
710        let mut offset = 0;
711        let deadline = Instant::now() + Duration::from_secs(30);
712        let mut tbl = table.lock().await;
713        while offset + 2 <= data.len() {
714            let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
715            offset += 2;
716            tbl.mark_busy(net, deadline);
717            debug!(
718                network = net,
719                "Router busy — marked network as congested (30s timer)"
720            );
721        }
722        drop(tbl);
723        // Re-broadcast to all other ports (spec 6.6.3.6)
724        let rebroadcast = Npdu {
725            is_network_message: true,
726            message_type: Some(NetworkMessageType::ROUTER_BUSY_TO_NETWORK.to_raw()),
727            payload: npdu.payload.clone(),
728            ..Npdu::default()
729        };
730        let mut buf = BytesMut::with_capacity(4 + npdu.payload.len());
731        if let Ok(()) = encode_npdu(&mut buf, &rebroadcast) {
732            let frozen = buf.freeze();
733            for (i, tx) in send_txs.iter().enumerate() {
734                if i != port_idx {
735                    let _ = tx.try_send(SendRequest::Broadcast {
736                        npdu: frozen.clone(),
737                    });
738                }
739            }
740        }
741    } else if msg_type == NetworkMessageType::ROUTER_AVAILABLE_TO_NETWORK.to_raw() {
742        let data = &npdu.payload;
743        let mut offset = 0;
744        let mut tbl = table.lock().await;
745        while offset + 2 <= data.len() {
746            let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
747            offset += 2;
748            tbl.mark_available(net);
749            debug!(network = net, "Router available — cleared congestion");
750        }
751        drop(tbl);
752        // Re-broadcast to all other ports (spec 6.6.3.7)
753        let rebroadcast = Npdu {
754            is_network_message: true,
755            message_type: Some(NetworkMessageType::ROUTER_AVAILABLE_TO_NETWORK.to_raw()),
756            payload: npdu.payload.clone(),
757            ..Npdu::default()
758        };
759        let mut buf = BytesMut::with_capacity(4 + npdu.payload.len());
760        if let Ok(()) = encode_npdu(&mut buf, &rebroadcast) {
761            let frozen = buf.freeze();
762            for (i, tx) in send_txs.iter().enumerate() {
763                if i != port_idx {
764                    let _ = tx.try_send(SendRequest::Broadcast {
765                        npdu: frozen.clone(),
766                    });
767                }
768            }
769        }
770    } else if msg_type == NetworkMessageType::INITIALIZE_ROUTING_TABLE.to_raw() {
771        let data = &npdu.payload;
772        let count = if data.is_empty() { 0 } else { data[0] as usize };
773
774        let is_query = count == 0;
775
776        if !is_query {
777            let mut offset = 1usize;
778            let mut tbl = table.lock().await;
779            for _ in 0..count {
780                if offset + 4 > data.len() {
781                    break;
782                }
783                let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
784                // skip port_id (1 byte)
785                let info_len = data[offset + 3] as usize;
786                if offset + 4 + info_len > data.len() {
787                    break;
788                }
789                offset += 4 + info_len;
790
791                if net == 0 || net == 0xFFFF {
792                    continue;
793                }
794                if tbl.lookup(net).is_some() {
795                    continue; // don't overwrite existing routes
796                }
797                if tbl.len() >= MAX_LEARNED_ROUTES {
798                    warn!("Init-Routing-Table: route cap reached, ignoring further entries");
799                    break;
800                }
801                tbl.add_learned(net, port_idx, MacAddr::from_slice(source_mac));
802                debug!(
803                    network = net,
804                    port = port_idx,
805                    "Learned route from Init-Routing-Table"
806                );
807            }
808        }
809
810        let mut payload = BytesMut::new();
811        if is_query {
812            let tbl = table.lock().await;
813            let networks = tbl.networks();
814            let count = networks.len().min(255);
815            payload.put_u8(count as u8);
816            for net in networks.iter().take(count) {
817                if let Some(route) = tbl.lookup(*net) {
818                    payload.put_u16(*net);
819                    payload.put_u8(route.port_index as u8); // Port ID
820                    payload.put_u8(0); // Port info length
821                }
822            }
823        } else {
824            payload.put_u8(0);
825        }
826
827        let payload_len = payload.len();
828        let response = Npdu {
829            is_network_message: true,
830            message_type: Some(NetworkMessageType::INITIALIZE_ROUTING_TABLE_ACK.to_raw()),
831            payload: payload.freeze(),
832            ..Npdu::default()
833        };
834
835        let mut buf = BytesMut::with_capacity(8 + payload_len);
836        if let Err(e) = encode_npdu(&mut buf, &response) {
837            warn!("Failed to encode Init-Routing-Table-ACK NPDU: {e}");
838            return;
839        }
840
841        if let Err(e) = send_txs[port_idx].try_send(SendRequest::Unicast {
842            npdu: buf.freeze(),
843            mac: MacAddr::from_slice(source_mac),
844        }) {
845            warn!(%e, "Router dropped Init-Routing-Table-ACK: output channel full");
846        }
847    } else if msg_type == NetworkMessageType::I_COULD_BE_ROUTER_TO_NETWORK.to_raw() {
848        if npdu.payload.len() >= 3 {
849            let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
850            let performance_index = npdu.payload[2];
851            debug!(
852                network = net,
853                performance_index = performance_index,
854                port = port_idx,
855                "Received I-Could-Be-Router-To-Network"
856            );
857            // Store only if no existing route (lower priority than direct/learned).
858            let mut tbl = table.lock().await;
859            if tbl.lookup(net).is_none() {
860                tbl.add_learned(net, port_idx, MacAddr::from_slice(source_mac));
861                debug!(
862                    network = net,
863                    port = port_idx,
864                    "Stored potential route from I-Could-Be-Router-To-Network"
865                );
866            }
867        }
868    } else if msg_type == NetworkMessageType::ESTABLISH_CONNECTION_TO_NETWORK.to_raw() {
869        if npdu.payload.len() >= 3 {
870            let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
871            let termination_time_min = npdu.payload[2];
872            tracing::info!(
873                network = net,
874                termination_time_minutes = termination_time_min,
875                "Received Establish-Connection-To-Network (PTP not implemented)"
876            );
877        }
878    } else if msg_type == NetworkMessageType::DISCONNECT_CONNECTION_TO_NETWORK.to_raw() {
879        if npdu.payload.len() >= 2 {
880            let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
881            debug!(network = net, "Received Disconnect-Connection-To-Network");
882            let mut tbl = table.lock().await;
883            if let Some(entry) = tbl.lookup(net) {
884                if !entry.directly_connected {
885                    tbl.remove(net);
886                    debug!(
887                        network = net,
888                        "Removed dynamically established route on disconnect"
889                    );
890                }
891            }
892        }
893    } else if msg_type == NetworkMessageType::WHAT_IS_NETWORK_NUMBER.to_raw() {
894        // Ignore if SNET/SADR or DNET/DADR is present.
895        if npdu.source.is_some() || npdu.destination.is_some() {
896            return;
897        }
898        let mut payload = BytesMut::with_capacity(3);
899        payload.put_u16(port_network);
900        payload.put_u8(1); // configured
901
902        let response = Npdu {
903            is_network_message: true,
904            message_type: Some(NetworkMessageType::NETWORK_NUMBER_IS.to_raw()),
905            payload: payload.freeze(),
906            ..Npdu::default()
907        };
908
909        let mut buf = BytesMut::with_capacity(8);
910        if let Err(e) = encode_npdu(&mut buf, &response) {
911            warn!("Failed to encode Network-Number-Is NPDU: {e}");
912            return;
913        }
914
915        if let Err(e) = send_txs[port_idx].try_send(SendRequest::Broadcast { npdu: buf.freeze() }) {
916            warn!(%e, "Router dropped Network-Number-Is: output channel full");
917        }
918    } else if msg_type == NetworkMessageType::NETWORK_NUMBER_IS.to_raw() {
919        // Spec 6.6.3.12: process Network-Number-Is for conflict detection.
920        if npdu.payload.len() >= 3 {
921            let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
922            let configured = npdu.payload[2];
923            if net != port_network {
924                if configured == 1 {
925                    warn!(
926                        local_network = port_network,
927                        peer_network = net,
928                        "Network number conflict: port configured as {} but peer reports {} (configured)",
929                        port_network, net
930                    );
931                } else {
932                    debug!(
933                        local_network = port_network,
934                        peer_network = net,
935                        "Network-Number-Is from peer (learned, differs from local)"
936                    );
937                }
938            }
939        }
940    } else if msg_type == NetworkMessageType::INITIALIZE_ROUTING_TABLE_ACK.to_raw() {
941        // Spec 6.4.8: process Init-Routing-Table-Ack — learn routes from peer.
942        let data = &npdu.payload;
943        if data.is_empty() {
944            return;
945        }
946        let count = data[0] as usize;
947        let mut offset = 1usize;
948        let mut table = table.lock().await;
949        for _ in 0..count {
950            if offset + 4 > data.len() {
951                break;
952            }
953            let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
954            let info_len = data[offset + 3] as usize;
955            if offset + 4 + info_len > data.len() {
956                break;
957            }
958            offset += 4 + info_len;
959            if net == 0 || net == 0xFFFF {
960                continue;
961            }
962            if table.len() >= MAX_LEARNED_ROUTES {
963                break;
964            }
965            table.add_learned_with_flap_detection(net, port_idx, MacAddr::from_slice(source_mac));
966            debug!(
967                network = net,
968                port = port_idx,
969                "Learned route from Init-Routing-Table-Ack"
970            );
971        }
972    } else if (0x0A..=0x11).contains(&msg_type) {
973        // Security messages — acknowledge but do not reject.
974        debug!(
975            message_type = msg_type,
976            "Router received security network message (not implemented)"
977        );
978    } else {
979        // Unknown message type — reject with reason 3 (spec 6.6.3.5).
980        debug!(
981            message_type = msg_type,
982            "Router rejecting unknown network message type"
983        );
984        send_reject(
985            &send_txs[port_idx],
986            source_mac,
987            0,
988            RejectMessageReason::UNKNOWN_MESSAGE_TYPE,
989        );
990    }
991}
992
993/// Send a Reject-Message-To-Network.
994fn send_reject(
995    send_tx: &mpsc::Sender<SendRequest>,
996    source_mac: &[u8],
997    rejected_network: u16,
998    reason: RejectMessageReason,
999) {
1000    let mut payload = BytesMut::with_capacity(3);
1001    payload.put_u8(reason.to_raw());
1002    payload.put_u16(rejected_network);
1003
1004    let reject = Npdu {
1005        is_network_message: true,
1006        message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
1007        payload: payload.freeze(),
1008        ..Npdu::default()
1009    };
1010
1011    let mut buf = BytesMut::with_capacity(8);
1012    if let Err(e) = encode_npdu(&mut buf, &reject) {
1013        warn!("Failed to encode Reject-Message NPDU: {e}");
1014        return;
1015    }
1016
1017    if let Err(e) = send_tx.try_send(SendRequest::Unicast {
1018        npdu: buf.freeze(),
1019        mac: MacAddr::from_slice(source_mac),
1020    }) {
1021        warn!(%e, "Router dropped reject message: output channel full");
1022    }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027    use super::*;
1028    use bacnet_transport::bip::BipTransport;
1029    use std::net::Ipv4Addr;
1030    use tokio::time::Duration;
1031
1032    #[tokio::test]
1033    async fn router_forwards_between_networks() {
1034        let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1035        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1036
1037        let mut device_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1038        let mut device_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1039
1040        let _rx_b = device_b.start().await.unwrap();
1041        let _rx_a = device_a.start().await.unwrap();
1042
1043        let port_a = RouterPort {
1044            transport: transport_a,
1045            network_number: 1000,
1046        };
1047        let port_b = RouterPort {
1048            transport: transport_b,
1049            network_number: 2000,
1050        };
1051
1052        let (mut router, _local_rx) = BACnetRouter::start(vec![port_a, port_b]).await.unwrap();
1053
1054        tokio::time::sleep(Duration::from_millis(50)).await;
1055
1056        let apdu = vec![0x10, 0x08];
1057        let npdu = Npdu {
1058            is_network_message: false,
1059            expecting_reply: false,
1060            priority: bacnet_types::enums::NetworkPriority::NORMAL,
1061            destination: Some(NpduAddress {
1062                network: 2000,
1063                mac_address: MacAddr::from_slice(device_b.local_mac()),
1064            }),
1065            source: None,
1066            hop_count: 255,
1067            payload: Bytes::copy_from_slice(&apdu),
1068            ..Npdu::default()
1069        };
1070
1071        let mut buf = BytesMut::new();
1072        encode_npdu(&mut buf, &npdu).unwrap();
1073
1074        let table = router.table().lock().await;
1075        assert_eq!(table.len(), 2);
1076        assert!(table.lookup(1000).unwrap().directly_connected);
1077        assert!(table.lookup(2000).unwrap().directly_connected);
1078        drop(table);
1079
1080        router.stop().await;
1081    }
1082
1083    #[tokio::test]
1084    async fn router_table_populated_on_start() {
1085        let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1086        let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1087        let transport_c = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1088
1089        let ports = vec![
1090            RouterPort {
1091                transport: transport_a,
1092                network_number: 100,
1093            },
1094            RouterPort {
1095                transport: transport_b,
1096                network_number: 200,
1097            },
1098            RouterPort {
1099                transport: transport_c,
1100                network_number: 300,
1101            },
1102        ];
1103
1104        let (mut router, _local_rx) = BACnetRouter::start(ports).await.unwrap();
1105
1106        let table = router.table().lock().await;
1107        assert_eq!(table.len(), 3);
1108        assert_eq!(table.lookup(100).unwrap().port_index, 0);
1109        assert_eq!(table.lookup(200).unwrap().port_index, 1);
1110        assert_eq!(table.lookup(300).unwrap().port_index, 2);
1111        drop(table);
1112
1113        router.stop().await;
1114    }
1115
1116    #[tokio::test]
1117    async fn local_message_delivered_to_application() {
1118        let transport = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1119        let mut sender = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1120        let _sender_rx = sender.start().await.unwrap();
1121
1122        let router_port = RouterPort {
1123            transport,
1124            network_number: 1000,
1125        };
1126
1127        let (mut router, _local_rx) = BACnetRouter::start(vec![router_port]).await.unwrap();
1128
1129        tokio::time::sleep(Duration::from_millis(50)).await;
1130
1131        router.stop().await;
1132    }
1133
1134    #[test]
1135    fn forward_unicast_drops_hop_count_zero() {
1136        let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1137        let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1138        let send_txs = vec![tx_a, tx_b];
1139
1140        let route = crate::router_table::RouteEntry {
1141            port_index: 1,
1142            directly_connected: true,
1143            next_hop_mac: MacAddr::new(),
1144            last_seen: None,
1145            reachability: crate::router_table::ReachabilityStatus::Reachable,
1146            busy_until: None,
1147            flap_count: 0,
1148            last_port_change: None,
1149        };
1150
1151        let npdu = Npdu {
1152            is_network_message: false,
1153            expecting_reply: false,
1154            priority: bacnet_types::enums::NetworkPriority::NORMAL,
1155            destination: Some(NpduAddress {
1156                network: 2000,
1157                mac_address: MacAddr::from_slice(&[0x01, 0x02]),
1158            }),
1159            source: None,
1160            hop_count: 0, // Should cause the message to be dropped
1161            payload: Bytes::from_static(&[0x10, 0x08]),
1162            ..Npdu::default()
1163        };
1164
1165        forward_unicast(&send_txs, &route, 1000, &[0x0A], npdu, 0);
1166
1167        assert!(rx_a.try_recv().is_err());
1168        assert!(rx_b.try_recv().is_err());
1169    }
1170
1171    #[test]
1172    fn forward_broadcast_drops_hop_count_zero() {
1173        let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1174        let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1175        let send_txs = vec![tx_a, tx_b];
1176
1177        let npdu = Npdu {
1178            is_network_message: false,
1179            expecting_reply: false,
1180            priority: bacnet_types::enums::NetworkPriority::NORMAL,
1181            destination: Some(NpduAddress {
1182                network: 0xFFFF,
1183                mac_address: MacAddr::new(),
1184            }),
1185            source: None,
1186            hop_count: 0, // Should cause broadcast to be dropped
1187            payload: Bytes::from_static(&[0x10, 0x08]),
1188            ..Npdu::default()
1189        };
1190
1191        forward_broadcast(&send_txs, 0, 1000, &[0x0A], &npdu);
1192
1193        assert!(rx_a.try_recv().is_err());
1194        assert!(rx_b.try_recv().is_err());
1195    }
1196
1197    #[test]
1198    fn forward_unicast_decrements_hop_count() {
1199        let (tx_a, _rx_a) = mpsc::channel::<SendRequest>(256);
1200        let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1201        let send_txs = vec![tx_a, tx_b];
1202
1203        let route = crate::router_table::RouteEntry {
1204            port_index: 1,
1205            directly_connected: true,
1206            next_hop_mac: MacAddr::new(),
1207            last_seen: None,
1208            reachability: crate::router_table::ReachabilityStatus::Reachable,
1209            busy_until: None,
1210            flap_count: 0,
1211            last_port_change: None,
1212        };
1213
1214        let npdu = Npdu {
1215            is_network_message: false,
1216            expecting_reply: false,
1217            priority: bacnet_types::enums::NetworkPriority::NORMAL,
1218            destination: Some(NpduAddress {
1219                network: 2000,
1220                mac_address: MacAddr::from_slice(&[0x01, 0x02]),
1221            }),
1222            source: None,
1223            hop_count: 10,
1224            payload: Bytes::from_static(&[0x10, 0x08]),
1225            ..Npdu::default()
1226        };
1227
1228        forward_unicast(&send_txs, &route, 1000, &[0x0A], npdu, 0);
1229
1230        let sent = rx_b.try_recv().unwrap();
1231        match sent {
1232            SendRequest::Unicast { npdu: data, .. } => {
1233                let decoded = decode_npdu(data.clone()).unwrap();
1234                assert!(decoded.destination.is_none());
1235                assert!(decoded.source.is_some());
1236            }
1237            SendRequest::Broadcast { npdu: data } => {
1238                let decoded = decode_npdu(data.clone()).unwrap();
1239                assert!(decoded.destination.is_none());
1240            }
1241        }
1242    }
1243
1244    #[test]
1245    fn send_reject_generates_reject_message() {
1246        let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1247
1248        let source_mac = vec![0x0A, 0x00, 0x01, 0x01];
1249        let unknown_network: u16 = 9999;
1250
1251        send_reject(
1252            &tx,
1253            &source_mac,
1254            unknown_network,
1255            RejectMessageReason::NOT_DIRECTLY_CONNECTED,
1256        );
1257
1258        let sent = rx.try_recv().unwrap();
1259        match sent {
1260            SendRequest::Unicast { npdu: data, mac } => {
1261                assert_eq!(mac.as_slice(), &source_mac[..]);
1262                let decoded = decode_npdu(data.clone()).unwrap();
1263                assert!(decoded.is_network_message);
1264                assert_eq!(
1265                    decoded.message_type,
1266                    Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw())
1267                );
1268                assert_eq!(decoded.payload.len(), 3);
1269                assert_eq!(
1270                    decoded.payload[0],
1271                    RejectMessageReason::NOT_DIRECTLY_CONNECTED.to_raw()
1272                );
1273                let rejected_net = u16::from_be_bytes([decoded.payload[1], decoded.payload[2]]);
1274                assert_eq!(rejected_net, 9999);
1275            }
1276            _ => panic!("Expected Unicast send for reject message"),
1277        }
1278    }
1279
1280    #[tokio::test]
1281    async fn single_port_router_no_i_am_router_announcement() {
1282        let (send_tx, mut send_rx) = mpsc::channel::<SendRequest>(256);
1283
1284        let port_networks: Vec<u16> = vec![1000];
1285        let send_txs = [send_tx];
1286
1287        for (port_idx, tx) in send_txs.iter().enumerate() {
1288            let other_networks: Vec<u16> = port_networks
1289                .iter()
1290                .enumerate()
1291                .filter(|(idx, _)| *idx != port_idx)
1292                .map(|(_, net)| *net)
1293                .collect();
1294
1295            if other_networks.is_empty() {
1296                continue;
1297            }
1298
1299            let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
1300            for net in &other_networks {
1301                payload.put_u16(*net);
1302            }
1303
1304            let payload_len = payload.len();
1305            let response = Npdu {
1306                is_network_message: true,
1307                message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
1308                payload: payload.freeze(),
1309                ..Npdu::default()
1310            };
1311
1312            let mut buf = BytesMut::with_capacity(8 + payload_len);
1313            encode_npdu(&mut buf, &response).unwrap();
1314
1315            let _ = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() });
1316        }
1317
1318        assert!(send_rx.try_recv().is_err());
1319    }
1320
1321    #[tokio::test]
1322    async fn two_port_router_sends_i_am_router_announcement() {
1323        let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1324        let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1325
1326        let port_networks: Vec<u16> = vec![1000, 2000];
1327        let send_txs = [tx_a, tx_b];
1328
1329        for (port_idx, tx) in send_txs.iter().enumerate() {
1330            let other_networks: Vec<u16> = port_networks
1331                .iter()
1332                .enumerate()
1333                .filter(|(idx, _)| *idx != port_idx)
1334                .map(|(_, net)| *net)
1335                .collect();
1336
1337            if other_networks.is_empty() {
1338                continue;
1339            }
1340
1341            let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
1342            for net in &other_networks {
1343                payload.put_u16(*net);
1344            }
1345
1346            let payload_len = payload.len();
1347            let response = Npdu {
1348                is_network_message: true,
1349                message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
1350                payload: payload.freeze(),
1351                ..Npdu::default()
1352            };
1353
1354            let mut buf = BytesMut::with_capacity(8 + payload_len);
1355            encode_npdu(&mut buf, &response).unwrap();
1356
1357            let _ = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() });
1358        }
1359
1360        let sent_a = rx_a.try_recv().unwrap();
1361        match sent_a {
1362            SendRequest::Broadcast { npdu: data } => {
1363                let decoded = decode_npdu(data.clone()).unwrap();
1364                assert!(decoded.is_network_message);
1365                assert_eq!(
1366                    decoded.message_type,
1367                    Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw())
1368                );
1369                assert_eq!(decoded.payload.len(), 2);
1370                let net = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1371                assert_eq!(net, 2000);
1372            }
1373            _ => panic!("Expected Broadcast for I-Am-Router announcement on port A"),
1374        }
1375
1376        let sent_b = rx_b.try_recv().unwrap();
1377        match sent_b {
1378            SendRequest::Broadcast { npdu: data } => {
1379                let decoded = decode_npdu(data.clone()).unwrap();
1380                assert!(decoded.is_network_message);
1381                assert_eq!(
1382                    decoded.message_type,
1383                    Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw())
1384                );
1385                assert_eq!(decoded.payload.len(), 2);
1386                let net = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1387                assert_eq!(net, 1000);
1388            }
1389            _ => panic!("Expected Broadcast for I-Am-Router announcement on port B"),
1390        }
1391    }
1392
1393    #[tokio::test]
1394    async fn three_port_router_announces_multiple_networks() {
1395        let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1396        let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1397        let (tx_c, mut rx_c) = mpsc::channel::<SendRequest>(256);
1398
1399        let port_networks: Vec<u16> = vec![100, 200, 300];
1400        let send_txs = [tx_a, tx_b, tx_c];
1401
1402        for (port_idx, tx) in send_txs.iter().enumerate() {
1403            let other_networks: Vec<u16> = port_networks
1404                .iter()
1405                .enumerate()
1406                .filter(|(idx, _)| *idx != port_idx)
1407                .map(|(_, net)| *net)
1408                .collect();
1409
1410            if other_networks.is_empty() {
1411                continue;
1412            }
1413
1414            let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
1415            for net in &other_networks {
1416                payload.put_u16(*net);
1417            }
1418
1419            let payload_len = payload.len();
1420            let response = Npdu {
1421                is_network_message: true,
1422                message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
1423                payload: payload.freeze(),
1424                ..Npdu::default()
1425            };
1426
1427            let mut buf = BytesMut::with_capacity(8 + payload_len);
1428            encode_npdu(&mut buf, &response).unwrap();
1429
1430            let _ = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() });
1431        }
1432
1433        let sent_a = rx_a.try_recv().unwrap();
1434        match sent_a {
1435            SendRequest::Broadcast { npdu: data } => {
1436                let decoded = decode_npdu(data.clone()).unwrap();
1437                assert!(decoded.is_network_message);
1438                assert_eq!(decoded.payload.len(), 4); // two u16 values
1439                let net1 = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1440                let net2 = u16::from_be_bytes([decoded.payload[2], decoded.payload[3]]);
1441                assert_eq!(net1, 200);
1442                assert_eq!(net2, 300);
1443            }
1444            _ => panic!("Expected Broadcast on port A"),
1445        }
1446
1447        let sent_b = rx_b.try_recv().unwrap();
1448        match sent_b {
1449            SendRequest::Broadcast { npdu: data } => {
1450                let decoded = decode_npdu(data.clone()).unwrap();
1451                assert_eq!(decoded.payload.len(), 4);
1452                let net1 = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1453                let net2 = u16::from_be_bytes([decoded.payload[2], decoded.payload[3]]);
1454                assert_eq!(net1, 100);
1455                assert_eq!(net2, 300);
1456            }
1457            _ => panic!("Expected Broadcast on port B"),
1458        }
1459
1460        let sent_c = rx_c.try_recv().unwrap();
1461        match sent_c {
1462            SendRequest::Broadcast { npdu: data } => {
1463                let decoded = decode_npdu(data.clone()).unwrap();
1464                assert_eq!(decoded.payload.len(), 4);
1465                let net1 = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1466                let net2 = u16::from_be_bytes([decoded.payload[2], decoded.payload[3]]);
1467                assert_eq!(net1, 100);
1468                assert_eq!(net2, 200);
1469            }
1470            _ => panic!("Expected Broadcast on port C"),
1471        }
1472    }
1473
1474    #[test]
1475    fn forward_unicast_with_hop_count_one_still_forwards() {
1476        let (tx_a, _rx_a) = mpsc::channel::<SendRequest>(256);
1477        let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1478        let send_txs = vec![tx_a, tx_b];
1479
1480        let route = crate::router_table::RouteEntry {
1481            port_index: 1,
1482            directly_connected: true,
1483            next_hop_mac: MacAddr::new(),
1484            last_seen: None,
1485            reachability: crate::router_table::ReachabilityStatus::Reachable,
1486            busy_until: None,
1487            flap_count: 0,
1488            last_port_change: None,
1489        };
1490
1491        let npdu = Npdu {
1492            is_network_message: false,
1493            expecting_reply: false,
1494            priority: bacnet_types::enums::NetworkPriority::NORMAL,
1495            destination: Some(NpduAddress {
1496                network: 2000,
1497                mac_address: MacAddr::from_slice(&[0x01, 0x02]),
1498            }),
1499            source: None,
1500            hop_count: 1,
1501            payload: Bytes::from_static(&[0x10, 0x08]),
1502            ..Npdu::default()
1503        };
1504
1505        forward_unicast(&send_txs, &route, 1000, &[0x0A], npdu, 0);
1506
1507        let sent = rx_b.try_recv().unwrap();
1508        match sent {
1509            SendRequest::Unicast { npdu: data, .. } => {
1510                let decoded = decode_npdu(data.clone()).unwrap();
1511                assert!(decoded.destination.is_none());
1512                assert!(decoded.source.is_some());
1513            }
1514            SendRequest::Broadcast { npdu: data } => {
1515                let decoded = decode_npdu(data.clone()).unwrap();
1516                assert!(decoded.destination.is_none());
1517            }
1518        }
1519    }
1520
1521    #[tokio::test]
1522    async fn received_reject_removes_learned_route() {
1523        let mut table = RouterTable::new();
1524        table.add_direct(1000, 0);
1525        table.add_learned(3000, 0, MacAddr::from_slice(&[10, 0, 1, 1]));
1526        assert!(table.lookup(3000).is_some());
1527
1528        let table = Arc::new(Mutex::new(table));
1529
1530        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1531        let send_txs = vec![tx];
1532
1533        let mut payload = BytesMut::with_capacity(3);
1534        payload.put_u8(RejectMessageReason::OTHER.to_raw());
1535        payload.put_u16(3000);
1536
1537        let npdu = Npdu {
1538            is_network_message: true,
1539            message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
1540            payload: payload.freeze(),
1541            ..Npdu::default()
1542        };
1543
1544        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1545
1546        let tbl = table.lock().await;
1547        assert!(tbl.lookup(3000).is_none());
1548        assert!(tbl.lookup(1000).is_some());
1549    }
1550
1551    #[tokio::test]
1552    async fn received_reject_does_not_remove_direct_route() {
1553        let mut table = RouterTable::new();
1554        table.add_direct(1000, 0);
1555
1556        let table = Arc::new(Mutex::new(table));
1557
1558        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1559        let send_txs = vec![tx];
1560        let mut payload = BytesMut::with_capacity(3);
1561        payload.put_u8(RejectMessageReason::OTHER.to_raw());
1562        payload.put_u16(1000);
1563
1564        let npdu = Npdu {
1565            is_network_message: true,
1566            message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
1567            payload: payload.freeze(),
1568            ..Npdu::default()
1569        };
1570
1571        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1572
1573        let tbl = table.lock().await;
1574        assert!(tbl.lookup(1000).is_some());
1575    }
1576
1577    #[tokio::test]
1578    async fn who_is_router_with_specific_network() {
1579        let mut table = RouterTable::new();
1580        table.add_direct(1000, 0);
1581        table.add_direct(2000, 1);
1582        table.add_direct(3000, 2);
1583
1584        let table = Arc::new(Mutex::new(table));
1585
1586        let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1587        let send_txs = vec![tx];
1588
1589        let mut req_payload = BytesMut::with_capacity(2);
1590        req_payload.put_u16(2000);
1591
1592        let npdu = Npdu {
1593            is_network_message: true,
1594            message_type: Some(NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw()),
1595            payload: req_payload.freeze(),
1596            ..Npdu::default()
1597        };
1598
1599        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1600
1601        let sent = rx.try_recv().unwrap();
1602        match sent {
1603            SendRequest::Broadcast { npdu: data } => {
1604                let decoded = decode_npdu(data.clone()).unwrap();
1605                assert!(decoded.is_network_message);
1606                assert_eq!(
1607                    decoded.message_type,
1608                    Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw())
1609                );
1610                assert_eq!(decoded.payload.len(), 2);
1611                let net = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1612                assert_eq!(net, 2000);
1613            }
1614            _ => panic!("Expected Broadcast response for I-Am-Router"),
1615        }
1616    }
1617
1618    #[tokio::test]
1619    async fn who_is_router_with_unknown_network_no_response() {
1620        let mut table = RouterTable::new();
1621        table.add_direct(1000, 0);
1622
1623        let table = Arc::new(Mutex::new(table));
1624
1625        let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1626        let send_txs = vec![tx];
1627
1628        let mut req_payload = BytesMut::with_capacity(2);
1629        req_payload.put_u16(9999);
1630
1631        let npdu = Npdu {
1632            is_network_message: true,
1633            message_type: Some(NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw()),
1634            payload: req_payload.freeze(),
1635            ..Npdu::default()
1636        };
1637
1638        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1639
1640        assert!(rx.try_recv().is_err());
1641    }
1642
1643    #[tokio::test]
1644    async fn initialize_routing_table_ack() {
1645        let mut table = RouterTable::new();
1646        table.add_direct(1000, 0);
1647        table.add_direct(2000, 1);
1648
1649        let table = Arc::new(Mutex::new(table));
1650
1651        let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1652        let send_txs = vec![tx];
1653
1654        let npdu = Npdu {
1655            is_network_message: true,
1656            message_type: Some(NetworkMessageType::INITIALIZE_ROUTING_TABLE.to_raw()),
1657            payload: Bytes::new(),
1658            ..Npdu::default()
1659        };
1660
1661        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1662
1663        let sent = rx.try_recv().unwrap();
1664        match sent {
1665            SendRequest::Unicast { npdu: data, mac } => {
1666                assert_eq!(mac.as_slice(), &[0x0A]);
1667                let decoded = decode_npdu(data.clone()).unwrap();
1668                assert!(decoded.is_network_message);
1669                assert_eq!(
1670                    decoded.message_type,
1671                    Some(NetworkMessageType::INITIALIZE_ROUTING_TABLE_ACK.to_raw())
1672                );
1673                assert_eq!(decoded.payload.len(), 9);
1674                assert_eq!(decoded.payload[0], 2);
1675            }
1676            _ => panic!("Expected Unicast response for Init-Routing-Table"),
1677        }
1678    }
1679
1680    #[tokio::test]
1681    async fn router_busy_does_not_crash() {
1682        let table = RouterTable::new();
1683        let table = Arc::new(Mutex::new(table));
1684
1685        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1686        let send_txs = vec![tx];
1687
1688        let mut payload = BytesMut::with_capacity(4);
1689        payload.put_u16(1000);
1690        payload.put_u16(2000);
1691
1692        let npdu = Npdu {
1693            is_network_message: true,
1694            message_type: Some(NetworkMessageType::ROUTER_BUSY_TO_NETWORK.to_raw()),
1695            payload: payload.freeze(),
1696            ..Npdu::default()
1697        };
1698
1699        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1700    }
1701
1702    #[tokio::test]
1703    async fn router_available_does_not_crash() {
1704        let table = RouterTable::new();
1705        let table = Arc::new(Mutex::new(table));
1706
1707        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1708        let send_txs = vec![tx];
1709
1710        let mut payload = BytesMut::with_capacity(4);
1711        payload.put_u16(1000);
1712        payload.put_u16(2000);
1713
1714        let npdu = Npdu {
1715            is_network_message: true,
1716            message_type: Some(NetworkMessageType::ROUTER_AVAILABLE_TO_NETWORK.to_raw()),
1717            payload: payload.freeze(),
1718            ..Npdu::default()
1719        };
1720
1721        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1722    }
1723
1724    #[tokio::test]
1725    async fn i_could_be_router_stores_potential_route() {
1726        let table = RouterTable::new();
1727        let table = Arc::new(Mutex::new(table));
1728
1729        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1730        let send_txs = vec![tx];
1731
1732        let mut payload = BytesMut::with_capacity(3);
1733        payload.put_u16(5000);
1734        payload.put_u8(50);
1735
1736        let npdu = Npdu {
1737            is_network_message: true,
1738            message_type: Some(NetworkMessageType::I_COULD_BE_ROUTER_TO_NETWORK.to_raw()),
1739            payload: payload.freeze(),
1740            ..Npdu::default()
1741        };
1742
1743        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A, 0x0B], &npdu).await;
1744
1745        let tbl = table.lock().await;
1746        let entry = tbl.lookup(5000).unwrap();
1747        assert!(!entry.directly_connected);
1748        assert_eq!(entry.port_index, 0);
1749        assert_eq!(entry.next_hop_mac.as_slice(), &[0x0A, 0x0B]);
1750    }
1751
1752    #[tokio::test]
1753    async fn i_could_be_router_does_not_overwrite_existing_route() {
1754        let mut table = RouterTable::new();
1755        table.add_direct(5000, 1);
1756        let table = Arc::new(Mutex::new(table));
1757
1758        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1759        let send_txs = vec![tx];
1760
1761        let mut payload = BytesMut::with_capacity(3);
1762        payload.put_u16(5000);
1763        payload.put_u8(50);
1764
1765        let npdu = Npdu {
1766            is_network_message: true,
1767            message_type: Some(NetworkMessageType::I_COULD_BE_ROUTER_TO_NETWORK.to_raw()),
1768            payload: payload.freeze(),
1769            ..Npdu::default()
1770        };
1771
1772        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1773
1774        let tbl = table.lock().await;
1775        let entry = tbl.lookup(5000).unwrap();
1776        assert!(entry.directly_connected);
1777        assert_eq!(entry.port_index, 1);
1778    }
1779
1780    #[tokio::test]
1781    async fn establish_connection_does_not_crash() {
1782        let table = RouterTable::new();
1783        let table = Arc::new(Mutex::new(table));
1784
1785        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1786        let send_txs = vec![tx];
1787
1788        let mut payload = BytesMut::with_capacity(3);
1789        payload.put_u16(6000);
1790        payload.put_u8(30);
1791
1792        let npdu = Npdu {
1793            is_network_message: true,
1794            message_type: Some(NetworkMessageType::ESTABLISH_CONNECTION_TO_NETWORK.to_raw()),
1795            payload: payload.freeze(),
1796            ..Npdu::default()
1797        };
1798
1799        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1800    }
1801
1802    #[tokio::test]
1803    async fn disconnect_removes_learned_route() {
1804        let mut table = RouterTable::new();
1805        table.add_learned(7000, 0, MacAddr::from_slice(&[10, 0, 1, 1]));
1806        let table = Arc::new(Mutex::new(table));
1807
1808        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1809        let send_txs = vec![tx];
1810
1811        let mut payload = BytesMut::with_capacity(2);
1812        payload.put_u16(7000);
1813
1814        let npdu = Npdu {
1815            is_network_message: true,
1816            message_type: Some(NetworkMessageType::DISCONNECT_CONNECTION_TO_NETWORK.to_raw()),
1817            payload: payload.freeze(),
1818            ..Npdu::default()
1819        };
1820
1821        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1822
1823        let tbl = table.lock().await;
1824        assert!(tbl.lookup(7000).is_none());
1825    }
1826
1827    #[tokio::test]
1828    async fn disconnect_does_not_remove_direct_route() {
1829        let mut table = RouterTable::new();
1830        table.add_direct(1000, 0);
1831        let table = Arc::new(Mutex::new(table));
1832
1833        let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1834        let send_txs = vec![tx];
1835
1836        let mut payload = BytesMut::with_capacity(2);
1837        payload.put_u16(1000);
1838
1839        let npdu = Npdu {
1840            is_network_message: true,
1841            message_type: Some(NetworkMessageType::DISCONNECT_CONNECTION_TO_NETWORK.to_raw()),
1842            payload: payload.freeze(),
1843            ..Npdu::default()
1844        };
1845
1846        handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1847
1848        let tbl = table.lock().await;
1849        assert!(tbl.lookup(1000).is_some());
1850        assert!(tbl.lookup(1000).unwrap().directly_connected);
1851    }
1852}