Skip to main content

mabi_knx/
server.rs

1//! KNXnet/IP Server with production-grade tunnel FSM, sequence validation, and L_Data.con.
2//!
3//! This module provides a complete KNXnet/IP server supporting:
4//! - knxd-compatible sequence validation (Duplicate/OutOfOrder/FatalDesync)
5//! - L_Data.con confirmation flow (MC=0x2E with Ctrl1 bit 0 success/failure)
6//! - 7-state per-connection FSM mapped to knxd mod 0-3
7//! - Configurable heartbeat status codes for reconnection testing
8//! - Server→client ACK tracking with timeout/retry
9//! - Bus Monitor frame generation (MC=0x2B) with Additional Info TLV
10//! - cEMI property service handling (M_PropRead, M_PropWrite, M_Reset)
11//! - L_Data.ind broadcast to other tunnel connections
12//! - Flow control filter chain (PaceFilter, QueueFilter, RetryFilter)
13//!   for realistic KNX bus timing simulation
14//! - Unified 62-field metrics collection with Prometheus exposition format
15//! - 9-rule automatic diagnostic analysis with configurable thresholds
16
17use std::net::SocketAddr;
18use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
19use std::sync::Arc;
20use std::time::Duration;
21
22use dashmap::DashMap;
23use tokio::net::UdpSocket;
24use tokio::sync::{broadcast, mpsc};
25use tracing::{debug, error, info, trace, warn};
26
27use crate::address::{GroupAddress, IndividualAddress};
28use crate::cemi::{Apci, CemiFrame, MessageCode};
29use crate::config::KnxServerConfig;
30use crate::error::{KnxError, KnxResult};
31use crate::error_tracker::{SendErrorTracker, ErrorCategory, TrackingResult};
32use crate::filter::{
33    FilterChain, FilterResult, FrameEnvelope,
34    CircuitBreakerState, PaceState,
35};
36use crate::frame::{
37    DibDeviceInfo, Hpai, KnxFrame, ServiceType, SupportedServiceFamilies,
38};
39use crate::group::GroupObjectTable;
40use crate::diagnostics::{KnxDiagnostics, DiagnosticConfig};
41use crate::group_cache::GroupValueCache;
42use crate::heartbeat::HeartbeatScheduler;
43use crate::metrics::{KnxMetricsCollector, KnxMetricsSnapshot, ConnectionMetricsSnapshot};
44use crate::tunnel::{
45    ConnectRequest, ConnectResponse, ConnectStatus, ConnectionResponseData,
46    ConnectionStateRequest, ConnectionStateResponse, DisconnectRequest, DisconnectResponse,
47    TunnelConnection, TunnellingAck, TunnellingRequest,
48    ReceivedValidation, AckMessage,
49};
50
51// ============================================================================
52// Server Event
53// ============================================================================
54
55/// Server event.
56#[derive(Debug, Clone)]
57pub enum ServerEvent {
58    /// Server started.
59    Started { address: SocketAddr },
60    /// Server stopped.
61    Stopped,
62    /// Client connected.
63    ClientConnected {
64        channel_id: u8,
65        address: SocketAddr,
66    },
67    /// Client disconnected.
68    ClientDisconnected { channel_id: u8 },
69    /// Group value written.
70    GroupValueWrite {
71        address: GroupAddress,
72        value: Vec<u8>,
73        source: IndividualAddress,
74    },
75    /// Group value read.
76    GroupValueRead {
77        address: GroupAddress,
78        source: IndividualAddress,
79    },
80    /// Sequence validation event.
81    SequenceEvent {
82        channel_id: u8,
83        validation: SequenceEventType,
84    },
85    /// L_Data.con sent.
86    ConfirmationSent {
87        channel_id: u8,
88        success: bool,
89    },
90    /// Bus monitor frame sent.
91    BusMonitorFrameSent {
92        channel_id: u8,
93        message_code: u8,
94        raw_frame_len: usize,
95    },
96    /// Property read request processed.
97    PropertyRead {
98        channel_id: u8,
99        object_index: u16,
100        property_id: u16,
101    },
102    /// Property write request processed.
103    PropertyWrite {
104        channel_id: u8,
105        object_index: u16,
106        property_id: u16,
107    },
108    /// Device reset requested.
109    DeviceReset { channel_id: u8 },
110    /// L_Data.ind broadcast to other connections.
111    DataIndBroadcast {
112        source_channel_id: u8,
113        target_channel_count: usize,
114        group_address: GroupAddress,
115    },
116    /// Frame delayed by PaceFilter (bus timing simulation).
117    FrameDelayed {
118        channel_id: u8,
119        delay_ms: u64,
120        pace_state: String,
121    },
122    /// Frame queued by QueueFilter (backpressure active).
123    FrameQueued {
124        channel_id: u8,
125        priority: String,
126        queue_depth: usize,
127    },
128    /// Frame dropped by flow control filter.
129    FrameDropped {
130        channel_id: u8,
131        reason: String,
132    },
133    /// Circuit breaker state changed.
134    CircuitBreakerStateChanged {
135        new_state: String,
136        failure_count: u32,
137    },
138    /// Queued frames drained after ACK received.
139    QueueDrained {
140        channel_id: u8,
141        drained_count: usize,
142    },
143    /// Heartbeat action taken (non-Continue).
144    HeartbeatAction {
145        channel_id: u8,
146        action: String,
147        status_code: Option<u8>,
148    },
149    /// Heartbeat suppressed (NoResponse action).
150    HeartbeatSuppressed {
151        channel_id: u8,
152    },
153    /// Group value cache updated.
154    GroupValueCacheUpdated {
155        address: GroupAddress,
156        source: String,
157        cache_size: usize,
158    },
159    /// Send error threshold exceeded — tunnel restart recommended.
160    SendErrorThreshold {
161        channel_id: u8,
162        consecutive_errors: u32,
163        threshold: u32,
164    },
165    /// Send error rate warning.
166    SendErrorRateWarning {
167        channel_id: u8,
168        error_count: usize,
169        window_ms: u64,
170        rate_percent: u32,
171    },
172    /// Error occurred.
173    Error { message: String },
174}
175
176/// Sequence validation event type for observability.
177#[derive(Debug, Clone)]
178pub enum SequenceEventType {
179    /// Valid sequence processed.
180    Valid { sequence: u8 },
181    /// Duplicate frame detected (ACK sent, not processed).
182    Duplicate { sequence: u8, expected: u8 },
183    /// Out-of-order frame (processed with warning).
184    OutOfOrder { sequence: u8, expected: u8, distance: u8 },
185    /// Fatal desync — tunnel restart required.
186    FatalDesync { sequence: u8, expected: u8, distance: u8 },
187}
188
189// ============================================================================
190// Server State
191// ============================================================================
192
193/// Server state.
194#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
195pub enum ServerState {
196    /// Server is stopped.
197    #[default]
198    Stopped,
199    /// Server is starting.
200    Starting,
201    /// Server is running.
202    Running,
203    /// Server is stopping.
204    Stopping,
205}
206
207// ============================================================================
208// Connection Manager
209// ============================================================================
210
211/// Manages tunnel connections with per-connection FSM and sequence tracking.
212pub struct ConnectionManager {
213    connections: DashMap<u8, Arc<TunnelConnection>>,
214    next_channel_id: AtomicU8,
215    max_connections: usize,
216    heartbeat_timeout: Duration,
217    individual_address_base: IndividualAddress,
218    desync_threshold: u8,
219}
220
221impl ConnectionManager {
222    /// Create a new connection manager.
223    pub fn new(
224        max_connections: usize,
225        heartbeat_timeout: Duration,
226        individual_address_base: IndividualAddress,
227    ) -> Self {
228        Self {
229            connections: DashMap::new(),
230            next_channel_id: AtomicU8::new(1),
231            max_connections,
232            heartbeat_timeout,
233            individual_address_base,
234            desync_threshold: 5,
235        }
236    }
237
238    /// Create with custom fatal desync threshold.
239    pub fn with_desync_threshold(mut self, threshold: u8) -> Self {
240        self.desync_threshold = threshold;
241        self
242    }
243
244    /// Allocate a new channel.
245    pub fn allocate_channel(&self) -> Option<u8> {
246        if self.connections.len() >= self.max_connections {
247            return None;
248        }
249
250        // Find next available channel ID
251        for _ in 0..255 {
252            let channel_id = self.next_channel_id.fetch_add(1, Ordering::SeqCst);
253            if channel_id != 0 && !self.connections.contains_key(&channel_id) {
254                return Some(channel_id);
255            }
256        }
257
258        None
259    }
260
261    /// Create a new connection with Phase 1 components.
262    pub fn create_connection(
263        &self,
264        channel_id: u8,
265        client_addr: SocketAddr,
266        data_endpoint: SocketAddr,
267    ) -> Arc<TunnelConnection> {
268        // Assign individual address based on channel
269        let individual_address = IndividualAddress::new(
270            self.individual_address_base.area(),
271            self.individual_address_base.line(),
272            100 + channel_id,
273        );
274
275        let connection = Arc::new(TunnelConnection::with_desync_threshold(
276            channel_id,
277            client_addr,
278            data_endpoint,
279            individual_address,
280            self.heartbeat_timeout,
281            self.desync_threshold,
282        ));
283
284        // Transition FSM: Connecting → Idle (handshake completed at this point)
285        connection.fsm.on_connected();
286
287        self.connections.insert(channel_id, connection.clone());
288        connection
289    }
290
291    /// Get a connection.
292    pub fn get(&self, channel_id: u8) -> Option<Arc<TunnelConnection>> {
293        self.connections.get(&channel_id).map(|c| c.clone())
294    }
295
296    /// Remove a connection.
297    pub fn remove(&self, channel_id: u8) -> Option<Arc<TunnelConnection>> {
298        self.connections.remove(&channel_id).map(|(_, c)| {
299            c.fsm.on_disconnected();
300            c
301        })
302    }
303
304    /// Get all connections.
305    pub fn all(&self) -> Vec<Arc<TunnelConnection>> {
306        self.connections.iter().map(|r| r.value().clone()).collect()
307    }
308
309    /// Get connection count.
310    pub fn len(&self) -> usize {
311        self.connections.len()
312    }
313
314    /// Check if empty.
315    pub fn is_empty(&self) -> bool {
316        self.connections.is_empty()
317    }
318
319    /// Clean up timed out connections.
320    pub fn cleanup_timed_out(&self) -> Vec<u8> {
321        let timed_out: Vec<_> = self
322            .connections
323            .iter()
324            .filter(|r| r.value().is_timed_out())
325            .map(|r| *r.key())
326            .collect();
327
328        for channel_id in &timed_out {
329            if let Some((_, conn)) = self.connections.remove(channel_id) {
330                conn.fsm.on_disconnected();
331            }
332        }
333
334        timed_out
335    }
336}
337
338// ============================================================================
339// KNX Server
340// ============================================================================
341
342/// KNXnet/IP Server with full Phase 1-5 support.
343///
344/// Phase 4 additions:
345/// - HeartbeatScheduler: 5 heartbeat action types with configurable scheduling
346/// - GroupValueCache: TTL/LRU cache with auto-update on indication
347/// - SendErrorTracker: Consecutive and sliding window error rate monitoring
348///
349/// Phase 5 additions:
350/// - KnxMetricsCollector: Unified 62-field metrics snapshot with Prometheus exposition
351/// - KnxDiagnostics: 9-rule automatic health analysis with configurable thresholds
352/// - ConnectionMetricsSnapshot: Per-connection tunnel/FSM/sequence detail export
353pub struct KnxServer {
354    config: KnxServerConfig,
355    state: parking_lot::RwLock<ServerState>,
356    connections: ConnectionManager,
357    group_objects: Arc<GroupObjectTable>,
358    filter_chain: FilterChain,
359    heartbeat_scheduler: HeartbeatScheduler,
360    group_value_cache: GroupValueCache,
361    error_tracker: SendErrorTracker,
362    metrics_collector: KnxMetricsCollector,
363    event_tx: broadcast::Sender<ServerEvent>,
364    shutdown_tx: parking_lot::Mutex<Option<mpsc::Sender<()>>>,
365    running: AtomicBool,
366}
367
368impl KnxServer {
369    /// Create a new KNX server.
370    pub fn new(config: KnxServerConfig) -> Self {
371        let (event_tx, _) = broadcast::channel(1000);
372        let desync_threshold = config.tunnel_behavior.sequence_validation_enabled
373            .then_some(5u8)
374            .unwrap_or(255); // 255 effectively disables desync detection
375
376        let filter_chain = FilterChain::new(config.tunnel_behavior.flow_control.clone());
377
378        // Phase 4: initialize heartbeat scheduler
379        let heartbeat_scheduler = if config.tunnel_behavior.heartbeat_scheduler.enabled {
380            HeartbeatScheduler::new(
381                config.tunnel_behavior.heartbeat_scheduler.schedule.clone(),
382            )
383        } else {
384            HeartbeatScheduler::normal()
385        };
386
387        // Phase 4: initialize group value cache
388        let group_value_cache = GroupValueCache::new(
389            config.tunnel_behavior.group_value_cache.clone(),
390        );
391
392        // Phase 4: initialize send error tracker
393        let error_tracker = SendErrorTracker::new(
394            config.tunnel_behavior.send_error_tracker.clone(),
395        );
396
397        Self {
398            connections: ConnectionManager::new(
399                config.max_connections,
400                config.connection_timeout(),
401                config.individual_address,
402            ).with_desync_threshold(desync_threshold),
403            filter_chain,
404            heartbeat_scheduler,
405            group_value_cache,
406            error_tracker,
407            metrics_collector: KnxMetricsCollector::new(),
408            config,
409            state: parking_lot::RwLock::new(ServerState::Stopped),
410            group_objects: Arc::new(GroupObjectTable::new()),
411            event_tx,
412            shutdown_tx: parking_lot::Mutex::new(None),
413            running: AtomicBool::new(false),
414        }
415    }
416
417    /// Create with custom group object table.
418    pub fn with_group_objects(mut self, table: Arc<GroupObjectTable>) -> Self {
419        self.group_objects = table;
420        self
421    }
422
423    /// Get server state.
424    pub fn state(&self) -> ServerState {
425        *self.state.read()
426    }
427
428    /// Check if running.
429    pub fn is_running(&self) -> bool {
430        self.running.load(Ordering::SeqCst)
431    }
432
433    /// Get config.
434    pub fn config(&self) -> &KnxServerConfig {
435        &self.config
436    }
437
438    /// Get group object table.
439    pub fn group_objects(&self) -> Arc<GroupObjectTable> {
440        self.group_objects.clone()
441    }
442
443    /// Subscribe to server events.
444    pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
445        self.event_tx.subscribe()
446    }
447
448    /// Get connection count.
449    pub fn connection_count(&self) -> usize {
450        self.connections.len()
451    }
452
453    /// Get a connection by channel ID (for testing/diagnostics).
454    pub fn get_connection(&self, channel_id: u8) -> Option<Arc<TunnelConnection>> {
455        self.connections.get(channel_id)
456    }
457
458    /// Get a reference to the flow control filter chain.
459    pub fn filter_chain(&self) -> &FilterChain {
460        &self.filter_chain
461    }
462
463    /// Get the current PaceFilter state.
464    pub fn pace_state(&self) -> PaceState {
465        self.filter_chain.pace_state()
466    }
467
468    /// Get the current CircuitBreaker state.
469    pub fn circuit_breaker_state(&self) -> CircuitBreakerState {
470        self.filter_chain.circuit_breaker_state()
471    }
472
473    /// Get a reference to the heartbeat scheduler.
474    pub fn heartbeat_scheduler(&self) -> &HeartbeatScheduler {
475        &self.heartbeat_scheduler
476    }
477
478    /// Get a reference to the group value cache.
479    pub fn group_value_cache(&self) -> &GroupValueCache {
480        &self.group_value_cache
481    }
482
483    /// Get a reference to the send error tracker.
484    pub fn error_tracker(&self) -> &SendErrorTracker {
485        &self.error_tracker
486    }
487
488    /// Get a reference to the metrics collector.
489    pub fn metrics_collector(&self) -> &KnxMetricsCollector {
490        &self.metrics_collector
491    }
492
493    // ========================================================================
494    // Phase 5: Observability & Diagnostics
495    // ========================================================================
496
497    /// Collect a unified metrics snapshot from all server components.
498    ///
499    /// This gathers statistics from every subsystem (heartbeat, cache,
500    /// error tracker, filter chain, pace/queue/retry filters, tunnel
501    /// connections) and produces a single [`KnxMetricsSnapshot`] with
502    /// 62 fields. The snapshot can be exported as Prometheus text, JSON,
503    /// or YAML.
504    ///
505    /// # Performance
506    ///
507    /// All underlying counters use `Ordering::Relaxed` atomic loads,
508    /// so this method is lock-free and safe to call at any frequency
509    /// without impacting server performance.
510    ///
511    /// # Example
512    ///
513    /// ```rust,ignore
514    /// let snapshot = server.metrics_snapshot();
515    /// println!("{}", snapshot.to_prometheus());
516    /// println!("{}", snapshot.summary());
517    /// ```
518    pub fn metrics_snapshot(&self) -> KnxMetricsSnapshot {
519        // Server-level state
520        let server_state = match self.state() {
521            ServerState::Stopped => 0u8,
522            ServerState::Starting => 1,
523            ServerState::Running => 2,
524            ServerState::Stopping => 3,
525        };
526
527        // Phase 4 component snapshots
528        let heartbeat = self.heartbeat_scheduler.stats_snapshot();
529        let cache = self.group_value_cache.stats_snapshot();
530        let cache_entries = self.group_value_cache.len();
531        let error_tracker = self.error_tracker.stats_snapshot();
532
533        // Phase 3 filter chain snapshots
534        let filter_chain = self.filter_chain.stats_snapshot();
535        let pace = self.filter_chain.pace_filter().stats_snapshot();
536        let queue = self.filter_chain.queue_filter().stats_snapshot();
537        let retry = self.filter_chain.retry_filter().stats_snapshot();
538
539        // Per-connection tunnel snapshots (aggregated)
540        let connections = self.connections.all();
541        let seq_stats: Vec<_> = connections
542            .iter()
543            .map(|c| c.sequence_tracker.stats_snapshot())
544            .collect();
545        let fsm_stats: Vec<_> = connections
546            .iter()
547            .map(|c| c.fsm.stats_snapshot())
548            .collect();
549
550        self.metrics_collector.collect(
551            server_state,
552            connections.len(),
553            self.config.max_connections,
554            &heartbeat,
555            &cache,
556            cache_entries,
557            &error_tracker,
558            &filter_chain,
559            &pace,
560            &queue,
561            &retry,
562            &seq_stats,
563            &fsm_stats,
564        )
565    }
566
567    /// Run automatic diagnostic analysis on current server metrics.
568    ///
569    /// Evaluates all 9 diagnostic rules using default thresholds and
570    /// returns a [`KnxDiagnostics`] result with severity, messages,
571    /// and recommendations for each rule.
572    ///
573    /// # Example
574    ///
575    /// ```rust,ignore
576    /// let diag = server.diagnostics();
577    /// if !diag.is_healthy() {
578    ///     for problem in diag.problems() {
579    ///         eprintln!("[{}] {}: {}", problem.severity, problem.rule, problem.message);
580    ///         eprintln!("  → {}", problem.recommendation);
581    ///     }
582    /// }
583    /// ```
584    pub fn diagnostics(&self) -> KnxDiagnostics {
585        let snapshot = self.metrics_snapshot();
586        KnxDiagnostics::analyze(&snapshot)
587    }
588
589    /// Run diagnostic analysis with custom thresholds.
590    ///
591    /// Allows tuning warning/critical thresholds for each of the 9
592    /// diagnostic rules. See [`DiagnosticConfig`] for available options.
593    pub fn diagnostics_with_config(&self, config: &DiagnosticConfig) -> KnxDiagnostics {
594        let snapshot = self.metrics_snapshot();
595        KnxDiagnostics::analyze_with_config(&snapshot, config)
596    }
597
598    /// Collect per-connection metrics snapshots for detailed diagnostics.
599    ///
600    /// Returns a [`ConnectionMetricsSnapshot`] for each active tunnel
601    /// connection, containing FSM state, sequence statistics, idle
602    /// duration, and timeout status.
603    pub fn connection_metrics(&self) -> Vec<ConnectionMetricsSnapshot> {
604        self.connections
605            .all()
606            .iter()
607            .map(|conn| {
608                let seq = conn.sequence_tracker.stats_snapshot();
609                let fsm = conn.fsm.stats_snapshot();
610                ConnectionMetricsSnapshot {
611                    channel_id: conn.channel_id,
612                    individual_address: conn.individual_address.to_string(),
613                    fsm_state: conn.fsm.state().to_string(),
614                    fsm_transitions: fsm.transitions,
615                    frames_sent: seq.frames_sent,
616                    frames_received: seq.frames_received,
617                    duplicates_detected: seq.duplicates_detected,
618                    out_of_order_detected: seq.out_of_order_detected,
619                    fatal_desyncs: seq.fatal_desyncs,
620                    resets: seq.resets,
621                    idle_duration_ms: conn.idle_duration().as_millis() as u64,
622                    is_timed_out: conn.is_timed_out(),
623                }
624            })
625            .collect()
626    }
627
628    /// Start the server.
629    pub async fn start(&self) -> KnxResult<()> {
630        if self.is_running() {
631            return Err(KnxError::ServerAlreadyRunning);
632        }
633
634        *self.state.write() = ServerState::Starting;
635
636        // Bind UDP socket
637        let socket = UdpSocket::bind(&self.config.bind_addr).await.map_err(|e| {
638            KnxError::BindError {
639                address: self.config.bind_addr,
640                reason: e.to_string(),
641            }
642        })?;
643
644        let local_addr = socket.local_addr()?;
645        info!(address = %local_addr, "KNXnet/IP server started");
646
647        self.running.store(true, Ordering::SeqCst);
648        *self.state.write() = ServerState::Running;
649
650        let _ = self.event_tx.send(ServerEvent::Started {
651            address: local_addr,
652        });
653
654        // Create shutdown channel
655        let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
656        *self.shutdown_tx.lock() = Some(shutdown_tx);
657
658        let socket = Arc::new(socket);
659
660        // Start receive loop
661        let mut buf = vec![0u8; 1024];
662
663        loop {
664            tokio::select! {
665                result = socket.recv_from(&mut buf) => {
666                    match result {
667                        Ok((len, addr)) => {
668                            if let Err(e) = self.handle_packet(&socket, &buf[..len], addr).await {
669                                debug!(error = %e, "Error handling packet");
670                            }
671                        }
672                        Err(e) => {
673                            error!(error = %e, "Error receiving packet");
674                        }
675                    }
676                }
677                _ = shutdown_rx.recv() => {
678                    info!("Server shutdown requested");
679                    break;
680                }
681            }
682        }
683
684        self.running.store(false, Ordering::SeqCst);
685        *self.state.write() = ServerState::Stopped;
686        let _ = self.event_tx.send(ServerEvent::Stopped);
687
688        Ok(())
689    }
690
691    /// Stop the server.
692    pub async fn stop(&self) -> KnxResult<()> {
693        if !self.is_running() {
694            return Ok(());
695        }
696
697        *self.state.write() = ServerState::Stopping;
698
699        if let Some(tx) = self.shutdown_tx.lock().take() {
700            let _ = tx.send(()).await;
701        }
702
703        Ok(())
704    }
705
706    /// Handle incoming packet.
707    async fn handle_packet(
708        &self,
709        socket: &UdpSocket,
710        data: &[u8],
711        addr: SocketAddr,
712    ) -> KnxResult<()> {
713        let frame = KnxFrame::decode(data)?;
714
715        trace!(
716            service_type = ?frame.service_type,
717            from = %addr,
718            "Received KNXnet/IP frame"
719        );
720
721        match frame.service_type {
722            ServiceType::SearchRequest => {
723                self.handle_search_request(socket, addr).await?;
724            }
725            ServiceType::DescriptionRequest => {
726                self.handle_description_request(socket, addr).await?;
727            }
728            ServiceType::ConnectRequest => {
729                self.handle_connect_request(socket, &frame.body, addr).await?;
730            }
731            ServiceType::ConnectionStateRequest => {
732                self.handle_connection_state_request(socket, &frame.body, addr)
733                    .await?;
734            }
735            ServiceType::DisconnectRequest => {
736                self.handle_disconnect_request(socket, &frame.body, addr)
737                    .await?;
738            }
739            ServiceType::TunnellingRequest => {
740                self.handle_tunnelling_request(socket, &frame.body, addr)
741                    .await?;
742            }
743            ServiceType::TunnellingAck => {
744                if self.filter_chain.is_enabled() {
745                    self.handle_tunnelling_ack_async(socket, &frame.body).await?;
746                } else {
747                    self.handle_tunnelling_ack(&frame.body)?;
748                }
749            }
750            _ => {
751                debug!(service_type = ?frame.service_type, "Unhandled service type");
752            }
753        }
754
755        Ok(())
756    }
757
758    // ========================================================================
759    // Discovery handlers (unchanged)
760    // ========================================================================
761
762    async fn handle_search_request(
763        &self,
764        socket: &UdpSocket,
765        addr: SocketAddr,
766    ) -> KnxResult<()> {
767        debug!(from = %addr, "Handling SearchRequest");
768
769        let local_addr = socket.local_addr()?;
770        let local_ip = match local_addr {
771            SocketAddr::V4(v4) => *v4.ip(),
772            _ => std::net::Ipv4Addr::UNSPECIFIED,
773        };
774
775        let hpai = Hpai::udp_ipv4(local_ip, local_addr.port());
776        let device_info = DibDeviceInfo::new(&self.config.device_name, self.config.individual_address)
777            .with_serial_number(self.config.serial_number)
778            .with_mac_address(self.config.mac_address);
779        let families = SupportedServiceFamilies::default_families();
780
781        let mut body = hpai.encode();
782        body.extend(device_info.encode());
783        body.extend(families.encode());
784
785        let response = KnxFrame::new(ServiceType::SearchResponse, body);
786        socket.send_to(&response.encode(), addr).await?;
787
788        Ok(())
789    }
790
791    async fn handle_description_request(
792        &self,
793        socket: &UdpSocket,
794        addr: SocketAddr,
795    ) -> KnxResult<()> {
796        debug!(from = %addr, "Handling DescriptionRequest");
797
798        let device_info = DibDeviceInfo::new(&self.config.device_name, self.config.individual_address)
799            .with_serial_number(self.config.serial_number)
800            .with_mac_address(self.config.mac_address);
801        let families = SupportedServiceFamilies::default_families();
802
803        let mut body = device_info.encode();
804        body.extend(families.encode());
805
806        let response = KnxFrame::new(ServiceType::DescriptionResponse, body);
807        socket.send_to(&response.encode(), addr).await?;
808
809        Ok(())
810    }
811
812    // ========================================================================
813    // Connection lifecycle
814    // ========================================================================
815
816    async fn handle_connect_request(
817        &self,
818        socket: &UdpSocket,
819        data: &[u8],
820        addr: SocketAddr,
821    ) -> KnxResult<()> {
822        let request = ConnectRequest::decode(data)?;
823        debug!(from = %addr, "Handling ConnectRequest");
824
825        let channel_id = match self.connections.allocate_channel() {
826            Some(id) => id,
827            None => {
828                let response = ConnectResponse::error(ConnectStatus::NoMoreConnections);
829                let frame = KnxFrame::new(ServiceType::ConnectResponse, response.encode());
830                socket.send_to(&frame.encode(), addr).await?;
831                return Ok(());
832            }
833        };
834
835        let data_endpoint = if request.data_endpoint.is_nat() {
836            addr
837        } else {
838            request.data_endpoint.to_socket_addr_v()
839        };
840
841        let connection = self.connections.create_connection(channel_id, addr, data_endpoint);
842
843        let local_addr = socket.local_addr()?;
844        let local_ip = match local_addr {
845            SocketAddr::V4(v4) => *v4.ip(),
846            _ => std::net::Ipv4Addr::UNSPECIFIED,
847        };
848
849        let response = ConnectResponse::success(
850            channel_id,
851            Hpai::udp_ipv4(local_ip, local_addr.port()),
852            ConnectionResponseData::new(connection.individual_address),
853        );
854
855        let frame = KnxFrame::new(ServiceType::ConnectResponse, response.encode());
856        socket.send_to(&frame.encode(), addr).await?;
857
858        info!(
859            channel_id,
860            client = %addr,
861            individual_address = %connection.individual_address,
862            "Client connected"
863        );
864
865        let _ = self.event_tx.send(ServerEvent::ClientConnected {
866            channel_id,
867            address: addr,
868        });
869
870        Ok(())
871    }
872
873    /// Handle heartbeat with configurable status code simulation.
874    ///
875    /// Heartbeat response determination priority:
876    /// 1. If `heartbeat_scheduler` is enabled → use scheduler to determine action
877    /// 2. If `heartbeat_status_override` is set → use static override (legacy)
878    /// 3. Otherwise → normal behavior (0x00 if connected, 0x21 if unknown)
879    ///
880    /// The HeartbeatScheduler supports 5 action types:
881    /// - Continue → 0x00 (normal)
882    /// - ImmediateReconnect → 0x21 (E_CONNECTION_ID)
883    /// - AbandonTunnel → 0x27 (E_KNX_CONNECTION)
884    /// - DelayedReconnect → 0x29 (E_TUNNELLING_LAYER)
885    /// - NoResponse → no packet sent (simulates timeout)
886    async fn handle_connection_state_request(
887        &self,
888        socket: &UdpSocket,
889        data: &[u8],
890        addr: SocketAddr,
891    ) -> KnxResult<()> {
892        let request = ConnectionStateRequest::decode(data)?;
893
894        // Phase 4: Use heartbeat scheduler if enabled
895        if self.config.tunnel_behavior.heartbeat_scheduler.enabled {
896            let action = self.heartbeat_scheduler.next_action(request.channel_id);
897
898            match action.status_code() {
899                Some(status) => {
900                    if !action.is_normal() {
901                        debug!(
902                            channel_id = request.channel_id,
903                            action = %action,
904                            status = status,
905                            "Heartbeat scheduler action"
906                        );
907                        let _ = self.event_tx.send(ServerEvent::HeartbeatAction {
908                            channel_id: request.channel_id,
909                            action: action.to_string(),
910                            status_code: Some(status),
911                        });
912                    }
913
914                    // Touch connection if it exists and action is Continue
915                    if action.is_normal() {
916                        if let Some(conn) = self.connections.get(request.channel_id) {
917                            conn.touch();
918                        }
919                    }
920
921                    let response = ConnectionStateResponse {
922                        channel_id: request.channel_id,
923                        status,
924                    };
925                    let frame = KnxFrame::new(ServiceType::ConnectionStateResponse, response.encode());
926                    socket.send_to(&frame.encode(), addr).await?;
927                }
928                None => {
929                    // NoResponse action — suppress the heartbeat reply
930                    debug!(
931                        channel_id = request.channel_id,
932                        "Heartbeat suppressed (NoResponse)"
933                    );
934                    let _ = self.event_tx.send(ServerEvent::HeartbeatSuppressed {
935                        channel_id: request.channel_id,
936                    });
937                    // Intentionally do NOT send any response
938                }
939            }
940
941            return Ok(());
942        }
943
944        // Legacy: static status override
945        let response = if let Some(status_override) = self.config.tunnel_behavior.heartbeat_status_override {
946            debug!(
947                channel_id = request.channel_id,
948                status = status_override,
949                "Heartbeat override"
950            );
951            ConnectionStateResponse {
952                channel_id: request.channel_id,
953                status: status_override,
954            }
955        } else if let Some(conn) = self.connections.get(request.channel_id) {
956            conn.touch();
957            ConnectionStateResponse::ok(request.channel_id)
958        } else {
959            // Unknown channel → 0x21 E_CONNECTION_ID (knxd standard)
960            ConnectionStateResponse {
961                channel_id: request.channel_id,
962                status: 0x21,
963            }
964        };
965
966        let frame = KnxFrame::new(ServiceType::ConnectionStateResponse, response.encode());
967        socket.send_to(&frame.encode(), addr).await?;
968
969        Ok(())
970    }
971
972    async fn handle_disconnect_request(
973        &self,
974        socket: &UdpSocket,
975        data: &[u8],
976        addr: SocketAddr,
977    ) -> KnxResult<()> {
978        let request = DisconnectRequest::decode(data)?;
979        debug!(channel_id = request.channel_id, "Handling DisconnectRequest");
980
981        self.connections.remove(request.channel_id);
982
983        // Clean up flow control queue state for this channel
984        if self.filter_chain.is_enabled() {
985            self.filter_chain.queue_filter().clear_channel(request.channel_id);
986        }
987
988        // Phase 4: Clean up error tracker state for this channel
989        self.error_tracker.remove_channel(request.channel_id);
990
991        let response = DisconnectResponse::ok(request.channel_id);
992        let frame = KnxFrame::new(ServiceType::DisconnectResponse, response.encode());
993        socket.send_to(&frame.encode(), addr).await?;
994
995        info!(channel_id = request.channel_id, "Client disconnected");
996
997        let _ = self.event_tx.send(ServerEvent::ClientDisconnected {
998            channel_id: request.channel_id,
999        });
1000
1001        Ok(())
1002    }
1003
1004    // ========================================================================
1005    // Tunnelling with Phase 1 sequence validation
1006    // ========================================================================
1007
1008    /// Handle TUNNELLING_REQUEST with knxd-compatible sequence validation.
1009    ///
1010    /// Implements the full validation chain:
1011    /// 1. Validate channel ID
1012    /// 2. Validate sequence number (Valid/Duplicate/OutOfOrder/FatalDesync)
1013    /// 3. Send ACK (with appropriate status)
1014    /// 4. Process cEMI frame (if Valid or OutOfOrder)
1015    /// 5. Send L_Data.con (if L_Data.req and confirmation enabled)
1016    async fn handle_tunnelling_request(
1017        &self,
1018        socket: &UdpSocket,
1019        data: &[u8],
1020        addr: SocketAddr,
1021    ) -> KnxResult<()> {
1022        let request = TunnellingRequest::decode(data)?;
1023
1024        let connection = match self.connections.get(request.channel_id) {
1025            Some(conn) => conn,
1026            None => {
1027                // Unknown channel → error ACK with 0x21
1028                let ack = TunnellingAck::error(request.channel_id, request.sequence_counter, 0x21);
1029                let frame = KnxFrame::new(ServiceType::TunnellingAck, ack.encode());
1030                socket.send_to(&frame.encode(), addr).await?;
1031                return Ok(());
1032            }
1033        };
1034
1035        connection.touch();
1036
1037        // Validate sequence number with knxd-compatible logic
1038        let validation = connection.validate_recv_sequence(request.sequence_counter);
1039
1040        // Emit sequence event for observability
1041        let seq_event = match &validation {
1042            ReceivedValidation::Valid { sequence } => {
1043                SequenceEventType::Valid { sequence: *sequence }
1044            }
1045            ReceivedValidation::Duplicate { sequence, expected } => {
1046                debug!(
1047                    channel_id = request.channel_id,
1048                    sequence, expected,
1049                    "Duplicate frame — ACK only"
1050                );
1051                SequenceEventType::Duplicate { sequence: *sequence, expected: *expected }
1052            }
1053            ReceivedValidation::OutOfOrder { sequence, expected, distance } => {
1054                warn!(
1055                    channel_id = request.channel_id,
1056                    sequence, expected, distance,
1057                    "Out-of-order frame"
1058                );
1059                SequenceEventType::OutOfOrder {
1060                    sequence: *sequence,
1061                    expected: *expected,
1062                    distance: *distance,
1063                }
1064            }
1065            ReceivedValidation::FatalDesync { sequence, expected, distance } => {
1066                error!(
1067                    channel_id = request.channel_id,
1068                    sequence, expected, distance,
1069                    "Fatal sequence desync — tunnel restart required"
1070                );
1071                SequenceEventType::FatalDesync {
1072                    sequence: *sequence,
1073                    expected: *expected,
1074                    distance: *distance,
1075                }
1076            }
1077        };
1078
1079        let _ = self.event_tx.send(ServerEvent::SequenceEvent {
1080            channel_id: request.channel_id,
1081            validation: seq_event,
1082        });
1083
1084        // Send ACK (unless FatalDesync — knxd does not ACK desynced frames)
1085        if validation.should_ack() {
1086            let ack = TunnellingAck::ok(request.channel_id, request.sequence_counter);
1087            let frame = KnxFrame::new(ServiceType::TunnellingAck, ack.encode());
1088            socket.send_to(&frame.encode(), addr).await?;
1089        }
1090
1091        // Process frame only if it should be processed
1092        if validation.should_process() {
1093            self.process_cemi(socket, addr, &request.cemi, &connection).await?;
1094        }
1095
1096        // FatalDesync → trigger disconnect event
1097        if validation.requires_restart() {
1098            // Clean up flow control queue state
1099            if self.filter_chain.is_enabled() {
1100                self.filter_chain.queue_filter().clear_channel(request.channel_id);
1101            }
1102
1103            // Phase 4: Clean up error tracker
1104            self.error_tracker.remove_channel(request.channel_id);
1105
1106            self.connections.remove(request.channel_id);
1107            let _ = self.event_tx.send(ServerEvent::ClientDisconnected {
1108                channel_id: request.channel_id,
1109            });
1110        }
1111
1112        Ok(())
1113    }
1114
1115    /// Handle TUNNELLING_ACK from client (for server→client frame delivery tracking).
1116    ///
1117    /// When flow control is enabled, this also:
1118    /// - Releases QueueFilter backpressure for the channel
1119    /// - Feeds success/failure to the RetryFilter circuit breaker
1120    /// - Does NOT drain queued frames here (the async version does)
1121    fn handle_tunnelling_ack(&self, data: &[u8]) -> KnxResult<()> {
1122        let ack = TunnellingAck::decode(data)?;
1123
1124        if let Some(conn) = self.connections.get(ack.channel_id) {
1125            conn.touch();
1126
1127            // Feed ACK to the connection's ACK channel for AckWaiter
1128            conn.feed_ack(AckMessage {
1129                channel_id: ack.channel_id,
1130                sequence: ack.sequence_counter,
1131                status: ack.status,
1132            });
1133
1134            // Transition FSM: WaitingForAck → Idle (simple ACK case)
1135            conn.fsm.on_ack_received_simple();
1136
1137            // Release flow control backpressure
1138            if self.filter_chain.is_enabled() {
1139                if ack.status == 0 {
1140                    self.filter_chain.on_send_success(conn.channel_id);
1141                } else {
1142                    self.filter_chain.on_send_failure(
1143                        conn.channel_id,
1144                        &format!("ACK error status: {:#04x}", ack.status),
1145                    );
1146                }
1147            }
1148
1149            // Phase 4: Track ACK errors
1150            if ack.status == 0 {
1151                self.error_tracker.on_send_success(conn.channel_id);
1152            } else {
1153                self.handle_send_error_tracking(
1154                    conn.channel_id,
1155                    ErrorCategory::AckError,
1156                );
1157            }
1158        }
1159
1160        Ok(())
1161    }
1162
1163    /// Handle TUNNELLING_ACK from client with async queue draining.
1164    ///
1165    /// This is the async variant that also drains queued frames after
1166    /// backpressure is released.
1167    async fn handle_tunnelling_ack_async(
1168        &self,
1169        socket: &UdpSocket,
1170        data: &[u8],
1171    ) -> KnxResult<()> {
1172        let ack = TunnellingAck::decode(data)?;
1173
1174        if let Some(conn) = self.connections.get(ack.channel_id) {
1175            conn.touch();
1176
1177            // Feed ACK to the connection's ACK channel for AckWaiter
1178            conn.feed_ack(AckMessage {
1179                channel_id: ack.channel_id,
1180                sequence: ack.sequence_counter,
1181                status: ack.status,
1182            });
1183
1184            // Transition FSM: WaitingForAck → Idle (simple ACK case)
1185            conn.fsm.on_ack_received_simple();
1186
1187            // Release flow control backpressure and drain queued frames
1188            if self.filter_chain.is_enabled() {
1189                if ack.status == 0 {
1190                    self.filter_chain.on_send_success(conn.channel_id);
1191                    // Drain queued frames now that backpressure is released
1192                    self.drain_queued_frames(socket, &conn).await;
1193                } else {
1194                    self.filter_chain.on_send_failure(
1195                        conn.channel_id,
1196                        &format!("ACK error status: {:#04x}", ack.status),
1197                    );
1198                }
1199            }
1200
1201            // Phase 4: Track ACK errors
1202            if ack.status == 0 {
1203                self.error_tracker.on_send_success(conn.channel_id);
1204            } else {
1205                self.handle_send_error_tracking(
1206                    conn.channel_id,
1207                    ErrorCategory::AckError,
1208                );
1209            }
1210        }
1211
1212        Ok(())
1213    }
1214
1215    // ========================================================================
1216    // cEMI processing — full message code dispatcher
1217    // ========================================================================
1218
1219    /// Process cEMI frame — dispatches all message codes with L_Data.con support.
1220    ///
1221    /// This is the central cEMI processing pipeline:
1222    /// 1. Dispatch based on message_code (L_Data, M_Prop*, M_Reset, L_Busmon, L_Raw)
1223    /// 2. For L_Data.req: process group services, send L_Data.con, broadcast L_Data.ind
1224    /// 3. For M_PropRead.req: respond with M_PropRead.con
1225    /// 4. For M_PropWrite.req: respond with M_PropWrite.con
1226    /// 5. For M_Reset.req: respond with M_Reset.ind
1227    /// 6. Optionally generate Bus Monitor frames for all bus traffic
1228    async fn process_cemi(
1229        &self,
1230        socket: &UdpSocket,
1231        client_addr: SocketAddr,
1232        cemi: &CemiFrame,
1233        connection: &TunnelConnection,
1234    ) -> KnxResult<()> {
1235        match cemi.message_code {
1236            MessageCode::LDataReq => {
1237                self.handle_ldata_req(socket, client_addr, cemi, connection).await?;
1238            }
1239            MessageCode::LDataInd => {
1240                // Client sending L_Data.ind — unusual but process group services
1241                self.handle_ldata_data(socket, client_addr, cemi, connection).await?;
1242            }
1243            MessageCode::MPropReadReq => {
1244                if self.config.tunnel_behavior.property_service_enabled {
1245                    self.handle_prop_read_req(socket, client_addr, cemi, connection).await?;
1246                }
1247            }
1248            MessageCode::MPropWriteReq => {
1249                if self.config.tunnel_behavior.property_service_enabled {
1250                    self.handle_prop_write_req(socket, client_addr, cemi, connection).await?;
1251                }
1252            }
1253            MessageCode::MResetReq => {
1254                if self.config.tunnel_behavior.reset_service_enabled {
1255                    self.handle_reset_req(socket, client_addr, connection).await?;
1256                }
1257            }
1258            MessageCode::LRawReq => {
1259                // L_Raw.req: send L_Raw.con (success) — raw bus access
1260                self.send_ldata_con(socket, client_addr, connection, cemi, true).await?;
1261            }
1262            _ => {
1263                debug!(
1264                    message_code = ?cemi.message_code,
1265                    channel_id = connection.channel_id,
1266                    "Unhandled cEMI message code"
1267                );
1268            }
1269        }
1270
1271        // Generate Bus Monitor frame if enabled
1272        if self.config.tunnel_behavior.bus_monitor_enabled {
1273            self.emit_bus_monitor_frame(socket, cemi, connection).await?;
1274        }
1275
1276        Ok(())
1277    }
1278
1279    /// Handle L_Data.req — the primary data path.
1280    ///
1281    /// 1. Process group value services (Write/Read/Response)
1282    /// 2. Send L_Data.con (MC=0x2E) with success/failure status
1283    /// 3. Broadcast L_Data.ind to other tunnel connections
1284    async fn handle_ldata_req(
1285        &self,
1286        socket: &UdpSocket,
1287        client_addr: SocketAddr,
1288        cemi: &CemiFrame,
1289        connection: &TunnelConnection,
1290    ) -> KnxResult<()> {
1291        // Process group value services if applicable
1292        let bus_success = self.handle_ldata_data(socket, client_addr, cemi, connection).await?;
1293
1294        // Always send L_Data.con for L_Data.req when enabled
1295        if self.config.tunnel_behavior.ldata_con_enabled {
1296            let confirm_success = self.compute_confirmation_success(bus_success);
1297
1298            // Optional bus delivery delay
1299            let delay_ms = self.config.tunnel_behavior.bus_delivery_delay_ms;
1300            if delay_ms > 0 {
1301                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1302            }
1303
1304            self.send_ldata_con(socket, client_addr, connection, cemi, confirm_success).await?;
1305
1306            let _ = self.event_tx.send(ServerEvent::ConfirmationSent {
1307                channel_id: connection.channel_id,
1308                success: confirm_success,
1309            });
1310        }
1311
1312        // Broadcast L_Data.ind to other tunnel connections
1313        if self.config.tunnel_behavior.ldata_ind_broadcast_enabled {
1314            if let Some(group_addr) = cemi.destination_group() {
1315                self.broadcast_ldata_ind(socket, cemi, connection, group_addr).await?;
1316            }
1317        }
1318
1319        Ok(())
1320    }
1321
1322    /// Handle L_Data group value services.
1323    ///
1324    /// Returns true if bus delivery succeeded.
1325    async fn handle_ldata_data(
1326        &self,
1327        socket: &UdpSocket,
1328        client_addr: SocketAddr,
1329        cemi: &CemiFrame,
1330        connection: &TunnelConnection,
1331    ) -> KnxResult<bool> {
1332        if !cemi.apci.is_group_value() {
1333            // Non-group-value APCIs (IndividualAddressRead, MemoryRead, etc.)
1334            // are still valid L_Data frames — bus delivery succeeds.
1335            return Ok(true);
1336        }
1337
1338        let group_addr = match cemi.destination_group() {
1339            Some(addr) => addr,
1340            None => return Ok(true),
1341        };
1342
1343        match cemi.apci {
1344            Apci::GroupValueWrite => {
1345                debug!(
1346                    address = %group_addr,
1347                    source = %cemi.source,
1348                    "Group Value Write"
1349                );
1350
1351                let write_ok = self.group_objects.write(
1352                    &group_addr,
1353                    &cemi.data,
1354                    Some(cemi.source.to_string()),
1355                ).is_ok();
1356
1357                // Phase 4: Update group value cache on write
1358                self.group_value_cache.on_write(
1359                    group_addr,
1360                    cemi.data.clone(),
1361                    Some(cemi.source.to_string()),
1362                );
1363
1364                let _ = self.event_tx.send(ServerEvent::GroupValueWrite {
1365                    address: group_addr,
1366                    value: cemi.data.clone(),
1367                    source: cemi.source,
1368                });
1369
1370                Ok(write_ok)
1371            }
1372            Apci::GroupValueRead => {
1373                debug!(
1374                    address = %group_addr,
1375                    source = %cemi.source,
1376                    "Group Value Read"
1377                );
1378
1379                let _ = self.event_tx.send(ServerEvent::GroupValueRead {
1380                    address: group_addr,
1381                    source: cemi.source,
1382                });
1383
1384                // Send GroupValueResponse back via tunnelling
1385                let response_data = match self.group_objects.read(&group_addr) {
1386                    Ok(data) => data,
1387                    Err(_e) => vec![0u8],
1388                };
1389
1390                let response_cemi = CemiFrame::group_value_response(
1391                    self.config.individual_address,
1392                    group_addr,
1393                    response_data,
1394                );
1395
1396                self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1397                Ok(true)
1398            }
1399            Apci::GroupValueResponse => {
1400                debug!(
1401                    address = %group_addr,
1402                    source = %cemi.source,
1403                    "Group Value Response"
1404                );
1405
1406                // Store the response value in the group object table
1407                let _ = self.group_objects.write(
1408                    &group_addr,
1409                    &cemi.data,
1410                    Some(cemi.source.to_string()),
1411                );
1412
1413                Ok(true)
1414            }
1415            _ => Ok(true),
1416        }
1417    }
1418
1419    /// Compute confirmation success based on bus result and configured success rate.
1420    fn compute_confirmation_success(&self, bus_success: bool) -> bool {
1421        let rate = self.config.tunnel_behavior.confirmation_success_rate;
1422        if rate >= 1.0 {
1423            bus_success
1424        } else if rate <= 0.0 {
1425            false
1426        } else {
1427            bus_success && (rand_simple() < rate)
1428        }
1429    }
1430
1431    // ========================================================================
1432    // Property service handlers (M_PropRead, M_PropWrite, M_Reset)
1433    // ========================================================================
1434
1435    /// Handle M_PropRead.req — respond with M_PropRead.con.
1436    ///
1437    /// Simulates property read by returning well-known property values
1438    /// or a default response for unknown properties.
1439    async fn handle_prop_read_req(
1440        &self,
1441        socket: &UdpSocket,
1442        client_addr: SocketAddr,
1443        cemi: &CemiFrame,
1444        connection: &TunnelConnection,
1445    ) -> KnxResult<()> {
1446        let (object_index, property_id, count, start_index, _) =
1447            match cemi.parse_property_request() {
1448                Some(parsed) => parsed,
1449                None => {
1450                    debug!(
1451                        channel_id = connection.channel_id,
1452                        "M_PropRead.req with invalid data format"
1453                    );
1454                    return Ok(());
1455                }
1456            };
1457
1458        debug!(
1459            channel_id = connection.channel_id,
1460            object_index,
1461            property_id,
1462            count,
1463            start_index,
1464            "M_PropRead.req"
1465        );
1466
1467        // Simulate well-known interface object properties (KNX AN033)
1468        let response_value = self.read_property(object_index, property_id);
1469
1470        let response_cemi = CemiFrame::prop_read_con(
1471            object_index,
1472            property_id,
1473            count,
1474            start_index,
1475            response_value,
1476        );
1477
1478        self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1479
1480        let _ = self.event_tx.send(ServerEvent::PropertyRead {
1481            channel_id: connection.channel_id,
1482            object_index,
1483            property_id,
1484        });
1485
1486        Ok(())
1487    }
1488
1489    /// Handle M_PropWrite.req — respond with M_PropWrite.con.
1490    async fn handle_prop_write_req(
1491        &self,
1492        socket: &UdpSocket,
1493        client_addr: SocketAddr,
1494        cemi: &CemiFrame,
1495        connection: &TunnelConnection,
1496    ) -> KnxResult<()> {
1497        let (object_index, property_id, count, start_index, write_data) =
1498            match cemi.parse_property_request() {
1499                Some(parsed) => parsed,
1500                None => {
1501                    debug!(
1502                        channel_id = connection.channel_id,
1503                        "M_PropWrite.req with invalid data format"
1504                    );
1505                    return Ok(());
1506                }
1507            };
1508
1509        debug!(
1510            channel_id = connection.channel_id,
1511            object_index,
1512            property_id,
1513            count,
1514            start_index,
1515            data_len = write_data.len(),
1516            "M_PropWrite.req"
1517        );
1518
1519        // Property writes succeed for simulation purposes
1520        let response_cemi = CemiFrame::prop_write_con(
1521            object_index,
1522            property_id,
1523            count,
1524            start_index,
1525            true, // success
1526        );
1527
1528        self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1529
1530        let _ = self.event_tx.send(ServerEvent::PropertyWrite {
1531            channel_id: connection.channel_id,
1532            object_index,
1533            property_id,
1534        });
1535
1536        Ok(())
1537    }
1538
1539    /// Handle M_Reset.req — respond with M_Reset.ind.
1540    async fn handle_reset_req(
1541        &self,
1542        socket: &UdpSocket,
1543        client_addr: SocketAddr,
1544        connection: &TunnelConnection,
1545    ) -> KnxResult<()> {
1546        debug!(
1547            channel_id = connection.channel_id,
1548            "M_Reset.req — sending M_Reset.ind"
1549        );
1550
1551        let response_cemi = CemiFrame::reset_ind();
1552        self.send_tunnelling_request(socket, client_addr, connection, response_cemi).await?;
1553
1554        let _ = self.event_tx.send(ServerEvent::DeviceReset {
1555            channel_id: connection.channel_id,
1556        });
1557
1558        Ok(())
1559    }
1560
1561    /// Read a simulated interface object property.
1562    ///
1563    /// Returns well-known property values for standard KNX interface objects.
1564    /// For unknown properties, returns a default 2-byte zero value.
1565    fn read_property(&self, object_index: u16, property_id: u16) -> Vec<u8> {
1566        match (object_index, property_id) {
1567            // Object 0 = Device Object
1568            // PID_OBJECT_TYPE (1)
1569            (0, 1) => vec![0x00, 0x00], // Device Object
1570            // PID_SERIAL_NUMBER (11)
1571            (0, 11) => self.config.serial_number.to_vec(),
1572            // PID_MANUFACTURER_ID (12)
1573            (0, 12) => vec![0x00, 0x00],
1574            // PID_DEVICE_CONTROL (14)
1575            (0, 14) => vec![0x00],
1576            // PID_ORDER_INFO (15)
1577            (0, 15) => vec![0x00; 10],
1578            // PID_FIRMWARE_REVISION (9)
1579            (0, 9) => vec![0x01],
1580            // PID_HARDWARE_TYPE (7)
1581            (0, 7) => vec![0x00; 6],
1582            // PID_SUBNET_ADDRESS (57)
1583            (0, 57) => vec![self.config.individual_address.area()],
1584            // PID_DEVICE_ADDRESS (58)
1585            (0, 58) => vec![self.config.individual_address.device()],
1586            // PID_MAX_APDULENGTH (56)
1587            (0, 56) => vec![0x00, 0xFE], // 254 bytes max APDU
1588
1589            // Object 11 = KNXnet/IP Parameter Object
1590            // PID_KNX_INDIVIDUAL_ADDRESS (52)
1591            (11, 52) => self.config.individual_address.to_bytes().to_vec(),
1592            // PID_CURRENT_IP_ADDRESS (55)
1593            (11, 55) => {
1594                match self.config.bind_addr {
1595                    SocketAddr::V4(v4) => v4.ip().octets().to_vec(),
1596                    _ => vec![0, 0, 0, 0],
1597                }
1598            }
1599            // PID_FRIENDLY_NAME (7)
1600            (11, 7) => {
1601                let mut name = self.config.device_name.as_bytes().to_vec();
1602                name.truncate(30);
1603                name
1604            }
1605            // PID_MAC_ADDRESS (8)
1606            (11, 8) => self.config.mac_address.to_vec(),
1607
1608            // Default: return 2-byte zero for unknown properties
1609            _ => vec![0x00, 0x00],
1610        }
1611    }
1612
1613    // ========================================================================
1614    // Bus Monitor frame generation
1615    // ========================================================================
1616
1617    /// Generate and emit a Bus Monitor indication (MC=0x2B) frame.
1618    ///
1619    /// Bus Monitor frames carry:
1620    /// - Additional Info TLV with BusMonitorInfo (0x03) status byte
1621    /// - Additional Info TLV with TimestampRelative (0x04) in ms
1622    /// - The raw bus frame data
1623    async fn emit_bus_monitor_frame(
1624        &self,
1625        socket: &UdpSocket,
1626        original_cemi: &CemiFrame,
1627        source_connection: &TunnelConnection,
1628    ) -> KnxResult<()> {
1629        // Encode the original cEMI to get raw bus frame bytes
1630        let raw_frame = original_cemi.encode();
1631
1632        // Status byte: 0x00 = OK, bit 7 = frame error, bit 0 = parity error
1633        let status: u8 = if original_cemi.confirm { 0x80 } else { 0x00 };
1634
1635        // Relative timestamp: 0 ms (we don't track inter-frame timing)
1636        let busmon_cemi = CemiFrame::bus_monitor_indication(&raw_frame, status, 0);
1637        let raw_frame_len = raw_frame.len();
1638
1639        // Send bus monitor frame to ALL connections (including the source)
1640        let all_connections = self.connections.all();
1641        for conn in &all_connections {
1642            let target_addr = conn.data_endpoint;
1643            if let Err(e) = self.send_tunnelling_request(
1644                socket,
1645                target_addr,
1646                conn,
1647                busmon_cemi.clone(),
1648            ).await {
1649                debug!(
1650                    error = %e,
1651                    channel_id = conn.channel_id,
1652                    "Failed to send bus monitor frame"
1653                );
1654            }
1655        }
1656
1657        let _ = self.event_tx.send(ServerEvent::BusMonitorFrameSent {
1658            channel_id: source_connection.channel_id,
1659            message_code: original_cemi.message_code as u8,
1660            raw_frame_len,
1661        });
1662
1663        Ok(())
1664    }
1665
1666    // ========================================================================
1667    // L_Data.ind broadcast to other tunnel connections
1668    // ========================================================================
1669
1670    /// Broadcast L_Data.ind to all other tunnel connections.
1671    ///
1672    /// When a client sends a group value write, all other connected tunnels
1673    /// should receive the same data as L_Data.ind, simulating bus-level
1674    /// multicast behavior.
1675    async fn broadcast_ldata_ind(
1676        &self,
1677        socket: &UdpSocket,
1678        original_cemi: &CemiFrame,
1679        source_connection: &TunnelConnection,
1680        group_addr: GroupAddress,
1681    ) -> KnxResult<()> {
1682        let all_connections = self.connections.all();
1683        let mut target_count = 0usize;
1684
1685        for conn in &all_connections {
1686            // Skip the source connection
1687            if conn.channel_id == source_connection.channel_id {
1688                continue;
1689            }
1690
1691            // Create L_Data.ind frame from the original request
1692            let ind_cemi = CemiFrame {
1693                message_code: MessageCode::LDataInd,
1694                additional_info: Vec::new(),
1695                source: original_cemi.source,
1696                destination: original_cemi.destination,
1697                address_type: original_cemi.address_type,
1698                hop_count: original_cemi.hop_count,
1699                priority: original_cemi.priority,
1700                confirm: false,
1701                ack_request: false,
1702                system_broadcast: original_cemi.system_broadcast,
1703                apci: original_cemi.apci,
1704                data: original_cemi.data.clone(),
1705            };
1706
1707            let target_addr = conn.data_endpoint;
1708            if let Err(e) = self.send_tunnelling_request(socket, target_addr, conn, ind_cemi).await {
1709                debug!(
1710                    error = %e,
1711                    channel_id = conn.channel_id,
1712                    "Failed to broadcast L_Data.ind"
1713                );
1714            } else {
1715                target_count += 1;
1716            }
1717        }
1718
1719        if target_count > 0 {
1720            // Phase 4: Update group value cache on indication broadcast
1721            self.group_value_cache.on_indication(
1722                group_addr,
1723                original_cemi.data.clone(),
1724                Some(original_cemi.source.to_string()),
1725            );
1726
1727            let _ = self.event_tx.send(ServerEvent::DataIndBroadcast {
1728                source_channel_id: source_connection.channel_id,
1729                target_channel_count: target_count,
1730                group_address: group_addr,
1731            });
1732
1733            let _ = self.event_tx.send(ServerEvent::GroupValueCacheUpdated {
1734                address: group_addr,
1735                source: format!("indication from {}", original_cemi.source),
1736                cache_size: self.group_value_cache.len(),
1737            });
1738        }
1739
1740        Ok(())
1741    }
1742
1743    // ========================================================================
1744    // Frame transmission helpers
1745    // ========================================================================
1746
1747    /// Send a TUNNELLING_REQUEST frame through the flow control filter chain.
1748    ///
1749    /// This is the primary transmission path. Frames are processed through:
1750    /// 1. QueueFilter — may queue if backpressure is active
1751    /// 2. PaceFilter — may delay for bus timing simulation
1752    /// 3. RetryFilter — may drop if circuit breaker is open
1753    ///
1754    /// If the filter chain imposes a delay, the transmission is deferred.
1755    /// If the frame is queued, it will be sent later when the queue is drained.
1756    async fn send_tunnelling_request(
1757        &self,
1758        socket: &UdpSocket,
1759        client_addr: SocketAddr,
1760        connection: &TunnelConnection,
1761        cemi: CemiFrame,
1762    ) -> KnxResult<()> {
1763        // If filter chain is enabled, process through it
1764        if self.filter_chain.is_enabled() {
1765            let mut envelope = FrameEnvelope::new(
1766                cemi.clone(),
1767                connection.channel_id,
1768                client_addr,
1769            );
1770
1771            let result = self.filter_chain.send(&mut envelope);
1772
1773            match result {
1774                FilterResult::Pass { delay } => {
1775                    if delay > Duration::ZERO {
1776                        let _ = self.event_tx.send(ServerEvent::FrameDelayed {
1777                            channel_id: connection.channel_id,
1778                            delay_ms: delay.as_millis() as u64,
1779                            pace_state: format!("{}", self.filter_chain.pace_state()),
1780                        });
1781
1782                        tokio::time::sleep(delay).await;
1783                    }
1784
1785                    // Proceed with actual transmission
1786                    let result = self.send_tunnelling_request_raw(
1787                        socket, client_addr, connection, cemi,
1788                    ).await;
1789
1790                    match &result {
1791                        Ok(()) => {
1792                            self.filter_chain.on_send_success(connection.channel_id);
1793                            self.error_tracker.on_send_success(connection.channel_id);
1794                        }
1795                        Err(e) => {
1796                            self.filter_chain.on_send_failure(
1797                                connection.channel_id,
1798                                &e.to_string(),
1799                            );
1800                            self.handle_send_error_tracking(
1801                                connection.channel_id,
1802                                ErrorCategory::SendFailure,
1803                            );
1804                        }
1805                    }
1806
1807                    return result;
1808                }
1809                FilterResult::Queued => {
1810                    let _ = self.event_tx.send(ServerEvent::FrameQueued {
1811                        channel_id: connection.channel_id,
1812                        priority: format!("{}", envelope.priority),
1813                        queue_depth: self.filter_chain.pending_count(),
1814                    });
1815                    return Ok(());
1816                }
1817                FilterResult::Dropped { reason } => {
1818                    let _ = self.event_tx.send(ServerEvent::FrameDropped {
1819                        channel_id: connection.channel_id,
1820                        reason: reason.clone(),
1821                    });
1822                    debug!(
1823                        channel_id = connection.channel_id,
1824                        reason = %reason,
1825                        "Frame dropped by flow control"
1826                    );
1827                    return Ok(()); // Not an error — intentional drop
1828                }
1829                FilterResult::Error { message } => {
1830                    let _ = self.event_tx.send(ServerEvent::FrameDropped {
1831                        channel_id: connection.channel_id,
1832                        reason: message.clone(),
1833                    });
1834                    return Err(KnxError::FlowControlDrop { reason: message });
1835                }
1836            }
1837        }
1838
1839        // Filter chain not enabled — direct send
1840        self.send_tunnelling_request_raw(socket, client_addr, connection, cemi).await
1841    }
1842
1843    /// Raw TUNNELLING_REQUEST send without filter chain processing.
1844    ///
1845    /// This bypasses the filter chain and sends directly to the UDP socket.
1846    /// Used internally by `send_tunnelling_request` after filtering, and also
1847    /// for queue draining where frames have already been filtered.
1848    async fn send_tunnelling_request_raw(
1849        &self,
1850        socket: &UdpSocket,
1851        client_addr: SocketAddr,
1852        connection: &TunnelConnection,
1853        cemi: CemiFrame,
1854    ) -> KnxResult<()> {
1855        let seq = connection.next_send_sequence();
1856        let tunnel_req = TunnellingRequest::new(
1857            connection.channel_id,
1858            seq,
1859            cemi,
1860        );
1861        let frame = KnxFrame::new(ServiceType::TunnellingRequest, tunnel_req.encode());
1862
1863        // Activate backpressure before sending (we're now waiting for ACK)
1864        if self.filter_chain.is_enabled() {
1865            self.filter_chain.queue_filter().set_waiting_for_ack(
1866                connection.channel_id,
1867                true,
1868            );
1869        }
1870
1871        if let Err(e) = socket.send_to(&frame.encode(), client_addr).await {
1872            debug!(error = %e, channel_id = connection.channel_id, "Failed to send tunnelling request");
1873            return Err(e.into());
1874        }
1875
1876        Ok(())
1877    }
1878
1879    /// Drain and send queued frames for a channel after ACK received.
1880    ///
1881    /// When the QueueFilter has been holding frames due to WaitingForAck
1882    /// backpressure, this method drains and sends them in priority order
1883    /// once the ACK is received.
1884    async fn drain_queued_frames(
1885        &self,
1886        socket: &UdpSocket,
1887        connection: &TunnelConnection,
1888    ) {
1889        if !self.filter_chain.is_enabled() {
1890            return;
1891        }
1892
1893        let envelopes = self.filter_chain.drain_pending(connection.channel_id, 16);
1894
1895        if envelopes.is_empty() {
1896            return;
1897        }
1898
1899        let drained_count = envelopes.len();
1900
1901        for envelope in envelopes {
1902            // Re-apply pace filter timing for each drained frame
1903            let env = envelope;
1904            let pace_result = self.filter_chain.pace_filter().process_send(&env);
1905
1906            if let FilterResult::Pass { delay } = pace_result {
1907                if delay > Duration::ZERO {
1908                    tokio::time::sleep(delay).await;
1909                }
1910            }
1911
1912            if let Err(e) = self.send_tunnelling_request_raw(
1913                socket,
1914                env.target_addr,
1915                connection,
1916                env.cemi,
1917            ).await {
1918                debug!(
1919                    error = %e,
1920                    channel_id = connection.channel_id,
1921                    "Failed to send drained frame"
1922                );
1923                self.filter_chain.on_send_failure(
1924                    connection.channel_id,
1925                    &e.to_string(),
1926                );
1927                self.handle_send_error_tracking(
1928                    connection.channel_id,
1929                    ErrorCategory::SendFailure,
1930                );
1931                break;
1932            } else {
1933                self.filter_chain.on_send_success(connection.channel_id);
1934                self.error_tracker.on_send_success(connection.channel_id);
1935            }
1936        }
1937
1938        if drained_count > 0 {
1939            let _ = self.event_tx.send(ServerEvent::QueueDrained {
1940                channel_id: connection.channel_id,
1941                drained_count,
1942            });
1943        }
1944    }
1945
1946    /// Handle send error tracking result — emit events if thresholds are exceeded.
1947    fn handle_send_error_tracking(
1948        &self,
1949        channel_id: u8,
1950        category: ErrorCategory,
1951    ) {
1952        let result = self.error_tracker.on_send_failure(channel_id, category);
1953
1954        match result {
1955            TrackingResult::Recorded => {}
1956            TrackingResult::ConsecutiveThresholdExceeded {
1957                consecutive_errors,
1958                threshold,
1959            } => {
1960                warn!(
1961                    channel_id,
1962                    consecutive_errors,
1963                    threshold,
1964                    "Send error threshold exceeded — tunnel restart recommended"
1965                );
1966                let _ = self.event_tx.send(ServerEvent::SendErrorThreshold {
1967                    channel_id,
1968                    consecutive_errors,
1969                    threshold,
1970                });
1971            }
1972            TrackingResult::RateThresholdExceeded {
1973                error_count,
1974                window_ms,
1975                rate,
1976            } => {
1977                warn!(
1978                    channel_id,
1979                    error_count,
1980                    window_ms,
1981                    rate_percent = rate,
1982                    "Send error rate threshold exceeded"
1983                );
1984                let _ = self.event_tx.send(ServerEvent::SendErrorRateWarning {
1985                    channel_id,
1986                    error_count,
1987                    window_ms,
1988                    rate_percent: rate,
1989                });
1990            }
1991        }
1992    }
1993
1994    /// Send L_Data.con (MC=0x2E) confirmation frame.
1995    ///
1996    /// After the server processes an L_Data.req, it sends back an L_Data.con with:
1997    /// - Same source/destination as the original frame
1998    /// - MC=0x2E (or the appropriate confirmation code for the message type)
1999    /// - Ctrl1 bit 0: 0=success, 1=failure (NACK)
2000    async fn send_ldata_con(
2001        &self,
2002        socket: &UdpSocket,
2003        client_addr: SocketAddr,
2004        connection: &TunnelConnection,
2005        original_cemi: &CemiFrame,
2006        success: bool,
2007    ) -> KnxResult<()> {
2008        // Use the appropriate confirmation message code
2009        let con_message_code = original_cemi.message_code
2010            .to_confirmation()
2011            .unwrap_or(MessageCode::LDataCon);
2012
2013        let con_cemi = CemiFrame {
2014            message_code: con_message_code,
2015            additional_info: Vec::new(),
2016            source: original_cemi.source,
2017            destination: original_cemi.destination,
2018            address_type: original_cemi.address_type,
2019            hop_count: original_cemi.hop_count,
2020            priority: original_cemi.priority,
2021            confirm: !success, // Ctrl1 bit 0: 0=success, 1=failure
2022            ack_request: false,
2023            system_broadcast: original_cemi.system_broadcast,
2024            apci: original_cemi.apci,
2025            data: original_cemi.data.clone(),
2026        };
2027
2028        debug!(
2029            channel_id = connection.channel_id,
2030            success,
2031            message_code = ?con_message_code,
2032            destination = original_cemi.destination,
2033            "Sending confirmation frame"
2034        );
2035
2036        self.send_tunnelling_request(socket, client_addr, connection, con_cemi).await
2037    }
2038}
2039
2040/// Simple deterministic pseudo-random for confirmation success rate.
2041/// No external dependency needed — uses wrapping arithmetic on thread-local state.
2042fn rand_simple() -> f64 {
2043    use std::cell::Cell;
2044    thread_local! {
2045        static STATE: Cell<u64> = Cell::new(0x12345678_9ABCDEF0);
2046    }
2047    STATE.with(|s| {
2048        let mut x = s.get();
2049        x ^= x << 13;
2050        x ^= x >> 7;
2051        x ^= x << 17;
2052        s.set(x);
2053        (x as f64) / (u64::MAX as f64)
2054    })
2055}
2056
2057#[cfg(test)]
2058mod tests {
2059    use super::*;
2060
2061    #[test]
2062    fn test_connection_manager() {
2063        let manager = ConnectionManager::new(
2064            10,
2065            Duration::from_secs(60),
2066            IndividualAddress::new(1, 1, 0),
2067        );
2068
2069        let channel_id = manager.allocate_channel().unwrap();
2070        assert!(channel_id > 0);
2071
2072        let conn = manager.create_connection(
2073            channel_id,
2074            "192.168.1.100:3671".parse().unwrap(),
2075            "192.168.1.100:3672".parse().unwrap(),
2076        );
2077
2078        assert_eq!(conn.channel_id, channel_id);
2079        assert_eq!(manager.len(), 1);
2080
2081        // Connection should be in Idle state (Connecting → Idle in create_connection)
2082        assert!(conn.fsm.is_connected());
2083        assert!(conn.fsm.can_send());
2084
2085        manager.remove(channel_id);
2086        assert!(manager.is_empty());
2087    }
2088
2089    #[test]
2090    fn test_connection_with_sequence_tracker() {
2091        let manager = ConnectionManager::new(
2092            10,
2093            Duration::from_secs(60),
2094            IndividualAddress::new(1, 1, 0),
2095        );
2096
2097        let channel_id = manager.allocate_channel().unwrap();
2098        let conn = manager.create_connection(
2099            channel_id,
2100            "192.168.1.100:3671".parse().unwrap(),
2101            "192.168.1.100:3672".parse().unwrap(),
2102        );
2103
2104        // Test sequence tracking
2105        let validation = conn.validate_recv_sequence(0);
2106        assert!(matches!(validation, ReceivedValidation::Valid { sequence: 0 }));
2107
2108        let validation = conn.validate_recv_sequence(0);
2109        assert!(matches!(validation, ReceivedValidation::Duplicate { .. }));
2110
2111        let validation = conn.validate_recv_sequence(1);
2112        assert!(matches!(validation, ReceivedValidation::Valid { sequence: 1 }));
2113
2114        // Test send sequence
2115        assert_eq!(conn.next_send_sequence(), 0);
2116        assert_eq!(conn.next_send_sequence(), 1);
2117    }
2118
2119    #[test]
2120    fn test_server_config() {
2121        let config = KnxServerConfig::default();
2122        assert!(config.validate().is_ok());
2123        assert!(config.tunneling_enabled);
2124        assert!(config.tunnel_behavior.ldata_con_enabled);
2125        assert_eq!(config.tunnel_behavior.confirmation_success_rate, 1.0);
2126    }
2127
2128    #[test]
2129    fn test_connection_fsm_lifecycle() {
2130        let manager = ConnectionManager::new(
2131            10,
2132            Duration::from_secs(60),
2133            IndividualAddress::new(1, 1, 0),
2134        );
2135
2136        let channel_id = manager.allocate_channel().unwrap();
2137        let conn = manager.create_connection(
2138            channel_id,
2139            "192.168.1.100:3671".parse().unwrap(),
2140            "192.168.1.100:3672".parse().unwrap(),
2141        );
2142
2143        // Should be Idle after creation
2144        use crate::tunnel::TunnelState;
2145        assert!(matches!(conn.fsm.state(), TunnelState::Idle));
2146
2147        // Send frame → WaitingForAck
2148        conn.fsm.on_frame_sent(0);
2149        assert!(matches!(conn.fsm.state(), TunnelState::WaitingForAck { sequence: 0, .. }));
2150
2151        // ACK received → WaitingForConfirmation (for L_Data.req)
2152        conn.fsm.on_ack_received(0);
2153        assert!(matches!(conn.fsm.state(), TunnelState::WaitingForConfirmation { .. }));
2154
2155        // Confirmation received → Idle
2156        conn.fsm.on_confirmation_received();
2157        assert!(matches!(conn.fsm.state(), TunnelState::Idle));
2158    }
2159
2160    #[test]
2161    fn test_connection_reset() {
2162        let manager = ConnectionManager::new(
2163            10,
2164            Duration::from_secs(60),
2165            IndividualAddress::new(1, 1, 0),
2166        );
2167
2168        let channel_id = manager.allocate_channel().unwrap();
2169        let conn = manager.create_connection(
2170            channel_id,
2171            "192.168.1.100:3671".parse().unwrap(),
2172            "192.168.1.100:3672".parse().unwrap(),
2173        );
2174
2175        conn.validate_recv_sequence(0);
2176        conn.validate_recv_sequence(1);
2177        conn.next_send_sequence();
2178
2179        conn.reset();
2180
2181        assert_eq!(conn.sequence_tracker.current_rno(), 0);
2182        assert_eq!(conn.sequence_tracker.current_sno(), 0);
2183    }
2184
2185    #[test]
2186    fn test_desync_threshold_custom() {
2187        let manager = ConnectionManager::new(
2188            10,
2189            Duration::from_secs(60),
2190            IndividualAddress::new(1, 1, 0),
2191        ).with_desync_threshold(10);
2192
2193        let channel_id = manager.allocate_channel().unwrap();
2194        let conn = manager.create_connection(
2195            channel_id,
2196            "192.168.1.100:3671".parse().unwrap(),
2197            "192.168.1.100:3672".parse().unwrap(),
2198        );
2199
2200        // Distance 5 should be OutOfOrder (not FatalDesync) with threshold 10
2201        let validation = conn.validate_recv_sequence(5);
2202        assert!(matches!(validation, ReceivedValidation::OutOfOrder { .. }));
2203    }
2204
2205    #[test]
2206    fn test_rand_simple_range() {
2207        for _ in 0..100 {
2208            let v = rand_simple();
2209            assert!(v >= 0.0 && v <= 1.0);
2210        }
2211    }
2212
2213    #[test]
2214    fn test_server_config_new_fields() {
2215        let config = KnxServerConfig::default();
2216        assert!(!config.tunnel_behavior.bus_monitor_enabled);
2217        assert!(config.tunnel_behavior.ldata_ind_broadcast_enabled);
2218        assert!(config.tunnel_behavior.property_service_enabled);
2219        assert!(config.tunnel_behavior.reset_service_enabled);
2220    }
2221
2222    #[test]
2223    fn test_read_property_device_object() {
2224        let config = KnxServerConfig::default()
2225            .with_individual_address(IndividualAddress::new(1, 2, 3));
2226        let server = KnxServer::new(config);
2227
2228        // PID_SERIAL_NUMBER
2229        let serial = server.read_property(0, 11);
2230        assert_eq!(serial.len(), 6);
2231
2232        // PID_SUBNET_ADDRESS
2233        let subnet = server.read_property(0, 57);
2234        assert_eq!(subnet, vec![1]); // area = 1
2235
2236        // PID_DEVICE_ADDRESS
2237        let device = server.read_property(0, 58);
2238        assert_eq!(device, vec![3]); // device = 3
2239
2240        // PID_MAX_APDULENGTH
2241        let max_apdu = server.read_property(0, 56);
2242        assert_eq!(max_apdu, vec![0x00, 0xFE]); // 254
2243
2244        // Unknown property
2245        let unknown = server.read_property(99, 99);
2246        assert_eq!(unknown, vec![0x00, 0x00]);
2247    }
2248
2249    #[test]
2250    fn test_read_property_knxnet_ip_object() {
2251        let config = KnxServerConfig::default()
2252            .with_individual_address(IndividualAddress::new(1, 2, 3))
2253            .with_device_name("Test Device");
2254        let server = KnxServer::new(config);
2255
2256        // PID_KNX_INDIVIDUAL_ADDRESS
2257        let addr = server.read_property(11, 52);
2258        assert_eq!(addr, IndividualAddress::new(1, 2, 3).to_bytes().to_vec());
2259
2260        // PID_FRIENDLY_NAME
2261        let name = server.read_property(11, 7);
2262        assert_eq!(name, b"Test Device".to_vec());
2263
2264        // PID_MAC_ADDRESS
2265        let mac = server.read_property(11, 8);
2266        assert_eq!(mac.len(), 6);
2267    }
2268
2269    #[test]
2270    fn test_compute_confirmation_success() {
2271        let mut config = KnxServerConfig::default();
2272        config.tunnel_behavior.confirmation_success_rate = 1.0;
2273        let server = KnxServer::new(config.clone());
2274
2275        // Rate 1.0 — always follows bus_success
2276        assert!(server.compute_confirmation_success(true));
2277        assert!(!server.compute_confirmation_success(false));
2278
2279        // Rate 0.0 — always fails
2280        config.tunnel_behavior.confirmation_success_rate = 0.0;
2281        let server = KnxServer::new(config);
2282        assert!(!server.compute_confirmation_success(true));
2283        assert!(!server.compute_confirmation_success(false));
2284    }
2285
2286    #[test]
2287    fn test_server_event_variants() {
2288        // Ensure all new ServerEvent variants are constructible
2289        let events: Vec<ServerEvent> = vec![
2290            ServerEvent::BusMonitorFrameSent {
2291                channel_id: 1,
2292                message_code: 0x11,
2293                raw_frame_len: 10,
2294            },
2295            ServerEvent::PropertyRead {
2296                channel_id: 1,
2297                object_index: 0,
2298                property_id: 11,
2299            },
2300            ServerEvent::PropertyWrite {
2301                channel_id: 1,
2302                object_index: 0,
2303                property_id: 14,
2304            },
2305            ServerEvent::DeviceReset { channel_id: 1 },
2306            ServerEvent::DataIndBroadcast {
2307                source_channel_id: 1,
2308                target_channel_count: 3,
2309                group_address: GroupAddress::three_level(1, 0, 1),
2310            },
2311        ];
2312
2313        for event in &events {
2314            // Just ensure Debug formatting works
2315            let _ = format!("{:?}", event);
2316        }
2317        assert_eq!(events.len(), 5);
2318    }
2319}