use std::net::IpAddr;
use std::pin::Pin;
use anyhow;
use asic_rs_core::traits::miner::*;
use async_stream::stream;
use tokio::net::UdpSocket;
use tokio_stream::{Stream, StreamExt};
use crate::factory::MinerFactory;
pub struct MinerListener {
antminer_listener: AntMinerListener,
whatsminer_listener: WhatsMinerListener,
}
impl Default for MinerListener {
fn default() -> Self {
Self::new()
}
}
impl MinerListener {
pub fn new() -> Self {
MinerListener {
antminer_listener: AntMinerListener::new(),
whatsminer_listener: WhatsMinerListener::new(),
}
}
pub async fn listen(
&self,
) -> Pin<Box<dyn Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> + '_>> {
let am_stream = self.antminer_listener.listen().await;
let wm_stream = self.whatsminer_listener.listen().await;
let stream = am_stream.merge(wm_stream);
Box::pin(stream)
}
pub async fn listen_ip_only(
&self,
) -> Pin<Box<dyn Stream<Item = anyhow::Result<Option<IpAddr>>> + '_>> {
let am_stream = self.antminer_listener.listen_ip_only().await;
let wm_stream = self.whatsminer_listener.listen_ip_only().await;
let stream = am_stream.merge(wm_stream);
Box::pin(stream)
}
}
struct AntMinerListener {}
impl AntMinerListener {
pub fn new() -> Self {
AntMinerListener {}
}
pub(crate) async fn listen(
&self,
) -> impl Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> {
stream! {
let factory = MinerFactory::new();
let sock = match UdpSocket::bind("0.0.0.0:14235").await {
Ok(s) => s,
Err(e) => {
yield Err(anyhow::anyhow!("Failed to bind UDP port 14235: {e}"));
return;
}
};
let mut buf = Vec::with_capacity(256);
loop {
buf.clear();
match sock.recv_buf_from(&mut buf).await {
Ok((_len, addr)) => yield factory.get_miner(addr.ip()).await,
Err(e) => {
tracing::warn!("UDP recv error on port 14235: {e}");
continue;
}
}
}
}
}
pub(crate) async fn listen_ip_only(
&self,
) -> impl Stream<Item = anyhow::Result<Option<IpAddr>>> {
stream! {
let _factory = MinerFactory::new();
let sock = UdpSocket::bind("0.0.0.0:14235").await.expect("Failed to bind to port 14235 to listen for AntMiners.");
let mut buf = Vec::with_capacity(256);
loop {
let (_len, addr) = sock.recv_buf_from(&mut buf).await.unwrap();
yield Ok(Some(addr.ip()));
}
}
}
}
struct WhatsMinerListener {}
impl WhatsMinerListener {
pub fn new() -> Self {
WhatsMinerListener {}
}
pub(crate) async fn listen(
&self,
) -> impl Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> {
stream! {
let factory = MinerFactory::new();
let sock = match UdpSocket::bind("0.0.0.0:8888").await {
Ok(s) => s,
Err(e) => {
yield Err(anyhow::anyhow!("Failed to bind UDP port 8888: {e}"));
return;
}
};
let mut buf = Vec::with_capacity(256);
loop {
buf.clear();
match sock.recv_buf_from(&mut buf).await {
Ok((_len, addr)) => yield factory.get_miner(addr.ip()).await,
Err(e) => {
tracing::warn!("UDP recv error on port 8888: {e}");
continue;
}
}
}
}
}
pub(crate) async fn listen_ip_only(
&self,
) -> impl Stream<Item = anyhow::Result<Option<IpAddr>>> {
stream! {
let _factory = MinerFactory::new();
let sock = UdpSocket::bind("0.0.0.0:8888").await.expect("Failed to bind to port 8888 to listen for WhatsMiners.");
let mut buf = Vec::with_capacity(256);
loop {
let (_len, addr) = sock.recv_buf_from(&mut buf).await.unwrap();
yield Ok(Some(addr.ip()));
}
}
}
}