use crate::core::portfolio::PortfolioManager;
use crate::core::strategy::StrategyStats;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
pub async fn spawn_metrics_server(
port: u16,
stats: Arc<StrategyStats>,
portfolio: Arc<PortfolioManager>,
) -> Option<SocketAddr> {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listener = match TcpListener::bind(addr).await {
Ok(l) => l,
Err(e) => {
tracing::warn!("Metrics server failed to bind: {}", e);
return None;
}
};
let local = listener.local_addr().ok();
if let Some(addr) = local {
tracing::info!("Metrics server listening on {}", addr);
}
tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((mut socket, _)) => {
let body = render_metrics(&stats, &portfolio);
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\n\r\n{}",
body.len(),
body
);
let _ = socket.write_all(response.as_bytes()).await;
}
Err(e) => {
tracing::warn!("Metrics accept error: {}", e);
continue;
}
}
}
});
local
}
fn render_metrics(stats: &Arc<StrategyStats>, portfolio: &Arc<PortfolioManager>) -> String {
let processed = stats.processed.load(std::sync::atomic::Ordering::Relaxed);
let submitted = stats.submitted.load(std::sync::atomic::Ordering::Relaxed);
let skipped = stats.skipped.load(std::sync::atomic::Ordering::Relaxed);
let failed = stats.failed.load(std::sync::atomic::Ordering::Relaxed);
let mut body = format!(
concat!(
"# TYPE strategy_processed counter\nstrategy_processed {}\n",
"# TYPE strategy_submitted counter\nstrategy_submitted {}\n",
"# TYPE strategy_skipped counter\nstrategy_skipped {}\n",
"# TYPE strategy_failed counter\nstrategy_failed {}\n"
),
processed, submitted, skipped, failed
);
for (chain, profit) in portfolio.net_profit_all() {
body.push_str(&format!(
"# TYPE net_profit_eth gauge\nnet_profit_eth{{chain=\"{}\"}} {}\n",
chain, profit
));
}
for (chain, token, profit) in portfolio.token_profit_all() {
body.push_str(&format!(
"# TYPE token_profit gauge\ntoken_profit{{chain=\"{}\",token=\"{:#x}\"}} {}\n",
chain, token, profit
));
}
body
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::portfolio::PortfolioManager;
use crate::core::strategy::StrategyStats;
use crate::network::provider::HttpProvider;
use alloy::primitives::Address;
use url::Url;
#[tokio::test]
async fn metrics_endpoint_serves() {
let provider = HttpProvider::new_http(Url::parse("http://localhost:8545").unwrap());
let portfolio = Arc::new(PortfolioManager::new(provider, Address::ZERO));
let stats = Arc::new(StrategyStats::default());
let addr = spawn_metrics_server(0, stats.clone(), portfolio.clone())
.await
.expect("bind metrics");
let body = reqwest::get(format!("http://{}", addr))
.await
.unwrap()
.text()
.await
.unwrap();
assert!(body.contains("strategy_processed"));
}
}