1use std::collections::HashMap;
28use std::net::SocketAddr;
29use std::sync::RwLock;
30use std::time::Instant;
31
32use bytes::Bytes;
33
34use crate::error::{Error, Result};
35use crate::v3::UsmSecurityParams;
36
37pub const TIME_WINDOW: u32 = 150;
39
40pub mod report_oids {
42 use crate::Oid;
43 use crate::oid;
44
45 pub fn unsupported_sec_levels() -> Oid {
47 oid!(1, 3, 6, 1, 6, 3, 15, 1, 1, 1, 0)
48 }
49
50 pub fn not_in_time_windows() -> Oid {
52 oid!(1, 3, 6, 1, 6, 3, 15, 1, 1, 2, 0)
53 }
54
55 pub fn unknown_user_names() -> Oid {
57 oid!(1, 3, 6, 1, 6, 3, 15, 1, 1, 3, 0)
58 }
59
60 pub fn unknown_engine_ids() -> Oid {
62 oid!(1, 3, 6, 1, 6, 3, 15, 1, 1, 4, 0)
63 }
64
65 pub fn wrong_digests() -> Oid {
67 oid!(1, 3, 6, 1, 6, 3, 15, 1, 1, 5, 0)
68 }
69
70 pub fn decryption_errors() -> Oid {
72 oid!(1, 3, 6, 1, 6, 3, 15, 1, 1, 6, 0)
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct EngineState {
79 pub engine_id: Bytes,
81 pub engine_boots: u32,
83 pub engine_time: u32,
85 pub synced_at: Instant,
87 pub latest_received_engine_time: u32,
89}
90
91impl EngineState {
92 pub fn new(engine_id: Bytes, engine_boots: u32, engine_time: u32) -> Self {
94 Self {
95 engine_id,
96 engine_boots,
97 engine_time,
98 synced_at: Instant::now(),
99 latest_received_engine_time: engine_time,
100 }
101 }
102
103 pub fn estimated_time(&self) -> u32 {
107 let elapsed = self.synced_at.elapsed().as_secs() as u32;
108 self.engine_time.saturating_add(elapsed)
109 }
110
111 pub fn update_time(&mut self, response_boots: u32, response_time: u32) -> bool {
117 if response_boots > self.engine_boots {
118 self.engine_boots = response_boots;
120 self.engine_time = response_time;
121 self.synced_at = Instant::now();
122 self.latest_received_engine_time = response_time;
123 true
124 } else if response_boots == self.engine_boots
125 && response_time > self.latest_received_engine_time
126 {
127 self.engine_time = response_time;
129 self.synced_at = Instant::now();
130 self.latest_received_engine_time = response_time;
131 true
132 } else {
133 false
134 }
135 }
136
137 pub fn is_in_time_window(&self, msg_boots: u32, msg_time: u32) -> bool {
144 if self.engine_boots == 2147483647 {
146 return false;
147 }
148
149 if msg_boots != self.engine_boots {
151 return false;
152 }
153
154 let local_time = self.estimated_time();
156 let diff = msg_time.abs_diff(local_time);
157
158 diff <= TIME_WINDOW
159 }
160}
161
162#[derive(Debug, Default)]
187pub struct EngineCache {
188 engines: RwLock<HashMap<SocketAddr, EngineState>>,
189}
190
191impl EngineCache {
192 pub fn new() -> Self {
194 Self {
195 engines: RwLock::new(HashMap::new()),
196 }
197 }
198
199 pub fn get(&self, target: &SocketAddr) -> Option<EngineState> {
201 self.engines.read().ok()?.get(target).cloned()
202 }
203
204 pub fn insert(&self, target: SocketAddr, state: EngineState) {
206 if let Ok(mut engines) = self.engines.write() {
207 engines.insert(target, state);
208 }
209 }
210
211 pub fn update_time(
215 &self,
216 target: &SocketAddr,
217 response_boots: u32,
218 response_time: u32,
219 ) -> bool {
220 if let Ok(mut engines) = self.engines.write()
221 && let Some(state) = engines.get_mut(target)
222 {
223 return state.update_time(response_boots, response_time);
224 }
225 false
226 }
227
228 pub fn remove(&self, target: &SocketAddr) -> Option<EngineState> {
230 self.engines.write().ok()?.remove(target)
231 }
232
233 pub fn clear(&self) {
235 if let Ok(mut engines) = self.engines.write() {
236 engines.clear();
237 }
238 }
239
240 pub fn len(&self) -> usize {
242 self.engines.read().map(|e| e.len()).unwrap_or(0)
243 }
244
245 pub fn is_empty(&self) -> bool {
247 self.len() == 0
248 }
249}
250
251impl Clone for EngineCache {
252 fn clone(&self) -> Self {
253 let engines = self.engines.read().map(|e| e.clone()).unwrap_or_default();
255 Self {
256 engines: RwLock::new(engines),
257 }
258 }
259}
260
261pub fn parse_discovery_response(security_params: &Bytes) -> Result<EngineState> {
266 let usm = UsmSecurityParams::decode(security_params.clone())?;
267
268 if usm.engine_id.is_empty() {
269 return Err(Error::UnknownEngineId { target: None });
270 }
271
272 Ok(EngineState::new(
273 usm.engine_id,
274 usm.engine_boots,
275 usm.engine_time,
276 ))
277}
278
279pub fn is_unknown_engine_id_report(pdu: &crate::pdu::Pdu) -> bool {
283 use crate::pdu::PduType;
284
285 if pdu.pdu_type != PduType::Report {
286 return false;
287 }
288
289 let unknown_engine_ids_oid = report_oids::unknown_engine_ids();
290 pdu.varbinds
291 .iter()
292 .any(|vb| vb.oid == unknown_engine_ids_oid)
293}
294
295pub fn is_not_in_time_window_report(pdu: &crate::pdu::Pdu) -> bool {
299 use crate::pdu::PduType;
300
301 if pdu.pdu_type != PduType::Report {
302 return false;
303 }
304
305 let not_in_time_windows_oid = report_oids::not_in_time_windows();
306 pdu.varbinds
307 .iter()
308 .any(|vb| vb.oid == not_in_time_windows_oid)
309}
310
311pub fn is_wrong_digest_report(pdu: &crate::pdu::Pdu) -> bool {
315 use crate::pdu::PduType;
316
317 if pdu.pdu_type != PduType::Report {
318 return false;
319 }
320
321 let wrong_digests_oid = report_oids::wrong_digests();
322 pdu.varbinds.iter().any(|vb| vb.oid == wrong_digests_oid)
323}
324
325pub fn is_unsupported_sec_level_report(pdu: &crate::pdu::Pdu) -> bool {
329 use crate::pdu::PduType;
330
331 if pdu.pdu_type != PduType::Report {
332 return false;
333 }
334
335 let oid = report_oids::unsupported_sec_levels();
336 pdu.varbinds.iter().any(|vb| vb.oid == oid)
337}
338
339pub fn is_unknown_user_name_report(pdu: &crate::pdu::Pdu) -> bool {
343 use crate::pdu::PduType;
344
345 if pdu.pdu_type != PduType::Report {
346 return false;
347 }
348
349 let oid = report_oids::unknown_user_names();
350 pdu.varbinds.iter().any(|vb| vb.oid == oid)
351}
352
353pub fn is_decryption_error_report(pdu: &crate::pdu::Pdu) -> bool {
357 use crate::pdu::PduType;
358
359 if pdu.pdu_type != PduType::Report {
360 return false;
361 }
362
363 let oid = report_oids::decryption_errors();
364 pdu.varbinds.iter().any(|vb| vb.oid == oid)
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370
371 #[test]
372 fn test_engine_state_estimated_time() {
373 let state = EngineState::new(Bytes::from_static(b"engine"), 1, 1000);
374
375 let estimated = state.estimated_time();
377 assert!(estimated >= 1000);
378 }
379
380 #[test]
381 fn test_engine_state_update_time() {
382 let mut state = EngineState::new(Bytes::from_static(b"engine"), 1, 1000);
383
384 assert!(state.update_time(1, 1100));
386 assert_eq!(state.latest_received_engine_time, 1100);
387
388 assert!(!state.update_time(1, 1050));
390 assert_eq!(state.latest_received_engine_time, 1100);
391
392 assert!(state.update_time(2, 500));
394 assert_eq!(state.engine_boots, 2);
395 assert_eq!(state.latest_received_engine_time, 500);
396 }
397
398 #[test]
404 fn test_anti_replay_rejects_old_time() {
405 let mut state = EngineState::new(Bytes::from_static(b"engine"), 1, 1000);
406 state.latest_received_engine_time = 1500; assert!(
411 !state.update_time(1, 1400),
412 "Should reject replay: time 1400 < latest 1500"
413 );
414 assert_eq!(
415 state.latest_received_engine_time, 1500,
416 "Latest should not change"
417 );
418
419 assert!(
421 !state.update_time(1, 1500),
422 "Should reject replay: time 1500 == latest 1500"
423 );
424 assert_eq!(state.latest_received_engine_time, 1500);
425
426 assert!(
428 state.update_time(1, 1501),
429 "Should accept: time 1501 > latest 1500"
430 );
431 assert_eq!(state.latest_received_engine_time, 1501);
432 }
433
434 #[test]
439 fn test_anti_replay_new_boot_cycle_resets() {
440 let mut state = EngineState::new(Bytes::from_static(b"engine"), 1, 1000);
441 state.latest_received_engine_time = 5000; assert!(
446 state.update_time(2, 100),
447 "New boot cycle should accept even with lower time"
448 );
449 assert_eq!(state.engine_boots, 2);
450 assert_eq!(state.engine_time, 100);
451 assert_eq!(
452 state.latest_received_engine_time, 100,
453 "Latest should reset to new time"
454 );
455
456 assert!(
458 !state.update_time(2, 50),
459 "Should reject older time in same boot cycle"
460 );
461 assert!(state.update_time(2, 150), "Should accept newer time");
462 assert_eq!(state.latest_received_engine_time, 150);
463 }
464
465 #[test]
469 fn test_anti_replay_rejects_old_boot_cycle() {
470 let mut state = EngineState::new(Bytes::from_static(b"engine"), 5, 1000);
471 state.latest_received_engine_time = 1000;
472
473 assert!(
475 !state.update_time(4, 9999),
476 "Should reject old boot cycle even with high time"
477 );
478 assert_eq!(state.engine_boots, 5, "Boots should not change");
479 assert_eq!(
480 state.latest_received_engine_time, 1000,
481 "Latest should not change"
482 );
483
484 assert!(!state.update_time(0, 9999), "Should reject boots=0 replay");
486 }
487
488 #[test]
490 fn test_anti_replay_boundary_values() {
491 let mut state = EngineState::new(Bytes::from_static(b"engine"), 1, 0);
492
493 assert_eq!(state.latest_received_engine_time, 0);
495
496 assert!(state.update_time(1, 1));
498 assert_eq!(state.latest_received_engine_time, 1);
499
500 assert!(!state.update_time(1, 0));
502
503 assert!(state.update_time(1, u32::MAX - 1));
505 assert_eq!(state.latest_received_engine_time, u32::MAX - 1);
506
507 assert!(state.update_time(1, u32::MAX));
509 assert_eq!(state.latest_received_engine_time, u32::MAX);
510
511 assert!(!state.update_time(1, u32::MAX));
513 }
514
515 #[test]
516 fn test_engine_state_time_window() {
517 let state = EngineState::new(Bytes::from_static(b"engine"), 1, 1000);
518
519 assert!(state.is_in_time_window(1, 1000));
521 assert!(state.is_in_time_window(1, 1100)); assert!(state.is_in_time_window(1, 900)); assert!(!state.is_in_time_window(2, 1000));
526 assert!(!state.is_in_time_window(0, 1000));
527
528 assert!(!state.is_in_time_window(1, 2000)); }
531
532 #[test]
537 fn test_time_window_150s_exact_boundary() {
538 let state = EngineState::new(Bytes::from_static(b"engine"), 1, 10000);
540
541 assert!(
546 state.is_in_time_window(1, 10150),
547 "Message at exactly +150s boundary should be in window"
548 );
549
550 assert!(
552 !state.is_in_time_window(1, 10151),
553 "Message at +151s should be outside window"
554 );
555
556 assert!(
558 state.is_in_time_window(1, 9850),
559 "Message at exactly -150s boundary should be in window"
560 );
561
562 assert!(
564 !state.is_in_time_window(1, 9849),
565 "Message at -151s should be outside window"
566 );
567 }
568
569 #[test]
574 fn test_time_window_boots_latched() {
575 let state = EngineState::new(Bytes::from_static(b"engine"), 2147483647, 1000);
578
579 assert!(
581 !state.is_in_time_window(2147483647, 1000),
582 "Latched boots should reject all messages"
583 );
584
585 assert!(!state.is_in_time_window(2147483647, 1100));
587 assert!(!state.is_in_time_window(2147483647, 900));
588 }
589
590 #[test]
594 fn test_time_window_boots_mismatch() {
595 let state = EngineState::new(Bytes::from_static(b"engine"), 100, 1000);
596
597 assert!(!state.is_in_time_window(101, 1000));
599 assert!(!state.is_in_time_window(200, 1000));
600
601 assert!(!state.is_in_time_window(99, 1000));
603 assert!(!state.is_in_time_window(0, 1000));
604 }
605
606 #[test]
607 fn test_engine_cache_basic_operations() {
608 let cache = EngineCache::new();
609 let addr: SocketAddr = "192.168.1.1:161".parse().unwrap();
610
611 assert!(cache.is_empty());
613 assert!(cache.get(&addr).is_none());
614
615 let state = EngineState::new(Bytes::from_static(b"engine1"), 1, 1000);
617 cache.insert(addr, state);
618
619 assert_eq!(cache.len(), 1);
620 assert!(!cache.is_empty());
621
622 let retrieved = cache.get(&addr).unwrap();
624 assert_eq!(retrieved.engine_id.as_ref(), b"engine1");
625 assert_eq!(retrieved.engine_boots, 1);
626
627 assert!(cache.update_time(&addr, 1, 1100));
629
630 let removed = cache.remove(&addr).unwrap();
632 assert_eq!(removed.latest_received_engine_time, 1100);
633 assert!(cache.is_empty());
634 }
635
636 #[test]
637 fn test_engine_cache_clone() {
638 let cache1 = EngineCache::new();
639 let addr: SocketAddr = "192.168.1.1:161".parse().unwrap();
640
641 cache1.insert(
642 addr,
643 EngineState::new(Bytes::from_static(b"engine1"), 1, 1000),
644 );
645
646 let cache2 = cache1.clone();
648 assert_eq!(cache2.len(), 1);
649 assert!(cache2.get(&addr).is_some());
650
651 cache2.clear();
653 assert_eq!(cache1.len(), 1);
654 assert_eq!(cache2.len(), 0);
655 }
656
657 #[test]
658 fn test_parse_discovery_response() {
659 let usm = UsmSecurityParams::new(b"test-engine-id".as_slice(), 42, 12345, b"".as_slice());
660 let encoded = usm.encode();
661
662 let state = parse_discovery_response(&encoded).unwrap();
663 assert_eq!(state.engine_id.as_ref(), b"test-engine-id");
664 assert_eq!(state.engine_boots, 42);
665 assert_eq!(state.engine_time, 12345);
666 }
667
668 #[test]
669 fn test_parse_discovery_response_empty_engine_id() {
670 let usm = UsmSecurityParams::empty();
671 let encoded = usm.encode();
672
673 let result = parse_discovery_response(&encoded);
674 assert!(matches!(result, Err(Error::UnknownEngineId { .. })));
675 }
676
677 #[test]
678 fn test_is_unknown_engine_id_report() {
679 use crate::Value;
680 use crate::VarBind;
681 use crate::pdu::{Pdu, PduType};
682
683 let mut pdu = Pdu {
685 pdu_type: PduType::Report,
686 request_id: 1,
687 error_status: 0,
688 error_index: 0,
689 varbinds: vec![VarBind {
690 oid: report_oids::unknown_engine_ids(),
691 value: Value::Counter32(1),
692 }],
693 };
694
695 assert!(is_unknown_engine_id_report(&pdu));
696
697 pdu.varbinds[0].oid = report_oids::not_in_time_windows();
699 assert!(!is_unknown_engine_id_report(&pdu));
700
701 pdu.pdu_type = PduType::Response;
703 assert!(!is_unknown_engine_id_report(&pdu));
704 }
705
706 #[test]
715 fn test_engine_boots_transition_to_max() {
716 let mut state = EngineState::new(Bytes::from_static(b"engine"), 2147483646, 1000);
717
718 assert!(
720 state.update_time(2147483647, 100),
721 "Transition to boots=2147483647 should be accepted"
722 );
723 assert_eq!(state.engine_boots, 2147483647);
724 assert_eq!(state.engine_time, 100);
725 }
726
727 #[test]
734 fn test_engine_boots_latched_update_behavior() {
735 let mut state = EngineState::new(Bytes::from_static(b"engine"), 2147483647, 1000);
736
737 assert!(
739 state.update_time(2147483647, 2000),
740 "Time tracking updates should still work"
741 );
742 assert_eq!(state.latest_received_engine_time, 2000);
743
744 assert!(!state.update_time(2147483647, 1500));
746 assert_eq!(state.latest_received_engine_time, 2000);
747
748 assert!(
750 !state.is_in_time_window(2147483647, 2000),
751 "Latched state should still reject all messages"
752 );
753 }
754
755 #[test]
761 fn test_engine_boots_latched_time_window_always_fails() {
762 let state = EngineState::new(Bytes::from_static(b"engine"), 2147483647, 1000);
763
764 assert!(!state.is_in_time_window(2147483647, 0));
766 assert!(!state.is_in_time_window(2147483647, 1000));
767 assert!(!state.is_in_time_window(2147483647, 1001));
768 assert!(!state.is_in_time_window(2147483647, u32::MAX));
769
770 assert!(!state.is_in_time_window(2147483646, 1000));
772 assert!(!state.is_in_time_window(0, 1000));
773 }
774
775 #[test]
780 fn test_engine_state_created_latched() {
781 let state = EngineState::new(Bytes::from_static(b"engine"), 2147483647, 5000);
782
783 assert_eq!(state.engine_boots, 2147483647);
784 assert_eq!(state.engine_time, 5000);
785 assert_eq!(state.latest_received_engine_time, 5000);
786
787 assert!(
789 !state.is_in_time_window(2147483647, 5000),
790 "Newly created latched engine should reject all messages"
791 );
792 }
793
794 #[test]
798 fn test_engine_boots_near_max_operates_normally() {
799 let mut state = EngineState::new(Bytes::from_static(b"engine"), 2147483645, 1000);
800
801 assert!(state.is_in_time_window(2147483645, 1000));
803 assert!(state.is_in_time_window(2147483645, 1100));
804 assert!(!state.is_in_time_window(2147483645, 1200)); assert!(state.update_time(2147483646, 500));
808 assert_eq!(state.engine_boots, 2147483646);
809 assert!(state.is_in_time_window(2147483646, 500));
810
811 assert!(state.update_time(2147483647, 100));
813 assert_eq!(state.engine_boots, 2147483647);
814
815 assert!(!state.is_in_time_window(2147483647, 100));
817 }
818
819 #[test]
822 fn test_engine_boots_high_value_update_logic() {
823 let mut state = EngineState::new(Bytes::from_static(b"engine"), 2147483640, 1000);
824
825 assert!(!state.update_time(2147483639, 9999));
827 assert!(!state.update_time(0, 9999));
828
829 assert!(!state.update_time(2147483640, 500));
831
832 assert!(state.update_time(2147483640, 1500));
834 assert_eq!(state.latest_received_engine_time, 1500);
835
836 assert!(state.update_time(2147483641, 100));
838 assert_eq!(state.engine_boots, 2147483641);
839 }
840
841 #[test]
846 fn test_engine_cache_latched_engine() {
847 let cache = EngineCache::new();
848 let addr: SocketAddr = "192.168.1.1:161".parse().unwrap();
849
850 cache.insert(
852 addr,
853 EngineState::new(Bytes::from_static(b"latched"), 2147483647, 1000),
854 );
855
856 assert!(
858 cache.update_time(&addr, 2147483647, 2000),
859 "Time tracking should update even for latched engine"
860 );
861
862 let state = cache.get(&addr).unwrap();
864 assert_eq!(state.latest_received_engine_time, 2000);
865
866 assert!(
868 !state.is_in_time_window(2147483647, 2000),
869 "Latched engine should reject all time window checks"
870 );
871 }
872}