1use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::SystemTime;
9
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::types::TorrentState;
14use irontide_core::Id20;
15
16bitflags::bitflags! {
19 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21 pub struct AlertCategory: u32 {
22 const STATUS = 0x001;
24 const ERROR = 0x002;
26 const PEER = 0x004;
28 const TRACKER = 0x008;
30 const STORAGE = 0x010;
32 const DHT = 0x020;
34 const STATS = 0x040;
36 const PIECE = 0x080;
38 const BLOCK = 0x100;
40 const PERFORMANCE = 0x200;
42 const PORT_MAPPING = 0x400;
44 const I2P = 0x800;
46 const ALL = 0xFFF;
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum AlertKind {
56 TorrentAdded {
59 info_hash: Id20,
61 name: String,
63 },
64 TorrentRemoved {
66 info_hash: Id20,
68 },
69 TorrentPaused {
71 info_hash: Id20,
73 },
74 TorrentResumed {
76 info_hash: Id20,
78 },
79 TorrentFinished {
81 info_hash: Id20,
83 },
84 StateChanged {
86 info_hash: Id20,
88 prev_state: TorrentState,
90 new_state: TorrentState,
92 },
93 MetadataReceived {
95 info_hash: Id20,
97 name: String,
99 },
100 MetadataFailed {
102 info_hash: Id20,
104 },
105
106 TorrentChecked {
109 info_hash: Id20,
111 pieces_have: u32,
113 pieces_total: u32,
115 },
116 CheckingProgress {
118 info_hash: Id20,
120 progress: f32,
122 },
123
124 PieceFinished {
127 info_hash: Id20,
129 piece: u32,
131 },
132 BlockFinished {
134 info_hash: Id20,
136 piece: u32,
138 offset: u32,
140 },
141 HashFailed {
143 info_hash: Id20,
145 piece: u32,
147 contributors: Vec<std::net::IpAddr>,
149 },
150
151 PeerConnected {
154 info_hash: Id20,
156 addr: SocketAddr,
158 },
159 PeerDisconnected {
161 info_hash: Id20,
163 addr: SocketAddr,
165 reason: Option<String>,
167 },
168 PeerBanned {
170 info_hash: Id20,
172 addr: SocketAddr,
174 },
175
176 TrackerReply {
179 info_hash: Id20,
181 url: String,
183 num_peers: usize,
185 },
186 TrackerWarning {
188 info_hash: Id20,
190 url: String,
192 message: String,
194 },
195 TrackerError {
197 info_hash: Id20,
199 url: String,
201 message: String,
203 },
204 ScrapeReply {
206 info_hash: Id20,
208 url: String,
210 complete: u32,
212 incomplete: u32,
214 downloaded: u32,
216 },
217 ScrapeError {
219 info_hash: Id20,
221 url: String,
223 message: String,
225 },
226
227 DhtBootstrapComplete,
230 DhtGetPeers {
232 info_hash: Id20,
234 num_peers: usize,
236 },
237 DhtNodeIdViolation {
239 node_id: Id20,
241 addr: SocketAddr,
243 },
244 DhtSampleInfohashes {
246 num_samples: usize,
248 total_estimate: i64,
250 },
251
252 ListenSucceeded {
255 port: u16,
257 },
258 ListenFailed {
260 port: u16,
262 message: String,
264 },
265 SessionStatsUpdate(crate::types::SessionStats),
267
268 FileRenamed {
271 info_hash: Id20,
273 index: usize,
275 new_path: std::path::PathBuf,
277 },
278 FileCompleted {
280 info_hash: Id20,
282 file_index: usize,
284 },
285 StorageMoved {
287 info_hash: Id20,
289 new_path: std::path::PathBuf,
291 },
292 FileError {
294 info_hash: Id20,
296 path: std::path::PathBuf,
298 message: String,
300 },
301 DiskStatsUpdate(crate::disk::DiskStats),
303
304 ResumeDataSaved {
307 info_hash: Id20,
309 },
310
311 TorrentError {
314 info_hash: Id20,
316 message: String,
318 },
319
320 PerformanceWarning {
323 info_hash: Id20,
325 message: String,
327 },
328
329 TorrentQueuePositionChanged {
332 info_hash: Id20,
334 old_pos: i32,
336 new_pos: i32,
338 },
339 TorrentAutoManaged {
341 info_hash: Id20,
343 paused: bool,
345 },
346
347 PeerBlocked {
350 addr: SocketAddr,
352 },
353
354 PeerTurnover {
356 info_hash: Id20,
358 disconnected: usize,
360 replaced: usize,
362 },
363
364 WebSeedBanned {
367 info_hash: Id20,
369 url: String,
371 },
372
373 PortMappingSucceeded {
376 port: u16,
378 protocol: String,
380 },
381 PortMappingFailed {
383 port: u16,
385 message: String,
387 },
388
389 InconsistentHashes {
392 info_hash: Id20,
394 piece: u32,
396 },
397
398 DhtPutComplete {
401 target: Id20,
403 },
404 DhtMutablePutComplete {
406 target: Id20,
408 seq: i64,
410 },
411 DhtGetResult {
413 target: Id20,
415 value: Option<Vec<u8>>,
417 },
418 DhtMutableGetResult {
420 target: Id20,
422 value: Option<Vec<u8>>,
424 seq: Option<i64>,
426 public_key: [u8; 32],
428 },
429 DhtItemError {
431 target: Id20,
433 message: String,
435 },
436
437 HolepunchSucceeded {
440 info_hash: Id20,
442 addr: SocketAddr,
444 },
445 HolepunchFailed {
447 info_hash: Id20,
449 addr: SocketAddr,
451 error_code: Option<u32>,
453 message: String,
455 },
456
457 I2pSessionCreated {
460 b32_address: String,
462 },
463 I2pError {
465 message: String,
467 },
468
469 SslTorrentError {
472 info_hash: Id20,
474 message: String,
476 },
477
478 SessionStatsAlert {
481 values: Vec<i64>,
483 },
484
485 ExternalIpDetected {
488 ip: std::net::IpAddr,
490 },
491
492 SettingsChanged,
495}
496
497impl AlertKind {
498 pub fn category(&self) -> AlertCategory {
500 use AlertKind::*;
501 match self {
502 TorrentAdded { .. }
504 | TorrentRemoved { .. }
505 | TorrentPaused { .. }
506 | TorrentResumed { .. }
507 | TorrentFinished { .. }
508 | StateChanged { .. }
509 | MetadataReceived { .. }
510 | ListenSucceeded { .. }
511 | ListenFailed { .. }
512 | ResumeDataSaved { .. }
513 | TorrentChecked { .. }
514 | CheckingProgress { .. } => AlertCategory::STATUS,
515
516 MetadataFailed { .. } => AlertCategory::STATUS | AlertCategory::ERROR,
517
518 SessionStatsUpdate(_) => AlertCategory::STATS,
519
520 PieceFinished { .. } => AlertCategory::PIECE,
522 HashFailed { .. } => AlertCategory::PIECE | AlertCategory::ERROR,
523
524 BlockFinished { .. } => AlertCategory::BLOCK,
526
527 PeerConnected { .. } | PeerDisconnected { .. } | PeerBanned { .. } => {
529 AlertCategory::PEER
530 }
531
532 TrackerReply { .. } => AlertCategory::TRACKER,
534 TrackerWarning { .. } => AlertCategory::TRACKER,
535 TrackerError { .. } => AlertCategory::TRACKER | AlertCategory::ERROR,
536 ScrapeReply { .. } => AlertCategory::TRACKER,
537 ScrapeError { .. } => AlertCategory::TRACKER | AlertCategory::ERROR,
538
539 DhtBootstrapComplete | DhtGetPeers { .. } | DhtSampleInfohashes { .. } => {
541 AlertCategory::DHT
542 }
543 DhtNodeIdViolation { .. } => AlertCategory::DHT | AlertCategory::ERROR,
544 DhtPutComplete { .. }
545 | DhtMutablePutComplete { .. }
546 | DhtGetResult { .. }
547 | DhtMutableGetResult { .. } => AlertCategory::DHT,
548 DhtItemError { .. } => AlertCategory::DHT | AlertCategory::ERROR,
549
550 FileRenamed { .. } | StorageMoved { .. } | FileCompleted { .. } => {
552 AlertCategory::STORAGE
553 }
554 FileError { .. } => AlertCategory::STORAGE | AlertCategory::ERROR,
555 DiskStatsUpdate(_) => AlertCategory::STATS | AlertCategory::STORAGE,
556
557 TorrentError { .. } => AlertCategory::ERROR,
559 InconsistentHashes { .. } => AlertCategory::ERROR,
560
561 PerformanceWarning { .. } => AlertCategory::PERFORMANCE,
563
564 TorrentQueuePositionChanged { .. } => AlertCategory::STATUS,
566 TorrentAutoManaged { .. } => AlertCategory::STATUS,
567
568 PeerBlocked { .. } => AlertCategory::PEER,
570 PeerTurnover { .. } => AlertCategory::PEER,
571
572 WebSeedBanned { .. } => AlertCategory::STATUS,
574
575 PortMappingSucceeded { .. } => AlertCategory::PORT_MAPPING,
577 PortMappingFailed { .. } => AlertCategory::PORT_MAPPING | AlertCategory::ERROR,
578
579 HolepunchSucceeded { .. } => AlertCategory::PEER,
581 HolepunchFailed { .. } => AlertCategory::PEER | AlertCategory::ERROR,
582
583 I2pSessionCreated { .. } => AlertCategory::I2P | AlertCategory::STATUS,
585 I2pError { .. } => AlertCategory::I2P | AlertCategory::ERROR,
586
587 SslTorrentError { .. } => AlertCategory::ERROR,
589
590 SessionStatsAlert { .. } => AlertCategory::STATS,
592
593 ExternalIpDetected { .. } => AlertCategory::STATUS,
595
596 SettingsChanged => AlertCategory::STATUS,
598 }
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
606pub struct Alert {
607 pub timestamp: SystemTime,
609 pub kind: AlertKind,
611}
612
613impl Alert {
614 pub fn new(kind: AlertKind) -> Self {
616 Self {
617 timestamp: SystemTime::now(),
618 kind,
619 }
620 }
621
622 pub fn category(&self) -> AlertCategory {
624 self.kind.category()
625 }
626}
627
628pub struct AlertStream {
634 rx: broadcast::Receiver<Alert>,
635 filter: AlertCategory,
636}
637
638impl AlertStream {
639 pub fn new(rx: broadcast::Receiver<Alert>, filter: AlertCategory) -> Self {
641 Self { rx, filter }
642 }
643
644 pub async fn recv(&mut self) -> Result<Alert, broadcast::error::RecvError> {
648 loop {
649 let alert = self.rx.recv().await?;
650 if alert.category().intersects(self.filter) {
651 return Ok(alert);
652 }
653 }
654 }
655}
656
657pub(crate) fn post_alert(tx: &broadcast::Sender<Alert>, mask: &AtomicU32, kind: AlertKind) {
664 let alert = Alert::new(kind);
665 let m = AlertCategory::from_bits_truncate(mask.load(Ordering::Relaxed));
666 if alert.category().intersects(m) {
667 let _ = tx.send(alert);
668 }
669}
670
671#[cfg(test)]
674mod tests {
675 use super::*;
676
677 #[test]
678 fn alert_category_all_includes_every_flag() {
679 let all = AlertCategory::ALL;
680 assert!(all.contains(AlertCategory::STATUS));
681 assert!(all.contains(AlertCategory::ERROR));
682 assert!(all.contains(AlertCategory::PEER));
683 assert!(all.contains(AlertCategory::TRACKER));
684 assert!(all.contains(AlertCategory::STORAGE));
685 assert!(all.contains(AlertCategory::DHT));
686 assert!(all.contains(AlertCategory::STATS));
687 assert!(all.contains(AlertCategory::PIECE));
688 assert!(all.contains(AlertCategory::BLOCK));
689 assert!(all.contains(AlertCategory::PERFORMANCE));
690 assert!(all.contains(AlertCategory::PORT_MAPPING));
691 assert!(all.contains(AlertCategory::I2P));
692 }
693
694 #[test]
695 fn alert_category_mapping() {
696 use AlertKind::*;
697 let info_hash = Id20::from_bytes(&[0u8; 20]).unwrap();
698
699 let a = Alert::new(TorrentAdded {
700 info_hash,
701 name: String::new(),
702 });
703 assert!(a.category().contains(AlertCategory::STATUS));
704
705 let a = Alert::new(PieceFinished {
706 info_hash,
707 piece: 0,
708 });
709 assert!(a.category().contains(AlertCategory::PIECE));
710
711 let a = Alert::new(PeerConnected {
712 info_hash,
713 addr: "127.0.0.1:6881".parse().unwrap(),
714 });
715 assert!(a.category().contains(AlertCategory::PEER));
716
717 let a = Alert::new(TrackerError {
719 info_hash,
720 url: String::new(),
721 message: String::new(),
722 });
723 assert!(a.category().contains(AlertCategory::TRACKER));
724 assert!(a.category().contains(AlertCategory::ERROR));
725 }
726
727 #[test]
728 fn alert_has_timestamp() {
729 let before = SystemTime::now();
730 let alert = Alert::new(AlertKind::DhtBootstrapComplete);
731 assert!(alert.timestamp >= before);
732 }
733
734 #[test]
735 fn alert_is_send_and_sync() {
736 fn assert_send_sync<T: Send + Sync>() {}
737 assert_send_sync::<Alert>();
738 }
739
740 #[test]
741 fn post_alert_respects_mask() {
742 let (tx, mut rx) = broadcast::channel(16);
743 let mask = AtomicU32::new(AlertCategory::STATUS.bits());
744
745 post_alert(
747 &tx,
748 &mask,
749 AlertKind::TorrentAdded {
750 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
751 name: "test".into(),
752 },
753 );
754 assert!(rx.try_recv().is_ok());
755
756 post_alert(
758 &tx,
759 &mask,
760 AlertKind::PieceFinished {
761 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
762 piece: 0,
763 },
764 );
765 assert!(rx.try_recv().is_err());
766 }
767
768 #[test]
769 fn post_alert_empty_mask_blocks_all() {
770 let (tx, mut rx) = broadcast::channel(16);
771 let mask = AtomicU32::new(AlertCategory::empty().bits());
772
773 post_alert(
774 &tx,
775 &mask,
776 AlertKind::TorrentAdded {
777 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
778 name: "test".into(),
779 },
780 );
781 assert!(rx.try_recv().is_err());
782 }
783
784 #[test]
785 fn alert_serializes_to_json() {
786 let alert = Alert::new(AlertKind::TorrentAdded {
787 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
788 name: "test".into(),
789 });
790 let json = serde_json::to_string(&alert).unwrap();
791 let decoded: Alert = serde_json::from_str(&json).unwrap();
792 assert!(matches!(decoded.kind, AlertKind::TorrentAdded { .. }));
793 }
794
795 #[test]
796 fn alert_category_serializes_as_u32() {
797 let mask = AlertCategory::STATUS | AlertCategory::ERROR;
798 let json = serde_json::to_string(&mask).unwrap();
799 let decoded: AlertCategory = serde_json::from_str(&json).unwrap();
800 assert_eq!(decoded, mask);
801 }
802
803 #[test]
804 fn queue_position_changed_alert_has_status_category() {
805 let alert = Alert::new(AlertKind::TorrentQueuePositionChanged {
806 info_hash: Id20::from([0u8; 20]),
807 old_pos: 3,
808 new_pos: 0,
809 });
810 assert!(alert.category().contains(AlertCategory::STATUS));
811 }
812
813 #[test]
814 fn torrent_auto_managed_alert_has_status_category() {
815 let alert = Alert::new(AlertKind::TorrentAutoManaged {
816 info_hash: Id20::from([0u8; 20]),
817 paused: true,
818 });
819 assert!(alert.category().contains(AlertCategory::STATUS));
820 }
821
822 #[test]
823 fn scrape_reply_alert_has_tracker_category() {
824 let alert = Alert::new(AlertKind::ScrapeReply {
825 info_hash: Id20::from([0u8; 20]),
826 url: "http://tracker.example.com/announce".into(),
827 complete: 10,
828 incomplete: 3,
829 downloaded: 50,
830 });
831 assert!(alert.category().contains(AlertCategory::TRACKER));
832 }
833
834 #[test]
835 fn scrape_error_alert_has_tracker_and_error_category() {
836 let alert = Alert::new(AlertKind::ScrapeError {
837 info_hash: Id20::from([0u8; 20]),
838 url: "http://tracker.example.com/announce".into(),
839 message: "connection refused".into(),
840 });
841 assert!(alert.category().contains(AlertCategory::TRACKER));
842 assert!(alert.category().contains(AlertCategory::ERROR));
843 }
844
845 #[test]
846 fn web_seed_banned_alert_has_status_category() {
847 let alert = Alert::new(AlertKind::WebSeedBanned {
848 info_hash: Id20::from([0u8; 20]),
849 url: "http://example.com/files".into(),
850 });
851 assert!(alert.category().contains(AlertCategory::STATUS));
852 }
853
854 #[test]
855 fn dht_node_id_violation_alert_category() {
856 let alert = Alert::new(AlertKind::DhtNodeIdViolation {
857 node_id: Id20::from([0u8; 20]),
858 addr: "203.0.113.5:6881".parse().unwrap(),
859 });
860 assert!(alert.category().contains(AlertCategory::DHT));
861 assert!(alert.category().contains(AlertCategory::ERROR));
862 }
863
864 #[test]
865 fn dht_put_complete_alert_has_dht_category() {
866 let alert = Alert::new(AlertKind::DhtPutComplete {
867 target: Id20::from([0u8; 20]),
868 });
869 assert!(alert.category().contains(AlertCategory::DHT));
870 }
871
872 #[test]
873 fn dht_sample_infohashes_alert_has_dht_category() {
874 let alert = Alert::new(AlertKind::DhtSampleInfohashes {
875 num_samples: 15,
876 total_estimate: 500,
877 });
878 assert!(alert.category().contains(AlertCategory::DHT));
879 }
880
881 #[test]
882 fn dht_item_error_alert_has_dht_and_error_category() {
883 let alert = Alert::new(AlertKind::DhtItemError {
884 target: Id20::from([0u8; 20]),
885 message: "test".into(),
886 });
887 assert!(alert.category().contains(AlertCategory::DHT));
888 assert!(alert.category().contains(AlertCategory::ERROR));
889 }
890
891 #[test]
892 fn holepunch_succeeded_alert_has_peer_category() {
893 let alert = Alert::new(AlertKind::HolepunchSucceeded {
894 info_hash: Id20::from([0u8; 20]),
895 addr: "203.0.113.5:6881".parse().unwrap(),
896 });
897 assert!(alert.category().contains(AlertCategory::PEER));
898 assert!(!alert.category().contains(AlertCategory::ERROR));
899 }
900
901 #[test]
902 fn holepunch_failed_alert_has_peer_and_error_category() {
903 let alert = Alert::new(AlertKind::HolepunchFailed {
904 info_hash: Id20::from([0u8; 20]),
905 addr: "203.0.113.5:6881".parse().unwrap(),
906 error_code: Some(1),
907 message: "no support".into(),
908 });
909 assert!(alert.category().contains(AlertCategory::PEER));
910 assert!(alert.category().contains(AlertCategory::ERROR));
911 }
912
913 #[test]
914 fn i2p_session_created_alert_category() {
915 let alert = Alert::new(AlertKind::I2pSessionCreated {
916 b32_address: "abcdef1234567890abcdef1234567890abcdef1234567890abcd.b32.i2p".into(),
917 });
918 assert!(alert.category().contains(AlertCategory::I2P));
919 assert!(alert.category().contains(AlertCategory::STATUS));
920 }
921
922 #[test]
923 fn i2p_error_alert_category() {
924 let alert = Alert::new(AlertKind::I2pError {
925 message: "SAM bridge unreachable".into(),
926 });
927 assert!(alert.category().contains(AlertCategory::I2P));
928 assert!(alert.category().contains(AlertCategory::ERROR));
929 }
930
931 #[test]
932 fn alert_category_all_includes_i2p() {
933 let all = AlertCategory::ALL;
934 assert!(all.contains(AlertCategory::I2P));
935 }
936
937 #[test]
938 fn ssl_torrent_error_alert_has_error_category() {
939 let alert = Alert::new(AlertKind::SslTorrentError {
940 info_hash: Id20::from([0u8; 20]),
941 message: "handshake failed".into(),
942 });
943 assert!(alert.category().contains(AlertCategory::ERROR));
944 }
945
946 #[test]
947 fn ssl_torrent_error_alert_serializes_to_json() {
948 let alert = Alert::new(AlertKind::SslTorrentError {
949 info_hash: Id20::from([0u8; 20]),
950 message: "cert validation failed".into(),
951 });
952 let json = serde_json::to_string(&alert).unwrap();
953 let decoded: Alert = serde_json::from_str(&json).unwrap();
954 assert!(matches!(decoded.kind, AlertKind::SslTorrentError { .. }));
955 }
956
957 #[test]
958 fn i2p_alert_serializes_to_json() {
959 let alert = Alert::new(AlertKind::I2pSessionCreated {
960 b32_address: "test.b32.i2p".into(),
961 });
962 let json = serde_json::to_string(&alert).unwrap();
963 let decoded: Alert = serde_json::from_str(&json).unwrap();
964 assert!(matches!(decoded.kind, AlertKind::I2pSessionCreated { .. }));
965 }
966
967 #[test]
968 fn peer_turnover_alert_has_peer_category() {
969 let alert = Alert::new(AlertKind::PeerTurnover {
970 info_hash: Id20::from([0u8; 20]),
971 disconnected: 2,
972 replaced: 1,
973 });
974 assert!(alert.category().contains(AlertCategory::PEER));
975 }
976
977 #[test]
978 fn session_stats_alert_has_stats_category() {
979 let alert = Alert::new(AlertKind::SessionStatsAlert {
980 values: vec![100, 200, 300],
981 });
982 assert!(alert.category().contains(AlertCategory::STATS));
983 }
984
985 #[test]
986 fn session_stats_alert_serializes_to_json() {
987 let alert = Alert::new(AlertKind::SessionStatsAlert {
988 values: vec![1, 2, 3, 4, 5],
989 });
990 let json = serde_json::to_string(&alert).unwrap();
991 let decoded: Alert = serde_json::from_str(&json).unwrap();
992 match decoded.kind {
993 AlertKind::SessionStatsAlert { values } => {
994 assert_eq!(values, vec![1, 2, 3, 4, 5]);
995 }
996 _ => panic!("expected SessionStatsAlert"),
997 }
998 }
999}