1use std::convert::Infallible;
2use std::fs::{self, OpenOptions};
3use std::net::SocketAddr;
4use std::path::Path;
5use std::sync::OnceLock;
6
7use anyhow::{Context, Result};
8use hyper::body::Body;
9use hyper::service::{make_service_fn, service_fn};
10use hyper::{Request, Response, StatusCode};
11use prometheus::{Encoder, Gauge, GaugeVec, IntCounter, IntCounterVec, Registry, TextEncoder};
12use tracing::{error, info};
13use tracing_appender::non_blocking::WorkerGuard;
14use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
15
16static FILE_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
17
18pub fn init_tracing(filter: &str, log_path: Option<&Path>) -> Result<()> {
20 if let Some(path) = log_path {
21 let stdout_layer = fmt::layer()
22 .with_target(false)
23 .with_filter(EnvFilter::new(filter));
24 if let Some(dir) = path.parent() {
25 fs::create_dir_all(dir)
26 .with_context(|| format!("failed to create log directory {dir:?}"))?;
27 }
28 let file = OpenOptions::new()
29 .create(true)
30 .append(true)
31 .open(path)
32 .with_context(|| format!("failed to open log file {}", path.display()))?;
33 let (writer, guard) = tracing_appender::non_blocking(file);
34 let _ = FILE_GUARD.set(guard);
35 let file_layer = fmt::layer()
36 .json()
37 .with_ansi(false)
38 .with_target(true)
39 .with_writer(writer)
40 .with_filter(EnvFilter::new(filter));
41 tracing_subscriber::registry()
42 .with(stdout_layer)
43 .with(file_layer)
44 .try_init()?;
45 } else {
46 let stdout_layer = fmt::layer()
47 .with_target(false)
48 .with_filter(EnvFilter::new(filter));
49 tracing_subscriber::registry()
50 .with(stdout_layer)
51 .try_init()?;
52 }
53
54 Ok(())
55}
56
57pub struct LiveMetrics {
59 registry: Registry,
60 ticks_total: IntCounter,
61 candles_total: IntCounter,
62 signals_total: IntCounter,
63 orders_total: IntCounter,
64 order_failures: IntCounter,
65 panic_closes: IntCounter,
66 router_failures: IntCounterVec,
67 equity_gauge: Gauge,
68 price_gauge: GaugeVec,
69 data_gap_gauge: Gauge,
70 reconciliation_position_diff: GaugeVec,
71 reconciliation_balance_diff: GaugeVec,
72 connection_status: GaugeVec,
73 last_data_timestamp: Gauge,
74 checksum_mismatches: IntCounterVec,
75}
76
77impl LiveMetrics {
78 pub fn new() -> Self {
79 let registry = Registry::new();
80 let ticks_total = IntCounter::new("ticks_total", "Number of ticks processed").unwrap();
81 let candles_total =
82 IntCounter::new("candles_total", "Number of candles processed").unwrap();
83 let signals_total =
84 IntCounter::new("signals_total", "Signals emitted by strategies").unwrap();
85 let orders_total =
86 IntCounter::new("orders_total", "Orders submitted to execution").unwrap();
87 let order_failures = IntCounter::new("order_failures_total", "Execution failures").unwrap();
88 let panic_closes =
89 IntCounter::new("tesser_panic_closes_total", "Execution group panic closes").unwrap();
90 let equity_gauge = Gauge::new("portfolio_equity", "Current portfolio equity").unwrap();
91 let price_gauge = GaugeVec::new(
92 prometheus::Opts::new("symbol_price", "Latest observed price per symbol"),
93 &["symbol"],
94 )
95 .unwrap();
96 let data_gap_gauge = Gauge::new(
97 "market_data_gap_seconds",
98 "Seconds since last market data heartbeat",
99 )
100 .unwrap();
101 let reconciliation_position_diff = GaugeVec::new(
102 prometheus::Opts::new(
103 "tesser_reconciliation_position_diff",
104 "Absolute quantity difference between local and remote positions",
105 ),
106 &["symbol"],
107 )
108 .unwrap();
109 let reconciliation_balance_diff = GaugeVec::new(
110 prometheus::Opts::new(
111 "tesser_reconciliation_balance_diff",
112 "Absolute balance difference between local and remote accounts",
113 ),
114 &["currency"],
115 )
116 .unwrap();
117 let connection_status = GaugeVec::new(
118 prometheus::Opts::new(
119 "tesser_exchange_connection_status",
120 "Status of exchange websocket connections (1=connected, 0=disconnected)",
121 ),
122 &["stream"],
123 )
124 .unwrap();
125 let last_data_timestamp = Gauge::new(
126 "tesser_market_data_last_received_timestamp_seconds",
127 "Unix timestamp of the last received market data event",
128 )
129 .unwrap();
130 let checksum_mismatches = IntCounterVec::new(
131 prometheus::Opts::new(
132 "tesser_order_book_checksum_mismatches_total",
133 "Count of order book checksum mismatches detected",
134 ),
135 &["driver", "symbol"],
136 )
137 .unwrap();
138 let router_failures = IntCounterVec::new(
139 prometheus::Opts::new(
140 "tesser_router_failures_total",
141 "Router-level execution failures grouped by reason",
142 ),
143 &["reason"],
144 )
145 .unwrap();
146
147 registry.register(Box::new(ticks_total.clone())).unwrap();
148 registry.register(Box::new(candles_total.clone())).unwrap();
149 registry.register(Box::new(signals_total.clone())).unwrap();
150 registry.register(Box::new(orders_total.clone())).unwrap();
151 registry.register(Box::new(order_failures.clone())).unwrap();
152 registry.register(Box::new(panic_closes.clone())).unwrap();
153 registry.register(Box::new(equity_gauge.clone())).unwrap();
154 registry.register(Box::new(price_gauge.clone())).unwrap();
155 registry.register(Box::new(data_gap_gauge.clone())).unwrap();
156 registry
157 .register(Box::new(reconciliation_position_diff.clone()))
158 .unwrap();
159 registry
160 .register(Box::new(reconciliation_balance_diff.clone()))
161 .unwrap();
162 registry
163 .register(Box::new(connection_status.clone()))
164 .unwrap();
165 registry
166 .register(Box::new(last_data_timestamp.clone()))
167 .unwrap();
168 registry
169 .register(Box::new(checksum_mismatches.clone()))
170 .unwrap();
171 registry
172 .register(Box::new(router_failures.clone()))
173 .unwrap();
174
175 Self {
176 registry,
177 ticks_total,
178 candles_total,
179 signals_total,
180 orders_total,
181 order_failures,
182 equity_gauge,
183 price_gauge,
184 data_gap_gauge,
185 reconciliation_position_diff,
186 reconciliation_balance_diff,
187 connection_status,
188 last_data_timestamp,
189 checksum_mismatches,
190 panic_closes,
191 router_failures,
192 }
193 }
194
195 pub fn registry(&self) -> Registry {
196 self.registry.clone()
197 }
198
199 pub fn inc_tick(&self) {
200 self.ticks_total.inc();
201 }
202
203 pub fn inc_candle(&self) {
204 self.candles_total.inc();
205 }
206
207 pub fn inc_signals(&self, count: usize) {
208 self.signals_total.inc_by(count as u64);
209 }
210
211 pub fn inc_order(&self) {
212 self.orders_total.inc();
213 }
214
215 pub fn inc_order_failure(&self) {
216 self.order_failures.inc();
217 }
218
219 pub fn inc_panic_close(&self) {
220 self.panic_closes.inc();
221 }
222
223 pub fn inc_router_failure(&self, reason: &str) {
224 self.router_failures.with_label_values(&[reason]).inc();
225 }
226
227 pub fn update_equity(&self, equity: f64) {
228 self.equity_gauge.set(equity);
229 }
230
231 pub fn update_price(&self, symbol: &str, price: f64) {
232 self.price_gauge.with_label_values(&[symbol]).set(price);
233 }
234
235 pub fn update_staleness(&self, seconds: f64) {
236 self.data_gap_gauge.set(seconds);
237 }
238
239 pub fn update_position_diff(&self, symbol: &str, diff: f64) {
240 self.reconciliation_position_diff
241 .with_label_values(&[symbol])
242 .set(diff);
243 }
244
245 pub fn inc_checksum_mismatch(&self, driver: &str, symbol: &str) {
246 self.checksum_mismatches
247 .with_label_values(&[driver, symbol])
248 .inc();
249 }
250
251 pub fn update_balance_diff(&self, currency: &str, diff: f64) {
252 self.reconciliation_balance_diff
253 .with_label_values(&[currency])
254 .set(diff);
255 }
256
257 pub fn update_connection_status(&self, stream: &str, connected: bool) {
258 let value = if connected { 1.0 } else { 0.0 };
259 self.connection_status
260 .with_label_values(&[stream])
261 .set(value);
262 }
263
264 pub fn update_last_data_timestamp(&self, timestamp_secs: f64) {
265 self.last_data_timestamp.set(timestamp_secs);
266 }
267}
268
269impl Default for LiveMetrics {
270 fn default() -> Self {
271 Self::new()
272 }
273}
274
275pub fn spawn_metrics_server(registry: Registry, addr: SocketAddr) -> tokio::task::JoinHandle<()> {
277 tokio::spawn(async move {
278 let make_svc = make_service_fn(move |_| {
279 let registry = registry.clone();
280 async move {
281 Ok::<_, Infallible>(service_fn(move |_req: Request<Body>| {
282 let registry = registry.clone();
283 async move {
284 let encoder = TextEncoder::new();
285 let metric_families = registry.gather();
286 let mut buffer = Vec::new();
287 if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
288 error!(error = %err, "failed to encode Prometheus metrics");
289 return Ok::<_, Infallible>(
290 Response::builder()
291 .status(StatusCode::INTERNAL_SERVER_ERROR)
292 .body(Body::from("failed to encode metrics"))
293 .unwrap(),
294 );
295 }
296 Ok::<_, Infallible>(
297 Response::builder()
298 .status(StatusCode::OK)
299 .header("Content-Type", encoder.format_type())
300 .body(Body::from(buffer))
301 .unwrap(),
302 )
303 }
304 }))
305 }
306 });
307
308 if let Err(err) = hyper::Server::bind(&addr).serve(make_svc).await {
309 error!(error = %err, %addr, "metrics server terminated");
310 } else {
311 info!(%addr, "metrics server shutdown");
312 }
313 })
314}