ant_quic/relay/
connection.rs

1// Copyright 2024 Saorsa Labs Ltd.
2//
3// This Saorsa Network Software is licensed under the General Public License (GPL), version 3.
4// Please see the file LICENSE-GPL, or visit <http://www.gnu.org/licenses/> for the full text.
5//
6// Full details available at https://saorsalabs.com/licenses
7
8//! Relay connection implementation for bidirectional data forwarding.
9
10use crate::relay::{RelayError, RelayResult};
11use std::collections::VecDeque;
12use std::net::SocketAddr;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16
17/// Configuration for relay connections
18#[derive(Debug, Clone)]
19pub struct RelayConnectionConfig {
20    /// Maximum data frame size
21    pub max_frame_size: usize,
22    /// Buffer size for queued data
23    pub buffer_size: usize,
24    /// Connection timeout
25    pub connection_timeout: Duration,
26    /// Keep-alive interval
27    pub keep_alive_interval: Duration,
28    /// Maximum bandwidth per connection (bytes/sec)
29    pub bandwidth_limit: u64,
30}
31
32impl Default for RelayConnectionConfig {
33    fn default() -> Self {
34        Self {
35            max_frame_size: 65536,                        // 64 KB
36            buffer_size: 1048576,                         // 1 MB
37            connection_timeout: Duration::from_secs(300), // 5 minutes
38            keep_alive_interval: Duration::from_secs(30), // 30 seconds
39            bandwidth_limit: 1048576,                     // 1 MB/s
40        }
41    }
42}
43
44/// Events that can occur during relay operation
45#[derive(Debug, Clone)]
46pub enum RelayEvent {
47    /// Connection established successfully
48    ConnectionEstablished {
49        /// Unique session identifier
50        session_id: u32,
51        /// Remote peer network address
52        peer_addr: SocketAddr,
53    },
54    /// Data received from peer
55    DataReceived {
56        /// Session the data belongs to
57        session_id: u32,
58        /// Payload bytes received
59        data: Vec<u8>,
60    },
61    /// Connection terminated
62    ConnectionTerminated {
63        /// Identifier of the terminated session
64        session_id: u32,
65        /// Human-readable reason for termination
66        reason: String,
67    },
68    /// Error occurred during relay operation
69    Error {
70        /// Optional session context for the error
71        session_id: Option<u32>,
72        /// Underlying error detail
73        error: RelayError,
74    },
75    /// Bandwidth limit exceeded
76    BandwidthLimitExceeded {
77        /// Identifier of the session exceeding bandwidth
78        session_id: u32,
79        /// Current bandwidth usage (bytes) within the window
80        current_usage: u64,
81        /// Configured limit (bytes) for the window
82        limit: u64,
83    },
84    /// Keep-alive signal
85    KeepAlive {
86        /// Identifier of the session emitting keep-alive
87        session_id: u32,
88    },
89}
90
91/// Actions that can be taken in response to relay events
92#[derive(Debug, Clone)]
93pub enum RelayAction {
94    /// Send data to peer
95    SendData {
96        /// Target session
97        session_id: u32,
98        /// Payload to send
99        data: Vec<u8>,
100    },
101    /// Terminate connection
102    TerminateConnection {
103        /// Target session to terminate
104        session_id: u32,
105        /// Reason for termination
106        reason: String,
107    },
108    /// Update bandwidth limit
109    UpdateBandwidthLimit {
110        /// Target session
111        session_id: u32,
112        /// New bandwidth limit (bytes/sec)
113        new_limit: u64,
114    },
115    /// Send keep-alive
116    SendKeepAlive {
117        /// Target session
118        session_id: u32,
119    },
120}
121
122/// Relay connection for bidirectional data forwarding
123#[derive(Debug)]
124pub struct RelayConnection {
125    /// Unique session identifier
126    session_id: u32,
127    /// Peer address
128    peer_addr: SocketAddr,
129    /// Configuration
130    config: RelayConnectionConfig,
131    /// Connection state
132    state: Arc<Mutex<ConnectionState>>,
133    /// Event sender
134    event_sender: mpsc::UnboundedSender<RelayEvent>,
135    /// Action receiver (reserved for future control-plane)
136    #[allow(dead_code)]
137    action_receiver: mpsc::UnboundedReceiver<RelayAction>,
138}
139
140/// Internal connection state
141#[derive(Debug)]
142struct ConnectionState {
143    /// Whether connection is active
144    is_active: bool,
145    /// Data queue for outgoing packets
146    outgoing_queue: VecDeque<Vec<u8>>,
147    /// Data queue for incoming packets
148    incoming_queue: VecDeque<Vec<u8>>,
149    /// Current buffer usage
150    buffer_usage: usize,
151    /// Bandwidth tracking
152    bandwidth_tracker: BandwidthTracker,
153    /// Last activity timestamp
154    last_activity: Instant,
155    /// Keep-alive timer
156    next_keep_alive: Instant,
157}
158
159/// Bandwidth usage tracker
160#[derive(Debug)]
161struct BandwidthTracker {
162    /// Bytes sent in current window
163    bytes_sent: u64,
164    /// Bytes received in current window
165    bytes_received: u64,
166    /// Window start time
167    window_start: Instant,
168    /// Window duration (1 second)
169    window_duration: Duration,
170    /// Rate limit
171    limit: u64,
172}
173
174impl BandwidthTracker {
175    fn new(limit: u64) -> Self {
176        Self {
177            bytes_sent: 0,
178            bytes_received: 0,
179            window_start: Instant::now(),
180            window_duration: Duration::from_secs(1),
181            limit,
182        }
183    }
184
185    fn reset_if_needed(&mut self) {
186        let now = Instant::now();
187        if now.duration_since(self.window_start) >= self.window_duration {
188            self.bytes_sent = 0;
189            self.bytes_received = 0;
190            self.window_start = now;
191        }
192    }
193
194    fn can_send(&mut self, bytes: u64) -> bool {
195        self.reset_if_needed();
196        self.bytes_sent + bytes <= self.limit
197    }
198
199    fn record_sent(&mut self, bytes: u64) {
200        self.reset_if_needed();
201        self.bytes_sent += bytes;
202    }
203
204    fn record_received(&mut self, bytes: u64) {
205        self.reset_if_needed();
206        self.bytes_received += bytes;
207    }
208
209    fn current_usage(&mut self) -> u64 {
210        self.reset_if_needed();
211        self.bytes_sent + self.bytes_received
212    }
213}
214
215impl RelayConnection {
216    /// Create a new relay connection
217    pub fn new(
218        session_id: u32,
219        peer_addr: SocketAddr,
220        config: RelayConnectionConfig,
221        event_sender: mpsc::UnboundedSender<RelayEvent>,
222        action_receiver: mpsc::UnboundedReceiver<RelayAction>,
223    ) -> Self {
224        let now = Instant::now();
225        let state = ConnectionState {
226            is_active: true,
227            outgoing_queue: VecDeque::new(),
228            incoming_queue: VecDeque::new(),
229            buffer_usage: 0,
230            bandwidth_tracker: BandwidthTracker::new(config.bandwidth_limit),
231            last_activity: now,
232            next_keep_alive: now + config.keep_alive_interval,
233        };
234
235        Self {
236            session_id,
237            peer_addr,
238            config,
239            state: Arc::new(Mutex::new(state)),
240            event_sender,
241            action_receiver,
242        }
243    }
244
245    /// Get session ID
246    pub fn session_id(&self) -> u32 {
247        self.session_id
248    }
249
250    /// Get peer address
251    pub fn peer_addr(&self) -> SocketAddr {
252        self.peer_addr
253    }
254
255    /// Check if connection is active
256    #[allow(clippy::unwrap_used, clippy::expect_used)]
257    pub fn is_active(&self) -> bool {
258        let state = self
259            .state
260            .lock()
261            .expect("Mutex poisoning is unexpected in normal operation");
262        state.is_active
263    }
264
265    /// Send data through the relay
266    #[allow(clippy::unwrap_used, clippy::expect_used)]
267    pub fn send_data(&self, data: Vec<u8>) -> RelayResult<()> {
268        if data.len() > self.config.max_frame_size {
269            return Err(RelayError::ProtocolError {
270                frame_type: 0x46, // RELAY_DATA
271                reason: format!(
272                    "Data size {} exceeds maximum {}",
273                    data.len(),
274                    self.config.max_frame_size
275                ),
276            });
277        }
278
279        let mut state = self
280            .state
281            .lock()
282            .expect("Mutex poisoning is unexpected in normal operation");
283
284        if !state.is_active {
285            return Err(RelayError::SessionError {
286                session_id: Some(self.session_id),
287                kind: crate::relay::error::SessionErrorKind::Terminated,
288            });
289        }
290
291        // Check bandwidth limit
292        if !state.bandwidth_tracker.can_send(data.len() as u64) {
293            let current_usage = state.bandwidth_tracker.current_usage();
294            return Err(RelayError::SessionError {
295                session_id: Some(self.session_id),
296                kind: crate::relay::error::SessionErrorKind::BandwidthExceeded {
297                    used: current_usage,
298                    limit: self.config.bandwidth_limit,
299                },
300            });
301        }
302
303        // Check buffer space
304        if state.buffer_usage + data.len() > self.config.buffer_size {
305            return Err(RelayError::ResourceExhausted {
306                resource_type: "buffer".to_string(),
307                current_usage: state.buffer_usage as u64,
308                limit: self.config.buffer_size as u64,
309            });
310        }
311
312        // Queue data and update tracking
313        state.bandwidth_tracker.record_sent(data.len() as u64);
314        state.buffer_usage += data.len();
315        state.outgoing_queue.push_back(data.clone());
316        state.last_activity = Instant::now();
317
318        // Send event
319        let _ = self.event_sender.send(RelayEvent::DataReceived {
320            session_id: self.session_id,
321            data,
322        });
323
324        Ok(())
325    }
326
327    /// Receive data from the relay
328    #[allow(clippy::unwrap_used)]
329    pub fn receive_data(&self, data: Vec<u8>) -> RelayResult<()> {
330        let mut state = self.state.lock().unwrap();
331
332        if !state.is_active {
333            return Err(RelayError::SessionError {
334                session_id: Some(self.session_id),
335                kind: crate::relay::error::SessionErrorKind::Terminated,
336            });
337        }
338
339        // Check buffer space
340        if state.buffer_usage + data.len() > self.config.buffer_size {
341            return Err(RelayError::ResourceExhausted {
342                resource_type: "buffer".to_string(),
343                current_usage: state.buffer_usage as u64,
344                limit: self.config.buffer_size as u64,
345            });
346        }
347
348        // Queue data and update tracking
349        state.bandwidth_tracker.record_received(data.len() as u64);
350        state.buffer_usage += data.len();
351        state.incoming_queue.push_back(data.clone());
352        state.last_activity = Instant::now();
353
354        // Send event
355        let _ = self.event_sender.send(RelayEvent::DataReceived {
356            session_id: self.session_id,
357            data,
358        });
359
360        Ok(())
361    }
362
363    /// Get next outgoing data packet
364    #[allow(clippy::unwrap_used)]
365    pub fn next_outgoing(&self) -> Option<Vec<u8>> {
366        let mut state = self.state.lock().unwrap();
367        if let Some(data) = state.outgoing_queue.pop_front() {
368            state.buffer_usage = state.buffer_usage.saturating_sub(data.len());
369            Some(data)
370        } else {
371            None
372        }
373    }
374
375    /// Get next incoming data packet
376    #[allow(clippy::unwrap_used)]
377    pub fn next_incoming(&self) -> Option<Vec<u8>> {
378        let mut state = self.state.lock().unwrap();
379        if let Some(data) = state.incoming_queue.pop_front() {
380            state.buffer_usage = state.buffer_usage.saturating_sub(data.len());
381            Some(data)
382        } else {
383            None
384        }
385    }
386
387    /// Check if connection has timed out
388    #[allow(clippy::unwrap_used)]
389    pub fn check_timeout(&self) -> RelayResult<()> {
390        let state = self.state.lock().unwrap();
391        let now = Instant::now();
392
393        if now.duration_since(state.last_activity) > self.config.connection_timeout {
394            return Err(RelayError::SessionError {
395                session_id: Some(self.session_id),
396                kind: crate::relay::error::SessionErrorKind::Expired,
397            });
398        }
399
400        Ok(())
401    }
402
403    /// Check if keep-alive should be sent
404    #[allow(clippy::unwrap_used)]
405    pub fn should_send_keep_alive(&self) -> bool {
406        let state = self.state.lock().unwrap();
407        Instant::now() >= state.next_keep_alive
408    }
409
410    /// Send keep-alive
411    #[allow(clippy::unwrap_used)]
412    pub fn send_keep_alive(&self) -> RelayResult<()> {
413        let mut state = self.state.lock().unwrap();
414        state.next_keep_alive = Instant::now() + self.config.keep_alive_interval;
415
416        let _ = self.event_sender.send(RelayEvent::KeepAlive {
417            session_id: self.session_id,
418        });
419
420        Ok(())
421    }
422
423    /// Terminate the connection
424    #[allow(clippy::unwrap_used)]
425    pub fn terminate(&self, reason: String) -> RelayResult<()> {
426        let mut state = self.state.lock().unwrap();
427        state.is_active = false;
428
429        let _ = self.event_sender.send(RelayEvent::ConnectionTerminated {
430            session_id: self.session_id,
431            reason: reason.clone(),
432        });
433
434        Ok(())
435    }
436
437    /// Get connection statistics
438    #[allow(clippy::unwrap_used)]
439    pub fn get_stats(&self) -> ConnectionStats {
440        let state = self.state.lock().unwrap();
441        ConnectionStats {
442            session_id: self.session_id,
443            peer_addr: self.peer_addr,
444            is_active: state.is_active,
445            bytes_sent: state.bandwidth_tracker.bytes_sent,
446            bytes_received: state.bandwidth_tracker.bytes_received,
447            buffer_usage: state.buffer_usage,
448            outgoing_queue_size: state.outgoing_queue.len(),
449            incoming_queue_size: state.incoming_queue.len(),
450            last_activity: state.last_activity,
451        }
452    }
453}
454
455/// Connection statistics
456#[derive(Debug, Clone)]
457pub struct ConnectionStats {
458    /// Unique session identifier
459    pub session_id: u32,
460    /// Remote peer address
461    pub peer_addr: SocketAddr,
462    /// Whether the connection is currently active
463    pub is_active: bool,
464    /// Total bytes sent in the current window
465    pub bytes_sent: u64,
466    /// Total bytes received in the current window
467    pub bytes_received: u64,
468    /// Current buffer usage (bytes)
469    pub buffer_usage: usize,
470    /// Number of queued outgoing packets
471    pub outgoing_queue_size: usize,
472    /// Number of queued incoming packets
473    pub incoming_queue_size: usize,
474    /// Timestamp of last activity
475    pub last_activity: Instant,
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481    use std::net::{IpAddr, Ipv4Addr};
482    use tokio::sync::mpsc;
483
484    fn test_addr() -> SocketAddr {
485        SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080)
486    }
487
488    #[test]
489    fn test_relay_connection_creation() {
490        let (event_tx, _event_rx) = mpsc::unbounded_channel();
491        let (_action_tx, action_rx) = mpsc::unbounded_channel();
492
493        let connection = RelayConnection::new(
494            123,
495            test_addr(),
496            RelayConnectionConfig::default(),
497            event_tx,
498            action_rx,
499        );
500
501        assert_eq!(connection.session_id(), 123);
502        assert_eq!(connection.peer_addr(), test_addr());
503        assert!(connection.is_active());
504    }
505
506    #[test]
507    fn test_send_data_within_limits() {
508        let (event_tx, _event_rx) = mpsc::unbounded_channel();
509        let (_action_tx, action_rx) = mpsc::unbounded_channel();
510
511        let connection = RelayConnection::new(
512            123,
513            test_addr(),
514            RelayConnectionConfig::default(),
515            event_tx,
516            action_rx,
517        );
518
519        let data = vec![1, 2, 3, 4];
520        assert!(connection.send_data(data.clone()).is_ok());
521
522        // Check that data is queued
523        assert_eq!(connection.next_outgoing(), Some(data));
524    }
525
526    #[test]
527    fn test_send_data_exceeds_frame_size() {
528        let (event_tx, _event_rx) = mpsc::unbounded_channel();
529        let (_action_tx, action_rx) = mpsc::unbounded_channel();
530
531        let mut config = RelayConnectionConfig::default();
532        config.max_frame_size = 10;
533
534        let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
535
536        let large_data = vec![0; 20];
537        assert!(connection.send_data(large_data).is_err());
538    }
539
540    #[test]
541    fn test_bandwidth_limiting() {
542        let (event_tx, _event_rx) = mpsc::unbounded_channel();
543        let (_action_tx, action_rx) = mpsc::unbounded_channel();
544
545        let mut config = RelayConnectionConfig::default();
546        config.bandwidth_limit = 100; // Very low limit
547
548        let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
549
550        // First small packet should succeed
551        assert!(connection.send_data(vec![0; 50]).is_ok());
552
553        // Second packet should exceed bandwidth limit
554        assert!(connection.send_data(vec![0; 60]).is_err());
555    }
556
557    #[test]
558    fn test_buffer_size_limiting() {
559        let (event_tx, _event_rx) = mpsc::unbounded_channel();
560        let (_action_tx, action_rx) = mpsc::unbounded_channel();
561
562        let mut config = RelayConnectionConfig::default();
563        config.buffer_size = 100; // Very small buffer
564
565        let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
566
567        // Fill buffer
568        assert!(connection.send_data(vec![0; 80]).is_ok());
569
570        // Should fail to add more data
571        assert!(connection.send_data(vec![0; 30]).is_err());
572    }
573
574    #[test]
575    fn test_connection_termination() {
576        let (event_tx, _event_rx) = mpsc::unbounded_channel();
577        let (_action_tx, action_rx) = mpsc::unbounded_channel();
578
579        let connection = RelayConnection::new(
580            123,
581            test_addr(),
582            RelayConnectionConfig::default(),
583            event_tx,
584            action_rx,
585        );
586
587        assert!(connection.is_active());
588
589        let reason = "Test termination".to_string();
590        assert!(connection.terminate(reason.clone()).is_ok());
591
592        assert!(!connection.is_active());
593
594        // Should not be able to send data after termination
595        assert!(connection.send_data(vec![1, 2, 3]).is_err());
596    }
597
598    #[test]
599    fn test_keep_alive() {
600        let (event_tx, _event_rx) = mpsc::unbounded_channel();
601        let (_action_tx, action_rx) = mpsc::unbounded_channel();
602
603        let mut config = RelayConnectionConfig::default();
604        config.keep_alive_interval = Duration::from_millis(1);
605
606        let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
607
608        // Initially should not need keep-alive
609        assert!(!connection.should_send_keep_alive());
610
611        // Wait for keep-alive interval
612        std::thread::sleep(Duration::from_millis(2));
613
614        // Should need keep-alive now
615        assert!(connection.should_send_keep_alive());
616
617        // Send keep-alive
618        assert!(connection.send_keep_alive().is_ok());
619
620        // Should not need keep-alive immediately after sending
621        assert!(!connection.should_send_keep_alive());
622    }
623
624    #[test]
625    fn test_connection_stats() {
626        let (event_tx, _event_rx) = mpsc::unbounded_channel();
627        let (_action_tx, action_rx) = mpsc::unbounded_channel();
628
629        let connection = RelayConnection::new(
630            123,
631            test_addr(),
632            RelayConnectionConfig::default(),
633            event_tx,
634            action_rx,
635        );
636
637        // Send some data
638        connection.send_data(vec![1, 2, 3]).unwrap();
639        connection.receive_data(vec![4, 5, 6, 7]).unwrap();
640
641        let stats = connection.get_stats();
642        assert_eq!(stats.session_id, 123);
643        assert_eq!(stats.peer_addr, test_addr());
644        assert!(stats.is_active);
645        assert_eq!(stats.bytes_sent, 3);
646        assert_eq!(stats.bytes_received, 4);
647        assert_eq!(stats.outgoing_queue_size, 1);
648        assert_eq!(stats.incoming_queue_size, 1);
649    }
650}