1use std::net::SocketAddr;
18use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use dashmap::DashMap;
23use tokio::net::UdpSocket;
24use tokio::sync::{broadcast, mpsc};
25use tracing::{debug, error, info, trace, warn};
26
27use crate::address::{GroupAddress, IndividualAddress};
28use crate::cemi::{Apci, CemiFrame, MessageCode};
29use crate::config::KnxServerConfig;
30use crate::error::{KnxError, KnxResult};
31use crate::error_tracker::{SendErrorTracker, ErrorCategory, TrackingResult};
32use crate::filter::{
33 FilterChain, FilterResult, FrameEnvelope,
34 CircuitBreakerState, PaceState,
35};
36use crate::frame::{
37 DibDeviceInfo, Hpai, KnxFrame, ServiceType, SupportedServiceFamilies,
38};
39use crate::group::GroupObjectTable;
40use crate::diagnostics::{KnxDiagnostics, DiagnosticConfig};
41use crate::group_cache::GroupValueCache;
42use crate::heartbeat::HeartbeatScheduler;
43use crate::metrics::{KnxMetricsCollector, KnxMetricsSnapshot, ConnectionMetricsSnapshot};
44use crate::tunnel::{
45 ConnectRequest, ConnectResponse, ConnectStatus, ConnectionResponseData,
46 ConnectionStateRequest, ConnectionStateResponse, DisconnectRequest, DisconnectResponse,
47 TunnelConnection, TunnellingAck, TunnellingRequest,
48 ReceivedValidation, AckMessage,
49};
50
51#[derive(Debug, Clone)]
57pub enum ServerEvent {
58 Started { address: SocketAddr },
60 Stopped,
62 ClientConnected {
64 channel_id: u8,
65 address: SocketAddr,
66 },
67 ClientDisconnected { channel_id: u8 },
69 GroupValueWrite {
71 address: GroupAddress,
72 value: Vec<u8>,
73 source: IndividualAddress,
74 },
75 GroupValueRead {
77 address: GroupAddress,
78 source: IndividualAddress,
79 },
80 SequenceEvent {
82 channel_id: u8,
83 validation: SequenceEventType,
84 },
85 ConfirmationSent {
87 channel_id: u8,
88 success: bool,
89 },
90 BusMonitorFrameSent {
92 channel_id: u8,
93 message_code: u8,
94 raw_frame_len: usize,
95 },
96 PropertyRead {
98 channel_id: u8,
99 object_index: u16,
100 property_id: u16,
101 },
102 PropertyWrite {
104 channel_id: u8,
105 object_index: u16,
106 property_id: u16,
107 },
108 DeviceReset { channel_id: u8 },
110 DataIndBroadcast {
112 source_channel_id: u8,
113 target_channel_count: usize,
114 group_address: GroupAddress,
115 },
116 FrameDelayed {
118 channel_id: u8,
119 delay_ms: u64,
120 pace_state: String,
121 },
122 FrameQueued {
124 channel_id: u8,
125 priority: String,
126 queue_depth: usize,
127 },
128 FrameDropped {
130 channel_id: u8,
131 reason: String,
132 },
133 CircuitBreakerStateChanged {
135 new_state: String,
136 failure_count: u32,
137 },
138 QueueDrained {
140 channel_id: u8,
141 drained_count: usize,
142 },
143 HeartbeatAction {
145 channel_id: u8,
146 action: String,
147 status_code: Option<u8>,
148 },
149 HeartbeatSuppressed {
151 channel_id: u8,
152 },
153 GroupValueCacheUpdated {
155 address: GroupAddress,
156 source: String,
157 cache_size: usize,
158 },
159 SendErrorThreshold {
161 channel_id: u8,
162 consecutive_errors: u32,
163 threshold: u32,
164 },
165 SendErrorRateWarning {
167 channel_id: u8,
168 error_count: usize,
169 window_ms: u64,
170 rate_percent: u32,
171 },
172 Error { message: String },
174}
175
176#[derive(Debug, Clone)]
178pub enum SequenceEventType {
179 Valid { sequence: u8 },
181 Duplicate { sequence: u8, expected: u8 },
183 OutOfOrder { sequence: u8, expected: u8, distance: u8 },
185 FatalDesync { sequence: u8, expected: u8, distance: u8 },
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
195pub enum ServerState {
196 #[default]
198 Stopped,
199 Starting,
201 Running,
203 Stopping,
205}
206
207pub struct ConnectionManager {
213 connections: DashMap<u8, Arc<TunnelConnection>>,
214 next_channel_id: AtomicU8,
215 max_connections: usize,
216 heartbeat_timeout: Duration,
217 individual_address_base: IndividualAddress,
218 desync_threshold: u8,
219}
220
221impl ConnectionManager {
222 pub fn new(
224 max_connections: usize,
225 heartbeat_timeout: Duration,
226 individual_address_base: IndividualAddress,
227 ) -> Self {
228 Self {
229 connections: DashMap::new(),
230 next_channel_id: AtomicU8::new(1),
231 max_connections,
232 heartbeat_timeout,
233 individual_address_base,
234 desync_threshold: 5,
235 }
236 }
237
238 pub fn with_desync_threshold(mut self, threshold: u8) -> Self {
240 self.desync_threshold = threshold;
241 self
242 }
243
244 pub fn allocate_channel(&self) -> Option<u8> {
246 if self.connections.len() >= self.max_connections {
247 return None;
248 }
249
250 for _ in 0..255 {
252 let channel_id = self.next_channel_id.fetch_add(1, Ordering::SeqCst);
253 if channel_id != 0 && !self.connections.contains_key(&channel_id) {
254 return Some(channel_id);
255 }
256 }
257
258 None
259 }
260
261 pub fn create_connection(
263 &self,
264 channel_id: u8,
265 client_addr: SocketAddr,
266 data_endpoint: SocketAddr,
267 ) -> Arc<TunnelConnection> {
268 let individual_address = IndividualAddress::new(
270 self.individual_address_base.area(),
271 self.individual_address_base.line(),
272 100 + channel_id,
273 );
274
275 let connection = Arc::new(TunnelConnection::with_desync_threshold(
276 channel_id,
277 client_addr,
278 data_endpoint,
279 individual_address,
280 self.heartbeat_timeout,
281 self.desync_threshold,
282 ));
283
284 connection.fsm.on_connected();
286
287 self.connections.insert(channel_id, connection.clone());
288 connection
289 }
290
291 pub fn get(&self, channel_id: u8) -> Option<Arc<TunnelConnection>> {
293 self.connections.get(&channel_id).map(|c| c.clone())
294 }
295
296 pub fn remove(&self, channel_id: u8) -> Option<Arc<TunnelConnection>> {
298 self.connections.remove(&channel_id).map(|(_, c)| {
299 c.fsm.on_disconnected();
300 c
301 })
302 }
303
304 pub fn all(&self) -> Vec<Arc<TunnelConnection>> {
306 self.connections.iter().map(|r| r.value().clone()).collect()
307 }
308
309 pub fn len(&self) -> usize {
311 self.connections.len()
312 }
313
314 pub fn is_empty(&self) -> bool {
316 self.connections.is_empty()
317 }
318
319 pub fn cleanup_timed_out(&self) -> Vec<u8> {
321 let timed_out: Vec<_> = self
322 .connections
323 .iter()
324 .filter(|r| r.value().is_timed_out())
325 .map(|r| *r.key())
326 .collect();
327
328 for channel_id in &timed_out {
329 if let Some((_, conn)) = self.connections.remove(channel_id) {
330 conn.fsm.on_disconnected();
331 }
332 }
333
334 timed_out
335 }
336}
337
338pub struct KnxServer {
354 config: KnxServerConfig,
355 state: parking_lot::RwLock<ServerState>,
356 connections: ConnectionManager,
357 group_objects: Arc<GroupObjectTable>,
358 filter_chain: FilterChain,
359 heartbeat_scheduler: HeartbeatScheduler,
360 group_value_cache: GroupValueCache,
361 error_tracker: SendErrorTracker,
362 metrics_collector: KnxMetricsCollector,
363 event_tx: broadcast::Sender<ServerEvent>,
364 shutdown_tx: parking_lot::Mutex<Option<mpsc::Sender<()>>>,
365 running: AtomicBool,
366}
367
368impl KnxServer {
369 pub fn new(config: KnxServerConfig) -> Self {
371 let (event_tx, _) = broadcast::channel(1000);
372 let desync_threshold = config.tunnel_behavior.sequence_validation_enabled
373 .then_some(5u8)
374 .unwrap_or(255); let filter_chain = FilterChain::new(config.tunnel_behavior.flow_control.clone());
377
378 let heartbeat_scheduler = if config.tunnel_behavior.heartbeat_scheduler.enabled {
380 HeartbeatScheduler::new(
381 config.tunnel_behavior.heartbeat_scheduler.schedule.clone(),
382 )
383 } else {
384 HeartbeatScheduler::normal()
385 };
386
387 let group_value_cache = GroupValueCache::new(
389 config.tunnel_behavior.group_value_cache.clone(),
390 );
391
392 let error_tracker = SendErrorTracker::new(
394 config.tunnel_behavior.send_error_tracker.clone(),
395 );
396
397 Self {
398 connections: ConnectionManager::new(
399 config.max_connections,
400 config.connection_timeout(),
401 config.individual_address,
402 ).with_desync_threshold(desync_threshold),
403 filter_chain,
404 heartbeat_scheduler,
405 group_value_cache,
406 error_tracker,
407 metrics_collector: KnxMetricsCollector::new(),
408 config,
409 state: parking_lot::RwLock::new(ServerState::Stopped),
410 group_objects: Arc::new(GroupObjectTable::new()),
411 event_tx,
412 shutdown_tx: parking_lot::Mutex::new(None),
413 running: AtomicBool::new(false),
414 }
415 }
416
417 pub fn with_group_objects(mut self, table: Arc<GroupObjectTable>) -> Self {
419 self.group_objects = table;
420 self
421 }
422
423 pub fn state(&self) -> ServerState {
425 *self.state.read()
426 }
427
428 pub fn is_running(&self) -> bool {
430 self.running.load(Ordering::SeqCst)
431 }
432
433 pub fn config(&self) -> &KnxServerConfig {
435 &self.config
436 }
437
438 pub fn group_objects(&self) -> Arc<GroupObjectTable> {
440 self.group_objects.clone()
441 }
442
443 pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
445 self.event_tx.subscribe()
446 }
447
448 pub fn connection_count(&self) -> usize {
450 self.connections.len()
451 }
452
453 pub fn get_connection(&self, channel_id: u8) -> Option<Arc<TunnelConnection>> {
455 self.connections.get(channel_id)
456 }
457
458 pub fn filter_chain(&self) -> &FilterChain {
460 &self.filter_chain
461 }
462
463 pub fn pace_state(&self) -> PaceState {
465 self.filter_chain.pace_state()
466 }
467
468 pub fn circuit_breaker_state(&self) -> CircuitBreakerState {
470 self.filter_chain.circuit_breaker_state()
471 }
472
473 pub fn heartbeat_scheduler(&self) -> &HeartbeatScheduler {
475 &self.heartbeat_scheduler
476 }
477
478 pub fn group_value_cache(&self) -> &GroupValueCache {
480 &self.group_value_cache
481 }
482
483 pub fn error_tracker(&self) -> &SendErrorTracker {
485 &self.error_tracker
486 }
487
488 pub fn metrics_collector(&self) -> &KnxMetricsCollector {
490 &self.metrics_collector
491 }
492
493 pub fn metrics_snapshot(&self) -> KnxMetricsSnapshot {
519 let server_state = match self.state() {
521 ServerState::Stopped => 0u8,
522 ServerState::Starting => 1,
523 ServerState::Running => 2,
524 ServerState::Stopping => 3,
525 };
526
527 let heartbeat = self.heartbeat_scheduler.stats_snapshot();
529 let cache = self.group_value_cache.stats_snapshot();
530 let cache_entries = self.group_value_cache.len();
531 let error_tracker = self.error_tracker.stats_snapshot();
532
533 let filter_chain = self.filter_chain.stats_snapshot();
535 let pace = self.filter_chain.pace_filter().stats_snapshot();
536 let queue = self.filter_chain.queue_filter().stats_snapshot();
537 let retry = self.filter_chain.retry_filter().stats_snapshot();
538
539 let connections = self.connections.all();
541 let seq_stats: Vec<_> = connections
542 .iter()
543 .map(|c| c.sequence_tracker.stats_snapshot())
544 .collect();
545 let fsm_stats: Vec<_> = connections
546 .iter()
547 .map(|c| c.fsm.stats_snapshot())
548 .collect();
549
550 self.metrics_collector.collect(
551 server_state,
552 connections.len(),
553 self.config.max_connections,
554 &heartbeat,
555 &cache,
556 cache_entries,
557 &error_tracker,
558 &filter_chain,
559 &pace,
560 &queue,
561 &retry,
562 &seq_stats,
563 &fsm_stats,
564 )
565 }
566
567 pub fn diagnostics(&self) -> KnxDiagnostics {
585 let snapshot = self.metrics_snapshot();
586 KnxDiagnostics::analyze(&snapshot)
587 }
588
589 pub fn diagnostics_with_config(&self, config: &DiagnosticConfig) -> KnxDiagnostics {
594 let snapshot = self.metrics_snapshot();
595 KnxDiagnostics::analyze_with_config(&snapshot, config)
596 }
597
598 pub fn connection_metrics(&self) -> Vec<ConnectionMetricsSnapshot> {
604 self.connections
605 .all()
606 .iter()
607 .map(|conn| {
608 let seq = conn.sequence_tracker.stats_snapshot();
609 let fsm = conn.fsm.stats_snapshot();
610 ConnectionMetricsSnapshot {
611 channel_id: conn.channel_id,
612 individual_address: conn.individual_address.to_string(),
613 fsm_state: conn.fsm.state().to_string(),
614 fsm_transitions: fsm.transitions,
615 frames_sent: seq.frames_sent,
616 frames_received: seq.frames_received,
617 duplicates_detected: seq.duplicates_detected,
618 out_of_order_detected: seq.out_of_order_detected,
619 fatal_desyncs: seq.fatal_desyncs,
620 resets: seq.resets,
621 idle_duration_ms: conn.idle_duration().as_millis() as u64,
622 is_timed_out: conn.is_timed_out(),
623 }
624 })
625 .collect()
626 }
627
628 pub async fn start(&self) -> KnxResult<()> {
630 if self.is_running() {
631 return Err(KnxError::ServerAlreadyRunning);
632 }
633
634 *self.state.write() = ServerState::Starting;
635
636 let socket = UdpSocket::bind(&self.config.bind_addr).await.map_err(|e| {
638 KnxError::BindError {
639 address: self.config.bind_addr,
640 reason: e.to_string(),
641 }
642 })?;
643
644 let local_addr = socket.local_addr()?;
645 info!(address = %local_addr, "KNXnet/IP server started");
646
647 self.running.store(true, Ordering::SeqCst);
648 *self.state.write() = ServerState::Running;
649
650 let _ = self.event_tx.send(ServerEvent::Started {
651 address: local_addr,
652 });
653
654 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
656 *self.shutdown_tx.lock() = Some(shutdown_tx);
657
658 let socket = Arc::new(socket);
659
660 let mut buf = vec![0u8; 1024];
662
663 loop {
664 tokio::select! {
665 result = socket.recv_from(&mut buf) => {
666 match result {
667 Ok((len, addr)) => {
668 if let Err(e) = self.handle_packet(&socket, &buf[..len], addr).await {
669 debug!(error = %e, "Error handling packet");
670 }
671 }
672 Err(e) => {
673 error!(error = %e, "Error receiving packet");
674 }
675 }
676 }
677 _ = shutdown_rx.recv() => {
678 info!("Server shutdown requested");
679 break;
680 }
681 }
682 }
683
684 self.running.store(false, Ordering::SeqCst);
685 *self.state.write() = ServerState::Stopped;
686 let _ = self.event_tx.send(ServerEvent::Stopped);
687
688 Ok(())
689 }
690
691 pub async fn stop(&self) -> KnxResult<()> {
693 if !self.is_running() {
694 return Ok(());
695 }
696
697 *self.state.write() = ServerState::Stopping;
698
699 if let Some(tx) = self.shutdown_tx.lock().take() {
700 let _ = tx.send(()).await;
701 }
702
703 Ok(())
704 }
705
706 async fn handle_packet(
708 &self,
709 socket: &UdpSocket,
710 data: &[u8],
711 addr: SocketAddr,
712 ) -> KnxResult<()> {
713 let frame = KnxFrame::decode(data)?;
714
715 trace!(
716 service_type = ?frame.service_type,
717 from = %addr,
718 "Received KNXnet/IP frame"
719 );
720
721 match frame.service_type {
722 ServiceType::SearchRequest => {
723 self.handle_search_request(socket, addr).await?;
724 }
725 ServiceType::DescriptionRequest => {
726 self.handle_description_request(socket, addr).await?;
727 }
728 ServiceType::ConnectRequest => {
729 self.handle_connect_request(socket, &frame.body, addr).await?;
730 }
731 ServiceType::ConnectionStateRequest => {
732 self.handle_connection_state_request(socket, &frame.body, addr)
733 .await?;
734 }
735 ServiceType::DisconnectRequest => {
736 self.handle_disconnect_request(socket, &frame.body, addr)
737 .await?;
738 }
739 ServiceType::TunnellingRequest => {
740 self.handle_tunnelling_request(socket, &frame.body, addr)
741 .await?;
742 }
743 ServiceType::TunnellingAck => {
744 if self.filter_chain.is_enabled() {
745 self.handle_tunnelling_ack_async(socket, &frame.body).await?;
746 } else {
747 self.handle_tunnelling_ack(&frame.body)?;
748 }
749 }
750 _ => {
751 debug!(service_type = ?frame.service_type, "Unhandled service type");
752 }
753 }
754
755 Ok(())
756 }
757
758 async fn handle_search_request(
763 &self,
764 socket: &UdpSocket,
765 addr: SocketAddr,
766 ) -> KnxResult<()> {
767 debug!(from = %addr, "Handling SearchRequest");
768
769 let local_addr = socket.local_addr()?;
770 let local_ip = match local_addr {
771 SocketAddr::V4(v4) => *v4.ip(),
772 _ => std::net::Ipv4Addr::UNSPECIFIED,
773 };
774
775 let hpai = Hpai::udp_ipv4(local_ip, local_addr.port());
776 let device_info = DibDeviceInfo::new(&self.config.device_name, self.config.individual_address)
777 .with_serial_number(self.config.serial_number)
778 .with_mac_address(self.config.mac_address);
779 let families = SupportedServiceFamilies::default_families();
780
781 let mut body = hpai.encode();
782 body.extend(device_info.encode());
783 body.extend(families.encode());
784
785 let response = KnxFrame::new(ServiceType::SearchResponse, body);
786 socket.send_to(&response.encode(), addr).await?;
787
788 Ok(())
789 }
790
791 async fn handle_description_request(
792 &self,
793 socket: &UdpSocket,
794 addr: SocketAddr,
795 ) -> KnxResult<()> {
796 debug!(from = %addr, "Handling DescriptionRequest");
797
798 let device_info = DibDeviceInfo::new(&self.config.device_name, self.config.individual_address)
799 .with_serial_number(self.config.serial_number)
800 .with_mac_address(self.config.mac_address);
801 let families = SupportedServiceFamilies::default_families();
802
803 let mut body = device_info.encode();
804 body.extend(families.encode());
805
806 let response = KnxFrame::new(ServiceType::DescriptionResponse, body);
807 socket.send_to(&response.encode(), addr).await?;
808
809 Ok(())
810 }
811
812 async fn handle_connect_request(
817 &self,
818 socket: &UdpSocket,
819 data: &[u8],
820 addr: SocketAddr,
821 ) -> KnxResult<()> {
822 let request = ConnectRequest::decode(data)?;
823 debug!(from = %addr, "Handling ConnectRequest");
824
825 let channel_id = match self.connections.allocate_channel() {
826 Some(id) => id,
827 None => {
828 let response = ConnectResponse::error(ConnectStatus::NoMoreConnections);
829 let frame = KnxFrame::new(ServiceType::ConnectResponse, response.encode());
830 socket.send_to(&frame.encode(), addr).await?;
831 return Ok(());
832 }
833 };
834
835 let data_endpoint = if request.data_endpoint.is_nat() {
836 addr
837 } else {
838 request.data_endpoint.to_socket_addr_v()
839 };
840
841 let connection = self.connections.create_connection(channel_id, addr, data_endpoint);
842
843 let local_addr = socket.local_addr()?;
844 let local_ip = match local_addr {
845 SocketAddr::V4(v4) => *v4.ip(),
846 _ => std::net::Ipv4Addr::UNSPECIFIED,
847 };
848
849 let response = ConnectResponse::success(
850 channel_id,
851 Hpai::udp_ipv4(local_ip, local_addr.port()),
852 ConnectionResponseData::new(connection.individual_address),
853 );
854
855 let frame = KnxFrame::new(ServiceType::ConnectResponse, response.encode());
856 socket.send_to(&frame.encode(), addr).await?;
857
858 info!(
859 channel_id,
860 client = %addr,
861 individual_address = %connection.individual_address,
862 "Client connected"
863 );
864
865 let _ = self.event_tx.send(ServerEvent::ClientConnected {
866 channel_id,
867 address: addr,
868 });
869
870 Ok(())
871 }
872
873 async fn handle_connection_state_request(
887 &self,
888 socket: &UdpSocket,
889 data: &[u8],
890 addr: SocketAddr,
891 ) -> KnxResult<()> {
892 let request = ConnectionStateRequest::decode(data)?;
893
894 if self.config.tunnel_behavior.heartbeat_scheduler.enabled {
896 let action = self.heartbeat_scheduler.next_action(request.channel_id);
897
898 match action.status_code() {
899 Some(status) => {
900 if !action.is_normal() {
901 debug!(
902 channel_id = request.channel_id,
903 action = %action,
904 status = status,
905 "Heartbeat scheduler action"
906 );
907 let _ = self.event_tx.send(ServerEvent::HeartbeatAction {
908 channel_id: request.channel_id,
909 action: action.to_string(),
910 status_code: Some(status),
911 });
912 }
913
914 if action.is_normal() {
916 if let Some(conn) = self.connections.get(request.channel_id) {
917 conn.touch();
918 }
919 }
920
921 let response = ConnectionStateResponse {
922 channel_id: request.channel_id,
923 status,
924 };
925 let frame = KnxFrame::new(ServiceType::ConnectionStateResponse, response.encode());
926 socket.send_to(&frame.encode(), addr).await?;
927 }
928 None => {
929 debug!(
931 channel_id = request.channel_id,
932 "Heartbeat suppressed (NoResponse)"
933 );
934 let _ = self.event_tx.send(ServerEvent::HeartbeatSuppressed {
935 channel_id: request.channel_id,
936 });
937 }
939 }
940
941 return Ok(());
942 }
943
944 let response = if let Some(status_override) = self.config.tunnel_behavior.heartbeat_status_override {
946 debug!(
947 channel_id = request.channel_id,
948 status = status_override,
949 "Heartbeat override"
950 );
951 ConnectionStateResponse {
952 channel_id: request.channel_id,
953 status: status_override,
954 }
955 } else if let Some(conn) = self.connections.get(request.channel_id) {
956 conn.touch();
957 ConnectionStateResponse::ok(request.channel_id)
958 } else {
959 ConnectionStateResponse {
961 channel_id: request.channel_id,
962 status: 0x21,
963 }
964 };
965
966 let frame = KnxFrame::new(ServiceType::ConnectionStateResponse, response.encode());
967 socket.send_to(&frame.encode(), addr).await?;
968
969 Ok(())
970 }
971
972 async fn handle_disconnect_request(
973 &self,
974 socket: &UdpSocket,
975 data: &[u8],
976 addr: SocketAddr,
977 ) -> KnxResult<()> {
978 let request = DisconnectRequest::decode(data)?;
979 debug!(channel_id = request.channel_id, "Handling DisconnectRequest");
980
981 self.connections.remove(request.channel_id);
982
983 if self.filter_chain.is_enabled() {
985 self.filter_chain.queue_filter().clear_channel(request.channel_id);
986 }
987
988 self.error_tracker.remove_channel(request.channel_id);
990
991 let response = DisconnectResponse::ok(request.channel_id);
992 let frame = KnxFrame::new(ServiceType::DisconnectResponse, response.encode());
993 socket.send_to(&frame.encode(), addr).await?;
994
995 info!(channel_id = request.channel_id, "Client disconnected");
996
997 let _ = self.event_tx.send(ServerEvent::ClientDisconnected {
998 channel_id: request.channel_id,
999 });
1000
1001 Ok(())
1002 }
1003
1004 async fn handle_tunnelling_request(
1017 &self,
1018 socket: &UdpSocket,
1019 data: &[u8],
1020 addr: SocketAddr,
1021 ) -> KnxResult<()> {
1022 let request = TunnellingRequest::decode(data)?;
1023
1024 let connection = match self.connections.get(request.channel_id) {
1025 Some(conn) => conn,
1026 None => {
1027 let ack = TunnellingAck::error(request.channel_id, request.sequence_counter, 0x21);
1029 let frame = KnxFrame::new(ServiceType::TunnellingAck, ack.encode());
1030 socket.send_to(&frame.encode(), addr).await?;
1031 return Ok(());
1032 }
1033 };
1034
1035 connection.touch();
1036
1037 let validation = connection.validate_recv_sequence(request.sequence_counter);
1039
1040 let seq_event = match &validation {
1042 ReceivedValidation::Valid { sequence } => {
1043 SequenceEventType::Valid { sequence: *sequence }
1044 }
1045 ReceivedValidation::Duplicate { sequence, expected } => {
1046 debug!(
1047 channel_id = request.channel_id,
1048 sequence, expected,
1049 "Duplicate frame — ACK only"
1050 );
1051 SequenceEventType::Duplicate { sequence: *sequence, expected: *expected }
1052 }
1053 ReceivedValidation::OutOfOrder { sequence, expected, distance } => {
1054 warn!(
1055 channel_id = request.channel_id,
1056 sequence, expected, distance,
1057 "Out-of-order frame"
1058 );
1059 SequenceEventType::OutOfOrder {
1060 sequence: *sequence,
1061 expected: *expected,
1062 distance: *distance,
1063 }
1064 }
1065 ReceivedValidation::FatalDesync { sequence, expected, distance } => {
1066 error!(
1067 channel_id = request.channel_id,
1068 sequence, expected, distance,
1069 "Fatal sequence desync — tunnel restart required"
1070 );
1071 SequenceEventType::FatalDesync {
1072 sequence: *sequence,
1073 expected: *expected,
1074 distance: *distance,
1075 }
1076 }
1077 };
1078
1079 let _ = self.event_tx.send(ServerEvent::SequenceEvent {
1080 channel_id: request.channel_id,
1081 validation: seq_event,
1082 });
1083
1084 if validation.should_ack() {
1086 let ack = TunnellingAck::ok(request.channel_id, request.sequence_counter);
1087 let frame = KnxFrame::new(ServiceType::TunnellingAck, ack.encode());
1088 socket.send_to(&frame.encode(), addr).await?;
1089 }
1090
1091 if validation.should_process() {
1093 self.process_cemi(socket, addr, &request.cemi, &connection).await?;
1094 }
1095
1096 if validation.requires_restart() {
1098 if self.filter_chain.is_enabled() {
1100 self.filter_chain.queue_filter().clear_channel(request.channel_id);
1101 }
1102
1103 self.error_tracker.remove_channel(request.channel_id);
1105
1106 self.connections.remove(request.channel_id);
1107 let _ = self.event_tx.send(ServerEvent::ClientDisconnected {
1108 channel_id: request.channel_id,
1109 });
1110 }
1111
1112 Ok(())
1113 }
1114
1115 fn handle_tunnelling_ack(&self, data: &[u8]) -> KnxResult<()> {
1122 let ack = TunnellingAck::decode(data)?;
1123
1124 if let Some(conn) = self.connections.get(ack.channel_id) {
1125 conn.touch();
1126
1127 conn.feed_ack(AckMessage {
1129 channel_id: ack.channel_id,
1130 sequence: ack.sequence_counter,
1131 status: ack.status,
1132 });
1133
1134 conn.fsm.on_ack_received_simple();
1136
1137 if self.filter_chain.is_enabled() {
1139 if ack.status == 0 {
1140 self.filter_chain.on_send_success(conn.channel_id);
1141 } else {
1142 self.filter_chain.on_send_failure(
1143 conn.channel_id,
1144 &format!("ACK error status: {:#04x}", ack.status),
1145 );
1146 }
1147 }
1148
1149 if ack.status == 0 {
1151 self.error_tracker.on_send_success(conn.channel_id);
1152 } else {
1153 self.handle_send_error_tracking(
1154 conn.channel_id,
1155 ErrorCategory::AckError,
1156 );
1157 }
1158 }
1159
1160 Ok(())
1161 }
1162
1163 async fn handle_tunnelling_ack_async(
1168 &self,
1169 socket: &UdpSocket,
1170 data: &[u8],
1171 ) -> KnxResult<()> {
1172 let ack = TunnellingAck::decode(data)?;
1173
1174 if let Some(conn) = self.connections.get(ack.channel_id) {
1175 conn.touch();
1176
1177 conn.feed_ack(AckMessage {
1179 channel_id: ack.channel_id,
1180 sequence: ack.sequence_counter,
1181 status: ack.status,
1182 });
1183
1184 conn.fsm.on_ack_received_simple();
1186
1187 if self.filter_chain.is_enabled() {
1189 if ack.status == 0 {
1190 self.filter_chain.on_send_success(conn.channel_id);
1191 self.drain_queued_frames(socket, &conn).await;
1193 } else {
1194 self.filter_chain.on_send_failure(
1195 conn.channel_id,
1196 &format!("ACK error status: {:#04x}", ack.status),
1197 );
1198 }
1199 }
1200
1201 if ack.status == 0 {
1203 self.error_tracker.on_send_success(conn.channel_id);
1204 } else {
1205 self.handle_send_error_tracking(
1206 conn.channel_id,
1207 ErrorCategory::AckError,
1208 );
1209 }
1210 }
1211
1212 Ok(())
1213 }
1214
1215 async fn process_cemi(
1229 &self,
1230 socket: &UdpSocket,
1231 client_addr: SocketAddr,
1232 cemi: &CemiFrame,
1233 connection: &TunnelConnection,
1234 ) -> KnxResult<()> {
1235 match cemi.message_code {
1236 MessageCode::LDataReq => {
1237 self.handle_ldata_req(socket, client_addr, cemi, connection).await?;
1238 }
1239 MessageCode::LDataInd => {
1240 self.handle_ldata_data(socket, client_addr, cemi, connection).await?;
1242 }
1243 MessageCode::MPropReadReq => {
1244 if self.config.tunnel_behavior.property_service_enabled {
1245 self.handle_prop_read_req(socket, client_addr, cemi, connection).await?;
1246 }
1247 }
1248 MessageCode::MPropWriteReq => {
1249 if self.config.tunnel_behavior.property_service_enabled {
1250 self.handle_prop_write_req(socket, client_addr, cemi, connection).await?;
1251 }
1252 }
1253 MessageCode::MResetReq => {
1254 if self.config.tunnel_behavior.reset_service_enabled {
1255 self.handle_reset_req(socket, client_addr, connection).await?;
1256 }
1257 }
1258 MessageCode::LRawReq => {
1259 self.send_ldata_con(socket, client_addr, connection, cemi, true).await?;
1261 }
1262 _ => {
1263 debug!(
1264 message_code = ?cemi.message_code,
1265 channel_id = connection.channel_id,
1266 "Unhandled cEMI message code"
1267 );
1268 }
1269 }
1270
1271 if self.config.tunnel_behavior.bus_monitor_enabled {
1273 self.emit_bus_monitor_frame(socket, cemi, connection).await?;
1274 }
1275
1276 Ok(())
1277 }
1278
1279 async fn handle_ldata_req(
1285 &self,
1286 socket: &UdpSocket,
1287 client_addr: SocketAddr,
1288 cemi: &CemiFrame,
1289 connection: &TunnelConnection,
1290 ) -> KnxResult<()> {
1291 let bus_success = self.handle_ldata_data(socket, client_addr, cemi, connection).await?;
1293
1294 if self.config.tunnel_behavior.ldata_con_enabled {
1296 let confirm_success = self.compute_confirmation_success(bus_success);
1297
1298 let delay_ms = self.config.tunnel_behavior.bus_delivery_delay_ms;
1300 if delay_ms > 0 {
1301 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1302 }
1303
1304 self.send_ldata_con(socket, client_addr, connection, cemi, confirm_success).await?;
1305
1306 let _ = self.event_tx.send(ServerEvent::ConfirmationSent {
1307 channel_id: connection.channel_id,
1308 success: confirm_success,
1309 });
1310 }
1311
1312 if self.config.tunnel_behavior.ldata_ind_broadcast_enabled {
1314 if let Some(group_addr) = cemi.destination_group() {
1315 self.broadcast_ldata_ind(socket, cemi, connection, group_addr).await?;
1316 }
1317 }
1318
1319 Ok(())
1320 }
1321
1322 async fn handle_ldata_data(
1326 &self,
1327 socket: &UdpSocket,
1328 client_addr: SocketAddr,
1329 cemi: &CemiFrame,
1330 connection: &TunnelConnection,
1331 ) -> KnxResult<bool> {
1332 if !cemi.apci.is_group_value() {
1333 return Ok(true);
1336 }
1337
1338 let group_addr = match cemi.destination_group() {
1339 Some(addr) => addr,
1340 None => return Ok(true),
1341 };
1342
1343 match cemi.apci {
1344 Apci::GroupValueWrite => {
1345 debug!(
1346 address = %group_addr,
1347 source = %cemi.source,
1348 "Group Value Write"
1349 );
1350
1351 let write_ok = self.group_objects.write(
1352 &group_addr,
1353 &cemi.data,
1354 Some(cemi.source.to_string()),
1355 ).is_ok();
1356
1357 self.group_value_cache.on_write(
1359 group_addr,
1360 cemi.data.clone(),
1361 Some(cemi.source.to_string()),
1362 );
1363
1364 let _ = self.event_tx.send(ServerEvent::GroupValueWrite {
1365 address: group_addr,
1366 value: cemi.data.clone(),
1367 source: cemi.source,
1368 });
1369
1370 Ok(write_ok)
1371 }
1372 Apci::GroupValueRead => {
1373 debug!(
1374 address = %group_addr,
1375 source = %cemi.source,
1376 "Group Value Read"
1377 );
1378
1379 let _ = self.event_tx.send(ServerEvent::GroupValueRead {
1380 address: group_addr,
1381 source: cemi.source,
1382 });
1383
1384 let response_data = match self.group_objects.read(&group_addr) {
1386 Ok(data) => data,
1387 Err(_e) => vec![0u8],
1388 };
1389
1390 let response_cemi = CemiFrame::group_value_response(
1391 self.config.individual_address,
1392 group_addr,
1393 response_data,
1394 );
1395
1396 self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1397 Ok(true)
1398 }
1399 Apci::GroupValueResponse => {
1400 debug!(
1401 address = %group_addr,
1402 source = %cemi.source,
1403 "Group Value Response"
1404 );
1405
1406 let _ = self.group_objects.write(
1408 &group_addr,
1409 &cemi.data,
1410 Some(cemi.source.to_string()),
1411 );
1412
1413 Ok(true)
1414 }
1415 _ => Ok(true),
1416 }
1417 }
1418
1419 fn compute_confirmation_success(&self, bus_success: bool) -> bool {
1421 let rate = self.config.tunnel_behavior.confirmation_success_rate;
1422 if rate >= 1.0 {
1423 bus_success
1424 } else if rate <= 0.0 {
1425 false
1426 } else {
1427 bus_success && (rand_simple() < rate)
1428 }
1429 }
1430
1431 async fn handle_prop_read_req(
1440 &self,
1441 socket: &UdpSocket,
1442 client_addr: SocketAddr,
1443 cemi: &CemiFrame,
1444 connection: &TunnelConnection,
1445 ) -> KnxResult<()> {
1446 let (object_index, property_id, count, start_index, _) =
1447 match cemi.parse_property_request() {
1448 Some(parsed) => parsed,
1449 None => {
1450 debug!(
1451 channel_id = connection.channel_id,
1452 "M_PropRead.req with invalid data format"
1453 );
1454 return Ok(());
1455 }
1456 };
1457
1458 debug!(
1459 channel_id = connection.channel_id,
1460 object_index,
1461 property_id,
1462 count,
1463 start_index,
1464 "M_PropRead.req"
1465 );
1466
1467 let response_value = self.read_property(object_index, property_id);
1469
1470 let response_cemi = CemiFrame::prop_read_con(
1471 object_index,
1472 property_id,
1473 count,
1474 start_index,
1475 response_value,
1476 );
1477
1478 self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1479
1480 let _ = self.event_tx.send(ServerEvent::PropertyRead {
1481 channel_id: connection.channel_id,
1482 object_index,
1483 property_id,
1484 });
1485
1486 Ok(())
1487 }
1488
1489 async fn handle_prop_write_req(
1491 &self,
1492 socket: &UdpSocket,
1493 client_addr: SocketAddr,
1494 cemi: &CemiFrame,
1495 connection: &TunnelConnection,
1496 ) -> KnxResult<()> {
1497 let (object_index, property_id, count, start_index, write_data) =
1498 match cemi.parse_property_request() {
1499 Some(parsed) => parsed,
1500 None => {
1501 debug!(
1502 channel_id = connection.channel_id,
1503 "M_PropWrite.req with invalid data format"
1504 );
1505 return Ok(());
1506 }
1507 };
1508
1509 debug!(
1510 channel_id = connection.channel_id,
1511 object_index,
1512 property_id,
1513 count,
1514 start_index,
1515 data_len = write_data.len(),
1516 "M_PropWrite.req"
1517 );
1518
1519 let response_cemi = CemiFrame::prop_write_con(
1521 object_index,
1522 property_id,
1523 count,
1524 start_index,
1525 true, );
1527
1528 self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1529
1530 let _ = self.event_tx.send(ServerEvent::PropertyWrite {
1531 channel_id: connection.channel_id,
1532 object_index,
1533 property_id,
1534 });
1535
1536 Ok(())
1537 }
1538
1539 async fn handle_reset_req(
1541 &self,
1542 socket: &UdpSocket,
1543 client_addr: SocketAddr,
1544 connection: &TunnelConnection,
1545 ) -> KnxResult<()> {
1546 debug!(
1547 channel_id = connection.channel_id,
1548 "M_Reset.req — sending M_Reset.ind"
1549 );
1550
1551 let response_cemi = CemiFrame::reset_ind();
1552 self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1553
1554 let _ = self.event_tx.send(ServerEvent::DeviceReset {
1555 channel_id: connection.channel_id,
1556 });
1557
1558 Ok(())
1559 }
1560
1561 fn read_property(&self, object_index: u16, property_id: u16) -> Vec<u8> {
1566 match (object_index, property_id) {
1567 (0, 1) => vec![0x00, 0x00], (0, 11) => self.config.serial_number.to_vec(),
1572 (0, 12) => vec![0x00, 0x00],
1574 (0, 14) => vec![0x00],
1576 (0, 15) => vec![0x00; 10],
1578 (0, 9) => vec![0x01],
1580 (0, 7) => vec![0x00; 6],
1582 (0, 57) => vec![self.config.individual_address.area()],
1584 (0, 58) => vec![self.config.individual_address.device()],
1586 (0, 56) => vec![0x00, 0xFE], (11, 52) => self.config.individual_address.to_bytes().to_vec(),
1592 (11, 55) => {
1594 match self.config.bind_addr {
1595 SocketAddr::V4(v4) => v4.ip().octets().to_vec(),
1596 _ => vec![0, 0, 0, 0],
1597 }
1598 }
1599 (11, 7) => {
1601 let mut name = self.config.device_name.as_bytes().to_vec();
1602 name.truncate(30);
1603 name
1604 }
1605 (11, 8) => self.config.mac_address.to_vec(),
1607
1608 _ => vec![0x00, 0x00],
1610 }
1611 }
1612
1613 async fn emit_bus_monitor_frame(
1624 &self,
1625 socket: &UdpSocket,
1626 original_cemi: &CemiFrame,
1627 source_connection: &TunnelConnection,
1628 ) -> KnxResult<()> {
1629 let raw_frame = original_cemi.encode();
1631
1632 let status: u8 = if original_cemi.confirm { 0x80 } else { 0x00 };
1634
1635 let busmon_cemi = CemiFrame::bus_monitor_indication(&raw_frame, status, 0);
1637 let raw_frame_len = raw_frame.len();
1638
1639 let all_connections = self.connections.all();
1641 for conn in &all_connections {
1642 let target_addr = conn.data_endpoint;
1643 if let Err(e) = self.send_tunnelling_request(
1644 socket,
1645 target_addr,
1646 conn,
1647 busmon_cemi.clone(),
1648 ).await {
1649 debug!(
1650 error = %e,
1651 channel_id = conn.channel_id,
1652 "Failed to send bus monitor frame"
1653 );
1654 }
1655 }
1656
1657 let _ = self.event_tx.send(ServerEvent::BusMonitorFrameSent {
1658 channel_id: source_connection.channel_id,
1659 message_code: original_cemi.message_code as u8,
1660 raw_frame_len,
1661 });
1662
1663 Ok(())
1664 }
1665
1666 async fn broadcast_ldata_ind(
1676 &self,
1677 socket: &UdpSocket,
1678 original_cemi: &CemiFrame,
1679 source_connection: &TunnelConnection,
1680 group_addr: GroupAddress,
1681 ) -> KnxResult<()> {
1682 let all_connections = self.connections.all();
1683 let mut target_count = 0usize;
1684
1685 for conn in &all_connections {
1686 if conn.channel_id == source_connection.channel_id {
1688 continue;
1689 }
1690
1691 let ind_cemi = CemiFrame {
1693 message_code: MessageCode::LDataInd,
1694 additional_info: Vec::new(),
1695 source: original_cemi.source,
1696 destination: original_cemi.destination,
1697 address_type: original_cemi.address_type,
1698 hop_count: original_cemi.hop_count,
1699 priority: original_cemi.priority,
1700 confirm: false,
1701 ack_request: false,
1702 system_broadcast: original_cemi.system_broadcast,
1703 apci: original_cemi.apci,
1704 data: original_cemi.data.clone(),
1705 };
1706
1707 let target_addr = conn.data_endpoint;
1708 if let Err(e) = self.send_tunnelling_request(socket, target_addr, conn, ind_cemi).await {
1709 debug!(
1710 error = %e,
1711 channel_id = conn.channel_id,
1712 "Failed to broadcast L_Data.ind"
1713 );
1714 } else {
1715 target_count += 1;
1716 }
1717 }
1718
1719 if target_count > 0 {
1720 self.group_value_cache.on_indication(
1722 group_addr,
1723 original_cemi.data.clone(),
1724 Some(original_cemi.source.to_string()),
1725 );
1726
1727 let _ = self.event_tx.send(ServerEvent::DataIndBroadcast {
1728 source_channel_id: source_connection.channel_id,
1729 target_channel_count: target_count,
1730 group_address: group_addr,
1731 });
1732
1733 let _ = self.event_tx.send(ServerEvent::GroupValueCacheUpdated {
1734 address: group_addr,
1735 source: format!("indication from {}", original_cemi.source),
1736 cache_size: self.group_value_cache.len(),
1737 });
1738 }
1739
1740 Ok(())
1741 }
1742
1743 async fn send_tunnelling_request(
1757 &self,
1758 socket: &UdpSocket,
1759 client_addr: SocketAddr,
1760 connection: &TunnelConnection,
1761 cemi: CemiFrame,
1762 ) -> KnxResult<()> {
1763 if self.filter_chain.is_enabled() {
1765 let mut envelope = FrameEnvelope::new(
1766 cemi.clone(),
1767 connection.channel_id,
1768 client_addr,
1769 );
1770
1771 let result = self.filter_chain.send(&mut envelope);
1772
1773 match result {
1774 FilterResult::Pass { delay } => {
1775 if delay > Duration::ZERO {
1776 let _ = self.event_tx.send(ServerEvent::FrameDelayed {
1777 channel_id: connection.channel_id,
1778 delay_ms: delay.as_millis() as u64,
1779 pace_state: format!("{}", self.filter_chain.pace_state()),
1780 });
1781
1782 tokio::time::sleep(delay).await;
1783 }
1784
1785 let result = self.send_tunnelling_request_raw(
1787 socket, client_addr, connection, cemi,
1788 ).await;
1789
1790 match &result {
1791 Ok(()) => {
1792 self.filter_chain.on_send_success(connection.channel_id);
1793 self.error_tracker.on_send_success(connection.channel_id);
1794 }
1795 Err(e) => {
1796 self.filter_chain.on_send_failure(
1797 connection.channel_id,
1798 &e.to_string(),
1799 );
1800 self.handle_send_error_tracking(
1801 connection.channel_id,
1802 ErrorCategory::SendFailure,
1803 );
1804 }
1805 }
1806
1807 return result;
1808 }
1809 FilterResult::Queued => {
1810 let _ = self.event_tx.send(ServerEvent::FrameQueued {
1811 channel_id: connection.channel_id,
1812 priority: format!("{}", envelope.priority),
1813 queue_depth: self.filter_chain.pending_count(),
1814 });
1815 return Ok(());
1816 }
1817 FilterResult::Dropped { reason } => {
1818 let _ = self.event_tx.send(ServerEvent::FrameDropped {
1819 channel_id: connection.channel_id,
1820 reason: reason.clone(),
1821 });
1822 debug!(
1823 channel_id = connection.channel_id,
1824 reason = %reason,
1825 "Frame dropped by flow control"
1826 );
1827 return Ok(()); }
1829 FilterResult::Error { message } => {
1830 let _ = self.event_tx.send(ServerEvent::FrameDropped {
1831 channel_id: connection.channel_id,
1832 reason: message.clone(),
1833 });
1834 return Err(KnxError::FlowControlDrop { reason: message });
1835 }
1836 }
1837 }
1838
1839 self.send_tunnelling_request_raw(socket, client_addr, connection, cemi).await
1841 }
1842
1843 async fn send_tunnelling_request_raw(
1849 &self,
1850 socket: &UdpSocket,
1851 client_addr: SocketAddr,
1852 connection: &TunnelConnection,
1853 cemi: CemiFrame,
1854 ) -> KnxResult<()> {
1855 let seq = connection.next_send_sequence();
1856 let tunnel_req = TunnellingRequest::new(
1857 connection.channel_id,
1858 seq,
1859 cemi,
1860 );
1861 let frame = KnxFrame::new(ServiceType::TunnellingRequest, tunnel_req.encode());
1862
1863 if self.filter_chain.is_enabled() {
1865 self.filter_chain.queue_filter().set_waiting_for_ack(
1866 connection.channel_id,
1867 true,
1868 );
1869 }
1870
1871 if let Err(e) = socket.send_to(&frame.encode(), client_addr).await {
1872 debug!(error = %e, channel_id = connection.channel_id, "Failed to send tunnelling request");
1873 return Err(e.into());
1874 }
1875
1876 Ok(())
1877 }
1878
1879 async fn drain_queued_frames(
1885 &self,
1886 socket: &UdpSocket,
1887 connection: &TunnelConnection,
1888 ) {
1889 if !self.filter_chain.is_enabled() {
1890 return;
1891 }
1892
1893 let envelopes = self.filter_chain.drain_pending(connection.channel_id, 16);
1894
1895 if envelopes.is_empty() {
1896 return;
1897 }
1898
1899 let drained_count = envelopes.len();
1900
1901 for envelope in envelopes {
1902 let env = envelope;
1904 let pace_result = self.filter_chain.pace_filter().process_send(&env);
1905
1906 if let FilterResult::Pass { delay } = pace_result {
1907 if delay > Duration::ZERO {
1908 tokio::time::sleep(delay).await;
1909 }
1910 }
1911
1912 if let Err(e) = self.send_tunnelling_request_raw(
1913 socket,
1914 env.target_addr,
1915 connection,
1916 env.cemi,
1917 ).await {
1918 debug!(
1919 error = %e,
1920 channel_id = connection.channel_id,
1921 "Failed to send drained frame"
1922 );
1923 self.filter_chain.on_send_failure(
1924 connection.channel_id,
1925 &e.to_string(),
1926 );
1927 self.handle_send_error_tracking(
1928 connection.channel_id,
1929 ErrorCategory::SendFailure,
1930 );
1931 break;
1932 } else {
1933 self.filter_chain.on_send_success(connection.channel_id);
1934 self.error_tracker.on_send_success(connection.channel_id);
1935 }
1936 }
1937
1938 if drained_count > 0 {
1939 let _ = self.event_tx.send(ServerEvent::QueueDrained {
1940 channel_id: connection.channel_id,
1941 drained_count,
1942 });
1943 }
1944 }
1945
1946 fn handle_send_error_tracking(
1948 &self,
1949 channel_id: u8,
1950 category: ErrorCategory,
1951 ) {
1952 let result = self.error_tracker.on_send_failure(channel_id, category);
1953
1954 match result {
1955 TrackingResult::Recorded => {}
1956 TrackingResult::ConsecutiveThresholdExceeded {
1957 consecutive_errors,
1958 threshold,
1959 } => {
1960 warn!(
1961 channel_id,
1962 consecutive_errors,
1963 threshold,
1964 "Send error threshold exceeded — tunnel restart recommended"
1965 );
1966 let _ = self.event_tx.send(ServerEvent::SendErrorThreshold {
1967 channel_id,
1968 consecutive_errors,
1969 threshold,
1970 });
1971 }
1972 TrackingResult::RateThresholdExceeded {
1973 error_count,
1974 window_ms,
1975 rate,
1976 } => {
1977 warn!(
1978 channel_id,
1979 error_count,
1980 window_ms,
1981 rate_percent = rate,
1982 "Send error rate threshold exceeded"
1983 );
1984 let _ = self.event_tx.send(ServerEvent::SendErrorRateWarning {
1985 channel_id,
1986 error_count,
1987 window_ms,
1988 rate_percent: rate,
1989 });
1990 }
1991 }
1992 }
1993
1994 async fn send_ldata_con(
2001 &self,
2002 socket: &UdpSocket,
2003 client_addr: SocketAddr,
2004 connection: &TunnelConnection,
2005 original_cemi: &CemiFrame,
2006 success: bool,
2007 ) -> KnxResult<()> {
2008 let con_message_code = original_cemi.message_code
2010 .to_confirmation()
2011 .unwrap_or(MessageCode::LDataCon);
2012
2013 let con_cemi = CemiFrame {
2014 message_code: con_message_code,
2015 additional_info: Vec::new(),
2016 source: original_cemi.source,
2017 destination: original_cemi.destination,
2018 address_type: original_cemi.address_type,
2019 hop_count: original_cemi.hop_count,
2020 priority: original_cemi.priority,
2021 confirm: !success, ack_request: false,
2023 system_broadcast: original_cemi.system_broadcast,
2024 apci: original_cemi.apci,
2025 data: original_cemi.data.clone(),
2026 };
2027
2028 debug!(
2029 channel_id = connection.channel_id,
2030 success,
2031 message_code = ?con_message_code,
2032 destination = original_cemi.destination,
2033 "Sending confirmation frame"
2034 );
2035
2036 self.send_tunnelling_request(socket, client_addr, connection, con_cemi).await
2037 }
2038}
2039
2040fn rand_simple() -> f64 {
2043 use std::cell::Cell;
2044 thread_local! {
2045 static STATE: Cell<u64> = Cell::new(0x12345678_9ABCDEF0);
2046 }
2047 STATE.with(|s| {
2048 let mut x = s.get();
2049 x ^= x << 13;
2050 x ^= x >> 7;
2051 x ^= x << 17;
2052 s.set(x);
2053 (x as f64) / (u64::MAX as f64)
2054 })
2055}
2056
2057#[cfg(test)]
2058mod tests {
2059 use super::*;
2060
2061 #[test]
2062 fn test_connection_manager() {
2063 let manager = ConnectionManager::new(
2064 10,
2065 Duration::from_secs(60),
2066 IndividualAddress::new(1, 1, 0),
2067 );
2068
2069 let channel_id = manager.allocate_channel().unwrap();
2070 assert!(channel_id > 0);
2071
2072 let conn = manager.create_connection(
2073 channel_id,
2074 "192.168.1.100:3671".parse().unwrap(),
2075 "192.168.1.100:3672".parse().unwrap(),
2076 );
2077
2078 assert_eq!(conn.channel_id, channel_id);
2079 assert_eq!(manager.len(), 1);
2080
2081 assert!(conn.fsm.is_connected());
2083 assert!(conn.fsm.can_send());
2084
2085 manager.remove(channel_id);
2086 assert!(manager.is_empty());
2087 }
2088
2089 #[test]
2090 fn test_connection_with_sequence_tracker() {
2091 let manager = ConnectionManager::new(
2092 10,
2093 Duration::from_secs(60),
2094 IndividualAddress::new(1, 1, 0),
2095 );
2096
2097 let channel_id = manager.allocate_channel().unwrap();
2098 let conn = manager.create_connection(
2099 channel_id,
2100 "192.168.1.100:3671".parse().unwrap(),
2101 "192.168.1.100:3672".parse().unwrap(),
2102 );
2103
2104 let validation = conn.validate_recv_sequence(0);
2106 assert!(matches!(validation, ReceivedValidation::Valid { sequence: 0 }));
2107
2108 let validation = conn.validate_recv_sequence(0);
2109 assert!(matches!(validation, ReceivedValidation::Duplicate { .. }));
2110
2111 let validation = conn.validate_recv_sequence(1);
2112 assert!(matches!(validation, ReceivedValidation::Valid { sequence: 1 }));
2113
2114 assert_eq!(conn.next_send_sequence(), 0);
2116 assert_eq!(conn.next_send_sequence(), 1);
2117 }
2118
2119 #[test]
2120 fn test_server_config() {
2121 let config = KnxServerConfig::default();
2122 assert!(config.validate().is_ok());
2123 assert!(config.tunneling_enabled);
2124 assert!(config.tunnel_behavior.ldata_con_enabled);
2125 assert_eq!(config.tunnel_behavior.confirmation_success_rate, 1.0);
2126 }
2127
2128 #[test]
2129 fn test_connection_fsm_lifecycle() {
2130 let manager = ConnectionManager::new(
2131 10,
2132 Duration::from_secs(60),
2133 IndividualAddress::new(1, 1, 0),
2134 );
2135
2136 let channel_id = manager.allocate_channel().unwrap();
2137 let conn = manager.create_connection(
2138 channel_id,
2139 "192.168.1.100:3671".parse().unwrap(),
2140 "192.168.1.100:3672".parse().unwrap(),
2141 );
2142
2143 use crate::tunnel::TunnelState;
2145 assert!(matches!(conn.fsm.state(), TunnelState::Idle));
2146
2147 conn.fsm.on_frame_sent(0);
2149 assert!(matches!(conn.fsm.state(), TunnelState::WaitingForAck { sequence: 0, .. }));
2150
2151 conn.fsm.on_ack_received(0);
2153 assert!(matches!(conn.fsm.state(), TunnelState::WaitingForConfirmation { .. }));
2154
2155 conn.fsm.on_confirmation_received();
2157 assert!(matches!(conn.fsm.state(), TunnelState::Idle));
2158 }
2159
2160 #[test]
2161 fn test_connection_reset() {
2162 let manager = ConnectionManager::new(
2163 10,
2164 Duration::from_secs(60),
2165 IndividualAddress::new(1, 1, 0),
2166 );
2167
2168 let channel_id = manager.allocate_channel().unwrap();
2169 let conn = manager.create_connection(
2170 channel_id,
2171 "192.168.1.100:3671".parse().unwrap(),
2172 "192.168.1.100:3672".parse().unwrap(),
2173 );
2174
2175 conn.validate_recv_sequence(0);
2176 conn.validate_recv_sequence(1);
2177 conn.next_send_sequence();
2178
2179 conn.reset();
2180
2181 assert_eq!(conn.sequence_tracker.current_rno(), 0);
2182 assert_eq!(conn.sequence_tracker.current_sno(), 0);
2183 }
2184
2185 #[test]
2186 fn test_desync_threshold_custom() {
2187 let manager = ConnectionManager::new(
2188 10,
2189 Duration::from_secs(60),
2190 IndividualAddress::new(1, 1, 0),
2191 ).with_desync_threshold(10);
2192
2193 let channel_id = manager.allocate_channel().unwrap();
2194 let conn = manager.create_connection(
2195 channel_id,
2196 "192.168.1.100:3671".parse().unwrap(),
2197 "192.168.1.100:3672".parse().unwrap(),
2198 );
2199
2200 let validation = conn.validate_recv_sequence(5);
2202 assert!(matches!(validation, ReceivedValidation::OutOfOrder { .. }));
2203 }
2204
2205 #[test]
2206 fn test_rand_simple_range() {
2207 for _ in 0..100 {
2208 let v = rand_simple();
2209 assert!(v >= 0.0 && v <= 1.0);
2210 }
2211 }
2212
2213 #[test]
2214 fn test_server_config_new_fields() {
2215 let config = KnxServerConfig::default();
2216 assert!(!config.tunnel_behavior.bus_monitor_enabled);
2217 assert!(config.tunnel_behavior.ldata_ind_broadcast_enabled);
2218 assert!(config.tunnel_behavior.property_service_enabled);
2219 assert!(config.tunnel_behavior.reset_service_enabled);
2220 }
2221
2222 #[test]
2223 fn test_read_property_device_object() {
2224 let config = KnxServerConfig::default()
2225 .with_individual_address(IndividualAddress::new(1, 2, 3));
2226 let server = KnxServer::new(config);
2227
2228 let serial = server.read_property(0, 11);
2230 assert_eq!(serial.len(), 6);
2231
2232 let subnet = server.read_property(0, 57);
2234 assert_eq!(subnet, vec![1]); let device = server.read_property(0, 58);
2238 assert_eq!(device, vec![3]); let max_apdu = server.read_property(0, 56);
2242 assert_eq!(max_apdu, vec![0x00, 0xFE]); let unknown = server.read_property(99, 99);
2246 assert_eq!(unknown, vec![0x00, 0x00]);
2247 }
2248
2249 #[test]
2250 fn test_read_property_knxnet_ip_object() {
2251 let config = KnxServerConfig::default()
2252 .with_individual_address(IndividualAddress::new(1, 2, 3))
2253 .with_device_name("Test Device");
2254 let server = KnxServer::new(config);
2255
2256 let addr = server.read_property(11, 52);
2258 assert_eq!(addr, IndividualAddress::new(1, 2, 3).to_bytes().to_vec());
2259
2260 let name = server.read_property(11, 7);
2262 assert_eq!(name, b"Test Device".to_vec());
2263
2264 let mac = server.read_property(11, 8);
2266 assert_eq!(mac.len(), 6);
2267 }
2268
2269 #[test]
2270 fn test_compute_confirmation_success() {
2271 let mut config = KnxServerConfig::default();
2272 config.tunnel_behavior.confirmation_success_rate = 1.0;
2273 let server = KnxServer::new(config.clone());
2274
2275 assert!(server.compute_confirmation_success(true));
2277 assert!(!server.compute_confirmation_success(false));
2278
2279 config.tunnel_behavior.confirmation_success_rate = 0.0;
2281 let server = KnxServer::new(config);
2282 assert!(!server.compute_confirmation_success(true));
2283 assert!(!server.compute_confirmation_success(false));
2284 }
2285
2286 #[test]
2287 fn test_server_event_variants() {
2288 let events: Vec<ServerEvent> = vec![
2290 ServerEvent::BusMonitorFrameSent {
2291 channel_id: 1,
2292 message_code: 0x11,
2293 raw_frame_len: 10,
2294 },
2295 ServerEvent::PropertyRead {
2296 channel_id: 1,
2297 object_index: 0,
2298 property_id: 11,
2299 },
2300 ServerEvent::PropertyWrite {
2301 channel_id: 1,
2302 object_index: 0,
2303 property_id: 14,
2304 },
2305 ServerEvent::DeviceReset { channel_id: 1 },
2306 ServerEvent::DataIndBroadcast {
2307 source_channel_id: 1,
2308 target_channel_count: 3,
2309 group_address: GroupAddress::three_level(1, 0, 1),
2310 },
2311 ];
2312
2313 for event in &events {
2314 let _ = format!("{:?}", event);
2316 }
2317 assert_eq!(events.len(), 5);
2318 }
2319}