tesser_cli/
telemetry.rs

1use std::convert::Infallible;
2use std::fs::{self, OpenOptions};
3use std::net::SocketAddr;
4use std::path::Path;
5use std::sync::OnceLock;
6
7use anyhow::{Context, Result};
8use hyper::body::Body;
9use hyper::service::{make_service_fn, service_fn};
10use hyper::{Request, Response, StatusCode};
11use prometheus::{Encoder, Gauge, GaugeVec, IntCounter, IntCounterVec, Registry, TextEncoder};
12use tracing::{error, info};
13use tracing_appender::non_blocking::WorkerGuard;
14use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
15
16static FILE_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
17
18/// Install the global tracing subscriber with optional JSON file logging.
19pub fn init_tracing(filter: &str, log_path: Option<&Path>) -> Result<()> {
20    if let Some(path) = log_path {
21        let stdout_layer = fmt::layer()
22            .with_target(false)
23            .with_filter(EnvFilter::new(filter));
24        if let Some(dir) = path.parent() {
25            fs::create_dir_all(dir)
26                .with_context(|| format!("failed to create log directory {dir:?}"))?;
27        }
28        let file = OpenOptions::new()
29            .create(true)
30            .append(true)
31            .open(path)
32            .with_context(|| format!("failed to open log file {}", path.display()))?;
33        let (writer, guard) = tracing_appender::non_blocking(file);
34        let _ = FILE_GUARD.set(guard);
35        let file_layer = fmt::layer()
36            .json()
37            .with_ansi(false)
38            .with_target(true)
39            .with_writer(writer)
40            .with_filter(EnvFilter::new(filter));
41        tracing_subscriber::registry()
42            .with(stdout_layer)
43            .with(file_layer)
44            .try_init()?;
45    } else {
46        let stdout_layer = fmt::layer()
47            .with_target(false)
48            .with_filter(EnvFilter::new(filter));
49        tracing_subscriber::registry()
50            .with(stdout_layer)
51            .try_init()?;
52    }
53
54    Ok(())
55}
56
57/// Prometheus metrics collected during live trading.
58pub struct LiveMetrics {
59    registry: Registry,
60    ticks_total: IntCounter,
61    candles_total: IntCounter,
62    signals_total: IntCounter,
63    orders_total: IntCounter,
64    order_failures: IntCounter,
65    panic_closes: IntCounter,
66    router_failures: IntCounterVec,
67    equity_gauge: Gauge,
68    price_gauge: GaugeVec,
69    data_gap_gauge: Gauge,
70    reconciliation_position_diff: GaugeVec,
71    reconciliation_balance_diff: GaugeVec,
72    connection_status: GaugeVec,
73    last_data_timestamp: Gauge,
74    checksum_mismatches: IntCounterVec,
75}
76
77impl LiveMetrics {
78    pub fn new() -> Self {
79        let registry = Registry::new();
80        let ticks_total = IntCounter::new("ticks_total", "Number of ticks processed").unwrap();
81        let candles_total =
82            IntCounter::new("candles_total", "Number of candles processed").unwrap();
83        let signals_total =
84            IntCounter::new("signals_total", "Signals emitted by strategies").unwrap();
85        let orders_total =
86            IntCounter::new("orders_total", "Orders submitted to execution").unwrap();
87        let order_failures = IntCounter::new("order_failures_total", "Execution failures").unwrap();
88        let panic_closes =
89            IntCounter::new("tesser_panic_closes_total", "Execution group panic closes").unwrap();
90        let equity_gauge = Gauge::new("portfolio_equity", "Current portfolio equity").unwrap();
91        let price_gauge = GaugeVec::new(
92            prometheus::Opts::new("symbol_price", "Latest observed price per symbol"),
93            &["symbol"],
94        )
95        .unwrap();
96        let data_gap_gauge = Gauge::new(
97            "market_data_gap_seconds",
98            "Seconds since last market data heartbeat",
99        )
100        .unwrap();
101        let reconciliation_position_diff = GaugeVec::new(
102            prometheus::Opts::new(
103                "tesser_reconciliation_position_diff",
104                "Absolute quantity difference between local and remote positions",
105            ),
106            &["symbol"],
107        )
108        .unwrap();
109        let reconciliation_balance_diff = GaugeVec::new(
110            prometheus::Opts::new(
111                "tesser_reconciliation_balance_diff",
112                "Absolute balance difference between local and remote accounts",
113            ),
114            &["currency"],
115        )
116        .unwrap();
117        let connection_status = GaugeVec::new(
118            prometheus::Opts::new(
119                "tesser_exchange_connection_status",
120                "Status of exchange websocket connections (1=connected, 0=disconnected)",
121            ),
122            &["stream"],
123        )
124        .unwrap();
125        let last_data_timestamp = Gauge::new(
126            "tesser_market_data_last_received_timestamp_seconds",
127            "Unix timestamp of the last received market data event",
128        )
129        .unwrap();
130        let checksum_mismatches = IntCounterVec::new(
131            prometheus::Opts::new(
132                "tesser_order_book_checksum_mismatches_total",
133                "Count of order book checksum mismatches detected",
134            ),
135            &["driver", "symbol"],
136        )
137        .unwrap();
138        let router_failures = IntCounterVec::new(
139            prometheus::Opts::new(
140                "tesser_router_failures_total",
141                "Router-level execution failures grouped by reason",
142            ),
143            &["reason"],
144        )
145        .unwrap();
146
147        registry.register(Box::new(ticks_total.clone())).unwrap();
148        registry.register(Box::new(candles_total.clone())).unwrap();
149        registry.register(Box::new(signals_total.clone())).unwrap();
150        registry.register(Box::new(orders_total.clone())).unwrap();
151        registry.register(Box::new(order_failures.clone())).unwrap();
152        registry.register(Box::new(panic_closes.clone())).unwrap();
153        registry.register(Box::new(equity_gauge.clone())).unwrap();
154        registry.register(Box::new(price_gauge.clone())).unwrap();
155        registry.register(Box::new(data_gap_gauge.clone())).unwrap();
156        registry
157            .register(Box::new(reconciliation_position_diff.clone()))
158            .unwrap();
159        registry
160            .register(Box::new(reconciliation_balance_diff.clone()))
161            .unwrap();
162        registry
163            .register(Box::new(connection_status.clone()))
164            .unwrap();
165        registry
166            .register(Box::new(last_data_timestamp.clone()))
167            .unwrap();
168        registry
169            .register(Box::new(checksum_mismatches.clone()))
170            .unwrap();
171        registry
172            .register(Box::new(router_failures.clone()))
173            .unwrap();
174
175        Self {
176            registry,
177            ticks_total,
178            candles_total,
179            signals_total,
180            orders_total,
181            order_failures,
182            equity_gauge,
183            price_gauge,
184            data_gap_gauge,
185            reconciliation_position_diff,
186            reconciliation_balance_diff,
187            connection_status,
188            last_data_timestamp,
189            checksum_mismatches,
190            panic_closes,
191            router_failures,
192        }
193    }
194
195    pub fn registry(&self) -> Registry {
196        self.registry.clone()
197    }
198
199    pub fn inc_tick(&self) {
200        self.ticks_total.inc();
201    }
202
203    pub fn inc_candle(&self) {
204        self.candles_total.inc();
205    }
206
207    pub fn inc_signals(&self, count: usize) {
208        self.signals_total.inc_by(count as u64);
209    }
210
211    pub fn inc_order(&self) {
212        self.orders_total.inc();
213    }
214
215    pub fn inc_order_failure(&self) {
216        self.order_failures.inc();
217    }
218
219    pub fn inc_panic_close(&self) {
220        self.panic_closes.inc();
221    }
222
223    pub fn inc_router_failure(&self, reason: &str) {
224        self.router_failures.with_label_values(&[reason]).inc();
225    }
226
227    pub fn update_equity(&self, equity: f64) {
228        self.equity_gauge.set(equity);
229    }
230
231    pub fn update_price(&self, symbol: &str, price: f64) {
232        self.price_gauge.with_label_values(&[symbol]).set(price);
233    }
234
235    pub fn update_staleness(&self, seconds: f64) {
236        self.data_gap_gauge.set(seconds);
237    }
238
239    pub fn update_position_diff(&self, symbol: &str, diff: f64) {
240        self.reconciliation_position_diff
241            .with_label_values(&[symbol])
242            .set(diff);
243    }
244
245    pub fn inc_checksum_mismatch(&self, driver: &str, symbol: &str) {
246        self.checksum_mismatches
247            .with_label_values(&[driver, symbol])
248            .inc();
249    }
250
251    pub fn update_balance_diff(&self, currency: &str, diff: f64) {
252        self.reconciliation_balance_diff
253            .with_label_values(&[currency])
254            .set(diff);
255    }
256
257    pub fn update_connection_status(&self, stream: &str, connected: bool) {
258        let value = if connected { 1.0 } else { 0.0 };
259        self.connection_status
260            .with_label_values(&[stream])
261            .set(value);
262    }
263
264    pub fn update_last_data_timestamp(&self, timestamp_secs: f64) {
265        self.last_data_timestamp.set(timestamp_secs);
266    }
267}
268
269impl Default for LiveMetrics {
270    fn default() -> Self {
271        Self::new()
272    }
273}
274
275/// Launch a lightweight HTTP server that exposes Prometheus metrics.
276pub fn spawn_metrics_server(registry: Registry, addr: SocketAddr) -> tokio::task::JoinHandle<()> {
277    tokio::spawn(async move {
278        let make_svc = make_service_fn(move |_| {
279            let registry = registry.clone();
280            async move {
281                Ok::<_, Infallible>(service_fn(move |_req: Request<Body>| {
282                    let registry = registry.clone();
283                    async move {
284                        let encoder = TextEncoder::new();
285                        let metric_families = registry.gather();
286                        let mut buffer = Vec::new();
287                        if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
288                            error!(error = %err, "failed to encode Prometheus metrics");
289                            return Ok::<_, Infallible>(
290                                Response::builder()
291                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
292                                    .body(Body::from("failed to encode metrics"))
293                                    .unwrap(),
294                            );
295                        }
296                        Ok::<_, Infallible>(
297                            Response::builder()
298                                .status(StatusCode::OK)
299                                .header("Content-Type", encoder.format_type())
300                                .body(Body::from(buffer))
301                                .unwrap(),
302                        )
303                    }
304                }))
305            }
306        });
307
308        if let Err(err) = hyper::Server::bind(&addr).serve(make_svc).await {
309            error!(error = %err, %addr, "metrics server terminated");
310        } else {
311            info!(%addr, "metrics server shutdown");
312        }
313    })
314}