tesser_cli/
telemetry.rs

1use std::convert::Infallible;
2use std::fs::{self, OpenOptions};
3use std::net::SocketAddr;
4use std::path::Path;
5use std::sync::OnceLock;
6
7use anyhow::{Context, Result};
8use hyper::body::Body;
9use hyper::service::{make_service_fn, service_fn};
10use hyper::{Request, Response, StatusCode};
11use prometheus::{Encoder, Gauge, GaugeVec, IntCounter, IntCounterVec, Registry, TextEncoder};
12use tracing::{error, info};
13use tracing_appender::non_blocking::WorkerGuard;
14use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
15
16static FILE_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
17
18/// Install the global tracing subscriber with optional JSON file logging.
19pub fn init_tracing(filter: &str, log_path: Option<&Path>) -> Result<()> {
20    if let Some(path) = log_path {
21        let stdout_layer = fmt::layer()
22            .with_target(false)
23            .with_filter(EnvFilter::new(filter));
24        if let Some(dir) = path.parent() {
25            fs::create_dir_all(dir)
26                .with_context(|| format!("failed to create log directory {dir:?}"))?;
27        }
28        let file = OpenOptions::new()
29            .create(true)
30            .append(true)
31            .open(path)
32            .with_context(|| format!("failed to open log file {}", path.display()))?;
33        let (writer, guard) = tracing_appender::non_blocking(file);
34        let _ = FILE_GUARD.set(guard);
35        let file_layer = fmt::layer()
36            .json()
37            .with_ansi(false)
38            .with_target(true)
39            .with_writer(writer)
40            .with_filter(EnvFilter::new(filter));
41        tracing_subscriber::registry()
42            .with(stdout_layer)
43            .with(file_layer)
44            .try_init()?;
45    } else {
46        let stdout_layer = fmt::layer()
47            .with_target(false)
48            .with_filter(EnvFilter::new(filter));
49        tracing_subscriber::registry()
50            .with(stdout_layer)
51            .try_init()?;
52    }
53
54    Ok(())
55}
56
57/// Prometheus metrics collected during live trading.
58pub struct LiveMetrics {
59    registry: Registry,
60    ticks_total: IntCounter,
61    candles_total: IntCounter,
62    signals_total: IntCounter,
63    orders_total: IntCounter,
64    order_failures: IntCounter,
65    equity_gauge: Gauge,
66    price_gauge: GaugeVec,
67    data_gap_gauge: Gauge,
68    reconciliation_position_diff: GaugeVec,
69    reconciliation_balance_diff: GaugeVec,
70    connection_status: GaugeVec,
71    last_data_timestamp: Gauge,
72    checksum_mismatches: IntCounterVec,
73}
74
75impl LiveMetrics {
76    pub fn new() -> Self {
77        let registry = Registry::new();
78        let ticks_total = IntCounter::new("ticks_total", "Number of ticks processed").unwrap();
79        let candles_total =
80            IntCounter::new("candles_total", "Number of candles processed").unwrap();
81        let signals_total =
82            IntCounter::new("signals_total", "Signals emitted by strategies").unwrap();
83        let orders_total =
84            IntCounter::new("orders_total", "Orders submitted to execution").unwrap();
85        let order_failures = IntCounter::new("order_failures_total", "Execution failures").unwrap();
86        let equity_gauge = Gauge::new("portfolio_equity", "Current portfolio equity").unwrap();
87        let price_gauge = GaugeVec::new(
88            prometheus::Opts::new("symbol_price", "Latest observed price per symbol"),
89            &["symbol"],
90        )
91        .unwrap();
92        let data_gap_gauge = Gauge::new(
93            "market_data_gap_seconds",
94            "Seconds since last market data heartbeat",
95        )
96        .unwrap();
97        let reconciliation_position_diff = GaugeVec::new(
98            prometheus::Opts::new(
99                "tesser_reconciliation_position_diff",
100                "Absolute quantity difference between local and remote positions",
101            ),
102            &["symbol"],
103        )
104        .unwrap();
105        let reconciliation_balance_diff = GaugeVec::new(
106            prometheus::Opts::new(
107                "tesser_reconciliation_balance_diff",
108                "Absolute balance difference between local and remote accounts",
109            ),
110            &["currency"],
111        )
112        .unwrap();
113        let connection_status = GaugeVec::new(
114            prometheus::Opts::new(
115                "tesser_exchange_connection_status",
116                "Status of exchange websocket connections (1=connected, 0=disconnected)",
117            ),
118            &["stream"],
119        )
120        .unwrap();
121        let last_data_timestamp = Gauge::new(
122            "tesser_market_data_last_received_timestamp_seconds",
123            "Unix timestamp of the last received market data event",
124        )
125        .unwrap();
126        let checksum_mismatches = IntCounterVec::new(
127            prometheus::Opts::new(
128                "tesser_order_book_checksum_mismatches_total",
129                "Count of order book checksum mismatches detected",
130            ),
131            &["driver", "symbol"],
132        )
133        .unwrap();
134
135        registry.register(Box::new(ticks_total.clone())).unwrap();
136        registry.register(Box::new(candles_total.clone())).unwrap();
137        registry.register(Box::new(signals_total.clone())).unwrap();
138        registry.register(Box::new(orders_total.clone())).unwrap();
139        registry.register(Box::new(order_failures.clone())).unwrap();
140        registry.register(Box::new(equity_gauge.clone())).unwrap();
141        registry.register(Box::new(price_gauge.clone())).unwrap();
142        registry.register(Box::new(data_gap_gauge.clone())).unwrap();
143        registry
144            .register(Box::new(reconciliation_position_diff.clone()))
145            .unwrap();
146        registry
147            .register(Box::new(reconciliation_balance_diff.clone()))
148            .unwrap();
149        registry
150            .register(Box::new(connection_status.clone()))
151            .unwrap();
152        registry
153            .register(Box::new(last_data_timestamp.clone()))
154            .unwrap();
155        registry
156            .register(Box::new(checksum_mismatches.clone()))
157            .unwrap();
158
159        Self {
160            registry,
161            ticks_total,
162            candles_total,
163            signals_total,
164            orders_total,
165            order_failures,
166            equity_gauge,
167            price_gauge,
168            data_gap_gauge,
169            reconciliation_position_diff,
170            reconciliation_balance_diff,
171            connection_status,
172            last_data_timestamp,
173            checksum_mismatches,
174        }
175    }
176
177    pub fn registry(&self) -> Registry {
178        self.registry.clone()
179    }
180
181    pub fn inc_tick(&self) {
182        self.ticks_total.inc();
183    }
184
185    pub fn inc_candle(&self) {
186        self.candles_total.inc();
187    }
188
189    pub fn inc_signals(&self, count: usize) {
190        self.signals_total.inc_by(count as u64);
191    }
192
193    pub fn inc_order(&self) {
194        self.orders_total.inc();
195    }
196
197    pub fn inc_order_failure(&self) {
198        self.order_failures.inc();
199    }
200
201    pub fn update_equity(&self, equity: f64) {
202        self.equity_gauge.set(equity);
203    }
204
205    pub fn update_price(&self, symbol: &str, price: f64) {
206        self.price_gauge.with_label_values(&[symbol]).set(price);
207    }
208
209    pub fn update_staleness(&self, seconds: f64) {
210        self.data_gap_gauge.set(seconds);
211    }
212
213    pub fn update_position_diff(&self, symbol: &str, diff: f64) {
214        self.reconciliation_position_diff
215            .with_label_values(&[symbol])
216            .set(diff);
217    }
218
219    pub fn inc_checksum_mismatch(&self, driver: &str, symbol: &str) {
220        self.checksum_mismatches
221            .with_label_values(&[driver, symbol])
222            .inc();
223    }
224
225    pub fn update_balance_diff(&self, currency: &str, diff: f64) {
226        self.reconciliation_balance_diff
227            .with_label_values(&[currency])
228            .set(diff);
229    }
230
231    pub fn update_connection_status(&self, stream: &str, connected: bool) {
232        let value = if connected { 1.0 } else { 0.0 };
233        self.connection_status
234            .with_label_values(&[stream])
235            .set(value);
236    }
237
238    pub fn update_last_data_timestamp(&self, timestamp_secs: f64) {
239        self.last_data_timestamp.set(timestamp_secs);
240    }
241}
242
243impl Default for LiveMetrics {
244    fn default() -> Self {
245        Self::new()
246    }
247}
248
249/// Launch a lightweight HTTP server that exposes Prometheus metrics.
250pub fn spawn_metrics_server(registry: Registry, addr: SocketAddr) -> tokio::task::JoinHandle<()> {
251    tokio::spawn(async move {
252        let make_svc = make_service_fn(move |_| {
253            let registry = registry.clone();
254            async move {
255                Ok::<_, Infallible>(service_fn(move |_req: Request<Body>| {
256                    let registry = registry.clone();
257                    async move {
258                        let encoder = TextEncoder::new();
259                        let metric_families = registry.gather();
260                        let mut buffer = Vec::new();
261                        if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
262                            error!(error = %err, "failed to encode Prometheus metrics");
263                            return Ok::<_, Infallible>(
264                                Response::builder()
265                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
266                                    .body(Body::from("failed to encode metrics"))
267                                    .unwrap(),
268                            );
269                        }
270                        Ok::<_, Infallible>(
271                            Response::builder()
272                                .status(StatusCode::OK)
273                                .header("Content-Type", encoder.format_type())
274                                .body(Body::from(buffer))
275                                .unwrap(),
276                        )
277                    }
278                }))
279            }
280        });
281
282        if let Err(err) = hyper::Server::bind(&addr).serve(make_svc).await {
283            error!(error = %err, %addr, "metrics server terminated");
284        } else {
285            info!(%addr, "metrics server shutdown");
286        }
287    })
288}