1use std::time::{Duration, Instant};
40use tracing::{debug, info, warn};
41
42#[derive(Debug, Clone, PartialEq)]
44pub enum KeepalivePreset {
45 Mobile,
48 Home,
51 Corporate,
54 DataCenter,
57 Disabled,
59}
60
61#[derive(Debug, Clone)]
63pub struct KeepaliveConfig {
64 pub interval: Duration,
66 pub timeout: Duration,
68 pub max_missed: u32,
70 pub enabled: bool,
72 pub adaptive: bool,
74}
75
76impl KeepaliveConfig {
77 pub fn mobile() -> Self {
80 KeepaliveConfig {
81 interval: Duration::from_secs(20),
82 timeout: Duration::from_secs(5),
83 max_missed: 3,
84 enabled: true,
85 adaptive: true,
86 }
87 }
88
89 pub fn home() -> Self {
91 KeepaliveConfig {
92 interval: Duration::from_secs(60),
93 timeout: Duration::from_secs(10),
94 max_missed: 3,
95 enabled: true,
96 adaptive: true,
97 }
98 }
99
100 pub fn corporate() -> Self {
102 KeepaliveConfig {
103 interval: Duration::from_secs(120),
104 timeout: Duration::from_secs(15),
105 max_missed: 2,
106 enabled: true,
107 adaptive: false,
108 }
109 }
110
111 pub fn datacenter() -> Self {
113 KeepaliveConfig {
114 interval: Duration::from_secs(30),
115 timeout: Duration::from_secs(10),
116 max_missed: 5,
117 enabled: true,
118 adaptive: false,
119 }
120 }
121
122 pub fn aggressive() -> Self {
124 Self::mobile()
125 }
126
127 pub fn disabled() -> Self {
129 KeepaliveConfig {
130 interval: Duration::from_secs(u64::MAX / 2),
131 timeout: Duration::from_secs(30),
132 max_missed: u32::MAX,
133 enabled: false,
134 adaptive: false,
135 }
136 }
137
138 pub fn from_preset(preset: KeepalivePreset) -> Self {
140 match preset {
141 KeepalivePreset::Mobile => Self::mobile(),
142 KeepalivePreset::Home => Self::home(),
143 KeepalivePreset::Corporate => Self::corporate(),
144 KeepalivePreset::DataCenter => Self::datacenter(),
145 KeepalivePreset::Disabled => Self::disabled(),
146 }
147 }
148}
149
150impl Default for KeepaliveConfig {
151 fn default() -> Self {
152 Self::home()
153 }
154}
155
156#[derive(Debug, Clone, PartialEq)]
158pub enum KeepaliveAction {
159 Idle,
161 SendPing,
163 PongTimeout,
165 ConnectionDead,
167}
168
169pub struct KeepaliveManager {
171 config: KeepaliveConfig,
172 last_sent: Option<Instant>,
174 last_pong: Option<Instant>,
176 last_activity: Instant,
178 waiting_for_pong: bool,
180 ping_sent_at: Option<Instant>,
182 missed_pongs: u32,
184 total_sent: u64,
186 total_pongs: u64,
188 srtt: Option<Duration>,
190 adaptive_interval: Duration,
192}
193
194impl KeepaliveManager {
195 pub fn new(config: KeepaliveConfig) -> Self {
197 let adaptive_interval = config.interval;
198 info!(
199 interval_secs = config.interval.as_secs(),
200 max_missed = config.max_missed,
201 adaptive = config.adaptive,
202 enabled = config.enabled,
203 "KeepaliveManager created"
204 );
205 KeepaliveManager {
206 adaptive_interval,
207 config,
208 last_sent: None,
209 last_pong: None,
210 last_activity: Instant::now(),
211 waiting_for_pong: false,
212 ping_sent_at: None,
213 missed_pongs: 0,
214 total_sent: 0,
215 total_pongs: 0,
216 srtt: None,
217 }
218 }
219
220 pub fn from_preset(preset: KeepalivePreset) -> Self {
222 Self::new(KeepaliveConfig::from_preset(preset))
223 }
224
225 pub fn check(&mut self) -> KeepaliveAction {
243 if !self.config.enabled {
244 return KeepaliveAction::Idle;
245 }
246
247 if self.waiting_for_pong {
249 if let Some(sent_at) = self.ping_sent_at {
250 if sent_at.elapsed() > self.config.timeout {
251 self.waiting_for_pong = false;
252 self.ping_sent_at = None;
253 warn!(
254 missed = self.missed_pongs + 1,
255 max = self.config.max_missed,
256 "Keepalive pong timed out"
257 );
258 self.missed_pongs += 1;
259 if self.missed_pongs >= self.config.max_missed {
260 warn!("Too many missed keepalive pongs — connection declared dead");
261 return KeepaliveAction::ConnectionDead;
262 }
263 return KeepaliveAction::PongTimeout;
264 }
265 return KeepaliveAction::Idle;
267 }
268 }
269
270 if self.should_send_keepalive() {
272 return KeepaliveAction::SendPing;
273 }
274
275 KeepaliveAction::Idle
276 }
277
278 pub fn should_send_keepalive(&self) -> bool {
280 if !self.config.enabled || self.waiting_for_pong {
281 return false;
282 }
283 let since_activity = self.last_activity.elapsed();
284 let since_sent = self.last_sent
285 .map(|t| t.elapsed())
286 .unwrap_or(Duration::MAX);
287
288 since_activity >= self.adaptive_interval
290 || since_sent >= self.adaptive_interval
291 }
292
293 pub fn record_keepalive_sent(&mut self) {
295 let now = Instant::now();
296 self.last_sent = Some(now);
297 self.ping_sent_at = Some(now);
298 self.waiting_for_pong = true;
299 self.total_sent += 1;
300 debug!(total_sent = self.total_sent, "Keepalive ping sent");
301 }
302
303 pub fn record_pong_received(&mut self) {
307 let now = Instant::now();
308 self.last_pong = Some(now);
309 self.last_activity = now;
310 self.total_pongs += 1;
311
312 if let Some(sent_at) = self.ping_sent_at.take() {
314 let rtt = sent_at.elapsed();
315 self.update_srtt(rtt);
316
317 if self.config.adaptive {
318 self.adjust_interval(rtt);
319 }
320 }
321
322 self.waiting_for_pong = false;
323 self.missed_pongs = 0;
324 debug!(total_pongs = self.total_pongs, "Keepalive pong received");
325 }
326
327 pub fn record_pong_missed(&mut self) {
329 self.missed_pongs += 1;
330 warn!(
331 missed = self.missed_pongs,
332 max = self.config.max_missed,
333 "Keepalive pong missed"
334 );
335 }
336
337 pub fn record_activity(&mut self) {
339 self.last_activity = Instant::now();
340 if self.waiting_for_pong {
342 self.waiting_for_pong = false;
343 self.ping_sent_at = None;
344 self.missed_pongs = 0;
345 }
346 }
347
348 pub fn reset_misses(&mut self) {
350 self.missed_pongs = 0;
351 self.waiting_for_pong = false;
352 self.ping_sent_at = None;
353 self.last_activity = Instant::now();
354 info!("Keepalive miss counter reset");
355 }
356
357 fn update_srtt(&mut self, rtt: Duration) {
360 self.srtt = Some(match self.srtt {
361 None => rtt,
362 Some(srtt) => {
363 let s = srtt.as_nanos() as u64;
364 let r = rtt.as_nanos() as u64;
365 Duration::from_nanos(s / 8 * 7 + r / 8)
366 }
367 });
368 }
369
370 fn adjust_interval(&mut self, rtt: Duration) {
371 let min_interval = (rtt * 4).max(Duration::from_secs(10));
374 let new_interval = min_interval
375 .min(self.config.interval)
376 .max(Duration::from_secs(10));
377
378 if new_interval != self.adaptive_interval {
379 debug!(
380 old_secs = self.adaptive_interval.as_secs(),
381 new_secs = new_interval.as_secs(),
382 rtt_ms = rtt.as_millis(),
383 "Adaptive keepalive interval adjusted"
384 );
385 self.adaptive_interval = new_interval;
386 }
387 }
388
389 pub fn srtt(&self) -> Option<Duration> {
393 self.srtt
394 }
395
396 pub fn current_interval(&self) -> Duration {
398 self.adaptive_interval
399 }
400
401 pub fn missed_pongs(&self) -> u32 {
403 self.missed_pongs
404 }
405
406 pub fn is_waiting_for_pong(&self) -> bool {
408 self.waiting_for_pong
409 }
410
411 pub fn is_dead(&self) -> bool {
413 self.missed_pongs >= self.config.max_missed
414 }
415
416 pub fn total_sent(&self) -> u64 {
418 self.total_sent
419 }
420
421 pub fn total_pongs(&self) -> u64 {
423 self.total_pongs
424 }
425
426 pub fn last_pong(&self) -> Option<Instant> {
428 self.last_pong
429 }
430
431 pub fn config(&self) -> &KeepaliveConfig {
433 &self.config
434 }
435}
436
437#[cfg(test)]
438mod tests {
439 use super::*;
440
441 fn instant_manager() -> KeepaliveManager {
442 KeepaliveManager::new(KeepaliveConfig {
443 interval: Duration::from_millis(1),
444 timeout: Duration::from_millis(50),
445 max_missed: 3,
446 enabled: true,
447 adaptive: false,
448 })
449 }
450
451 #[test]
452 fn test_config_mobile() {
453 let c = KeepaliveConfig::mobile();
454 assert_eq!(c.interval, Duration::from_secs(20));
455 assert!(c.enabled);
456 assert!(c.adaptive);
457 }
458
459 #[test]
460 fn test_config_home() {
461 let c = KeepaliveConfig::home();
462 assert_eq!(c.interval, Duration::from_secs(60));
463 }
464
465 #[test]
466 fn test_config_corporate() {
467 let c = KeepaliveConfig::corporate();
468 assert_eq!(c.max_missed, 2);
469 assert!(!c.adaptive);
470 }
471
472 #[test]
473 fn test_config_disabled() {
474 let c = KeepaliveConfig::disabled();
475 assert!(!c.enabled);
476 }
477
478 #[test]
479 fn test_config_from_preset_mobile() {
480 let c = KeepaliveConfig::from_preset(KeepalivePreset::Mobile);
481 assert_eq!(c.interval, Duration::from_secs(20));
482 }
483
484 #[test]
485 fn test_config_default_is_home() {
486 let c = KeepaliveConfig::default();
487 assert_eq!(c.interval, Duration::from_secs(60));
488 }
489
490 #[test]
491 fn test_manager_new() {
492 let m = KeepaliveManager::new(KeepaliveConfig::default());
493 assert_eq!(m.missed_pongs(), 0);
494 assert!(!m.is_waiting_for_pong());
495 assert!(!m.is_dead());
496 assert_eq!(m.total_sent(), 0);
497 }
498
499 #[test]
500 fn test_should_send_keepalive_initially() {
501 let m = instant_manager();
502 std::thread::sleep(Duration::from_millis(5));
503 assert!(m.should_send_keepalive());
504 }
505
506 #[test]
507 fn test_should_not_send_while_waiting() {
508 let mut m = instant_manager();
509 std::thread::sleep(Duration::from_millis(5));
510 m.record_keepalive_sent();
511 assert!(!m.should_send_keepalive());
512 }
513
514 #[test]
515 fn test_record_keepalive_sent() {
516 let mut m = instant_manager();
517 m.record_keepalive_sent();
518 assert!(m.is_waiting_for_pong());
519 assert_eq!(m.total_sent(), 1);
520 }
521
522 #[test]
523 fn test_record_pong_received() {
524 let mut m = instant_manager();
525 m.record_keepalive_sent();
526 assert!(m.is_waiting_for_pong());
527 m.record_pong_received();
528 assert!(!m.is_waiting_for_pong());
529 assert_eq!(m.total_pongs(), 1);
530 assert_eq!(m.missed_pongs(), 0);
531 assert!(m.last_pong().is_some());
532 }
533
534 #[test]
535 fn test_srtt_updated_after_pong() {
536 let mut m = instant_manager();
537 m.record_keepalive_sent();
538 std::thread::sleep(Duration::from_millis(5));
539 m.record_pong_received();
540 assert!(m.srtt().is_some());
541 }
542
543 #[test]
544 fn test_record_activity_resets_wait() {
545 let mut m = instant_manager();
546 m.record_keepalive_sent();
547 assert!(m.is_waiting_for_pong());
548 m.record_activity();
549 assert!(!m.is_waiting_for_pong());
550 assert_eq!(m.missed_pongs(), 0);
551 }
552
553 #[test]
554 fn test_check_disabled() {
555 let mut m = KeepaliveManager::new(KeepaliveConfig::disabled());
556 assert_eq!(m.check(), KeepaliveAction::Idle);
557 }
558
559 #[test]
560 fn test_check_send_ping() {
561 let mut m = instant_manager();
562 std::thread::sleep(Duration::from_millis(5));
563 assert_eq!(m.check(), KeepaliveAction::SendPing);
564 }
565
566 #[test]
567 fn test_check_pong_timeout() {
568 let mut m = KeepaliveManager::new(KeepaliveConfig {
569 interval: Duration::from_millis(1),
570 timeout: Duration::from_millis(1),
571 max_missed: 3,
572 enabled: true,
573 adaptive: false,
574 });
575 std::thread::sleep(Duration::from_millis(5));
576 m.record_keepalive_sent();
577 std::thread::sleep(Duration::from_millis(5));
578 let action = m.check();
579 assert!(
580 action == KeepaliveAction::PongTimeout
581 || action == KeepaliveAction::ConnectionDead
582 );
583 }
584
585 #[test]
586 fn test_connection_dead_after_max_missed() {
587 let mut m = KeepaliveManager::new(KeepaliveConfig {
588 interval: Duration::from_millis(1),
589 timeout: Duration::from_millis(1),
590 max_missed: 2,
591 enabled: true,
592 adaptive: false,
593 });
594
595 for _ in 0..2 {
597 std::thread::sleep(Duration::from_millis(3));
598 m.record_keepalive_sent();
599 std::thread::sleep(Duration::from_millis(3));
600 let action = m.check();
601 if action == KeepaliveAction::ConnectionDead {
602 assert!(m.is_dead());
603 return;
604 }
605 }
606 assert!(m.missed_pongs() > 0);
608 }
609
610 #[test]
611 fn test_reset_misses() {
612 let mut m = instant_manager();
613 m.record_pong_missed();
614 m.record_pong_missed();
615 assert_eq!(m.missed_pongs(), 2);
616 m.reset_misses();
617 assert_eq!(m.missed_pongs(), 0);
618 assert!(!m.is_waiting_for_pong());
619 }
620
621 #[test]
622 fn test_is_dead() {
623 let mut m = KeepaliveManager::new(KeepaliveConfig {
624 max_missed: 2,
625 ..KeepaliveConfig::mobile()
626 });
627 assert!(!m.is_dead());
628 m.record_pong_missed();
629 assert!(!m.is_dead());
630 m.record_pong_missed();
631 assert!(m.is_dead());
632 }
633
634 #[test]
635 fn test_from_preset() {
636 let m = KeepaliveManager::from_preset(KeepalivePreset::Mobile);
637 assert_eq!(m.config().interval, Duration::from_secs(20));
638 }
639
640 #[test]
641 fn test_adaptive_interval_adjusts() {
642 let mut m = KeepaliveManager::new(KeepaliveConfig {
643 interval: Duration::from_secs(60),
644 timeout: Duration::from_secs(5),
645 max_missed: 3,
646 enabled: true,
647 adaptive: true,
648 });
649 m.record_keepalive_sent();
650 std::thread::sleep(Duration::from_millis(10));
651 m.record_pong_received();
652 assert!(m.current_interval() >= Duration::from_secs(10));
654 }
655
656 #[test]
657 fn test_current_interval_default() {
658 let m = KeepaliveManager::new(KeepaliveConfig::home());
659 assert_eq!(m.current_interval(), Duration::from_secs(60));
660 }
661}