1use std::net::SocketAddr;
7use std::sync::atomic::{AtomicU32, Ordering};
8use std::time::SystemTime;
9
10use serde::{Deserialize, Serialize};
11use tokio::sync::broadcast;
12
13use crate::types::TorrentState;
14use irontide_core::Id20;
15
16pub use irontide_core::AlertCategory;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
24pub enum AlertKind {
25 TorrentAdded {
28 info_hash: Id20,
30 name: String,
32 },
33 TorrentRemoved {
35 info_hash: Id20,
37 },
38 TorrentPaused {
40 info_hash: Id20,
42 },
43 TorrentResumed {
45 info_hash: Id20,
47 },
48 TorrentFinished {
50 info_hash: Id20,
52 },
53 StateChanged {
55 info_hash: Id20,
57 prev_state: TorrentState,
59 new_state: TorrentState,
61 },
62 MetadataReceived {
64 info_hash: Id20,
66 name: String,
68 },
69 MetadataFailed {
71 info_hash: Id20,
73 },
74
75 TorrentChecked {
78 info_hash: Id20,
80 pieces_have: u32,
82 pieces_total: u32,
84 },
85 CheckingProgress {
87 info_hash: Id20,
89 progress: f32,
91 },
92
93 PieceFinished {
96 info_hash: Id20,
98 piece: u32,
100 },
101 BlockFinished {
103 info_hash: Id20,
105 piece: u32,
107 offset: u32,
109 },
110 HashFailed {
112 info_hash: Id20,
114 piece: u32,
116 contributors: Vec<std::net::IpAddr>,
118 },
119
120 PeerConnected {
123 info_hash: Id20,
125 addr: SocketAddr,
127 },
128 PeerDisconnected {
130 info_hash: Id20,
132 addr: SocketAddr,
134 reason: Option<String>,
136 },
137 PeerBanned {
139 info_hash: Id20,
141 addr: SocketAddr,
143 },
144
145 TrackerReply {
148 info_hash: Id20,
150 url: String,
152 num_peers: usize,
154 },
155 TrackerWarning {
157 info_hash: Id20,
159 url: String,
161 message: String,
163 },
164 TrackerError {
166 info_hash: Id20,
168 url: String,
170 message: String,
172 },
173 ScrapeReply {
175 info_hash: Id20,
177 url: String,
179 complete: u32,
181 incomplete: u32,
183 downloaded: u32,
185 },
186 ScrapeError {
188 info_hash: Id20,
190 url: String,
192 message: String,
194 },
195
196 DhtBootstrapComplete,
199 DhtGetPeers {
201 info_hash: Id20,
203 num_peers: usize,
205 },
206 DhtNodeIdViolation {
208 node_id: Id20,
210 addr: SocketAddr,
212 },
213 DhtSampleInfohashes {
215 num_samples: usize,
217 total_estimate: i64,
219 },
220
221 ListenSucceeded {
224 port: u16,
226 },
227 ListenFailed {
229 port: u16,
231 message: String,
233 },
234 SessionStatsUpdate(crate::types::SessionStats),
236
237 FileRenamed {
240 info_hash: Id20,
242 index: usize,
244 new_path: std::path::PathBuf,
246 },
247 FileCompleted {
249 info_hash: Id20,
251 file_index: usize,
253 },
254 StorageMoved {
256 info_hash: Id20,
258 new_path: std::path::PathBuf,
260 },
261 FileError {
263 info_hash: Id20,
265 path: std::path::PathBuf,
267 message: String,
269 },
270 DiskStatsUpdate(crate::disk::DiskStats),
272
273 ResumeDataSaved {
276 info_hash: Id20,
278 },
279
280 TorrentError {
283 info_hash: Id20,
285 message: String,
287 },
288
289 PerformanceWarning {
292 info_hash: Id20,
294 message: String,
296 },
297
298 TorrentQueuePositionChanged {
301 info_hash: Id20,
303 old_pos: i32,
305 new_pos: i32,
307 },
308 TorrentAutoManaged {
310 info_hash: Id20,
312 paused: bool,
314 },
315
316 PeerBlocked {
319 addr: SocketAddr,
321 },
322
323 PeerTurnover {
325 info_hash: Id20,
327 disconnected: usize,
329 replaced: usize,
331 },
332
333 WebSeedBanned {
336 info_hash: Id20,
338 url: String,
340 },
341
342 PortMappingSucceeded {
345 port: u16,
347 protocol: String,
349 },
350 PortMappingFailed {
352 port: u16,
354 message: String,
356 },
357
358 InconsistentHashes {
361 info_hash: Id20,
363 piece: u32,
365 },
366
367 DhtPutComplete {
370 target: Id20,
372 },
373 DhtMutablePutComplete {
375 target: Id20,
377 seq: i64,
379 },
380 DhtGetResult {
382 target: Id20,
384 value: Option<Vec<u8>>,
386 },
387 DhtMutableGetResult {
389 target: Id20,
391 value: Option<Vec<u8>>,
393 seq: Option<i64>,
395 public_key: [u8; 32],
397 },
398 DhtItemError {
400 target: Id20,
402 message: String,
404 },
405
406 HolepunchSucceeded {
409 info_hash: Id20,
411 addr: SocketAddr,
413 },
414 HolepunchFailed {
416 info_hash: Id20,
418 addr: SocketAddr,
420 error_code: Option<u32>,
422 message: String,
424 },
425
426 I2pSessionCreated {
429 b32_address: String,
431 },
432 I2pError {
434 message: String,
436 },
437
438 SslTorrentError {
441 info_hash: Id20,
443 message: String,
445 },
446
447 SessionStatsAlert {
450 values: Vec<i64>,
452 },
453
454 ExternalIpDetected {
457 ip: std::net::IpAddr,
459 },
460
461 SettingsChanged,
464}
465
466impl AlertKind {
467 #[must_use]
469 pub fn category(&self) -> AlertCategory {
470 use AlertKind::{
471 BlockFinished, CheckingProgress, DhtBootstrapComplete, DhtGetPeers, DhtGetResult,
472 DhtItemError, DhtMutableGetResult, DhtMutablePutComplete, DhtNodeIdViolation,
473 DhtPutComplete, DhtSampleInfohashes, DiskStatsUpdate, ExternalIpDetected,
474 FileCompleted, FileError, FileRenamed, HashFailed, HolepunchFailed, HolepunchSucceeded,
475 I2pError, I2pSessionCreated, InconsistentHashes, ListenFailed, ListenSucceeded,
476 MetadataFailed, MetadataReceived, PeerBanned, PeerBlocked, PeerConnected,
477 PeerDisconnected, PeerTurnover, PerformanceWarning, PieceFinished, PortMappingFailed,
478 PortMappingSucceeded, ResumeDataSaved, ScrapeError, ScrapeReply, SessionStatsAlert,
479 SessionStatsUpdate, SettingsChanged, SslTorrentError, StateChanged, StorageMoved,
480 TorrentAdded, TorrentAutoManaged, TorrentChecked, TorrentError, TorrentFinished,
481 TorrentPaused, TorrentQueuePositionChanged, TorrentRemoved, TorrentResumed,
482 TrackerError, TrackerReply, TrackerWarning, WebSeedBanned,
483 };
484 match self {
485 TorrentAdded { .. }
487 | TorrentRemoved { .. }
488 | TorrentPaused { .. }
489 | TorrentResumed { .. }
490 | TorrentFinished { .. }
491 | StateChanged { .. }
492 | MetadataReceived { .. }
493 | ListenSucceeded { .. }
494 | ListenFailed { .. }
495 | ResumeDataSaved { .. }
496 | TorrentChecked { .. }
497 | CheckingProgress { .. }
498 | TorrentQueuePositionChanged { .. }
499 | TorrentAutoManaged { .. }
500 | WebSeedBanned { .. }
501 | ExternalIpDetected { .. }
502 | SettingsChanged => AlertCategory::STATUS,
503
504 MetadataFailed { .. } => AlertCategory::STATUS | AlertCategory::ERROR,
505
506 SessionStatsUpdate(_) | SessionStatsAlert { .. } => AlertCategory::STATS,
508
509 PieceFinished { .. } => AlertCategory::PIECE,
511 HashFailed { .. } => AlertCategory::PIECE | AlertCategory::ERROR,
512
513 BlockFinished { .. } => AlertCategory::BLOCK,
515
516 PeerConnected { .. }
518 | PeerDisconnected { .. }
519 | PeerBanned { .. }
520 | PeerBlocked { .. }
521 | PeerTurnover { .. }
522 | HolepunchSucceeded { .. } => AlertCategory::PEER,
523
524 HolepunchFailed { .. } => AlertCategory::PEER | AlertCategory::ERROR,
525
526 TrackerReply { .. } | TrackerWarning { .. } | ScrapeReply { .. } => {
528 AlertCategory::TRACKER
529 }
530 TrackerError { .. } | ScrapeError { .. } => {
531 AlertCategory::TRACKER | AlertCategory::ERROR
532 }
533
534 DhtBootstrapComplete
536 | DhtGetPeers { .. }
537 | DhtSampleInfohashes { .. }
538 | DhtPutComplete { .. }
539 | DhtMutablePutComplete { .. }
540 | DhtGetResult { .. }
541 | DhtMutableGetResult { .. } => AlertCategory::DHT,
542
543 DhtNodeIdViolation { .. } | DhtItemError { .. } => {
544 AlertCategory::DHT | AlertCategory::ERROR
545 }
546
547 FileRenamed { .. } | StorageMoved { .. } | FileCompleted { .. } => {
549 AlertCategory::STORAGE
550 }
551 FileError { .. } => AlertCategory::STORAGE | AlertCategory::ERROR,
552 DiskStatsUpdate(_) => AlertCategory::STATS | AlertCategory::STORAGE,
553
554 TorrentError { .. } | InconsistentHashes { .. } | SslTorrentError { .. } => {
556 AlertCategory::ERROR
557 }
558
559 PerformanceWarning { .. } => AlertCategory::PERFORMANCE,
561
562 PortMappingSucceeded { .. } => AlertCategory::PORT_MAPPING,
564 PortMappingFailed { .. } => AlertCategory::PORT_MAPPING | AlertCategory::ERROR,
565
566 I2pSessionCreated { .. } => AlertCategory::I2P | AlertCategory::STATUS,
568 I2pError { .. } => AlertCategory::I2P | AlertCategory::ERROR,
569 }
570 }
571}
572
573#[derive(Debug, Clone, Serialize, Deserialize)]
577pub struct Alert {
578 pub timestamp: SystemTime,
580 pub kind: AlertKind,
582}
583
584impl Alert {
585 #[must_use]
587 pub fn new(kind: AlertKind) -> Self {
588 Self {
589 timestamp: SystemTime::now(),
590 kind,
591 }
592 }
593
594 #[must_use]
596 pub fn category(&self) -> AlertCategory {
597 self.kind.category()
598 }
599}
600
601pub struct AlertStream {
607 rx: broadcast::Receiver<Alert>,
608 filter: AlertCategory,
609}
610
611impl AlertStream {
612 #[must_use]
614 pub fn new(rx: broadcast::Receiver<Alert>, filter: AlertCategory) -> Self {
615 Self { rx, filter }
616 }
617
618 pub async fn recv(&mut self) -> Result<Alert, broadcast::error::RecvError> {
626 loop {
627 let alert = self.rx.recv().await?;
628 if alert.category().intersects(self.filter) {
629 return Ok(alert);
630 }
631 }
632 }
633}
634
635pub fn post_alert(tx: &broadcast::Sender<Alert>, mask: &AtomicU32, kind: AlertKind) {
642 let alert = Alert::new(kind);
643 let m = AlertCategory::from_bits_truncate(mask.load(Ordering::Relaxed));
644 if alert.category().intersects(m) {
645 let _ = tx.send(alert);
646 }
647}
648
649#[cfg(test)]
652mod tests {
653 use super::*;
654
655 #[test]
656 fn alert_category_all_includes_every_flag() {
657 let all = AlertCategory::ALL;
658 assert!(all.contains(AlertCategory::STATUS));
659 assert!(all.contains(AlertCategory::ERROR));
660 assert!(all.contains(AlertCategory::PEER));
661 assert!(all.contains(AlertCategory::TRACKER));
662 assert!(all.contains(AlertCategory::STORAGE));
663 assert!(all.contains(AlertCategory::DHT));
664 assert!(all.contains(AlertCategory::STATS));
665 assert!(all.contains(AlertCategory::PIECE));
666 assert!(all.contains(AlertCategory::BLOCK));
667 assert!(all.contains(AlertCategory::PERFORMANCE));
668 assert!(all.contains(AlertCategory::PORT_MAPPING));
669 assert!(all.contains(AlertCategory::I2P));
670 }
671
672 #[test]
673 fn alert_category_mapping() {
674 use AlertKind::*;
675 let info_hash = Id20::from_bytes(&[0u8; 20]).unwrap();
676
677 let a = Alert::new(TorrentAdded {
678 info_hash,
679 name: String::new(),
680 });
681 assert!(a.category().contains(AlertCategory::STATUS));
682
683 let a = Alert::new(PieceFinished {
684 info_hash,
685 piece: 0,
686 });
687 assert!(a.category().contains(AlertCategory::PIECE));
688
689 let a = Alert::new(PeerConnected {
690 info_hash,
691 addr: "127.0.0.1:6881".parse().unwrap(),
692 });
693 assert!(a.category().contains(AlertCategory::PEER));
694
695 let a = Alert::new(TrackerError {
697 info_hash,
698 url: String::new(),
699 message: String::new(),
700 });
701 assert!(a.category().contains(AlertCategory::TRACKER));
702 assert!(a.category().contains(AlertCategory::ERROR));
703 }
704
705 #[test]
706 fn alert_has_timestamp() {
707 let before = SystemTime::now();
708 let alert = Alert::new(AlertKind::DhtBootstrapComplete);
709 assert!(alert.timestamp >= before);
710 }
711
712 #[test]
713 fn alert_is_send_and_sync() {
714 fn assert_send_sync<T: Send + Sync>() {}
715 assert_send_sync::<Alert>();
716 }
717
718 #[test]
719 fn post_alert_respects_mask() {
720 let (tx, mut rx) = broadcast::channel(16);
721 let mask = AtomicU32::new(AlertCategory::STATUS.bits());
722
723 post_alert(
725 &tx,
726 &mask,
727 AlertKind::TorrentAdded {
728 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
729 name: "test".into(),
730 },
731 );
732 assert!(rx.try_recv().is_ok());
733
734 post_alert(
736 &tx,
737 &mask,
738 AlertKind::PieceFinished {
739 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
740 piece: 0,
741 },
742 );
743 assert!(rx.try_recv().is_err());
744 }
745
746 #[test]
747 fn post_alert_empty_mask_blocks_all() {
748 let (tx, mut rx) = broadcast::channel(16);
749 let mask = AtomicU32::new(AlertCategory::empty().bits());
750
751 post_alert(
752 &tx,
753 &mask,
754 AlertKind::TorrentAdded {
755 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
756 name: "test".into(),
757 },
758 );
759 assert!(rx.try_recv().is_err());
760 }
761
762 #[test]
763 fn alert_serializes_to_json() {
764 let alert = Alert::new(AlertKind::TorrentAdded {
765 info_hash: Id20::from_bytes(&[0u8; 20]).unwrap(),
766 name: "test".into(),
767 });
768 let json = serde_json::to_string(&alert).unwrap();
769 let decoded: Alert = serde_json::from_str(&json).unwrap();
770 assert!(matches!(decoded.kind, AlertKind::TorrentAdded { .. }));
771 }
772
773 #[test]
774 fn alert_category_serializes_as_u32() {
775 let mask = AlertCategory::STATUS | AlertCategory::ERROR;
776 let json = serde_json::to_string(&mask).unwrap();
777 let decoded: AlertCategory = serde_json::from_str(&json).unwrap();
778 assert_eq!(decoded, mask);
779 }
780
781 #[test]
782 fn queue_position_changed_alert_has_status_category() {
783 let alert = Alert::new(AlertKind::TorrentQueuePositionChanged {
784 info_hash: Id20::from([0u8; 20]),
785 old_pos: 3,
786 new_pos: 0,
787 });
788 assert!(alert.category().contains(AlertCategory::STATUS));
789 }
790
791 #[test]
792 fn torrent_auto_managed_alert_has_status_category() {
793 let alert = Alert::new(AlertKind::TorrentAutoManaged {
794 info_hash: Id20::from([0u8; 20]),
795 paused: true,
796 });
797 assert!(alert.category().contains(AlertCategory::STATUS));
798 }
799
800 #[test]
801 fn scrape_reply_alert_has_tracker_category() {
802 let alert = Alert::new(AlertKind::ScrapeReply {
803 info_hash: Id20::from([0u8; 20]),
804 url: "http://tracker.example.com/announce".into(),
805 complete: 10,
806 incomplete: 3,
807 downloaded: 50,
808 });
809 assert!(alert.category().contains(AlertCategory::TRACKER));
810 }
811
812 #[test]
813 fn scrape_error_alert_has_tracker_and_error_category() {
814 let alert = Alert::new(AlertKind::ScrapeError {
815 info_hash: Id20::from([0u8; 20]),
816 url: "http://tracker.example.com/announce".into(),
817 message: "connection refused".into(),
818 });
819 assert!(alert.category().contains(AlertCategory::TRACKER));
820 assert!(alert.category().contains(AlertCategory::ERROR));
821 }
822
823 #[test]
824 fn web_seed_banned_alert_has_status_category() {
825 let alert = Alert::new(AlertKind::WebSeedBanned {
826 info_hash: Id20::from([0u8; 20]),
827 url: "http://example.com/files".into(),
828 });
829 assert!(alert.category().contains(AlertCategory::STATUS));
830 }
831
832 #[test]
833 fn dht_node_id_violation_alert_category() {
834 let alert = Alert::new(AlertKind::DhtNodeIdViolation {
835 node_id: Id20::from([0u8; 20]),
836 addr: "203.0.113.5:6881".parse().unwrap(),
837 });
838 assert!(alert.category().contains(AlertCategory::DHT));
839 assert!(alert.category().contains(AlertCategory::ERROR));
840 }
841
842 #[test]
843 fn dht_put_complete_alert_has_dht_category() {
844 let alert = Alert::new(AlertKind::DhtPutComplete {
845 target: Id20::from([0u8; 20]),
846 });
847 assert!(alert.category().contains(AlertCategory::DHT));
848 }
849
850 #[test]
851 fn dht_sample_infohashes_alert_has_dht_category() {
852 let alert = Alert::new(AlertKind::DhtSampleInfohashes {
853 num_samples: 15,
854 total_estimate: 500,
855 });
856 assert!(alert.category().contains(AlertCategory::DHT));
857 }
858
859 #[test]
860 fn dht_item_error_alert_has_dht_and_error_category() {
861 let alert = Alert::new(AlertKind::DhtItemError {
862 target: Id20::from([0u8; 20]),
863 message: "test".into(),
864 });
865 assert!(alert.category().contains(AlertCategory::DHT));
866 assert!(alert.category().contains(AlertCategory::ERROR));
867 }
868
869 #[test]
870 fn holepunch_succeeded_alert_has_peer_category() {
871 let alert = Alert::new(AlertKind::HolepunchSucceeded {
872 info_hash: Id20::from([0u8; 20]),
873 addr: "203.0.113.5:6881".parse().unwrap(),
874 });
875 assert!(alert.category().contains(AlertCategory::PEER));
876 assert!(!alert.category().contains(AlertCategory::ERROR));
877 }
878
879 #[test]
880 fn holepunch_failed_alert_has_peer_and_error_category() {
881 let alert = Alert::new(AlertKind::HolepunchFailed {
882 info_hash: Id20::from([0u8; 20]),
883 addr: "203.0.113.5:6881".parse().unwrap(),
884 error_code: Some(1),
885 message: "no support".into(),
886 });
887 assert!(alert.category().contains(AlertCategory::PEER));
888 assert!(alert.category().contains(AlertCategory::ERROR));
889 }
890
891 #[test]
892 fn i2p_session_created_alert_category() {
893 let alert = Alert::new(AlertKind::I2pSessionCreated {
894 b32_address: "abcdef1234567890abcdef1234567890abcdef1234567890abcd.b32.i2p".into(),
895 });
896 assert!(alert.category().contains(AlertCategory::I2P));
897 assert!(alert.category().contains(AlertCategory::STATUS));
898 }
899
900 #[test]
901 fn i2p_error_alert_category() {
902 let alert = Alert::new(AlertKind::I2pError {
903 message: "SAM bridge unreachable".into(),
904 });
905 assert!(alert.category().contains(AlertCategory::I2P));
906 assert!(alert.category().contains(AlertCategory::ERROR));
907 }
908
909 #[test]
910 fn alert_category_all_includes_i2p() {
911 let all = AlertCategory::ALL;
912 assert!(all.contains(AlertCategory::I2P));
913 }
914
915 #[test]
916 fn ssl_torrent_error_alert_has_error_category() {
917 let alert = Alert::new(AlertKind::SslTorrentError {
918 info_hash: Id20::from([0u8; 20]),
919 message: "handshake failed".into(),
920 });
921 assert!(alert.category().contains(AlertCategory::ERROR));
922 }
923
924 #[test]
925 fn ssl_torrent_error_alert_serializes_to_json() {
926 let alert = Alert::new(AlertKind::SslTorrentError {
927 info_hash: Id20::from([0u8; 20]),
928 message: "cert validation failed".into(),
929 });
930 let json = serde_json::to_string(&alert).unwrap();
931 let decoded: Alert = serde_json::from_str(&json).unwrap();
932 assert!(matches!(decoded.kind, AlertKind::SslTorrentError { .. }));
933 }
934
935 #[test]
936 fn i2p_alert_serializes_to_json() {
937 let alert = Alert::new(AlertKind::I2pSessionCreated {
938 b32_address: "test.b32.i2p".into(),
939 });
940 let json = serde_json::to_string(&alert).unwrap();
941 let decoded: Alert = serde_json::from_str(&json).unwrap();
942 assert!(matches!(decoded.kind, AlertKind::I2pSessionCreated { .. }));
943 }
944
945 #[test]
946 fn peer_turnover_alert_has_peer_category() {
947 let alert = Alert::new(AlertKind::PeerTurnover {
948 info_hash: Id20::from([0u8; 20]),
949 disconnected: 2,
950 replaced: 1,
951 });
952 assert!(alert.category().contains(AlertCategory::PEER));
953 }
954
955 #[test]
956 fn session_stats_alert_has_stats_category() {
957 let alert = Alert::new(AlertKind::SessionStatsAlert {
958 values: vec![100, 200, 300],
959 });
960 assert!(alert.category().contains(AlertCategory::STATS));
961 }
962
963 #[test]
964 fn session_stats_alert_serializes_to_json() {
965 let alert = Alert::new(AlertKind::SessionStatsAlert {
966 values: vec![1, 2, 3, 4, 5],
967 });
968 let json = serde_json::to_string(&alert).unwrap();
969 let decoded: Alert = serde_json::from_str(&json).unwrap();
970 match decoded.kind {
971 AlertKind::SessionStatsAlert { values } => {
972 assert_eq!(values, vec![1, 2, 3, 4, 5]);
973 }
974 _ => panic!("expected SessionStatsAlert"),
975 }
976 }
977}