use prometheus::{Counter, Encoder, Gauge, Histogram, HistogramOpts, Opts, Registry, TextEncoder};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use http_body_util::Full;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use super::live_metrics::LiveMetrics;
const LATENCY_BUCKETS: &[f64] = &[
0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0, 1000.0,
];
const SPREAD_BUCKETS: &[f64] = &[1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0];
#[derive(Debug)]
pub struct PrometheusMetrics {
registry: Registry,
quotes_total: Counter,
orders_submitted_total: Counter,
orders_filled_total: Counter,
orders_cancelled_total: Counter,
orders_rejected_total: Counter,
partial_fills_total: Counter,
open_orders: Gauge,
position_current: Gauge,
pnl_realized: Gauge,
pnl_unrealized: Gauge,
pnl_total: Gauge,
spread_current: Gauge,
order_latency: Histogram,
fill_latency: Histogram,
spread_histogram: Histogram,
}
impl PrometheusMetrics {
pub fn new(namespace: &str) -> Result<Self, prometheus::Error> {
let registry = Registry::new();
let quotes_total = Counter::with_opts(
Opts::new("quotes_total", "Total number of quotes generated")
.namespace(namespace)
.subsystem("quotes"),
)?;
let orders_submitted_total = Counter::with_opts(
Opts::new("submitted_total", "Total number of orders submitted")
.namespace(namespace)
.subsystem("orders"),
)?;
let orders_filled_total = Counter::with_opts(
Opts::new("filled_total", "Total number of orders filled")
.namespace(namespace)
.subsystem("orders"),
)?;
let orders_cancelled_total = Counter::with_opts(
Opts::new("cancelled_total", "Total number of orders cancelled")
.namespace(namespace)
.subsystem("orders"),
)?;
let orders_rejected_total = Counter::with_opts(
Opts::new("rejected_total", "Total number of orders rejected")
.namespace(namespace)
.subsystem("orders"),
)?;
let partial_fills_total = Counter::with_opts(
Opts::new("partial_fills_total", "Total number of partial fills")
.namespace(namespace)
.subsystem("orders"),
)?;
let open_orders = Gauge::with_opts(
Opts::new("open_orders", "Current number of open orders")
.namespace(namespace)
.subsystem("orders"),
)?;
let position_current = Gauge::with_opts(
Opts::new("current", "Current position size")
.namespace(namespace)
.subsystem("position"),
)?;
let pnl_realized = Gauge::with_opts(
Opts::new("realized", "Realized PnL")
.namespace(namespace)
.subsystem("pnl"),
)?;
let pnl_unrealized = Gauge::with_opts(
Opts::new("unrealized", "Unrealized PnL")
.namespace(namespace)
.subsystem("pnl"),
)?;
let pnl_total = Gauge::with_opts(
Opts::new("total", "Total PnL (realized + unrealized)")
.namespace(namespace)
.subsystem("pnl"),
)?;
let spread_current = Gauge::with_opts(
Opts::new("current_bps", "Current spread in basis points")
.namespace(namespace)
.subsystem("spread"),
)?;
let order_latency = Histogram::with_opts(
HistogramOpts::new(
"order_milliseconds",
"Order submission latency in milliseconds",
)
.namespace(namespace)
.subsystem("latency")
.buckets(LATENCY_BUCKETS.to_vec()),
)?;
let fill_latency = Histogram::with_opts(
HistogramOpts::new(
"fill_milliseconds",
"Fill notification latency in milliseconds",
)
.namespace(namespace)
.subsystem("latency")
.buckets(LATENCY_BUCKETS.to_vec()),
)?;
let spread_histogram = Histogram::with_opts(
HistogramOpts::new("distribution_bps", "Spread distribution in basis points")
.namespace(namespace)
.subsystem("spread")
.buckets(SPREAD_BUCKETS.to_vec()),
)?;
registry.register(Box::new(quotes_total.clone()))?;
registry.register(Box::new(orders_submitted_total.clone()))?;
registry.register(Box::new(orders_filled_total.clone()))?;
registry.register(Box::new(orders_cancelled_total.clone()))?;
registry.register(Box::new(orders_rejected_total.clone()))?;
registry.register(Box::new(partial_fills_total.clone()))?;
registry.register(Box::new(open_orders.clone()))?;
registry.register(Box::new(position_current.clone()))?;
registry.register(Box::new(pnl_realized.clone()))?;
registry.register(Box::new(pnl_unrealized.clone()))?;
registry.register(Box::new(pnl_total.clone()))?;
registry.register(Box::new(spread_current.clone()))?;
registry.register(Box::new(order_latency.clone()))?;
registry.register(Box::new(fill_latency.clone()))?;
registry.register(Box::new(spread_histogram.clone()))?;
Ok(Self {
registry,
quotes_total,
orders_submitted_total,
orders_filled_total,
orders_cancelled_total,
orders_rejected_total,
partial_fills_total,
open_orders,
position_current,
pnl_realized,
pnl_unrealized,
pnl_total,
spread_current,
order_latency,
fill_latency,
spread_histogram,
})
}
pub fn inc_quotes(&self) {
self.quotes_total.inc();
}
pub fn inc_quotes_by(&self, count: f64) {
self.quotes_total.inc_by(count);
}
pub fn inc_orders_submitted(&self) {
self.orders_submitted_total.inc();
}
pub fn inc_orders_filled(&self) {
self.orders_filled_total.inc();
}
pub fn inc_orders_cancelled(&self) {
self.orders_cancelled_total.inc();
}
pub fn inc_orders_rejected(&self) {
self.orders_rejected_total.inc();
}
pub fn inc_partial_fills(&self) {
self.partial_fills_total.inc();
}
pub fn set_open_orders(&self, count: f64) {
self.open_orders.set(count);
}
pub fn set_position(&self, position: f64) {
self.position_current.set(position);
}
pub fn set_pnl(&self, realized: f64, unrealized: f64) {
self.pnl_realized.set(realized);
self.pnl_unrealized.set(unrealized);
self.pnl_total.set(realized + unrealized);
}
pub fn set_spread(&self, spread_bps: f64) {
self.spread_current.set(spread_bps);
}
pub fn observe_order_latency(&self, latency_ms: f64) {
self.order_latency.observe(latency_ms);
}
pub fn observe_fill_latency(&self, latency_ms: f64) {
self.fill_latency.observe(latency_ms);
}
pub fn observe_spread(&self, spread_bps: f64) {
self.spread_histogram.observe(spread_bps);
}
#[must_use]
pub fn registry(&self) -> &Registry {
&self.registry
}
pub fn encode(&self) -> Result<String, prometheus::Error> {
let encoder = TextEncoder::new();
let metric_families = self.registry.gather();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
Ok(String::from_utf8(buffer).unwrap_or_default())
}
#[must_use]
pub fn get_quotes_total(&self) -> f64 {
self.quotes_total.get()
}
#[must_use]
pub fn get_orders_submitted_total(&self) -> f64 {
self.orders_submitted_total.get()
}
#[must_use]
pub fn get_orders_filled_total(&self) -> f64 {
self.orders_filled_total.get()
}
#[must_use]
pub fn get_orders_cancelled_total(&self) -> f64 {
self.orders_cancelled_total.get()
}
#[must_use]
pub fn get_orders_rejected_total(&self) -> f64 {
self.orders_rejected_total.get()
}
#[must_use]
pub fn get_partial_fills_total(&self) -> f64 {
self.partial_fills_total.get()
}
#[must_use]
pub fn get_open_orders(&self) -> f64 {
self.open_orders.get()
}
#[must_use]
pub fn get_position(&self) -> f64 {
self.position_current.get()
}
#[must_use]
pub fn get_pnl_realized(&self) -> f64 {
self.pnl_realized.get()
}
#[must_use]
pub fn get_pnl_unrealized(&self) -> f64 {
self.pnl_unrealized.get()
}
#[must_use]
pub fn get_pnl_total(&self) -> f64 {
self.pnl_total.get()
}
#[must_use]
pub fn get_spread(&self) -> f64 {
self.spread_current.get()
}
}
pub struct MetricsServer {
metrics: Arc<PrometheusMetrics>,
bind_address: String,
}
impl MetricsServer {
#[must_use]
pub fn new(metrics: Arc<PrometheusMetrics>, bind_address: &str) -> Self {
Self {
metrics,
bind_address: bind_address.to_string(),
}
}
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr: SocketAddr = self.bind_address.parse()?;
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let metrics = Arc::clone(&self.metrics);
tokio::spawn(async move {
let service = service_fn(move |req| {
let metrics = Arc::clone(&metrics);
async move { handle_request(req, metrics).await }
});
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
eprintln!("Error serving connection: {:?}", err);
}
});
}
}
#[must_use]
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
if let Err(e) = self.run().await {
eprintln!("Metrics server error: {}", e);
}
})
}
#[must_use]
pub fn bind_address(&self) -> &str {
&self.bind_address
}
}
async fn handle_request(
req: Request<hyper::body::Incoming>,
metrics: Arc<PrometheusMetrics>,
) -> Result<Response<Full<Bytes>>, Infallible> {
let response = match req.uri().path() {
"/metrics" => match metrics.encode() {
Ok(body) => Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/plain; charset=utf-8")
.body(Full::new(Bytes::from(body)))
.unwrap_or_else(|_| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Full::new(Bytes::from("Failed to build response")))
.unwrap()
}),
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Full::new(Bytes::from(format!(
"Error encoding metrics: {}",
e
))))
.unwrap(),
},
"/health" => Response::builder()
.status(StatusCode::OK)
.body(Full::new(Bytes::from("OK")))
.unwrap(),
"/" => Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "text/html")
.body(Full::new(Bytes::from(
r#"<html>
<head><title>Market Maker Metrics</title></head>
<body>
<h1>Market Maker Metrics</h1>
<p><a href="/metrics">Metrics</a></p>
<p><a href="/health">Health</a></p>
</body>
</html>"#,
)))
.unwrap(),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from("Not Found")))
.unwrap(),
};
Ok(response)
}
pub struct MetricsBridge {
live_metrics: Arc<LiveMetrics>,
prom_metrics: Arc<PrometheusMetrics>,
}
impl MetricsBridge {
#[must_use]
pub fn new(live_metrics: Arc<LiveMetrics>, prom_metrics: Arc<PrometheusMetrics>) -> Self {
Self {
live_metrics,
prom_metrics,
}
}
pub fn sync(&self) {
let snapshot = self.live_metrics.snapshot(0);
let quotes_diff = snapshot.quotes_generated as f64 - self.prom_metrics.get_quotes_total();
if quotes_diff > 0.0 {
self.prom_metrics.inc_quotes_by(quotes_diff);
}
self.prom_metrics
.set_open_orders(snapshot.open_orders as f64);
self.prom_metrics
.set_position(snapshot.current_position.to_string().parse().unwrap_or(0.0));
self.prom_metrics.set_pnl(
snapshot.realized_pnl.to_string().parse().unwrap_or(0.0),
snapshot.unrealized_pnl.to_string().parse().unwrap_or(0.0),
);
}
#[must_use]
pub fn live_metrics(&self) -> &Arc<LiveMetrics> {
&self.live_metrics
}
#[must_use]
pub fn prom_metrics(&self) -> &Arc<PrometheusMetrics> {
&self.prom_metrics
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_prometheus_metrics_new() {
let metrics = PrometheusMetrics::new("test").unwrap();
assert_eq!(metrics.get_quotes_total(), 0.0);
assert_eq!(metrics.get_orders_submitted_total(), 0.0);
}
#[test]
fn test_counter_increments() {
let metrics = PrometheusMetrics::new("test").unwrap();
metrics.inc_quotes();
metrics.inc_quotes();
assert_eq!(metrics.get_quotes_total(), 2.0);
metrics.inc_orders_submitted();
assert_eq!(metrics.get_orders_submitted_total(), 1.0);
metrics.inc_orders_filled();
assert_eq!(metrics.get_orders_filled_total(), 1.0);
metrics.inc_orders_cancelled();
assert_eq!(metrics.get_orders_cancelled_total(), 1.0);
metrics.inc_orders_rejected();
assert_eq!(metrics.get_orders_rejected_total(), 1.0);
metrics.inc_partial_fills();
assert_eq!(metrics.get_partial_fills_total(), 1.0);
}
#[test]
fn test_gauge_updates() {
let metrics = PrometheusMetrics::new("test").unwrap();
metrics.set_open_orders(5.0);
assert_eq!(metrics.get_open_orders(), 5.0);
metrics.set_position(100.5);
assert_eq!(metrics.get_position(), 100.5);
metrics.set_pnl(1000.0, 500.0);
assert_eq!(metrics.get_pnl_realized(), 1000.0);
assert_eq!(metrics.get_pnl_unrealized(), 500.0);
assert_eq!(metrics.get_pnl_total(), 1500.0);
metrics.set_spread(10.5);
assert_eq!(metrics.get_spread(), 10.5);
}
#[test]
fn test_histogram_observations() {
let metrics = PrometheusMetrics::new("test").unwrap();
metrics.observe_order_latency(5.0);
metrics.observe_fill_latency(10.0);
metrics.observe_spread(15.0);
}
#[test]
fn test_encode() {
let metrics = PrometheusMetrics::new("test").unwrap();
metrics.inc_quotes();
metrics.set_position(100.0);
let encoded = metrics.encode().unwrap();
assert!(encoded.contains("test_quotes_quotes_total"));
assert!(encoded.contains("test_position_current"));
}
#[test]
fn test_metrics_server_new() {
let metrics = Arc::new(PrometheusMetrics::new("test").unwrap());
let server = MetricsServer::new(Arc::clone(&metrics), "127.0.0.1:9090");
assert_eq!(server.bind_address(), "127.0.0.1:9090");
}
#[test]
fn test_metrics_bridge() {
let live_metrics = Arc::new(LiveMetrics::new(0));
let prom_metrics = Arc::new(PrometheusMetrics::new("test").unwrap());
let bridge = MetricsBridge::new(Arc::clone(&live_metrics), Arc::clone(&prom_metrics));
live_metrics.record_quote(1);
live_metrics.record_quote(2);
live_metrics.update_position(crate::dec!(50.0));
live_metrics.update_pnl(crate::dec!(100.0), crate::dec!(25.0));
bridge.sync();
assert_eq!(prom_metrics.get_quotes_total(), 2.0);
assert_eq!(prom_metrics.get_position(), 50.0);
assert_eq!(prom_metrics.get_pnl_realized(), 100.0);
assert_eq!(prom_metrics.get_pnl_unrealized(), 25.0);
}
#[test]
fn test_registry_access() {
let metrics = PrometheusMetrics::new("test").unwrap();
let registry = metrics.registry();
let families = registry.gather();
assert!(!families.is_empty());
}
#[tokio::test]
async fn test_handle_request_metrics() {
let metrics = Arc::new(PrometheusMetrics::new("test").unwrap());
metrics.inc_quotes();
let encoded = metrics.encode().unwrap();
assert!(encoded.contains("test_quotes_quotes_total 1"));
}
#[tokio::test]
async fn test_metrics_server_spawn() {
let metrics = Arc::new(PrometheusMetrics::new("test").unwrap());
let server = MetricsServer::new(Arc::clone(&metrics), "127.0.0.1:0");
assert_eq!(server.bind_address(), "127.0.0.1:0");
}
}