#![allow(clippy::redundant_async_block)]
use std::{net::SocketAddr, sync::Once, time::Duration};
use stratum_server::{Result, Session, SessionList, StratumRequest, StratumServer};
use tokio::{io::AsyncReadExt, net::TcpStream, task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;
use tracing::subscriber::set_global_default;
use tracing_subscriber::{fmt, prelude::*, EnvFilter, Registry};
pub const STARTUP_TIME: Duration = Duration::from_secs(2);
pub const CONNECTION_DELAY: Duration = Duration::from_secs(1);
pub fn init_telemetry() {
let fmt_layer = fmt::layer();
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("stratum_server=trace"))
.unwrap();
let subscriber = Registry::default().with(filter_layer).with(fmt_layer);
set_global_default(subscriber).expect("Failed to set subscriber");
}
static LOGGER: Once = Once::new();
pub fn init() {
LOGGER.call_once(|| {
init_telemetry();
});
}
#[derive(Clone)]
pub struct AuthProvider {}
impl AuthProvider {
pub async fn login(&self) -> bool {
true
}
}
#[derive(Clone)]
pub struct State {
auth: AuthProvider,
}
#[derive(Clone, Default)]
pub struct ConnectionState {}
pub async fn handle_auth(
req: StratumRequest<State>,
_connection: Session<ConnectionState>,
) -> Result<bool> {
let state = req.state();
let login = state.auth.login().await;
Ok(login)
}
pub async fn poll_global(
_state: State,
_connection_list: SessionList<ConnectionState>,
) -> Result<()> {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
pub async fn spawn_full_server() -> Result<(SocketAddr, JoinHandle<Result<()>>, CancellationToken)>
{
let cancel_token = CancellationToken::new();
let auth = AuthProvider {};
let state = State { auth };
let builder = StratumServer::builder(state, 1)
.with_host("0.0.0.0")
.with_port(0)
.with_cancel_token(cancel_token.clone());
#[cfg(feature = "api")]
let builder = builder.with_api_host("0.0.0.0").with_api_port(0);
let mut server = builder.build().await?;
let address = server.get_address();
server.add("auth", handle_auth);
server.global("Poll Global", poll_global);
let handle = tokio::spawn(async move { server.start().await });
sleep(STARTUP_TIME).await;
Ok((address, handle, cancel_token))
}
pub async fn generate_connections<A: Into<SocketAddr>>(
num: usize,
url: A,
sleep_duration: u64,
) -> Vec<JoinHandle<usize>> {
let addrs = url.into();
let mut connections = Vec::new();
for i in 0..num {
let client = tokio::task::spawn({
async move {
let mut stream = TcpStream::connect(addrs).await.unwrap();
let mut buffer = [0; 10];
let sleep = tokio::time::sleep(Duration::from_secs(sleep_duration));
tokio::pin!(sleep);
loop {
tokio::select! {
res = stream.read(&mut buffer) => {
if let Ok(n) = res {
match n {
0 => {
break;
},
_ => {
println!("{:?}", buffer);
continue;
},
}
} else {
break;
}
},
_ = &mut sleep => {
break;
}
}
}
i
}
});
connections.push(client);
}
sleep(CONNECTION_DELAY).await;
connections
}