1use std::convert::Infallible;
2use std::fs::{self, OpenOptions};
3use std::net::SocketAddr;
4use std::path::Path;
5use std::sync::OnceLock;
6
7use anyhow::{Context, Result};
8use hyper::body::Body;
9use hyper::service::{make_service_fn, service_fn};
10use hyper::{Request, Response, StatusCode};
11use prometheus::{Encoder, Gauge, GaugeVec, IntCounter, Registry, TextEncoder};
12use tracing::{error, info};
13use tracing_appender::non_blocking::WorkerGuard;
14use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
15
16static FILE_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
17
18pub fn init_tracing(filter: &str, log_path: Option<&Path>) -> Result<()> {
20 if let Some(path) = log_path {
21 let stdout_layer = fmt::layer()
22 .with_target(false)
23 .with_filter(EnvFilter::new(filter));
24 if let Some(dir) = path.parent() {
25 fs::create_dir_all(dir)
26 .with_context(|| format!("failed to create log directory {dir:?}"))?;
27 }
28 let file = OpenOptions::new()
29 .create(true)
30 .append(true)
31 .open(path)
32 .with_context(|| format!("failed to open log file {}", path.display()))?;
33 let (writer, guard) = tracing_appender::non_blocking(file);
34 let _ = FILE_GUARD.set(guard);
35 let file_layer = fmt::layer()
36 .json()
37 .with_ansi(false)
38 .with_target(true)
39 .with_writer(writer)
40 .with_filter(EnvFilter::new(filter));
41 tracing_subscriber::registry()
42 .with(stdout_layer)
43 .with(file_layer)
44 .try_init()?;
45 } else {
46 let stdout_layer = fmt::layer()
47 .with_target(false)
48 .with_filter(EnvFilter::new(filter));
49 tracing_subscriber::registry()
50 .with(stdout_layer)
51 .try_init()?;
52 }
53
54 Ok(())
55}
56
57pub struct LiveMetrics {
59 registry: Registry,
60 ticks_total: IntCounter,
61 candles_total: IntCounter,
62 signals_total: IntCounter,
63 orders_total: IntCounter,
64 order_failures: IntCounter,
65 equity_gauge: Gauge,
66 price_gauge: GaugeVec,
67 data_gap_gauge: Gauge,
68 reconciliation_position_diff: GaugeVec,
69 reconciliation_balance_diff: GaugeVec,
70 connection_status: GaugeVec,
71 last_data_timestamp: Gauge,
72}
73
74impl LiveMetrics {
75 pub fn new() -> Self {
76 let registry = Registry::new();
77 let ticks_total = IntCounter::new("ticks_total", "Number of ticks processed").unwrap();
78 let candles_total =
79 IntCounter::new("candles_total", "Number of candles processed").unwrap();
80 let signals_total =
81 IntCounter::new("signals_total", "Signals emitted by strategies").unwrap();
82 let orders_total =
83 IntCounter::new("orders_total", "Orders submitted to execution").unwrap();
84 let order_failures = IntCounter::new("order_failures_total", "Execution failures").unwrap();
85 let equity_gauge = Gauge::new("portfolio_equity", "Current portfolio equity").unwrap();
86 let price_gauge = GaugeVec::new(
87 prometheus::Opts::new("symbol_price", "Latest observed price per symbol"),
88 &["symbol"],
89 )
90 .unwrap();
91 let data_gap_gauge = Gauge::new(
92 "market_data_gap_seconds",
93 "Seconds since last market data heartbeat",
94 )
95 .unwrap();
96 let reconciliation_position_diff = GaugeVec::new(
97 prometheus::Opts::new(
98 "tesser_reconciliation_position_diff",
99 "Absolute quantity difference between local and remote positions",
100 ),
101 &["symbol"],
102 )
103 .unwrap();
104 let reconciliation_balance_diff = GaugeVec::new(
105 prometheus::Opts::new(
106 "tesser_reconciliation_balance_diff",
107 "Absolute balance difference between local and remote accounts",
108 ),
109 &["currency"],
110 )
111 .unwrap();
112 let connection_status = GaugeVec::new(
113 prometheus::Opts::new(
114 "tesser_exchange_connection_status",
115 "Status of exchange websocket connections (1=connected, 0=disconnected)",
116 ),
117 &["stream"],
118 )
119 .unwrap();
120 let last_data_timestamp = Gauge::new(
121 "tesser_market_data_last_received_timestamp_seconds",
122 "Unix timestamp of the last received market data event",
123 )
124 .unwrap();
125
126 registry.register(Box::new(ticks_total.clone())).unwrap();
127 registry.register(Box::new(candles_total.clone())).unwrap();
128 registry.register(Box::new(signals_total.clone())).unwrap();
129 registry.register(Box::new(orders_total.clone())).unwrap();
130 registry.register(Box::new(order_failures.clone())).unwrap();
131 registry.register(Box::new(equity_gauge.clone())).unwrap();
132 registry.register(Box::new(price_gauge.clone())).unwrap();
133 registry.register(Box::new(data_gap_gauge.clone())).unwrap();
134 registry
135 .register(Box::new(reconciliation_position_diff.clone()))
136 .unwrap();
137 registry
138 .register(Box::new(reconciliation_balance_diff.clone()))
139 .unwrap();
140 registry
141 .register(Box::new(connection_status.clone()))
142 .unwrap();
143 registry
144 .register(Box::new(last_data_timestamp.clone()))
145 .unwrap();
146
147 Self {
148 registry,
149 ticks_total,
150 candles_total,
151 signals_total,
152 orders_total,
153 order_failures,
154 equity_gauge,
155 price_gauge,
156 data_gap_gauge,
157 reconciliation_position_diff,
158 reconciliation_balance_diff,
159 connection_status,
160 last_data_timestamp,
161 }
162 }
163
164 pub fn registry(&self) -> Registry {
165 self.registry.clone()
166 }
167
168 pub fn inc_tick(&self) {
169 self.ticks_total.inc();
170 }
171
172 pub fn inc_candle(&self) {
173 self.candles_total.inc();
174 }
175
176 pub fn inc_signals(&self, count: usize) {
177 self.signals_total.inc_by(count as u64);
178 }
179
180 pub fn inc_order(&self) {
181 self.orders_total.inc();
182 }
183
184 pub fn inc_order_failure(&self) {
185 self.order_failures.inc();
186 }
187
188 pub fn update_equity(&self, equity: f64) {
189 self.equity_gauge.set(equity);
190 }
191
192 pub fn update_price(&self, symbol: &str, price: f64) {
193 self.price_gauge.with_label_values(&[symbol]).set(price);
194 }
195
196 pub fn update_staleness(&self, seconds: f64) {
197 self.data_gap_gauge.set(seconds);
198 }
199
200 pub fn update_position_diff(&self, symbol: &str, diff: f64) {
201 self.reconciliation_position_diff
202 .with_label_values(&[symbol])
203 .set(diff);
204 }
205
206 pub fn update_balance_diff(&self, currency: &str, diff: f64) {
207 self.reconciliation_balance_diff
208 .with_label_values(&[currency])
209 .set(diff);
210 }
211
212 pub fn update_connection_status(&self, stream: &str, connected: bool) {
213 let value = if connected { 1.0 } else { 0.0 };
214 self.connection_status
215 .with_label_values(&[stream])
216 .set(value);
217 }
218
219 pub fn update_last_data_timestamp(&self, timestamp_secs: f64) {
220 self.last_data_timestamp.set(timestamp_secs);
221 }
222}
223
224impl Default for LiveMetrics {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230pub fn spawn_metrics_server(registry: Registry, addr: SocketAddr) -> tokio::task::JoinHandle<()> {
232 tokio::spawn(async move {
233 let make_svc = make_service_fn(move |_| {
234 let registry = registry.clone();
235 async move {
236 Ok::<_, Infallible>(service_fn(move |_req: Request<Body>| {
237 let registry = registry.clone();
238 async move {
239 let encoder = TextEncoder::new();
240 let metric_families = registry.gather();
241 let mut buffer = Vec::new();
242 if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
243 error!(error = %err, "failed to encode Prometheus metrics");
244 return Ok::<_, Infallible>(
245 Response::builder()
246 .status(StatusCode::INTERNAL_SERVER_ERROR)
247 .body(Body::from("failed to encode metrics"))
248 .unwrap(),
249 );
250 }
251 Ok::<_, Infallible>(
252 Response::builder()
253 .status(StatusCode::OK)
254 .header("Content-Type", encoder.format_type())
255 .body(Body::from(buffer))
256 .unwrap(),
257 )
258 }
259 }))
260 }
261 });
262
263 if let Err(err) = hyper::Server::bind(&addr).serve(make_svc).await {
264 error!(error = %err, %addr, "metrics server terminated");
265 } else {
266 info!(%addr, "metrics server shutdown");
267 }
268 })
269}