1use std::sync::Arc;
14use std::time::{Duration, Instant};
15
16use bacnet_encoding::npdu::{decode_npdu, encode_npdu, Npdu, NpduAddress};
17use bacnet_transport::port::TransportPort;
18use bacnet_types::enums::{NetworkMessageType, RejectMessageReason};
19use bacnet_types::error::Error;
20use bacnet_types::MacAddr;
21use bytes::{BufMut, Bytes, BytesMut};
22use tokio::sync::{mpsc, Mutex};
23use tokio::task::JoinHandle;
24use tracing::{debug, warn};
25
26use crate::layer::ReceivedApdu;
27use crate::router_table::{ReachabilityStatus, RouterTable};
28
29#[derive(Debug)]
31enum SendRequest {
32 Unicast { npdu: Bytes, mac: MacAddr },
33 Broadcast { npdu: Bytes },
34}
35
36pub struct RouterPort<T: TransportPort> {
38 pub transport: T,
40 pub network_number: u16,
42}
43
44pub struct BACnetRouter {
50 table: Arc<Mutex<RouterTable>>,
52 dispatch_tasks: Vec<JoinHandle<()>>,
54 sender_tasks: Vec<JoinHandle<()>>,
56 aging_task: Option<JoinHandle<()>>,
58}
59
60impl BACnetRouter {
61 pub async fn start<T: TransportPort + 'static>(
67 mut ports: Vec<RouterPort<T>>,
68 ) -> Result<(Self, mpsc::Receiver<ReceivedApdu>), Error> {
69 let mut table = RouterTable::new();
70
71 {
73 let mut seen = std::collections::HashSet::new();
74 for port in &ports {
75 if !seen.insert(port.network_number) {
76 return Err(Error::Encoding(format!(
77 "Duplicate network number {} in router ports",
78 port.network_number
79 )));
80 }
81 }
82 }
83
84 for (idx, port) in ports.iter().enumerate() {
86 table.add_direct(port.network_number, idx);
87 }
88
89 let table = Arc::new(Mutex::new(table));
90 let (local_tx, local_rx) = mpsc::channel(256);
91
92 let mut port_receivers = Vec::new();
94 let mut send_txs: Vec<mpsc::Sender<SendRequest>> = Vec::new();
95 let mut sender_tasks = Vec::new();
96 let mut port_networks = Vec::new();
97 let mut port_local_macs = Vec::new();
98
99 for port in &mut ports {
100 let rx = port.transport.start().await?;
101 port_receivers.push(rx);
102 port_networks.push(port.network_number);
103 port_local_macs.push(MacAddr::from_slice(port.transport.local_mac()));
104 }
105
106 for port in ports {
108 let (send_tx, mut send_rx) = mpsc::channel::<SendRequest>(256);
109 send_txs.push(send_tx);
110
111 let transport = port.transport;
112 let task = tokio::spawn(async move {
113 while let Some(req) = send_rx.recv().await {
114 match req {
115 SendRequest::Unicast { npdu, mac } => {
116 if let Err(e) = transport.send_unicast(&npdu, &mac).await {
117 warn!(error = %e, "Router send_unicast failed");
118 }
119 }
120 SendRequest::Broadcast { npdu } => {
121 if let Err(e) = transport.send_broadcast(&npdu).await {
122 warn!(error = %e, "Router send_broadcast failed");
123 }
124 }
125 }
126 }
127 });
128 sender_tasks.push(task);
129 }
130
131 let send_txs = Arc::new(send_txs);
132
133 for (port_idx, tx) in send_txs.iter().enumerate() {
135 let other_networks: Vec<u16> = port_networks
136 .iter()
137 .enumerate()
138 .filter(|(idx, _)| *idx != port_idx)
139 .map(|(_, net)| *net)
140 .collect();
141
142 if other_networks.is_empty() {
143 continue;
144 }
145
146 let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
147 for net in &other_networks {
148 payload.put_u16(*net);
149 }
150
151 let payload_len = payload.len();
152 let response = Npdu {
153 is_network_message: true,
154 message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
155 payload: payload.freeze(),
156 ..Npdu::default()
157 };
158
159 let mut buf = BytesMut::with_capacity(8 + payload_len);
160 if let Err(e) = encode_npdu(&mut buf, &response) {
161 warn!("Failed to encode I-Am-Router NPDU: {e}");
162 continue;
163 }
164
165 if let Err(e) = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() }) {
166 warn!(%e, "Router dropped I-Am-Router announcement: output channel full");
167 }
168 }
169
170 let mut dispatch_tasks = Vec::new();
171
172 for (port_idx, mut rx) in port_receivers.into_iter().enumerate() {
173 let table = Arc::clone(&table);
174 let local_tx = local_tx.clone();
175 let send_txs = Arc::clone(&send_txs);
176 let port_network = port_networks[port_idx];
177 let local_mac = port_local_macs[port_idx].clone();
178
179 let task = tokio::spawn(async move {
180 while let Some(received) = rx.recv().await {
181 match decode_npdu(received.npdu.clone()) {
182 Ok(npdu) => {
183 if npdu.is_network_message {
184 let is_proprietary =
187 npdu.message_type.map(|t| t >= 0x80).unwrap_or(false);
188 let has_remote_dest = npdu
189 .destination
190 .as_ref()
191 .is_some_and(|d| d.network != 0xFFFF);
192 if is_proprietary && has_remote_dest {
193 } else {
195 handle_network_message(
196 &table,
197 &send_txs,
198 port_idx,
199 port_network,
200 &received.source_mac,
201 &npdu,
202 )
203 .await;
204 continue;
205 }
206 }
207
208 if let Some(ref dest) = npdu.destination {
209 let dest_net = dest.network;
210
211 if dest_net == 0xFFFF {
213 forward_broadcast(
214 &send_txs,
215 port_idx,
216 port_network,
217 &received.source_mac,
218 &npdu,
219 );
220
221 let apdu = ReceivedApdu {
223 apdu: npdu.payload,
224 source_mac: received.source_mac,
225 source_network: npdu.source,
226 reply_tx: received.reply_tx,
227 };
228 let _ = local_tx.send(apdu).await;
229 continue;
230 }
231
232 let (route, reachability) = {
234 let mut tbl = table.lock().await;
235 let route = tbl.lookup(dest_net).cloned();
236 let reachability = tbl.effective_reachability(dest_net);
237 if route.is_some() {
238 tbl.touch(dest_net);
239 }
240 (route, reachability)
241 };
242
243 if let Some(route) = route {
244 match reachability.unwrap_or(ReachabilityStatus::Reachable) {
246 ReachabilityStatus::Busy => {
247 send_reject(
248 &send_txs[port_idx],
249 &received.source_mac,
250 dest_net,
251 RejectMessageReason::ROUTER_BUSY,
252 );
253 continue;
254 }
255 ReachabilityStatus::Unreachable => {
256 send_reject(
257 &send_txs[port_idx],
258 &received.source_mac,
259 dest_net,
260 RejectMessageReason::NOT_DIRECTLY_CONNECTED,
261 );
262 continue;
263 }
264 ReachabilityStatus::Reachable => {}
265 }
266 if route.port_index == port_idx && route.directly_connected {
267 let dest_mac = npdu
268 .destination
269 .as_ref()
270 .map(|d| &d.mac_address[..])
271 .unwrap_or(&[]);
272 if dest_mac == &local_mac[..] {
273 let apdu = ReceivedApdu {
275 apdu: npdu.payload,
276 source_mac: received.source_mac,
277 source_network: npdu.source,
278 reply_tx: received.reply_tx,
279 };
280 let _ = local_tx.send(apdu).await;
281 } else {
282 if dest_mac.is_empty() {
285 let apdu = ReceivedApdu {
286 apdu: npdu.payload.clone(),
287 source_mac: received.source_mac.clone(),
288 source_network: npdu.source.clone(),
289 reply_tx: None,
290 };
291 let _ = local_tx.send(apdu).await;
292 }
293 forward_unicast(
294 &send_txs,
295 &route,
296 port_network,
297 &received.source_mac,
298 npdu,
299 port_idx,
300 );
301 }
302 } else {
303 forward_unicast(
304 &send_txs,
305 &route,
306 port_network,
307 &received.source_mac,
308 npdu,
309 port_idx,
310 );
311 }
312 } else {
313 send_reject(
315 &send_txs[port_idx],
316 &received.source_mac,
317 dest_net,
318 RejectMessageReason::NOT_DIRECTLY_CONNECTED,
319 );
320 }
321 } else {
322 let apdu = ReceivedApdu {
323 apdu: npdu.payload,
324 source_mac: received.source_mac,
325 source_network: npdu.source,
326 reply_tx: received.reply_tx,
327 };
328 let _ = local_tx.send(apdu).await;
329 }
330 }
331 Err(e) => {
332 warn!(error = %e, port = port_idx, "Router decode failed");
333 }
334 }
335 }
336 });
337
338 dispatch_tasks.push(task);
339 }
340
341 let aging_table = Arc::clone(&table);
343 let aging_task = tokio::spawn(async move {
344 let mut interval = tokio::time::interval(Duration::from_secs(60));
345 let max_age = Duration::from_secs(300); loop {
347 interval.tick().await;
348 let mut tbl = aging_table.lock().await;
349 let purged = tbl.purge_stale(max_age);
350 tbl.clear_expired_busy();
351 drop(tbl);
352 for net in purged {
353 debug!(network = net, "Purged stale route");
354 }
355 }
356 });
357
358 Ok((
359 Self {
360 table,
361 dispatch_tasks,
362 sender_tasks,
363 aging_task: Some(aging_task),
364 },
365 local_rx,
366 ))
367 }
368
369 pub fn table(&self) -> &Arc<Mutex<RouterTable>> {
371 &self.table
372 }
373
374 pub async fn stop(&mut self) {
376 for task in self.dispatch_tasks.drain(..) {
377 task.abort();
378 let _ = task.await;
379 }
380 for task in self.sender_tasks.drain(..) {
381 task.abort();
382 let _ = task.await;
383 }
384 if let Some(task) = self.aging_task.take() {
385 task.abort();
386 let _ = task.await;
387 }
388 }
389}
390
391fn build_source(npdu: &Npdu, source_network: u16, source_mac: &[u8]) -> NpduAddress {
393 npdu.source.clone().unwrap_or(NpduAddress {
394 network: source_network,
395 mac_address: MacAddr::from_slice(source_mac),
396 })
397}
398
399fn forward_unicast(
401 send_txs: &[mpsc::Sender<SendRequest>],
402 route: &crate::router_table::RouteEntry,
403 source_network: u16,
404 source_mac: &[u8],
405 npdu: Npdu,
406 _source_port_idx: usize,
407) {
408 if npdu.hop_count == 0 {
409 warn!("Discarding NPDU with hop_count=0");
410 return;
411 }
412
413 let payload_len = npdu.payload.len();
414 let source = build_source(&npdu, source_network, source_mac);
415 let dest_mac;
416 let forwarded_dest;
417 let forwarded_hop_count;
418
419 if route.directly_connected {
420 dest_mac = npdu
422 .destination
423 .as_ref()
424 .map(|d| d.mac_address.clone())
425 .unwrap_or_default();
426 forwarded_dest = None;
427 forwarded_hop_count = 0; } else {
429 dest_mac = route.next_hop_mac.clone();
430 forwarded_dest = npdu.destination;
431 forwarded_hop_count = npdu.hop_count - 1;
432 };
433
434 let forwarded = Npdu {
435 is_network_message: npdu.is_network_message,
436 expecting_reply: npdu.expecting_reply,
437 priority: npdu.priority,
438 destination: forwarded_dest,
439 source: Some(source),
440 hop_count: forwarded_hop_count,
441 message_type: None,
442 vendor_id: None,
443 payload: npdu.payload,
444 };
445
446 let mut buf = BytesMut::with_capacity(32 + payload_len);
447 if let Err(e) = encode_npdu(&mut buf, &forwarded) {
448 warn!("Failed to encode forwarded NPDU: {e}");
449 return;
450 }
451
452 if route.port_index >= send_txs.len() {
453 warn!(
454 port = route.port_index,
455 "Route references invalid port index"
456 );
457 return;
458 }
459 if dest_mac.is_empty() {
460 if let Err(e) =
461 send_txs[route.port_index].try_send(SendRequest::Broadcast { npdu: buf.freeze() })
462 {
463 warn!(%e, "Router dropped message: output channel full");
464 }
465 } else if let Err(e) = send_txs[route.port_index].try_send(SendRequest::Unicast {
466 npdu: buf.freeze(),
467 mac: dest_mac,
468 }) {
469 warn!(%e, "Router dropped message: output channel full");
470 }
471}
472
473fn forward_broadcast(
475 send_txs: &[mpsc::Sender<SendRequest>],
476 source_port: usize,
477 source_network: u16,
478 source_mac: &[u8],
479 npdu: &Npdu,
480) {
481 if npdu.hop_count == 0 {
482 warn!("Discarding NPDU with hop_count=0");
483 return;
484 }
485
486 let forwarded = Npdu {
487 is_network_message: npdu.is_network_message,
488 expecting_reply: npdu.expecting_reply,
489 priority: npdu.priority,
490 destination: npdu.destination.clone(),
491 source: Some(build_source(npdu, source_network, source_mac)),
492 hop_count: npdu.hop_count - 1,
493 message_type: npdu.message_type,
494 vendor_id: npdu.vendor_id,
495 payload: npdu.payload.clone(),
496 };
497
498 let mut buf = BytesMut::with_capacity(32 + npdu.payload.len());
499 if let Err(e) = encode_npdu(&mut buf, &forwarded) {
500 warn!("Failed to encode forwarded broadcast NPDU: {e}");
501 return;
502 }
503
504 let encoded = buf.freeze();
505 for (idx, tx) in send_txs.iter().enumerate() {
506 if idx == source_port {
507 continue;
508 }
509 if let Err(e) = tx.try_send(SendRequest::Broadcast {
510 npdu: encoded.clone(),
511 }) {
512 warn!(%e, "Router dropped broadcast: output channel full");
513 }
514 }
515}
516
517async fn handle_network_message(
519 table: &Arc<Mutex<RouterTable>>,
520 send_txs: &[mpsc::Sender<SendRequest>],
521 port_idx: usize,
522 port_network: u16,
523 source_mac: &[u8],
524 npdu: &Npdu,
525) {
526 const MAX_LEARNED_ROUTES: usize = 256;
527
528 let msg_type = match npdu.message_type {
529 Some(t) => t,
530 None => return,
531 };
532
533 if msg_type == NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw() {
534 let table = table.lock().await;
535
536 let requested_network = if npdu.payload.len() >= 2 {
537 Some(u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]))
538 } else {
539 None
540 };
541
542 let networks: Vec<u16> = if let Some(net) = requested_network {
543 match table.lookup(net) {
545 Some(entry) if entry.port_index != port_idx => vec![net],
546 _ => {
547 drop(table);
549 let forward = Npdu {
550 is_network_message: true,
551 message_type: Some(NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw()),
552 payload: npdu.payload.clone(),
553 ..Npdu::default()
554 };
555 let mut fwd_buf = BytesMut::with_capacity(8);
556 if let Ok(()) = encode_npdu(&mut fwd_buf, &forward) {
557 let frozen = fwd_buf.freeze();
558 for (i, tx) in send_txs.iter().enumerate() {
559 if i != port_idx {
560 let _ = tx.try_send(SendRequest::Broadcast {
561 npdu: frozen.clone(),
562 });
563 }
564 }
565 }
566 return;
567 }
568 }
569 } else {
570 table.networks_not_on_port(port_idx)
571 };
572
573 if networks.is_empty() {
574 return;
575 }
576
577 let mut payload = BytesMut::with_capacity(networks.len() * 2);
578 for net in &networks {
579 payload.put_u16(*net);
580 }
581
582 let payload_len = payload.len();
583 let response = Npdu {
584 is_network_message: true,
585 message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
586 payload: payload.freeze(),
587 ..Npdu::default()
588 };
589
590 let mut buf = BytesMut::with_capacity(4 + payload_len);
591 if let Err(e) = encode_npdu(&mut buf, &response) {
592 warn!("Failed to encode I-Am-Router response NPDU: {e}");
593 return;
594 }
595
596 if let Err(e) = send_txs[port_idx].try_send(SendRequest::Broadcast { npdu: buf.freeze() }) {
598 warn!(%e, "Router dropped I-Am-Router response: output channel full");
599 }
600 } else if msg_type == NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw() {
601 let data = &npdu.payload;
602 let mut offset = 0;
603 let mut table = table.lock().await;
604
605 while offset + 2 <= data.len() {
606 let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
607 offset += 2;
608
609 if table.len() >= MAX_LEARNED_ROUTES && table.lookup(net).is_none() {
610 warn!("Router table learned routes cap ({MAX_LEARNED_ROUTES}) reached, ignoring further networks");
611 break;
612 }
613
614 if table.add_learned_with_flap_detection(net, port_idx, MacAddr::from_slice(source_mac))
615 {
616 debug!(
617 network = net,
618 port = port_idx,
619 "Learned route from I-Am-Router-To-Network"
620 );
621 }
622 }
623 drop(table);
624
625 if !npdu.payload.is_empty() {
627 let rebroadcast = Npdu {
628 is_network_message: true,
629 message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
630 payload: npdu.payload.clone(),
631 ..Npdu::default()
632 };
633 let mut buf = BytesMut::with_capacity(4 + npdu.payload.len());
634 if let Ok(()) = encode_npdu(&mut buf, &rebroadcast) {
635 let frozen = buf.freeze();
636 for (i, tx) in send_txs.iter().enumerate() {
637 if i != port_idx {
638 let _ = tx.try_send(SendRequest::Broadcast {
639 npdu: frozen.clone(),
640 });
641 }
642 }
643 }
644 }
645 } else if msg_type == NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw() {
646 if npdu.payload.len() >= 3 {
647 let reason = npdu.payload[0];
648 let rejected_net = u16::from_be_bytes([npdu.payload[1], npdu.payload[2]]);
649 warn!(
650 network = rejected_net,
651 reason = reason,
652 "Received Reject-Message-To-Network"
653 );
654 {
655 let mut tbl = table.lock().await;
656 if let Some(entry) = tbl.lookup(rejected_net) {
657 if !entry.directly_connected {
658 match reason {
659 1 => tbl.mark_unreachable(rejected_net),
660 2 => tbl
661 .mark_busy(rejected_net, Instant::now() + Duration::from_secs(30)),
662 _ => {
663 tbl.remove(rejected_net);
664 }
665 }
666 }
667 }
668 }
669
670 if let Some(ref source) = npdu.source {
672 let tbl = table.lock().await;
673 if let Some(route) = tbl.lookup(source.network) {
674 let dest_port = route.port_index;
675 let dest_mac = if route.directly_connected {
676 source.mac_address.clone()
677 } else {
678 route.next_hop_mac.clone()
679 };
680 drop(tbl);
681
682 let forwarded = Npdu {
683 is_network_message: true,
684 message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
685 destination: Some(NpduAddress {
686 network: source.network,
687 mac_address: source.mac_address.clone(),
688 }),
689 hop_count: 255,
690 payload: npdu.payload.clone(),
691 ..Npdu::default()
692 };
693 let mut buf = BytesMut::with_capacity(32);
694 if let Ok(()) = encode_npdu(&mut buf, &forwarded) {
695 if dest_mac.is_empty() {
696 let _ = send_txs[dest_port]
697 .try_send(SendRequest::Broadcast { npdu: buf.freeze() });
698 } else {
699 let _ = send_txs[dest_port].try_send(SendRequest::Unicast {
700 npdu: buf.freeze(),
701 mac: dest_mac,
702 });
703 }
704 }
705 }
706 }
707 }
708 } else if msg_type == NetworkMessageType::ROUTER_BUSY_TO_NETWORK.to_raw() {
709 let data = &npdu.payload;
710 let mut offset = 0;
711 let deadline = Instant::now() + Duration::from_secs(30);
712 let mut tbl = table.lock().await;
713 while offset + 2 <= data.len() {
714 let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
715 offset += 2;
716 tbl.mark_busy(net, deadline);
717 debug!(
718 network = net,
719 "Router busy — marked network as congested (30s timer)"
720 );
721 }
722 drop(tbl);
723 let rebroadcast = Npdu {
725 is_network_message: true,
726 message_type: Some(NetworkMessageType::ROUTER_BUSY_TO_NETWORK.to_raw()),
727 payload: npdu.payload.clone(),
728 ..Npdu::default()
729 };
730 let mut buf = BytesMut::with_capacity(4 + npdu.payload.len());
731 if let Ok(()) = encode_npdu(&mut buf, &rebroadcast) {
732 let frozen = buf.freeze();
733 for (i, tx) in send_txs.iter().enumerate() {
734 if i != port_idx {
735 let _ = tx.try_send(SendRequest::Broadcast {
736 npdu: frozen.clone(),
737 });
738 }
739 }
740 }
741 } else if msg_type == NetworkMessageType::ROUTER_AVAILABLE_TO_NETWORK.to_raw() {
742 let data = &npdu.payload;
743 let mut offset = 0;
744 let mut tbl = table.lock().await;
745 while offset + 2 <= data.len() {
746 let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
747 offset += 2;
748 tbl.mark_available(net);
749 debug!(network = net, "Router available — cleared congestion");
750 }
751 drop(tbl);
752 let rebroadcast = Npdu {
754 is_network_message: true,
755 message_type: Some(NetworkMessageType::ROUTER_AVAILABLE_TO_NETWORK.to_raw()),
756 payload: npdu.payload.clone(),
757 ..Npdu::default()
758 };
759 let mut buf = BytesMut::with_capacity(4 + npdu.payload.len());
760 if let Ok(()) = encode_npdu(&mut buf, &rebroadcast) {
761 let frozen = buf.freeze();
762 for (i, tx) in send_txs.iter().enumerate() {
763 if i != port_idx {
764 let _ = tx.try_send(SendRequest::Broadcast {
765 npdu: frozen.clone(),
766 });
767 }
768 }
769 }
770 } else if msg_type == NetworkMessageType::INITIALIZE_ROUTING_TABLE.to_raw() {
771 let data = &npdu.payload;
772 let count = if data.is_empty() { 0 } else { data[0] as usize };
773
774 let is_query = count == 0;
775
776 if !is_query {
777 let mut offset = 1usize;
778 let mut tbl = table.lock().await;
779 for _ in 0..count {
780 if offset + 4 > data.len() {
781 break;
782 }
783 let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
784 let info_len = data[offset + 3] as usize;
786 if offset + 4 + info_len > data.len() {
787 break;
788 }
789 offset += 4 + info_len;
790
791 if net == 0 || net == 0xFFFF {
792 continue;
793 }
794 if tbl.lookup(net).is_some() {
795 continue; }
797 if tbl.len() >= MAX_LEARNED_ROUTES {
798 warn!("Init-Routing-Table: route cap reached, ignoring further entries");
799 break;
800 }
801 tbl.add_learned(net, port_idx, MacAddr::from_slice(source_mac));
802 debug!(
803 network = net,
804 port = port_idx,
805 "Learned route from Init-Routing-Table"
806 );
807 }
808 }
809
810 let mut payload = BytesMut::new();
811 if is_query {
812 let tbl = table.lock().await;
813 let networks = tbl.networks();
814 let count = networks.len().min(255);
815 payload.put_u8(count as u8);
816 for net in networks.iter().take(count) {
817 if let Some(route) = tbl.lookup(*net) {
818 payload.put_u16(*net);
819 payload.put_u8(route.port_index as u8); payload.put_u8(0); }
822 }
823 } else {
824 payload.put_u8(0);
825 }
826
827 let payload_len = payload.len();
828 let response = Npdu {
829 is_network_message: true,
830 message_type: Some(NetworkMessageType::INITIALIZE_ROUTING_TABLE_ACK.to_raw()),
831 payload: payload.freeze(),
832 ..Npdu::default()
833 };
834
835 let mut buf = BytesMut::with_capacity(8 + payload_len);
836 if let Err(e) = encode_npdu(&mut buf, &response) {
837 warn!("Failed to encode Init-Routing-Table-ACK NPDU: {e}");
838 return;
839 }
840
841 if let Err(e) = send_txs[port_idx].try_send(SendRequest::Unicast {
842 npdu: buf.freeze(),
843 mac: MacAddr::from_slice(source_mac),
844 }) {
845 warn!(%e, "Router dropped Init-Routing-Table-ACK: output channel full");
846 }
847 } else if msg_type == NetworkMessageType::I_COULD_BE_ROUTER_TO_NETWORK.to_raw() {
848 if npdu.payload.len() >= 3 {
849 let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
850 let performance_index = npdu.payload[2];
851 debug!(
852 network = net,
853 performance_index = performance_index,
854 port = port_idx,
855 "Received I-Could-Be-Router-To-Network"
856 );
857 let mut tbl = table.lock().await;
859 if tbl.lookup(net).is_none() {
860 tbl.add_learned(net, port_idx, MacAddr::from_slice(source_mac));
861 debug!(
862 network = net,
863 port = port_idx,
864 "Stored potential route from I-Could-Be-Router-To-Network"
865 );
866 }
867 }
868 } else if msg_type == NetworkMessageType::ESTABLISH_CONNECTION_TO_NETWORK.to_raw() {
869 if npdu.payload.len() >= 3 {
870 let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
871 let termination_time_min = npdu.payload[2];
872 tracing::info!(
873 network = net,
874 termination_time_minutes = termination_time_min,
875 "Received Establish-Connection-To-Network (PTP not implemented)"
876 );
877 }
878 } else if msg_type == NetworkMessageType::DISCONNECT_CONNECTION_TO_NETWORK.to_raw() {
879 if npdu.payload.len() >= 2 {
880 let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
881 debug!(network = net, "Received Disconnect-Connection-To-Network");
882 let mut tbl = table.lock().await;
883 if let Some(entry) = tbl.lookup(net) {
884 if !entry.directly_connected {
885 tbl.remove(net);
886 debug!(
887 network = net,
888 "Removed dynamically established route on disconnect"
889 );
890 }
891 }
892 }
893 } else if msg_type == NetworkMessageType::WHAT_IS_NETWORK_NUMBER.to_raw() {
894 if npdu.source.is_some() || npdu.destination.is_some() {
896 return;
897 }
898 let mut payload = BytesMut::with_capacity(3);
899 payload.put_u16(port_network);
900 payload.put_u8(1); let response = Npdu {
903 is_network_message: true,
904 message_type: Some(NetworkMessageType::NETWORK_NUMBER_IS.to_raw()),
905 payload: payload.freeze(),
906 ..Npdu::default()
907 };
908
909 let mut buf = BytesMut::with_capacity(8);
910 if let Err(e) = encode_npdu(&mut buf, &response) {
911 warn!("Failed to encode Network-Number-Is NPDU: {e}");
912 return;
913 }
914
915 if let Err(e) = send_txs[port_idx].try_send(SendRequest::Broadcast { npdu: buf.freeze() }) {
916 warn!(%e, "Router dropped Network-Number-Is: output channel full");
917 }
918 } else if msg_type == NetworkMessageType::NETWORK_NUMBER_IS.to_raw() {
919 if npdu.payload.len() >= 3 {
921 let net = u16::from_be_bytes([npdu.payload[0], npdu.payload[1]]);
922 let configured = npdu.payload[2];
923 if net != port_network {
924 if configured == 1 {
925 warn!(
926 local_network = port_network,
927 peer_network = net,
928 "Network number conflict: port configured as {} but peer reports {} (configured)",
929 port_network, net
930 );
931 } else {
932 debug!(
933 local_network = port_network,
934 peer_network = net,
935 "Network-Number-Is from peer (learned, differs from local)"
936 );
937 }
938 }
939 }
940 } else if msg_type == NetworkMessageType::INITIALIZE_ROUTING_TABLE_ACK.to_raw() {
941 let data = &npdu.payload;
943 if data.is_empty() {
944 return;
945 }
946 let count = data[0] as usize;
947 let mut offset = 1usize;
948 let mut table = table.lock().await;
949 for _ in 0..count {
950 if offset + 4 > data.len() {
951 break;
952 }
953 let net = u16::from_be_bytes([data[offset], data[offset + 1]]);
954 let info_len = data[offset + 3] as usize;
955 if offset + 4 + info_len > data.len() {
956 break;
957 }
958 offset += 4 + info_len;
959 if net == 0 || net == 0xFFFF {
960 continue;
961 }
962 if table.len() >= MAX_LEARNED_ROUTES {
963 break;
964 }
965 table.add_learned_with_flap_detection(net, port_idx, MacAddr::from_slice(source_mac));
966 debug!(
967 network = net,
968 port = port_idx,
969 "Learned route from Init-Routing-Table-Ack"
970 );
971 }
972 } else if (0x0A..=0x11).contains(&msg_type) {
973 debug!(
975 message_type = msg_type,
976 "Router received security network message (not implemented)"
977 );
978 } else {
979 debug!(
981 message_type = msg_type,
982 "Router rejecting unknown network message type"
983 );
984 send_reject(
985 &send_txs[port_idx],
986 source_mac,
987 0,
988 RejectMessageReason::UNKNOWN_MESSAGE_TYPE,
989 );
990 }
991}
992
993fn send_reject(
995 send_tx: &mpsc::Sender<SendRequest>,
996 source_mac: &[u8],
997 rejected_network: u16,
998 reason: RejectMessageReason,
999) {
1000 let mut payload = BytesMut::with_capacity(3);
1001 payload.put_u8(reason.to_raw());
1002 payload.put_u16(rejected_network);
1003
1004 let reject = Npdu {
1005 is_network_message: true,
1006 message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
1007 payload: payload.freeze(),
1008 ..Npdu::default()
1009 };
1010
1011 let mut buf = BytesMut::with_capacity(8);
1012 if let Err(e) = encode_npdu(&mut buf, &reject) {
1013 warn!("Failed to encode Reject-Message NPDU: {e}");
1014 return;
1015 }
1016
1017 if let Err(e) = send_tx.try_send(SendRequest::Unicast {
1018 npdu: buf.freeze(),
1019 mac: MacAddr::from_slice(source_mac),
1020 }) {
1021 warn!(%e, "Router dropped reject message: output channel full");
1022 }
1023}
1024
1025#[cfg(test)]
1026mod tests {
1027 use super::*;
1028 use bacnet_transport::bip::BipTransport;
1029 use std::net::Ipv4Addr;
1030 use tokio::time::Duration;
1031
1032 #[tokio::test]
1033 async fn router_forwards_between_networks() {
1034 let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1035 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1036
1037 let mut device_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1038 let mut device_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1039
1040 let _rx_b = device_b.start().await.unwrap();
1041 let _rx_a = device_a.start().await.unwrap();
1042
1043 let port_a = RouterPort {
1044 transport: transport_a,
1045 network_number: 1000,
1046 };
1047 let port_b = RouterPort {
1048 transport: transport_b,
1049 network_number: 2000,
1050 };
1051
1052 let (mut router, _local_rx) = BACnetRouter::start(vec![port_a, port_b]).await.unwrap();
1053
1054 tokio::time::sleep(Duration::from_millis(50)).await;
1055
1056 let apdu = vec![0x10, 0x08];
1057 let npdu = Npdu {
1058 is_network_message: false,
1059 expecting_reply: false,
1060 priority: bacnet_types::enums::NetworkPriority::NORMAL,
1061 destination: Some(NpduAddress {
1062 network: 2000,
1063 mac_address: MacAddr::from_slice(device_b.local_mac()),
1064 }),
1065 source: None,
1066 hop_count: 255,
1067 payload: Bytes::copy_from_slice(&apdu),
1068 ..Npdu::default()
1069 };
1070
1071 let mut buf = BytesMut::new();
1072 encode_npdu(&mut buf, &npdu).unwrap();
1073
1074 let table = router.table().lock().await;
1075 assert_eq!(table.len(), 2);
1076 assert!(table.lookup(1000).unwrap().directly_connected);
1077 assert!(table.lookup(2000).unwrap().directly_connected);
1078 drop(table);
1079
1080 router.stop().await;
1081 }
1082
1083 #[tokio::test]
1084 async fn router_table_populated_on_start() {
1085 let transport_a = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1086 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1087 let transport_c = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1088
1089 let ports = vec![
1090 RouterPort {
1091 transport: transport_a,
1092 network_number: 100,
1093 },
1094 RouterPort {
1095 transport: transport_b,
1096 network_number: 200,
1097 },
1098 RouterPort {
1099 transport: transport_c,
1100 network_number: 300,
1101 },
1102 ];
1103
1104 let (mut router, _local_rx) = BACnetRouter::start(ports).await.unwrap();
1105
1106 let table = router.table().lock().await;
1107 assert_eq!(table.len(), 3);
1108 assert_eq!(table.lookup(100).unwrap().port_index, 0);
1109 assert_eq!(table.lookup(200).unwrap().port_index, 1);
1110 assert_eq!(table.lookup(300).unwrap().port_index, 2);
1111 drop(table);
1112
1113 router.stop().await;
1114 }
1115
1116 #[tokio::test]
1117 async fn local_message_delivered_to_application() {
1118 let transport = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1119 let mut sender = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1120 let _sender_rx = sender.start().await.unwrap();
1121
1122 let router_port = RouterPort {
1123 transport,
1124 network_number: 1000,
1125 };
1126
1127 let (mut router, _local_rx) = BACnetRouter::start(vec![router_port]).await.unwrap();
1128
1129 tokio::time::sleep(Duration::from_millis(50)).await;
1130
1131 router.stop().await;
1132 }
1133
1134 #[test]
1135 fn forward_unicast_drops_hop_count_zero() {
1136 let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1137 let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1138 let send_txs = vec![tx_a, tx_b];
1139
1140 let route = crate::router_table::RouteEntry {
1141 port_index: 1,
1142 directly_connected: true,
1143 next_hop_mac: MacAddr::new(),
1144 last_seen: None,
1145 reachability: crate::router_table::ReachabilityStatus::Reachable,
1146 busy_until: None,
1147 flap_count: 0,
1148 last_port_change: None,
1149 };
1150
1151 let npdu = Npdu {
1152 is_network_message: false,
1153 expecting_reply: false,
1154 priority: bacnet_types::enums::NetworkPriority::NORMAL,
1155 destination: Some(NpduAddress {
1156 network: 2000,
1157 mac_address: MacAddr::from_slice(&[0x01, 0x02]),
1158 }),
1159 source: None,
1160 hop_count: 0, payload: Bytes::from_static(&[0x10, 0x08]),
1162 ..Npdu::default()
1163 };
1164
1165 forward_unicast(&send_txs, &route, 1000, &[0x0A], npdu, 0);
1166
1167 assert!(rx_a.try_recv().is_err());
1168 assert!(rx_b.try_recv().is_err());
1169 }
1170
1171 #[test]
1172 fn forward_broadcast_drops_hop_count_zero() {
1173 let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1174 let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1175 let send_txs = vec![tx_a, tx_b];
1176
1177 let npdu = Npdu {
1178 is_network_message: false,
1179 expecting_reply: false,
1180 priority: bacnet_types::enums::NetworkPriority::NORMAL,
1181 destination: Some(NpduAddress {
1182 network: 0xFFFF,
1183 mac_address: MacAddr::new(),
1184 }),
1185 source: None,
1186 hop_count: 0, payload: Bytes::from_static(&[0x10, 0x08]),
1188 ..Npdu::default()
1189 };
1190
1191 forward_broadcast(&send_txs, 0, 1000, &[0x0A], &npdu);
1192
1193 assert!(rx_a.try_recv().is_err());
1194 assert!(rx_b.try_recv().is_err());
1195 }
1196
1197 #[test]
1198 fn forward_unicast_decrements_hop_count() {
1199 let (tx_a, _rx_a) = mpsc::channel::<SendRequest>(256);
1200 let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1201 let send_txs = vec![tx_a, tx_b];
1202
1203 let route = crate::router_table::RouteEntry {
1204 port_index: 1,
1205 directly_connected: true,
1206 next_hop_mac: MacAddr::new(),
1207 last_seen: None,
1208 reachability: crate::router_table::ReachabilityStatus::Reachable,
1209 busy_until: None,
1210 flap_count: 0,
1211 last_port_change: None,
1212 };
1213
1214 let npdu = Npdu {
1215 is_network_message: false,
1216 expecting_reply: false,
1217 priority: bacnet_types::enums::NetworkPriority::NORMAL,
1218 destination: Some(NpduAddress {
1219 network: 2000,
1220 mac_address: MacAddr::from_slice(&[0x01, 0x02]),
1221 }),
1222 source: None,
1223 hop_count: 10,
1224 payload: Bytes::from_static(&[0x10, 0x08]),
1225 ..Npdu::default()
1226 };
1227
1228 forward_unicast(&send_txs, &route, 1000, &[0x0A], npdu, 0);
1229
1230 let sent = rx_b.try_recv().unwrap();
1231 match sent {
1232 SendRequest::Unicast { npdu: data, .. } => {
1233 let decoded = decode_npdu(data.clone()).unwrap();
1234 assert!(decoded.destination.is_none());
1235 assert!(decoded.source.is_some());
1236 }
1237 SendRequest::Broadcast { npdu: data } => {
1238 let decoded = decode_npdu(data.clone()).unwrap();
1239 assert!(decoded.destination.is_none());
1240 }
1241 }
1242 }
1243
1244 #[test]
1245 fn send_reject_generates_reject_message() {
1246 let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1247
1248 let source_mac = vec![0x0A, 0x00, 0x01, 0x01];
1249 let unknown_network: u16 = 9999;
1250
1251 send_reject(
1252 &tx,
1253 &source_mac,
1254 unknown_network,
1255 RejectMessageReason::NOT_DIRECTLY_CONNECTED,
1256 );
1257
1258 let sent = rx.try_recv().unwrap();
1259 match sent {
1260 SendRequest::Unicast { npdu: data, mac } => {
1261 assert_eq!(mac.as_slice(), &source_mac[..]);
1262 let decoded = decode_npdu(data.clone()).unwrap();
1263 assert!(decoded.is_network_message);
1264 assert_eq!(
1265 decoded.message_type,
1266 Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw())
1267 );
1268 assert_eq!(decoded.payload.len(), 3);
1269 assert_eq!(
1270 decoded.payload[0],
1271 RejectMessageReason::NOT_DIRECTLY_CONNECTED.to_raw()
1272 );
1273 let rejected_net = u16::from_be_bytes([decoded.payload[1], decoded.payload[2]]);
1274 assert_eq!(rejected_net, 9999);
1275 }
1276 _ => panic!("Expected Unicast send for reject message"),
1277 }
1278 }
1279
1280 #[tokio::test]
1281 async fn single_port_router_no_i_am_router_announcement() {
1282 let (send_tx, mut send_rx) = mpsc::channel::<SendRequest>(256);
1283
1284 let port_networks: Vec<u16> = vec![1000];
1285 let send_txs = [send_tx];
1286
1287 for (port_idx, tx) in send_txs.iter().enumerate() {
1288 let other_networks: Vec<u16> = port_networks
1289 .iter()
1290 .enumerate()
1291 .filter(|(idx, _)| *idx != port_idx)
1292 .map(|(_, net)| *net)
1293 .collect();
1294
1295 if other_networks.is_empty() {
1296 continue;
1297 }
1298
1299 let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
1300 for net in &other_networks {
1301 payload.put_u16(*net);
1302 }
1303
1304 let payload_len = payload.len();
1305 let response = Npdu {
1306 is_network_message: true,
1307 message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
1308 payload: payload.freeze(),
1309 ..Npdu::default()
1310 };
1311
1312 let mut buf = BytesMut::with_capacity(8 + payload_len);
1313 encode_npdu(&mut buf, &response).unwrap();
1314
1315 let _ = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() });
1316 }
1317
1318 assert!(send_rx.try_recv().is_err());
1319 }
1320
1321 #[tokio::test]
1322 async fn two_port_router_sends_i_am_router_announcement() {
1323 let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1324 let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1325
1326 let port_networks: Vec<u16> = vec![1000, 2000];
1327 let send_txs = [tx_a, tx_b];
1328
1329 for (port_idx, tx) in send_txs.iter().enumerate() {
1330 let other_networks: Vec<u16> = port_networks
1331 .iter()
1332 .enumerate()
1333 .filter(|(idx, _)| *idx != port_idx)
1334 .map(|(_, net)| *net)
1335 .collect();
1336
1337 if other_networks.is_empty() {
1338 continue;
1339 }
1340
1341 let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
1342 for net in &other_networks {
1343 payload.put_u16(*net);
1344 }
1345
1346 let payload_len = payload.len();
1347 let response = Npdu {
1348 is_network_message: true,
1349 message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
1350 payload: payload.freeze(),
1351 ..Npdu::default()
1352 };
1353
1354 let mut buf = BytesMut::with_capacity(8 + payload_len);
1355 encode_npdu(&mut buf, &response).unwrap();
1356
1357 let _ = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() });
1358 }
1359
1360 let sent_a = rx_a.try_recv().unwrap();
1361 match sent_a {
1362 SendRequest::Broadcast { npdu: data } => {
1363 let decoded = decode_npdu(data.clone()).unwrap();
1364 assert!(decoded.is_network_message);
1365 assert_eq!(
1366 decoded.message_type,
1367 Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw())
1368 );
1369 assert_eq!(decoded.payload.len(), 2);
1370 let net = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1371 assert_eq!(net, 2000);
1372 }
1373 _ => panic!("Expected Broadcast for I-Am-Router announcement on port A"),
1374 }
1375
1376 let sent_b = rx_b.try_recv().unwrap();
1377 match sent_b {
1378 SendRequest::Broadcast { npdu: data } => {
1379 let decoded = decode_npdu(data.clone()).unwrap();
1380 assert!(decoded.is_network_message);
1381 assert_eq!(
1382 decoded.message_type,
1383 Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw())
1384 );
1385 assert_eq!(decoded.payload.len(), 2);
1386 let net = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1387 assert_eq!(net, 1000);
1388 }
1389 _ => panic!("Expected Broadcast for I-Am-Router announcement on port B"),
1390 }
1391 }
1392
1393 #[tokio::test]
1394 async fn three_port_router_announces_multiple_networks() {
1395 let (tx_a, mut rx_a) = mpsc::channel::<SendRequest>(256);
1396 let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1397 let (tx_c, mut rx_c) = mpsc::channel::<SendRequest>(256);
1398
1399 let port_networks: Vec<u16> = vec![100, 200, 300];
1400 let send_txs = [tx_a, tx_b, tx_c];
1401
1402 for (port_idx, tx) in send_txs.iter().enumerate() {
1403 let other_networks: Vec<u16> = port_networks
1404 .iter()
1405 .enumerate()
1406 .filter(|(idx, _)| *idx != port_idx)
1407 .map(|(_, net)| *net)
1408 .collect();
1409
1410 if other_networks.is_empty() {
1411 continue;
1412 }
1413
1414 let mut payload = BytesMut::with_capacity(other_networks.len() * 2);
1415 for net in &other_networks {
1416 payload.put_u16(*net);
1417 }
1418
1419 let payload_len = payload.len();
1420 let response = Npdu {
1421 is_network_message: true,
1422 message_type: Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw()),
1423 payload: payload.freeze(),
1424 ..Npdu::default()
1425 };
1426
1427 let mut buf = BytesMut::with_capacity(8 + payload_len);
1428 encode_npdu(&mut buf, &response).unwrap();
1429
1430 let _ = tx.try_send(SendRequest::Broadcast { npdu: buf.freeze() });
1431 }
1432
1433 let sent_a = rx_a.try_recv().unwrap();
1434 match sent_a {
1435 SendRequest::Broadcast { npdu: data } => {
1436 let decoded = decode_npdu(data.clone()).unwrap();
1437 assert!(decoded.is_network_message);
1438 assert_eq!(decoded.payload.len(), 4); let net1 = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1440 let net2 = u16::from_be_bytes([decoded.payload[2], decoded.payload[3]]);
1441 assert_eq!(net1, 200);
1442 assert_eq!(net2, 300);
1443 }
1444 _ => panic!("Expected Broadcast on port A"),
1445 }
1446
1447 let sent_b = rx_b.try_recv().unwrap();
1448 match sent_b {
1449 SendRequest::Broadcast { npdu: data } => {
1450 let decoded = decode_npdu(data.clone()).unwrap();
1451 assert_eq!(decoded.payload.len(), 4);
1452 let net1 = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1453 let net2 = u16::from_be_bytes([decoded.payload[2], decoded.payload[3]]);
1454 assert_eq!(net1, 100);
1455 assert_eq!(net2, 300);
1456 }
1457 _ => panic!("Expected Broadcast on port B"),
1458 }
1459
1460 let sent_c = rx_c.try_recv().unwrap();
1461 match sent_c {
1462 SendRequest::Broadcast { npdu: data } => {
1463 let decoded = decode_npdu(data.clone()).unwrap();
1464 assert_eq!(decoded.payload.len(), 4);
1465 let net1 = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1466 let net2 = u16::from_be_bytes([decoded.payload[2], decoded.payload[3]]);
1467 assert_eq!(net1, 100);
1468 assert_eq!(net2, 200);
1469 }
1470 _ => panic!("Expected Broadcast on port C"),
1471 }
1472 }
1473
1474 #[test]
1475 fn forward_unicast_with_hop_count_one_still_forwards() {
1476 let (tx_a, _rx_a) = mpsc::channel::<SendRequest>(256);
1477 let (tx_b, mut rx_b) = mpsc::channel::<SendRequest>(256);
1478 let send_txs = vec![tx_a, tx_b];
1479
1480 let route = crate::router_table::RouteEntry {
1481 port_index: 1,
1482 directly_connected: true,
1483 next_hop_mac: MacAddr::new(),
1484 last_seen: None,
1485 reachability: crate::router_table::ReachabilityStatus::Reachable,
1486 busy_until: None,
1487 flap_count: 0,
1488 last_port_change: None,
1489 };
1490
1491 let npdu = Npdu {
1492 is_network_message: false,
1493 expecting_reply: false,
1494 priority: bacnet_types::enums::NetworkPriority::NORMAL,
1495 destination: Some(NpduAddress {
1496 network: 2000,
1497 mac_address: MacAddr::from_slice(&[0x01, 0x02]),
1498 }),
1499 source: None,
1500 hop_count: 1,
1501 payload: Bytes::from_static(&[0x10, 0x08]),
1502 ..Npdu::default()
1503 };
1504
1505 forward_unicast(&send_txs, &route, 1000, &[0x0A], npdu, 0);
1506
1507 let sent = rx_b.try_recv().unwrap();
1508 match sent {
1509 SendRequest::Unicast { npdu: data, .. } => {
1510 let decoded = decode_npdu(data.clone()).unwrap();
1511 assert!(decoded.destination.is_none());
1512 assert!(decoded.source.is_some());
1513 }
1514 SendRequest::Broadcast { npdu: data } => {
1515 let decoded = decode_npdu(data.clone()).unwrap();
1516 assert!(decoded.destination.is_none());
1517 }
1518 }
1519 }
1520
1521 #[tokio::test]
1522 async fn received_reject_removes_learned_route() {
1523 let mut table = RouterTable::new();
1524 table.add_direct(1000, 0);
1525 table.add_learned(3000, 0, MacAddr::from_slice(&[10, 0, 1, 1]));
1526 assert!(table.lookup(3000).is_some());
1527
1528 let table = Arc::new(Mutex::new(table));
1529
1530 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1531 let send_txs = vec![tx];
1532
1533 let mut payload = BytesMut::with_capacity(3);
1534 payload.put_u8(RejectMessageReason::OTHER.to_raw());
1535 payload.put_u16(3000);
1536
1537 let npdu = Npdu {
1538 is_network_message: true,
1539 message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
1540 payload: payload.freeze(),
1541 ..Npdu::default()
1542 };
1543
1544 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1545
1546 let tbl = table.lock().await;
1547 assert!(tbl.lookup(3000).is_none());
1548 assert!(tbl.lookup(1000).is_some());
1549 }
1550
1551 #[tokio::test]
1552 async fn received_reject_does_not_remove_direct_route() {
1553 let mut table = RouterTable::new();
1554 table.add_direct(1000, 0);
1555
1556 let table = Arc::new(Mutex::new(table));
1557
1558 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1559 let send_txs = vec![tx];
1560 let mut payload = BytesMut::with_capacity(3);
1561 payload.put_u8(RejectMessageReason::OTHER.to_raw());
1562 payload.put_u16(1000);
1563
1564 let npdu = Npdu {
1565 is_network_message: true,
1566 message_type: Some(NetworkMessageType::REJECT_MESSAGE_TO_NETWORK.to_raw()),
1567 payload: payload.freeze(),
1568 ..Npdu::default()
1569 };
1570
1571 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1572
1573 let tbl = table.lock().await;
1574 assert!(tbl.lookup(1000).is_some());
1575 }
1576
1577 #[tokio::test]
1578 async fn who_is_router_with_specific_network() {
1579 let mut table = RouterTable::new();
1580 table.add_direct(1000, 0);
1581 table.add_direct(2000, 1);
1582 table.add_direct(3000, 2);
1583
1584 let table = Arc::new(Mutex::new(table));
1585
1586 let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1587 let send_txs = vec![tx];
1588
1589 let mut req_payload = BytesMut::with_capacity(2);
1590 req_payload.put_u16(2000);
1591
1592 let npdu = Npdu {
1593 is_network_message: true,
1594 message_type: Some(NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw()),
1595 payload: req_payload.freeze(),
1596 ..Npdu::default()
1597 };
1598
1599 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1600
1601 let sent = rx.try_recv().unwrap();
1602 match sent {
1603 SendRequest::Broadcast { npdu: data } => {
1604 let decoded = decode_npdu(data.clone()).unwrap();
1605 assert!(decoded.is_network_message);
1606 assert_eq!(
1607 decoded.message_type,
1608 Some(NetworkMessageType::I_AM_ROUTER_TO_NETWORK.to_raw())
1609 );
1610 assert_eq!(decoded.payload.len(), 2);
1611 let net = u16::from_be_bytes([decoded.payload[0], decoded.payload[1]]);
1612 assert_eq!(net, 2000);
1613 }
1614 _ => panic!("Expected Broadcast response for I-Am-Router"),
1615 }
1616 }
1617
1618 #[tokio::test]
1619 async fn who_is_router_with_unknown_network_no_response() {
1620 let mut table = RouterTable::new();
1621 table.add_direct(1000, 0);
1622
1623 let table = Arc::new(Mutex::new(table));
1624
1625 let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1626 let send_txs = vec![tx];
1627
1628 let mut req_payload = BytesMut::with_capacity(2);
1629 req_payload.put_u16(9999);
1630
1631 let npdu = Npdu {
1632 is_network_message: true,
1633 message_type: Some(NetworkMessageType::WHO_IS_ROUTER_TO_NETWORK.to_raw()),
1634 payload: req_payload.freeze(),
1635 ..Npdu::default()
1636 };
1637
1638 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1639
1640 assert!(rx.try_recv().is_err());
1641 }
1642
1643 #[tokio::test]
1644 async fn initialize_routing_table_ack() {
1645 let mut table = RouterTable::new();
1646 table.add_direct(1000, 0);
1647 table.add_direct(2000, 1);
1648
1649 let table = Arc::new(Mutex::new(table));
1650
1651 let (tx, mut rx) = mpsc::channel::<SendRequest>(256);
1652 let send_txs = vec![tx];
1653
1654 let npdu = Npdu {
1655 is_network_message: true,
1656 message_type: Some(NetworkMessageType::INITIALIZE_ROUTING_TABLE.to_raw()),
1657 payload: Bytes::new(),
1658 ..Npdu::default()
1659 };
1660
1661 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1662
1663 let sent = rx.try_recv().unwrap();
1664 match sent {
1665 SendRequest::Unicast { npdu: data, mac } => {
1666 assert_eq!(mac.as_slice(), &[0x0A]);
1667 let decoded = decode_npdu(data.clone()).unwrap();
1668 assert!(decoded.is_network_message);
1669 assert_eq!(
1670 decoded.message_type,
1671 Some(NetworkMessageType::INITIALIZE_ROUTING_TABLE_ACK.to_raw())
1672 );
1673 assert_eq!(decoded.payload.len(), 9);
1674 assert_eq!(decoded.payload[0], 2);
1675 }
1676 _ => panic!("Expected Unicast response for Init-Routing-Table"),
1677 }
1678 }
1679
1680 #[tokio::test]
1681 async fn router_busy_does_not_crash() {
1682 let table = RouterTable::new();
1683 let table = Arc::new(Mutex::new(table));
1684
1685 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1686 let send_txs = vec![tx];
1687
1688 let mut payload = BytesMut::with_capacity(4);
1689 payload.put_u16(1000);
1690 payload.put_u16(2000);
1691
1692 let npdu = Npdu {
1693 is_network_message: true,
1694 message_type: Some(NetworkMessageType::ROUTER_BUSY_TO_NETWORK.to_raw()),
1695 payload: payload.freeze(),
1696 ..Npdu::default()
1697 };
1698
1699 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1700 }
1701
1702 #[tokio::test]
1703 async fn router_available_does_not_crash() {
1704 let table = RouterTable::new();
1705 let table = Arc::new(Mutex::new(table));
1706
1707 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1708 let send_txs = vec![tx];
1709
1710 let mut payload = BytesMut::with_capacity(4);
1711 payload.put_u16(1000);
1712 payload.put_u16(2000);
1713
1714 let npdu = Npdu {
1715 is_network_message: true,
1716 message_type: Some(NetworkMessageType::ROUTER_AVAILABLE_TO_NETWORK.to_raw()),
1717 payload: payload.freeze(),
1718 ..Npdu::default()
1719 };
1720
1721 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1722 }
1723
1724 #[tokio::test]
1725 async fn i_could_be_router_stores_potential_route() {
1726 let table = RouterTable::new();
1727 let table = Arc::new(Mutex::new(table));
1728
1729 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1730 let send_txs = vec![tx];
1731
1732 let mut payload = BytesMut::with_capacity(3);
1733 payload.put_u16(5000);
1734 payload.put_u8(50);
1735
1736 let npdu = Npdu {
1737 is_network_message: true,
1738 message_type: Some(NetworkMessageType::I_COULD_BE_ROUTER_TO_NETWORK.to_raw()),
1739 payload: payload.freeze(),
1740 ..Npdu::default()
1741 };
1742
1743 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A, 0x0B], &npdu).await;
1744
1745 let tbl = table.lock().await;
1746 let entry = tbl.lookup(5000).unwrap();
1747 assert!(!entry.directly_connected);
1748 assert_eq!(entry.port_index, 0);
1749 assert_eq!(entry.next_hop_mac.as_slice(), &[0x0A, 0x0B]);
1750 }
1751
1752 #[tokio::test]
1753 async fn i_could_be_router_does_not_overwrite_existing_route() {
1754 let mut table = RouterTable::new();
1755 table.add_direct(5000, 1);
1756 let table = Arc::new(Mutex::new(table));
1757
1758 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1759 let send_txs = vec![tx];
1760
1761 let mut payload = BytesMut::with_capacity(3);
1762 payload.put_u16(5000);
1763 payload.put_u8(50);
1764
1765 let npdu = Npdu {
1766 is_network_message: true,
1767 message_type: Some(NetworkMessageType::I_COULD_BE_ROUTER_TO_NETWORK.to_raw()),
1768 payload: payload.freeze(),
1769 ..Npdu::default()
1770 };
1771
1772 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1773
1774 let tbl = table.lock().await;
1775 let entry = tbl.lookup(5000).unwrap();
1776 assert!(entry.directly_connected);
1777 assert_eq!(entry.port_index, 1);
1778 }
1779
1780 #[tokio::test]
1781 async fn establish_connection_does_not_crash() {
1782 let table = RouterTable::new();
1783 let table = Arc::new(Mutex::new(table));
1784
1785 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1786 let send_txs = vec![tx];
1787
1788 let mut payload = BytesMut::with_capacity(3);
1789 payload.put_u16(6000);
1790 payload.put_u8(30);
1791
1792 let npdu = Npdu {
1793 is_network_message: true,
1794 message_type: Some(NetworkMessageType::ESTABLISH_CONNECTION_TO_NETWORK.to_raw()),
1795 payload: payload.freeze(),
1796 ..Npdu::default()
1797 };
1798
1799 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1800 }
1801
1802 #[tokio::test]
1803 async fn disconnect_removes_learned_route() {
1804 let mut table = RouterTable::new();
1805 table.add_learned(7000, 0, MacAddr::from_slice(&[10, 0, 1, 1]));
1806 let table = Arc::new(Mutex::new(table));
1807
1808 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1809 let send_txs = vec![tx];
1810
1811 let mut payload = BytesMut::with_capacity(2);
1812 payload.put_u16(7000);
1813
1814 let npdu = Npdu {
1815 is_network_message: true,
1816 message_type: Some(NetworkMessageType::DISCONNECT_CONNECTION_TO_NETWORK.to_raw()),
1817 payload: payload.freeze(),
1818 ..Npdu::default()
1819 };
1820
1821 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1822
1823 let tbl = table.lock().await;
1824 assert!(tbl.lookup(7000).is_none());
1825 }
1826
1827 #[tokio::test]
1828 async fn disconnect_does_not_remove_direct_route() {
1829 let mut table = RouterTable::new();
1830 table.add_direct(1000, 0);
1831 let table = Arc::new(Mutex::new(table));
1832
1833 let (tx, _rx) = mpsc::channel::<SendRequest>(256);
1834 let send_txs = vec![tx];
1835
1836 let mut payload = BytesMut::with_capacity(2);
1837 payload.put_u16(1000);
1838
1839 let npdu = Npdu {
1840 is_network_message: true,
1841 message_type: Some(NetworkMessageType::DISCONNECT_CONNECTION_TO_NETWORK.to_raw()),
1842 payload: payload.freeze(),
1843 ..Npdu::default()
1844 };
1845
1846 handle_network_message(&table, &send_txs, 0, 1000, &[0x0A], &npdu).await;
1847
1848 let tbl = table.lock().await;
1849 assert!(tbl.lookup(1000).is_some());
1850 assert!(tbl.lookup(1000).unwrap().directly_connected);
1851 }
1852}