1use std::convert::Infallible;
2use std::fs::{self, OpenOptions};
3use std::net::SocketAddr;
4use std::path::Path;
5use std::sync::OnceLock;
6
7use anyhow::{Context, Result};
8use hyper::body::Body;
9use hyper::service::{make_service_fn, service_fn};
10use hyper::{Request, Response, StatusCode};
11use prometheus::{Encoder, Gauge, GaugeVec, IntCounter, IntCounterVec, Registry, TextEncoder};
12use tracing::{error, info};
13use tracing_appender::non_blocking::WorkerGuard;
14use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
15
16static FILE_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
17
18pub fn init_tracing(filter: &str, log_path: Option<&Path>) -> Result<()> {
20 if let Some(path) = log_path {
21 let stdout_layer = fmt::layer()
22 .with_target(false)
23 .with_filter(EnvFilter::new(filter));
24 if let Some(dir) = path.parent() {
25 fs::create_dir_all(dir)
26 .with_context(|| format!("failed to create log directory {dir:?}"))?;
27 }
28 let file = OpenOptions::new()
29 .create(true)
30 .append(true)
31 .open(path)
32 .with_context(|| format!("failed to open log file {}", path.display()))?;
33 let (writer, guard) = tracing_appender::non_blocking(file);
34 let _ = FILE_GUARD.set(guard);
35 let file_layer = fmt::layer()
36 .json()
37 .with_ansi(false)
38 .with_target(true)
39 .with_writer(writer)
40 .with_filter(EnvFilter::new(filter));
41 tracing_subscriber::registry()
42 .with(stdout_layer)
43 .with(file_layer)
44 .try_init()?;
45 } else {
46 let stdout_layer = fmt::layer()
47 .with_target(false)
48 .with_filter(EnvFilter::new(filter));
49 tracing_subscriber::registry()
50 .with(stdout_layer)
51 .try_init()?;
52 }
53
54 Ok(())
55}
56
57pub struct LiveMetrics {
59 registry: Registry,
60 ticks_total: IntCounter,
61 candles_total: IntCounter,
62 signals_total: IntCounter,
63 orders_total: IntCounter,
64 order_failures: IntCounter,
65 equity_gauge: Gauge,
66 price_gauge: GaugeVec,
67 data_gap_gauge: Gauge,
68 reconciliation_position_diff: GaugeVec,
69 reconciliation_balance_diff: GaugeVec,
70 connection_status: GaugeVec,
71 last_data_timestamp: Gauge,
72 checksum_mismatches: IntCounterVec,
73}
74
75impl LiveMetrics {
76 pub fn new() -> Self {
77 let registry = Registry::new();
78 let ticks_total = IntCounter::new("ticks_total", "Number of ticks processed").unwrap();
79 let candles_total =
80 IntCounter::new("candles_total", "Number of candles processed").unwrap();
81 let signals_total =
82 IntCounter::new("signals_total", "Signals emitted by strategies").unwrap();
83 let orders_total =
84 IntCounter::new("orders_total", "Orders submitted to execution").unwrap();
85 let order_failures = IntCounter::new("order_failures_total", "Execution failures").unwrap();
86 let equity_gauge = Gauge::new("portfolio_equity", "Current portfolio equity").unwrap();
87 let price_gauge = GaugeVec::new(
88 prometheus::Opts::new("symbol_price", "Latest observed price per symbol"),
89 &["symbol"],
90 )
91 .unwrap();
92 let data_gap_gauge = Gauge::new(
93 "market_data_gap_seconds",
94 "Seconds since last market data heartbeat",
95 )
96 .unwrap();
97 let reconciliation_position_diff = GaugeVec::new(
98 prometheus::Opts::new(
99 "tesser_reconciliation_position_diff",
100 "Absolute quantity difference between local and remote positions",
101 ),
102 &["symbol"],
103 )
104 .unwrap();
105 let reconciliation_balance_diff = GaugeVec::new(
106 prometheus::Opts::new(
107 "tesser_reconciliation_balance_diff",
108 "Absolute balance difference between local and remote accounts",
109 ),
110 &["currency"],
111 )
112 .unwrap();
113 let connection_status = GaugeVec::new(
114 prometheus::Opts::new(
115 "tesser_exchange_connection_status",
116 "Status of exchange websocket connections (1=connected, 0=disconnected)",
117 ),
118 &["stream"],
119 )
120 .unwrap();
121 let last_data_timestamp = Gauge::new(
122 "tesser_market_data_last_received_timestamp_seconds",
123 "Unix timestamp of the last received market data event",
124 )
125 .unwrap();
126 let checksum_mismatches = IntCounterVec::new(
127 prometheus::Opts::new(
128 "tesser_order_book_checksum_mismatches_total",
129 "Count of order book checksum mismatches detected",
130 ),
131 &["driver", "symbol"],
132 )
133 .unwrap();
134
135 registry.register(Box::new(ticks_total.clone())).unwrap();
136 registry.register(Box::new(candles_total.clone())).unwrap();
137 registry.register(Box::new(signals_total.clone())).unwrap();
138 registry.register(Box::new(orders_total.clone())).unwrap();
139 registry.register(Box::new(order_failures.clone())).unwrap();
140 registry.register(Box::new(equity_gauge.clone())).unwrap();
141 registry.register(Box::new(price_gauge.clone())).unwrap();
142 registry.register(Box::new(data_gap_gauge.clone())).unwrap();
143 registry
144 .register(Box::new(reconciliation_position_diff.clone()))
145 .unwrap();
146 registry
147 .register(Box::new(reconciliation_balance_diff.clone()))
148 .unwrap();
149 registry
150 .register(Box::new(connection_status.clone()))
151 .unwrap();
152 registry
153 .register(Box::new(last_data_timestamp.clone()))
154 .unwrap();
155 registry
156 .register(Box::new(checksum_mismatches.clone()))
157 .unwrap();
158
159 Self {
160 registry,
161 ticks_total,
162 candles_total,
163 signals_total,
164 orders_total,
165 order_failures,
166 equity_gauge,
167 price_gauge,
168 data_gap_gauge,
169 reconciliation_position_diff,
170 reconciliation_balance_diff,
171 connection_status,
172 last_data_timestamp,
173 checksum_mismatches,
174 }
175 }
176
177 pub fn registry(&self) -> Registry {
178 self.registry.clone()
179 }
180
181 pub fn inc_tick(&self) {
182 self.ticks_total.inc();
183 }
184
185 pub fn inc_candle(&self) {
186 self.candles_total.inc();
187 }
188
189 pub fn inc_signals(&self, count: usize) {
190 self.signals_total.inc_by(count as u64);
191 }
192
193 pub fn inc_order(&self) {
194 self.orders_total.inc();
195 }
196
197 pub fn inc_order_failure(&self) {
198 self.order_failures.inc();
199 }
200
201 pub fn update_equity(&self, equity: f64) {
202 self.equity_gauge.set(equity);
203 }
204
205 pub fn update_price(&self, symbol: &str, price: f64) {
206 self.price_gauge.with_label_values(&[symbol]).set(price);
207 }
208
209 pub fn update_staleness(&self, seconds: f64) {
210 self.data_gap_gauge.set(seconds);
211 }
212
213 pub fn update_position_diff(&self, symbol: &str, diff: f64) {
214 self.reconciliation_position_diff
215 .with_label_values(&[symbol])
216 .set(diff);
217 }
218
219 pub fn inc_checksum_mismatch(&self, driver: &str, symbol: &str) {
220 self.checksum_mismatches
221 .with_label_values(&[driver, symbol])
222 .inc();
223 }
224
225 pub fn update_balance_diff(&self, currency: &str, diff: f64) {
226 self.reconciliation_balance_diff
227 .with_label_values(&[currency])
228 .set(diff);
229 }
230
231 pub fn update_connection_status(&self, stream: &str, connected: bool) {
232 let value = if connected { 1.0 } else { 0.0 };
233 self.connection_status
234 .with_label_values(&[stream])
235 .set(value);
236 }
237
238 pub fn update_last_data_timestamp(&self, timestamp_secs: f64) {
239 self.last_data_timestamp.set(timestamp_secs);
240 }
241}
242
243impl Default for LiveMetrics {
244 fn default() -> Self {
245 Self::new()
246 }
247}
248
249pub fn spawn_metrics_server(registry: Registry, addr: SocketAddr) -> tokio::task::JoinHandle<()> {
251 tokio::spawn(async move {
252 let make_svc = make_service_fn(move |_| {
253 let registry = registry.clone();
254 async move {
255 Ok::<_, Infallible>(service_fn(move |_req: Request<Body>| {
256 let registry = registry.clone();
257 async move {
258 let encoder = TextEncoder::new();
259 let metric_families = registry.gather();
260 let mut buffer = Vec::new();
261 if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
262 error!(error = %err, "failed to encode Prometheus metrics");
263 return Ok::<_, Infallible>(
264 Response::builder()
265 .status(StatusCode::INTERNAL_SERVER_ERROR)
266 .body(Body::from("failed to encode metrics"))
267 .unwrap(),
268 );
269 }
270 Ok::<_, Infallible>(
271 Response::builder()
272 .status(StatusCode::OK)
273 .header("Content-Type", encoder.format_type())
274 .body(Body::from(buffer))
275 .unwrap(),
276 )
277 }
278 }))
279 }
280 });
281
282 if let Err(err) = hyper::Server::bind(&addr).serve(make_svc).await {
283 error!(error = %err, %addr, "metrics server terminated");
284 } else {
285 info!(%addr, "metrics server shutdown");
286 }
287 })
288}