1use crate::relay::{RelayError, RelayResult};
11use std::collections::VecDeque;
12use std::net::SocketAddr;
13use std::sync::{Arc, Mutex};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16
17#[derive(Debug, Clone)]
19pub struct RelayConnectionConfig {
20 pub max_frame_size: usize,
22 pub buffer_size: usize,
24 pub connection_timeout: Duration,
26 pub keep_alive_interval: Duration,
28 pub bandwidth_limit: u64,
30}
31
32impl Default for RelayConnectionConfig {
33 fn default() -> Self {
34 Self {
35 max_frame_size: 65536, buffer_size: 1048576, connection_timeout: Duration::from_secs(300), keep_alive_interval: Duration::from_secs(30), bandwidth_limit: 1048576, }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub enum RelayEvent {
47 ConnectionEstablished {
49 session_id: u32,
51 peer_addr: SocketAddr,
53 },
54 DataReceived {
56 session_id: u32,
58 data: Vec<u8>,
60 },
61 ConnectionTerminated {
63 session_id: u32,
65 reason: String,
67 },
68 Error {
70 session_id: Option<u32>,
72 error: RelayError,
74 },
75 BandwidthLimitExceeded {
77 session_id: u32,
79 current_usage: u64,
81 limit: u64,
83 },
84 KeepAlive {
86 session_id: u32,
88 },
89}
90
91#[derive(Debug, Clone)]
93pub enum RelayAction {
94 SendData {
96 session_id: u32,
98 data: Vec<u8>,
100 },
101 TerminateConnection {
103 session_id: u32,
105 reason: String,
107 },
108 UpdateBandwidthLimit {
110 session_id: u32,
112 new_limit: u64,
114 },
115 SendKeepAlive {
117 session_id: u32,
119 },
120}
121
122#[derive(Debug)]
124pub struct RelayConnection {
125 session_id: u32,
127 peer_addr: SocketAddr,
129 config: RelayConnectionConfig,
131 state: Arc<Mutex<ConnectionState>>,
133 event_sender: mpsc::UnboundedSender<RelayEvent>,
135 #[allow(dead_code)]
137 action_receiver: mpsc::UnboundedReceiver<RelayAction>,
138}
139
140#[derive(Debug)]
142struct ConnectionState {
143 is_active: bool,
145 outgoing_queue: VecDeque<Vec<u8>>,
147 incoming_queue: VecDeque<Vec<u8>>,
149 buffer_usage: usize,
151 bandwidth_tracker: BandwidthTracker,
153 last_activity: Instant,
155 next_keep_alive: Instant,
157}
158
159#[derive(Debug)]
161struct BandwidthTracker {
162 bytes_sent: u64,
164 bytes_received: u64,
166 window_start: Instant,
168 window_duration: Duration,
170 limit: u64,
172}
173
174impl BandwidthTracker {
175 fn new(limit: u64) -> Self {
176 Self {
177 bytes_sent: 0,
178 bytes_received: 0,
179 window_start: Instant::now(),
180 window_duration: Duration::from_secs(1),
181 limit,
182 }
183 }
184
185 fn reset_if_needed(&mut self) {
186 let now = Instant::now();
187 if now.duration_since(self.window_start) >= self.window_duration {
188 self.bytes_sent = 0;
189 self.bytes_received = 0;
190 self.window_start = now;
191 }
192 }
193
194 fn can_send(&mut self, bytes: u64) -> bool {
195 self.reset_if_needed();
196 self.bytes_sent + bytes <= self.limit
197 }
198
199 fn record_sent(&mut self, bytes: u64) {
200 self.reset_if_needed();
201 self.bytes_sent += bytes;
202 }
203
204 fn record_received(&mut self, bytes: u64) {
205 self.reset_if_needed();
206 self.bytes_received += bytes;
207 }
208
209 fn current_usage(&mut self) -> u64 {
210 self.reset_if_needed();
211 self.bytes_sent + self.bytes_received
212 }
213}
214
215impl RelayConnection {
216 pub fn new(
218 session_id: u32,
219 peer_addr: SocketAddr,
220 config: RelayConnectionConfig,
221 event_sender: mpsc::UnboundedSender<RelayEvent>,
222 action_receiver: mpsc::UnboundedReceiver<RelayAction>,
223 ) -> Self {
224 let now = Instant::now();
225 let state = ConnectionState {
226 is_active: true,
227 outgoing_queue: VecDeque::new(),
228 incoming_queue: VecDeque::new(),
229 buffer_usage: 0,
230 bandwidth_tracker: BandwidthTracker::new(config.bandwidth_limit),
231 last_activity: now,
232 next_keep_alive: now + config.keep_alive_interval,
233 };
234
235 Self {
236 session_id,
237 peer_addr,
238 config,
239 state: Arc::new(Mutex::new(state)),
240 event_sender,
241 action_receiver,
242 }
243 }
244
245 pub fn session_id(&self) -> u32 {
247 self.session_id
248 }
249
250 pub fn peer_addr(&self) -> SocketAddr {
252 self.peer_addr
253 }
254
255 #[allow(clippy::unwrap_used, clippy::expect_used)]
257 pub fn is_active(&self) -> bool {
258 let state = self
259 .state
260 .lock()
261 .expect("Mutex poisoning is unexpected in normal operation");
262 state.is_active
263 }
264
265 #[allow(clippy::unwrap_used, clippy::expect_used)]
267 pub fn send_data(&self, data: Vec<u8>) -> RelayResult<()> {
268 if data.len() > self.config.max_frame_size {
269 return Err(RelayError::ProtocolError {
270 frame_type: 0x46, reason: format!(
272 "Data size {} exceeds maximum {}",
273 data.len(),
274 self.config.max_frame_size
275 ),
276 });
277 }
278
279 let mut state = self
280 .state
281 .lock()
282 .expect("Mutex poisoning is unexpected in normal operation");
283
284 if !state.is_active {
285 return Err(RelayError::SessionError {
286 session_id: Some(self.session_id),
287 kind: crate::relay::error::SessionErrorKind::Terminated,
288 });
289 }
290
291 if !state.bandwidth_tracker.can_send(data.len() as u64) {
293 let current_usage = state.bandwidth_tracker.current_usage();
294 return Err(RelayError::SessionError {
295 session_id: Some(self.session_id),
296 kind: crate::relay::error::SessionErrorKind::BandwidthExceeded {
297 used: current_usage,
298 limit: self.config.bandwidth_limit,
299 },
300 });
301 }
302
303 if state.buffer_usage + data.len() > self.config.buffer_size {
305 return Err(RelayError::ResourceExhausted {
306 resource_type: "buffer".to_string(),
307 current_usage: state.buffer_usage as u64,
308 limit: self.config.buffer_size as u64,
309 });
310 }
311
312 state.bandwidth_tracker.record_sent(data.len() as u64);
314 state.buffer_usage += data.len();
315 state.outgoing_queue.push_back(data.clone());
316 state.last_activity = Instant::now();
317
318 let _ = self.event_sender.send(RelayEvent::DataReceived {
320 session_id: self.session_id,
321 data,
322 });
323
324 Ok(())
325 }
326
327 #[allow(clippy::unwrap_used)]
329 pub fn receive_data(&self, data: Vec<u8>) -> RelayResult<()> {
330 let mut state = self.state.lock().unwrap();
331
332 if !state.is_active {
333 return Err(RelayError::SessionError {
334 session_id: Some(self.session_id),
335 kind: crate::relay::error::SessionErrorKind::Terminated,
336 });
337 }
338
339 if state.buffer_usage + data.len() > self.config.buffer_size {
341 return Err(RelayError::ResourceExhausted {
342 resource_type: "buffer".to_string(),
343 current_usage: state.buffer_usage as u64,
344 limit: self.config.buffer_size as u64,
345 });
346 }
347
348 state.bandwidth_tracker.record_received(data.len() as u64);
350 state.buffer_usage += data.len();
351 state.incoming_queue.push_back(data.clone());
352 state.last_activity = Instant::now();
353
354 let _ = self.event_sender.send(RelayEvent::DataReceived {
356 session_id: self.session_id,
357 data,
358 });
359
360 Ok(())
361 }
362
363 #[allow(clippy::unwrap_used)]
365 pub fn next_outgoing(&self) -> Option<Vec<u8>> {
366 let mut state = self.state.lock().unwrap();
367 if let Some(data) = state.outgoing_queue.pop_front() {
368 state.buffer_usage = state.buffer_usage.saturating_sub(data.len());
369 Some(data)
370 } else {
371 None
372 }
373 }
374
375 #[allow(clippy::unwrap_used)]
377 pub fn next_incoming(&self) -> Option<Vec<u8>> {
378 let mut state = self.state.lock().unwrap();
379 if let Some(data) = state.incoming_queue.pop_front() {
380 state.buffer_usage = state.buffer_usage.saturating_sub(data.len());
381 Some(data)
382 } else {
383 None
384 }
385 }
386
387 #[allow(clippy::unwrap_used)]
389 pub fn check_timeout(&self) -> RelayResult<()> {
390 let state = self.state.lock().unwrap();
391 let now = Instant::now();
392
393 if now.duration_since(state.last_activity) > self.config.connection_timeout {
394 return Err(RelayError::SessionError {
395 session_id: Some(self.session_id),
396 kind: crate::relay::error::SessionErrorKind::Expired,
397 });
398 }
399
400 Ok(())
401 }
402
403 #[allow(clippy::unwrap_used)]
405 pub fn should_send_keep_alive(&self) -> bool {
406 let state = self.state.lock().unwrap();
407 Instant::now() >= state.next_keep_alive
408 }
409
410 #[allow(clippy::unwrap_used)]
412 pub fn send_keep_alive(&self) -> RelayResult<()> {
413 let mut state = self.state.lock().unwrap();
414 state.next_keep_alive = Instant::now() + self.config.keep_alive_interval;
415
416 let _ = self.event_sender.send(RelayEvent::KeepAlive {
417 session_id: self.session_id,
418 });
419
420 Ok(())
421 }
422
423 #[allow(clippy::unwrap_used)]
425 pub fn terminate(&self, reason: String) -> RelayResult<()> {
426 let mut state = self.state.lock().unwrap();
427 state.is_active = false;
428
429 let _ = self.event_sender.send(RelayEvent::ConnectionTerminated {
430 session_id: self.session_id,
431 reason: reason.clone(),
432 });
433
434 Ok(())
435 }
436
437 #[allow(clippy::unwrap_used)]
439 pub fn get_stats(&self) -> ConnectionStats {
440 let state = self.state.lock().unwrap();
441 ConnectionStats {
442 session_id: self.session_id,
443 peer_addr: self.peer_addr,
444 is_active: state.is_active,
445 bytes_sent: state.bandwidth_tracker.bytes_sent,
446 bytes_received: state.bandwidth_tracker.bytes_received,
447 buffer_usage: state.buffer_usage,
448 outgoing_queue_size: state.outgoing_queue.len(),
449 incoming_queue_size: state.incoming_queue.len(),
450 last_activity: state.last_activity,
451 }
452 }
453}
454
455#[derive(Debug, Clone)]
457pub struct ConnectionStats {
458 pub session_id: u32,
460 pub peer_addr: SocketAddr,
462 pub is_active: bool,
464 pub bytes_sent: u64,
466 pub bytes_received: u64,
468 pub buffer_usage: usize,
470 pub outgoing_queue_size: usize,
472 pub incoming_queue_size: usize,
474 pub last_activity: Instant,
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481 use std::net::{IpAddr, Ipv4Addr};
482 use tokio::sync::mpsc;
483
484 fn test_addr() -> SocketAddr {
485 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080)
486 }
487
488 #[test]
489 fn test_relay_connection_creation() {
490 let (event_tx, _event_rx) = mpsc::unbounded_channel();
491 let (_action_tx, action_rx) = mpsc::unbounded_channel();
492
493 let connection = RelayConnection::new(
494 123,
495 test_addr(),
496 RelayConnectionConfig::default(),
497 event_tx,
498 action_rx,
499 );
500
501 assert_eq!(connection.session_id(), 123);
502 assert_eq!(connection.peer_addr(), test_addr());
503 assert!(connection.is_active());
504 }
505
506 #[test]
507 fn test_send_data_within_limits() {
508 let (event_tx, _event_rx) = mpsc::unbounded_channel();
509 let (_action_tx, action_rx) = mpsc::unbounded_channel();
510
511 let connection = RelayConnection::new(
512 123,
513 test_addr(),
514 RelayConnectionConfig::default(),
515 event_tx,
516 action_rx,
517 );
518
519 let data = vec![1, 2, 3, 4];
520 assert!(connection.send_data(data.clone()).is_ok());
521
522 assert_eq!(connection.next_outgoing(), Some(data));
524 }
525
526 #[test]
527 fn test_send_data_exceeds_frame_size() {
528 let (event_tx, _event_rx) = mpsc::unbounded_channel();
529 let (_action_tx, action_rx) = mpsc::unbounded_channel();
530
531 let mut config = RelayConnectionConfig::default();
532 config.max_frame_size = 10;
533
534 let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
535
536 let large_data = vec![0; 20];
537 assert!(connection.send_data(large_data).is_err());
538 }
539
540 #[test]
541 fn test_bandwidth_limiting() {
542 let (event_tx, _event_rx) = mpsc::unbounded_channel();
543 let (_action_tx, action_rx) = mpsc::unbounded_channel();
544
545 let mut config = RelayConnectionConfig::default();
546 config.bandwidth_limit = 100; let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
549
550 assert!(connection.send_data(vec![0; 50]).is_ok());
552
553 assert!(connection.send_data(vec![0; 60]).is_err());
555 }
556
557 #[test]
558 fn test_buffer_size_limiting() {
559 let (event_tx, _event_rx) = mpsc::unbounded_channel();
560 let (_action_tx, action_rx) = mpsc::unbounded_channel();
561
562 let mut config = RelayConnectionConfig::default();
563 config.buffer_size = 100; let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
566
567 assert!(connection.send_data(vec![0; 80]).is_ok());
569
570 assert!(connection.send_data(vec![0; 30]).is_err());
572 }
573
574 #[test]
575 fn test_connection_termination() {
576 let (event_tx, _event_rx) = mpsc::unbounded_channel();
577 let (_action_tx, action_rx) = mpsc::unbounded_channel();
578
579 let connection = RelayConnection::new(
580 123,
581 test_addr(),
582 RelayConnectionConfig::default(),
583 event_tx,
584 action_rx,
585 );
586
587 assert!(connection.is_active());
588
589 let reason = "Test termination".to_string();
590 assert!(connection.terminate(reason.clone()).is_ok());
591
592 assert!(!connection.is_active());
593
594 assert!(connection.send_data(vec![1, 2, 3]).is_err());
596 }
597
598 #[test]
599 fn test_keep_alive() {
600 let (event_tx, _event_rx) = mpsc::unbounded_channel();
601 let (_action_tx, action_rx) = mpsc::unbounded_channel();
602
603 let mut config = RelayConnectionConfig::default();
604 config.keep_alive_interval = Duration::from_millis(1);
605
606 let connection = RelayConnection::new(123, test_addr(), config, event_tx, action_rx);
607
608 assert!(!connection.should_send_keep_alive());
610
611 std::thread::sleep(Duration::from_millis(2));
613
614 assert!(connection.should_send_keep_alive());
616
617 assert!(connection.send_keep_alive().is_ok());
619
620 assert!(!connection.should_send_keep_alive());
622 }
623
624 #[test]
625 fn test_connection_stats() {
626 let (event_tx, _event_rx) = mpsc::unbounded_channel();
627 let (_action_tx, action_rx) = mpsc::unbounded_channel();
628
629 let connection = RelayConnection::new(
630 123,
631 test_addr(),
632 RelayConnectionConfig::default(),
633 event_tx,
634 action_rx,
635 );
636
637 connection.send_data(vec![1, 2, 3]).unwrap();
639 connection.receive_data(vec![4, 5, 6, 7]).unwrap();
640
641 let stats = connection.get_stats();
642 assert_eq!(stats.session_id, 123);
643 assert_eq!(stats.peer_addr, test_addr());
644 assert!(stats.is_active);
645 assert_eq!(stats.bytes_sent, 3);
646 assert_eq!(stats.bytes_received, 4);
647 assert_eq!(stats.outgoing_queue_size, 1);
648 assert_eq!(stats.incoming_queue_size, 1);
649 }
650}