1use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::SystemTime;
9
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::types::TorrentState;
14use irontide_core::Id20;
15
16bitflags::bitflags! {
19 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21 pub struct AlertCategory: u32 {
22 const STATUS = 0x001;
24 const ERROR = 0x002;
26 const PEER = 0x004;
28 const TRACKER = 0x008;
30 const STORAGE = 0x010;
32 const DHT = 0x020;
34 const STATS = 0x040;
36 const PIECE = 0x080;
38 const BLOCK = 0x100;
40 const PERFORMANCE = 0x200;
42 const PORT_MAPPING = 0x400;
44 const I2P = 0x800;
46 const ALL = 0xFFF;
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
55pub enum AlertKind {
56 TorrentAdded {
59 info_hash: Id20,
61 name: String,
63 },
64 TorrentRemoved {
66 info_hash: Id20,
68 },
69 TorrentPaused {
71 info_hash: Id20,
73 },
74 TorrentResumed {
76 info_hash: Id20,
78 },
79 TorrentFinished {
81 info_hash: Id20,
83 },
84 StateChanged {
86 info_hash: Id20,
88 prev_state: TorrentState,
90 new_state: TorrentState,
92 },
93 MetadataReceived {
95 info_hash: Id20,
97 name: String,
99 },
100 MetadataFailed {
102 info_hash: Id20,
104 },
105
106 TorrentChecked {
109 info_hash: Id20,
111 pieces_have: u32,
113 pieces_total: u32,
115 },
116 CheckingProgress {
118 info_hash: Id20,
120 progress: f32,
122 },
123
124 PieceFinished {
127 info_hash: Id20,
129 piece: u32,
131 },
132 BlockFinished {
134 info_hash: Id20,
136 piece: u32,
138 offset: u32,
140 },
141 HashFailed {
143 info_hash: Id20,
145 piece: u32,
147 contributors: Vec<std::net::IpAddr>,
149 },
150
151 PeerConnected {
154 info_hash: Id20,
156 addr: SocketAddr,
158 },
159 PeerDisconnected {
161 info_hash: Id20,
163 addr: SocketAddr,
165 reason: Option<String>,
167 },
168 PeerBanned {
170 info_hash: Id20,
172 addr: SocketAddr,
174 },
175
176 TrackerReply {
179 info_hash: Id20,
181 url: String,
183 num_peers: usize,
185 },
186 TrackerWarning {
188 info_hash: Id20,
190 url: String,
192 message: String,
194 },
195 TrackerError {
197 info_hash: Id20,
199 url: String,
201 message: String,
203 },
204 ScrapeReply {
206 info_hash: Id20,
208 url: String,
210 complete: u32,
212 incomplete: u32,
214 downloaded: u32,
216 },
217 ScrapeError {
219 info_hash: Id20,
221 url: String,
223 message: String,
225 },
226
227 DhtBootstrapComplete,
230 DhtGetPeers {
232 info_hash: Id20,
234 num_peers: usize,
236 },
237 DhtNodeIdViolation {
239 node_id: Id20,
241 addr: SocketAddr,
243 },
244 DhtSampleInfohashes {
246 num_samples: usize,
248 total_estimate: i64,
250 },
251
252 ListenSucceeded {
255 port: u16,
257 },
258 ListenFailed {
260 port: u16,
262 message: String,
264 },
265 SessionStatsUpdate(crate::types::SessionStats),
267
268 FileRenamed {
271 info_hash: Id20,
273 index: usize,
275 new_path: std::path::PathBuf,
277 },
278 FileCompleted {
280 info_hash: Id20,
282 file_index: usize,
284 },
285 StorageMoved {
287 info_hash: Id20,
289 new_path: std::path::PathBuf,
291 },
292 FileError {
294 info_hash: Id20,
296 path: std::path::PathBuf,
298 message: String,
300 },
301 DiskStatsUpdate(crate::disk::DiskStats),
303
304 ResumeDataSaved {
307 info_hash: Id20,
309 },
310
311 TorrentError {
314 info_hash: Id20,
316 message: String,
318 },
319
320 PerformanceWarning {
323 info_hash: Id20,
325 message: String,
327 },
328
329 TorrentQueuePositionChanged {
332 info_hash: Id20,
334 old_pos: i32,
336 new_pos: i32,
338 },
339 TorrentAutoManaged {
341 info_hash: Id20,
343 paused: bool,
345 },
346
347 PeerBlocked {
350 addr: SocketAddr,
352 },
353
354 PeerTurnover {
356 info_hash: Id20,
358 disconnected: usize,
360 replaced: usize,
362 },
363
364 WebSeedBanned {
367 info_hash: Id20,
369 url: String,
371 },
372
373 PortMappingSucceeded {
376 port: u16,
378 protocol: String,
380 },
381 PortMappingFailed {
383 port: u16,
385 message: String,
387 },
388
389 InconsistentHashes {
392 info_hash: Id20,
394 piece: u32,
396 },
397
398 DhtPutComplete {
401 target: Id20,
403 },
404 DhtMutablePutComplete {
406 target: Id20,
408 seq: i64,
410 },
411 DhtGetResult {
413 target: Id20,
415 value: Option<Vec<u8>>,
417 },
418 DhtMutableGetResult {
420 target: Id20,
422 value: Option<Vec<u8>>,
424 seq: Option<i64>,
426 public_key: [u8; 32],
428 },
429 DhtItemError {
431 target: Id20,
433 message: String,
435 },
436
437 HolepunchSucceeded {
440 info_hash: Id20,
442 addr: SocketAddr,
444 },
445 HolepunchFailed {
447 info_hash: Id20,
449 addr: SocketAddr,
451 error_code: Option<u32>,
453 message: String,
455 },
456
457 I2pSessionCreated {
460 b32_address: String,
462 },
463 I2pError {
465 message: String,
467 },
468
469 SslTorrentError {
472 info_hash: Id20,
474 message: String,
476 },
477
478 SessionStatsAlert {
481 values: Vec<i64>,
483 },
484
485 ExternalIpDetected {
488 ip: std::net::IpAddr,
490 },
491
492 SettingsChanged,
495}
496
497impl AlertKind {
498 #[must_use]
500 pub fn category(&self) -> AlertCategory {
501 use AlertKind::{
502 BlockFinished, CheckingProgress, DhtBootstrapComplete, DhtGetPeers, DhtGetResult,
503 DhtItemError, DhtMutableGetResult, DhtMutablePutComplete, DhtNodeIdViolation,
504 DhtPutComplete, DhtSampleInfohashes, DiskStatsUpdate, ExternalIpDetected,
505 FileCompleted, FileError, FileRenamed, HashFailed, HolepunchFailed, HolepunchSucceeded,
506 I2pError, I2pSessionCreated, InconsistentHashes, ListenFailed, ListenSucceeded,
507 MetadataFailed, MetadataReceived, PeerBanned, PeerBlocked, PeerConnected,
508 PeerDisconnected, PeerTurnover, PerformanceWarning, PieceFinished, PortMappingFailed,
509 PortMappingSucceeded, ResumeDataSaved, ScrapeError, ScrapeReply, SessionStatsAlert,
510 SessionStatsUpdate, SettingsChanged, SslTorrentError, StateChanged, StorageMoved,
511 TorrentAdded, TorrentAutoManaged, TorrentChecked, TorrentError, TorrentFinished,
512 TorrentPaused, TorrentQueuePositionChanged, TorrentRemoved, TorrentResumed,
513 TrackerError, TrackerReply, TrackerWarning, WebSeedBanned,
514 };
515 match self {
516 TorrentAdded { .. }
518 | TorrentRemoved { .. }
519 | TorrentPaused { .. }
520 | TorrentResumed { .. }
521 | TorrentFinished { .. }
522 | StateChanged { .. }
523 | MetadataReceived { .. }
524 | ListenSucceeded { .. }
525 | ListenFailed { .. }
526 | ResumeDataSaved { .. }
527 | TorrentChecked { .. }
528 | CheckingProgress { .. }
529 | TorrentQueuePositionChanged { .. }
530 | TorrentAutoManaged { .. }
531 | WebSeedBanned { .. }
532 | ExternalIpDetected { .. }
533 | SettingsChanged => AlertCategory::STATUS,
534
535 MetadataFailed { .. } => AlertCategory::STATUS | AlertCategory::ERROR,
536
537 SessionStatsUpdate(_) | SessionStatsAlert { .. } => AlertCategory::STATS,
539
540 PieceFinished { .. } => AlertCategory::PIECE,
542 HashFailed { .. } => AlertCategory::PIECE | AlertCategory::ERROR,
543
544 BlockFinished { .. } => AlertCategory::BLOCK,
546
547 PeerConnected { .. }
549 | PeerDisconnected { .. }
550 | PeerBanned { .. }
551 | PeerBlocked { .. }
552 | PeerTurnover { .. }
553 | HolepunchSucceeded { .. } => AlertCategory::PEER,
554
555 HolepunchFailed { .. } => AlertCategory::PEER | AlertCategory::ERROR,
556
557 TrackerReply { .. } | TrackerWarning { .. } | ScrapeReply { .. } => {
559 AlertCategory::TRACKER
560 }
561 TrackerError { .. } | ScrapeError { .. } => {
562 AlertCategory::TRACKER | AlertCategory::ERROR
563 }
564
565 DhtBootstrapComplete
567 | DhtGetPeers { .. }
568 | DhtSampleInfohashes { .. }
569 | DhtPutComplete { .. }
570 | DhtMutablePutComplete { .. }
571 | DhtGetResult { .. }
572 | DhtMutableGetResult { .. } => AlertCategory::DHT,
573
574 DhtNodeIdViolation { .. } | DhtItemError { .. } => {
575 AlertCategory::DHT | AlertCategory::ERROR
576 }
577
578 FileRenamed { .. } | StorageMoved { .. } | FileCompleted { .. } => {
580 AlertCategory::STORAGE
581 }
582 FileError { .. } => AlertCategory::STORAGE | AlertCategory::ERROR,
583 DiskStatsUpdate(_) => AlertCategory::STATS | AlertCategory::STORAGE,
584
585 TorrentError { .. } | InconsistentHashes { .. } | SslTorrentError { .. } => {
587 AlertCategory::ERROR
588 }
589
590 PerformanceWarning { .. } => AlertCategory::PERFORMANCE,
592
593 PortMappingSucceeded { .. } => AlertCategory::PORT_MAPPING,
595 PortMappingFailed { .. } => AlertCategory::PORT_MAPPING | AlertCategory::ERROR,
596
597 I2pSessionCreated { .. } => AlertCategory::I2P | AlertCategory::STATUS,
599 I2pError { .. } => AlertCategory::I2P | AlertCategory::ERROR,
600 }
601 }
602}
603
604#[derive(Debug, Clone, Serialize, Deserialize)]
608pub struct Alert {
609 pub timestamp: SystemTime,
611 pub kind: AlertKind,
613}
614
615impl Alert {
616 #[must_use]
618 pub fn new(kind: AlertKind) -> Self {
619 Self {
620 timestamp: SystemTime::now(),
621 kind,
622 }
623 }
624
625 #[must_use]
627 pub fn category(&self) -> AlertCategory {
628 self.kind.category()
629 }
630}
631
632pub struct AlertStream {
638 rx: broadcast::Receiver<Alert>,
639 filter: AlertCategory,
640}
641
642impl AlertStream {
643 #[must_use]
645 pub fn new(rx: broadcast::Receiver<Alert>, filter: AlertCategory) -> Self {
646 Self { rx, filter }
647 }
648
649 pub async fn recv(&mut self) -> Result<Alert, broadcast::error::RecvError> {
657 loop {
658 let alert = self.rx.recv().await?;
659 if alert.category().intersects(self.filter) {
660 return Ok(alert);
661 }
662 }
663 }
664}
665
666pub(crate) fn post_alert(tx: &broadcast::Sender<Alert>, mask: &AtomicU32, kind: AlertKind) {
673 let alert = Alert::new(kind);
674 let m = AlertCategory::from_bits_truncate(mask.load(Ordering::Relaxed));
675 if alert.category().intersects(m) {
676 let _ = tx.send(alert);
677 }
678}
679
680#[cfg(test)]
683mod tests {
684 use super::*;
685
686 #[test]
687 fn alert_category_all_includes_every_flag() {
688 let all = AlertCategory::ALL;
689 assert!(all.contains(AlertCategory::STATUS));
690 assert!(all.contains(AlertCategory::ERROR));
691 assert!(all.contains(AlertCategory::PEER));
692 assert!(all.contains(AlertCategory::TRACKER));
693 assert!(all.contains(AlertCategory::STORAGE));
694 assert!(all.contains(AlertCategory::DHT));
695 assert!(all.contains(AlertCategory::STATS));
696 assert!(all.contains(AlertCategory::PIECE));
697 assert!(all.contains(AlertCategory::BLOCK));
698 assert!(all.contains(AlertCategory::PERFORMANCE));
699 assert!(all.contains(AlertCategory::PORT_MAPPING));
700 assert!(all.contains(AlertCategory::I2P));
701 }
702
703 #[test]
704 fn alert_category_mapping() {
705 use AlertKind::*;
706 let info_hash = Id20::from_bytes(&[0u8; 20]).unwrap();
707
708 let a = Alert::new(TorrentAdded {
709 info_hash,
710 name: String::new(),
711 });
712 assert!(a.category().contains(AlertCategory::STATUS));
713
714 let a = Alert::new(PieceFinished {
715 info_hash,
716 piece: 0,
717 });
718 assert!(a.category().contains(AlertCategory::PIECE));
719
720 let a = Alert::new(PeerConnected {
721 info_hash,
722 addr: "127.0.0.1:6881".parse().unwrap(),
723 });
724 assert!(a.category().contains(AlertCategory::PEER));
725
726 let a = Alert::new(TrackerError {
728 info_hash,
729 url: String::new(),
730 message: String::new(),
731 });
732 assert!(a.category().contains(AlertCategory::TRACKER));
733 assert!(a.category().contains(AlertCategory::ERROR));
734 }
735
736 #[test]
737 fn alert_has_timestamp() {
738 let before = SystemTime::now();
739 let alert = Alert::new(AlertKind::DhtBootstrapComplete);
740 assert!(alert.timestamp >= before);
741 }
742
743 #[test]
744 fn alert_is_send_and_sync() {
745 fn assert_send_sync<T: Send + Sync>() {}
746 assert_send_sync::<Alert>();
747 }
748
749 #[test]
750 fn post_alert_respects_mask() {
751 let (tx, mut rx) = broadcast::channel(16);
752 let mask = AtomicU32::new(AlertCategory::STATUS.bits());
753
754 post_alert(
756 &tx,
757 &mask,
758 AlertKind::TorrentAdded {
759 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
760 name: "test".into(),
761 },
762 );
763 assert!(rx.try_recv().is_ok());
764
765 post_alert(
767 &tx,
768 &mask,
769 AlertKind::PieceFinished {
770 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
771 piece: 0,
772 },
773 );
774 assert!(rx.try_recv().is_err());
775 }
776
777 #[test]
778 fn post_alert_empty_mask_blocks_all() {
779 let (tx, mut rx) = broadcast::channel(16);
780 let mask = AtomicU32::new(AlertCategory::empty().bits());
781
782 post_alert(
783 &tx,
784 &mask,
785 AlertKind::TorrentAdded {
786 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
787 name: "test".into(),
788 },
789 );
790 assert!(rx.try_recv().is_err());
791 }
792
793 #[test]
794 fn alert_serializes_to_json() {
795 let alert = Alert::new(AlertKind::TorrentAdded {
796 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
797 name: "test".into(),
798 });
799 let json = serde_json::to_string(&alert).unwrap();
800 let decoded: Alert = serde_json::from_str(&json).unwrap();
801 assert!(matches!(decoded.kind, AlertKind::TorrentAdded { .. }));
802 }
803
804 #[test]
805 fn alert_category_serializes_as_u32() {
806 let mask = AlertCategory::STATUS | AlertCategory::ERROR;
807 let json = serde_json::to_string(&mask).unwrap();
808 let decoded: AlertCategory = serde_json::from_str(&json).unwrap();
809 assert_eq!(decoded, mask);
810 }
811
812 #[test]
813 fn queue_position_changed_alert_has_status_category() {
814 let alert = Alert::new(AlertKind::TorrentQueuePositionChanged {
815 info_hash: Id20::from([0u8; 20]),
816 old_pos: 3,
817 new_pos: 0,
818 });
819 assert!(alert.category().contains(AlertCategory::STATUS));
820 }
821
822 #[test]
823 fn torrent_auto_managed_alert_has_status_category() {
824 let alert = Alert::new(AlertKind::TorrentAutoManaged {
825 info_hash: Id20::from([0u8; 20]),
826 paused: true,
827 });
828 assert!(alert.category().contains(AlertCategory::STATUS));
829 }
830
831 #[test]
832 fn scrape_reply_alert_has_tracker_category() {
833 let alert = Alert::new(AlertKind::ScrapeReply {
834 info_hash: Id20::from([0u8; 20]),
835 url: "http://tracker.example.com/announce".into(),
836 complete: 10,
837 incomplete: 3,
838 downloaded: 50,
839 });
840 assert!(alert.category().contains(AlertCategory::TRACKER));
841 }
842
843 #[test]
844 fn scrape_error_alert_has_tracker_and_error_category() {
845 let alert = Alert::new(AlertKind::ScrapeError {
846 info_hash: Id20::from([0u8; 20]),
847 url: "http://tracker.example.com/announce".into(),
848 message: "connection refused".into(),
849 });
850 assert!(alert.category().contains(AlertCategory::TRACKER));
851 assert!(alert.category().contains(AlertCategory::ERROR));
852 }
853
854 #[test]
855 fn web_seed_banned_alert_has_status_category() {
856 let alert = Alert::new(AlertKind::WebSeedBanned {
857 info_hash: Id20::from([0u8; 20]),
858 url: "http://example.com/files".into(),
859 });
860 assert!(alert.category().contains(AlertCategory::STATUS));
861 }
862
863 #[test]
864 fn dht_node_id_violation_alert_category() {
865 let alert = Alert::new(AlertKind::DhtNodeIdViolation {
866 node_id: Id20::from([0u8; 20]),
867 addr: "203.0.113.5:6881".parse().unwrap(),
868 });
869 assert!(alert.category().contains(AlertCategory::DHT));
870 assert!(alert.category().contains(AlertCategory::ERROR));
871 }
872
873 #[test]
874 fn dht_put_complete_alert_has_dht_category() {
875 let alert = Alert::new(AlertKind::DhtPutComplete {
876 target: Id20::from([0u8; 20]),
877 });
878 assert!(alert.category().contains(AlertCategory::DHT));
879 }
880
881 #[test]
882 fn dht_sample_infohashes_alert_has_dht_category() {
883 let alert = Alert::new(AlertKind::DhtSampleInfohashes {
884 num_samples: 15,
885 total_estimate: 500,
886 });
887 assert!(alert.category().contains(AlertCategory::DHT));
888 }
889
890 #[test]
891 fn dht_item_error_alert_has_dht_and_error_category() {
892 let alert = Alert::new(AlertKind::DhtItemError {
893 target: Id20::from([0u8; 20]),
894 message: "test".into(),
895 });
896 assert!(alert.category().contains(AlertCategory::DHT));
897 assert!(alert.category().contains(AlertCategory::ERROR));
898 }
899
900 #[test]
901 fn holepunch_succeeded_alert_has_peer_category() {
902 let alert = Alert::new(AlertKind::HolepunchSucceeded {
903 info_hash: Id20::from([0u8; 20]),
904 addr: "203.0.113.5:6881".parse().unwrap(),
905 });
906 assert!(alert.category().contains(AlertCategory::PEER));
907 assert!(!alert.category().contains(AlertCategory::ERROR));
908 }
909
910 #[test]
911 fn holepunch_failed_alert_has_peer_and_error_category() {
912 let alert = Alert::new(AlertKind::HolepunchFailed {
913 info_hash: Id20::from([0u8; 20]),
914 addr: "203.0.113.5:6881".parse().unwrap(),
915 error_code: Some(1),
916 message: "no support".into(),
917 });
918 assert!(alert.category().contains(AlertCategory::PEER));
919 assert!(alert.category().contains(AlertCategory::ERROR));
920 }
921
922 #[test]
923 fn i2p_session_created_alert_category() {
924 let alert = Alert::new(AlertKind::I2pSessionCreated {
925 b32_address: "abcdef1234567890abcdef1234567890abcdef1234567890abcd.b32.i2p".into(),
926 });
927 assert!(alert.category().contains(AlertCategory::I2P));
928 assert!(alert.category().contains(AlertCategory::STATUS));
929 }
930
931 #[test]
932 fn i2p_error_alert_category() {
933 let alert = Alert::new(AlertKind::I2pError {
934 message: "SAM bridge unreachable".into(),
935 });
936 assert!(alert.category().contains(AlertCategory::I2P));
937 assert!(alert.category().contains(AlertCategory::ERROR));
938 }
939
940 #[test]
941 fn alert_category_all_includes_i2p() {
942 let all = AlertCategory::ALL;
943 assert!(all.contains(AlertCategory::I2P));
944 }
945
946 #[test]
947 fn ssl_torrent_error_alert_has_error_category() {
948 let alert = Alert::new(AlertKind::SslTorrentError {
949 info_hash: Id20::from([0u8; 20]),
950 message: "handshake failed".into(),
951 });
952 assert!(alert.category().contains(AlertCategory::ERROR));
953 }
954
955 #[test]
956 fn ssl_torrent_error_alert_serializes_to_json() {
957 let alert = Alert::new(AlertKind::SslTorrentError {
958 info_hash: Id20::from([0u8; 20]),
959 message: "cert validation failed".into(),
960 });
961 let json = serde_json::to_string(&alert).unwrap();
962 let decoded: Alert = serde_json::from_str(&json).unwrap();
963 assert!(matches!(decoded.kind, AlertKind::SslTorrentError { .. }));
964 }
965
966 #[test]
967 fn i2p_alert_serializes_to_json() {
968 let alert = Alert::new(AlertKind::I2pSessionCreated {
969 b32_address: "test.b32.i2p".into(),
970 });
971 let json = serde_json::to_string(&alert).unwrap();
972 let decoded: Alert = serde_json::from_str(&json).unwrap();
973 assert!(matches!(decoded.kind, AlertKind::I2pSessionCreated { .. }));
974 }
975
976 #[test]
977 fn peer_turnover_alert_has_peer_category() {
978 let alert = Alert::new(AlertKind::PeerTurnover {
979 info_hash: Id20::from([0u8; 20]),
980 disconnected: 2,
981 replaced: 1,
982 });
983 assert!(alert.category().contains(AlertCategory::PEER));
984 }
985
986 #[test]
987 fn session_stats_alert_has_stats_category() {
988 let alert = Alert::new(AlertKind::SessionStatsAlert {
989 values: vec![100, 200, 300],
990 });
991 assert!(alert.category().contains(AlertCategory::STATS));
992 }
993
994 #[test]
995 fn session_stats_alert_serializes_to_json() {
996 let alert = Alert::new(AlertKind::SessionStatsAlert {
997 values: vec![1, 2, 3, 4, 5],
998 });
999 let json = serde_json::to_string(&alert).unwrap();
1000 let decoded: Alert = serde_json::from_str(&json).unwrap();
1001 match decoded.kind {
1002 AlertKind::SessionStatsAlert { values } => {
1003 assert_eq!(values, vec![1, 2, 3, 4, 5]);
1004 }
1005 _ => panic!("expected SessionStatsAlert"),
1006 }
1007 }
1008}