asic_rs/miners/
listener.rs

1use std::pin::Pin;
2
3use anyhow;
4use async_stream::stream;
5use tokio::net::UdpSocket;
6use tokio_stream::{Stream, StreamExt};
7
8use super::backends::traits::*;
9use super::factory::MinerFactory;
10
11pub struct MinerListener {
12    antminer_listener: AntMinerListener,
13    whatsminer_listener: WhatsMinerListener,
14}
15
16impl Default for MinerListener {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl MinerListener {
23    pub fn new() -> Self {
24        MinerListener {
25            antminer_listener: AntMinerListener::new(),
26            whatsminer_listener: WhatsMinerListener::new(),
27        }
28    }
29
30    /// Listen for miners on the network.
31    ///
32    /// # Examples
33    ///
34    /// ```no_run
35    /// use asic_rs::miners::listener::MinerListener;
36    /// use futures::pin_mut;
37    /// use tokio_stream::StreamExt;
38    ///
39    /// #[tokio::main]
40    /// async fn main() -> () {
41    ///     let listener = MinerListener::new();
42    ///     let stream = listener.listen().await;
43    ///     pin_mut!(stream);
44    ///
45    ///     while let Some(miner) = stream.next().await {
46    ///         println!("Found miner: {miner:?}")
47    ///     }
48    /// }
49    /// ```
50    pub async fn listen(
51        &self,
52    ) -> Pin<Box<dyn Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> + '_>> {
53        let am_stream = self.antminer_listener.listen().await;
54        let wm_stream = self.whatsminer_listener.listen().await;
55
56        let stream = am_stream.merge(wm_stream);
57
58        Box::pin(stream)
59    }
60}
61
62struct AntMinerListener {}
63
64impl AntMinerListener {
65    pub fn new() -> Self {
66        AntMinerListener {}
67    }
68
69    pub(crate) async fn listen(
70        &self,
71    ) -> impl Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> {
72        stream! {
73            let factory = MinerFactory::new();
74            let sock = UdpSocket::bind("0.0.0.0:14235").await.expect("Failed to bind to port 14235 to listen for AntMiners.");
75            let mut buf = Vec::with_capacity(256);
76
77            loop {
78                let (_len, addr) = sock.recv_buf_from(&mut buf).await.unwrap();
79                yield factory.get_miner(addr.ip()).await;
80            }
81        }
82    }
83}
84
85struct WhatsMinerListener {}
86
87impl WhatsMinerListener {
88    pub fn new() -> Self {
89        WhatsMinerListener {}
90    }
91
92    pub(crate) async fn listen(
93        &self,
94    ) -> impl Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> {
95        stream! {
96            let factory = MinerFactory::new();
97            let sock = UdpSocket::bind("0.0.0.0:8888").await.expect("Failed to bind to port 8888 to listen for WhatsMiners.");
98            let mut buf = Vec::with_capacity(256);
99
100            loop {
101                let (_len, addr) = sock.recv_buf_from(&mut buf).await.unwrap();
102                yield factory.get_miner(addr.ip()).await;
103            }
104        }
105    }
106}