asic_rs/miners/
listener.rs

1use std::pin::Pin;
2
3use anyhow::Result;
4use async_stream::stream;
5use tokio::net::UdpSocket;
6use tokio_stream::{Stream, StreamExt};
7
8use super::backends::traits::*;
9use super::factory::MinerFactory;
10
11pub struct MinerListener {
12    antminer_listener: AntMinerListener,
13    whatsminer_listener: WhatsMinerListener,
14}
15
16impl Default for MinerListener {
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl MinerListener {
23    pub fn new() -> Self {
24        MinerListener {
25            antminer_listener: AntMinerListener::new(),
26            whatsminer_listener: WhatsMinerListener::new(),
27        }
28    }
29
30    /// Listen for miners on the network.
31    ///
32    /// # Examples
33    ///
34    /// ```no_run
35    /// use asic_rs::miners::listener::MinerListener;
36    /// use futures::pin_mut;
37    /// use tokio_stream::StreamExt;
38    ///
39    /// #[tokio::main]
40    /// async fn main() -> () {
41    ///     let listener = MinerListener::new();
42    ///     let stream = listener.listen().await;
43    ///     pin_mut!(stream);
44    ///
45    ///     while let Some(miner) = stream.next().await {
46    ///         println!("Found miner: {miner:?}")
47    ///     }
48    /// }
49    /// ```
50    pub async fn listen(&self) -> Pin<Box<dyn Stream<Item = Result<Option<Box<dyn Miner>>>> + '_>> {
51        let am_stream = self.antminer_listener.listen().await;
52        let wm_stream = self.whatsminer_listener.listen().await;
53
54        let stream = am_stream.merge(wm_stream);
55
56        Box::pin(stream)
57    }
58}
59
60struct AntMinerListener {}
61
62impl AntMinerListener {
63    pub fn new() -> Self {
64        AntMinerListener {}
65    }
66
67    pub(crate) async fn listen(&self) -> impl Stream<Item = Result<Option<Box<dyn Miner>>>> {
68        stream! {
69            let factory = MinerFactory::new();
70            let sock = UdpSocket::bind("0.0.0.0:14235").await.expect("Failed to bind to port 14235 to listen for AntMiners.");
71            let mut buf = Vec::with_capacity(256);
72
73            loop {
74                let (_len, addr) = sock.recv_buf_from(&mut buf).await.unwrap();
75                yield factory.get_miner(addr.ip()).await;
76            }
77        }
78    }
79}
80
81struct WhatsMinerListener {}
82
83impl WhatsMinerListener {
84    pub fn new() -> Self {
85        WhatsMinerListener {}
86    }
87
88    pub(crate) async fn listen(&self) -> impl Stream<Item = Result<Option<Box<dyn Miner>>>> {
89        stream! {
90            let factory = MinerFactory::new();
91            let sock = UdpSocket::bind("0.0.0.0:8888").await.expect("Failed to bind to port 8888 to listen for WhatsMiners.");
92            let mut buf = Vec::with_capacity(256);
93
94            loop {
95                let (_len, addr) = sock.recv_buf_from(&mut buf).await.unwrap();
96                yield factory.get_miner(addr.ip()).await;
97            }
98        }
99    }
100}