use std::pin::Pin;
use anyhow;
use asic_rs_core::traits::miner::*;
use async_stream::stream;
use tokio::net::UdpSocket;
use tokio_stream::{Stream, StreamExt};
use crate::factory::MinerFactory;
pub struct MinerListener {
antminer_listener: AntMinerListener,
whatsminer_listener: WhatsMinerListener,
}
impl Default for MinerListener {
fn default() -> Self {
Self::new()
}
}
impl MinerListener {
pub fn new() -> Self {
MinerListener {
antminer_listener: AntMinerListener::new(),
whatsminer_listener: WhatsMinerListener::new(),
}
}
pub async fn listen(
&self,
) -> Pin<Box<dyn Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> + '_>> {
let am_stream = self.antminer_listener.listen().await;
let wm_stream = self.whatsminer_listener.listen().await;
let stream = am_stream.merge(wm_stream);
Box::pin(stream)
}
}
struct AntMinerListener {}
impl AntMinerListener {
pub fn new() -> Self {
AntMinerListener {}
}
pub(crate) async fn listen(
&self,
) -> impl Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> {
stream! {
let factory = MinerFactory::new();
let sock = match UdpSocket::bind("0.0.0.0:14235").await {
Ok(s) => s,
Err(e) => {
yield Err(anyhow::anyhow!("Failed to bind UDP port 14235: {e}"));
return;
}
};
let mut buf = Vec::with_capacity(256);
loop {
buf.clear();
match sock.recv_buf_from(&mut buf).await {
Ok((_len, addr)) => yield factory.get_miner(addr.ip()).await,
Err(e) => {
tracing::warn!("UDP recv error on port 14235: {e}");
continue;
}
}
}
}
}
}
struct WhatsMinerListener {}
impl WhatsMinerListener {
pub fn new() -> Self {
WhatsMinerListener {}
}
pub(crate) async fn listen(
&self,
) -> impl Stream<Item = anyhow::Result<Option<Box<dyn Miner>>>> {
stream! {
let factory = MinerFactory::new();
let sock = match UdpSocket::bind("0.0.0.0:8888").await {
Ok(s) => s,
Err(e) => {
yield Err(anyhow::anyhow!("Failed to bind UDP port 8888: {e}"));
return;
}
};
let mut buf = Vec::with_capacity(256);
loop {
buf.clear();
match sock.recv_buf_from(&mut buf).await {
Ok((_len, addr)) => yield factory.get_miner(addr.ip()).await,
Err(e) => {
tracing::warn!("UDP recv error on port 8888: {e}");
continue;
}
}
}
}
}
}