tesser_cli/
telemetry.rs

1use std::convert::Infallible;
2use std::fs::{self, OpenOptions};
3use std::net::SocketAddr;
4use std::path::Path;
5use std::sync::OnceLock;
6
7use anyhow::{Context, Result};
8use hyper::body::Body;
9use hyper::service::{make_service_fn, service_fn};
10use hyper::{Request, Response, StatusCode};
11use prometheus::{Encoder, Gauge, GaugeVec, IntCounter, Registry, TextEncoder};
12use tracing::{error, info};
13use tracing_appender::non_blocking::WorkerGuard;
14use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
15
16static FILE_GUARD: OnceLock<WorkerGuard> = OnceLock::new();
17
18/// Install the global tracing subscriber with optional JSON file logging.
19pub fn init_tracing(filter: &str, log_path: Option<&Path>) -> Result<()> {
20    if let Some(path) = log_path {
21        let stdout_layer = fmt::layer()
22            .with_target(false)
23            .with_filter(EnvFilter::new(filter));
24        if let Some(dir) = path.parent() {
25            fs::create_dir_all(dir)
26                .with_context(|| format!("failed to create log directory {dir:?}"))?;
27        }
28        let file = OpenOptions::new()
29            .create(true)
30            .append(true)
31            .open(path)
32            .with_context(|| format!("failed to open log file {}", path.display()))?;
33        let (writer, guard) = tracing_appender::non_blocking(file);
34        let _ = FILE_GUARD.set(guard);
35        let file_layer = fmt::layer()
36            .json()
37            .with_ansi(false)
38            .with_target(true)
39            .with_writer(writer)
40            .with_filter(EnvFilter::new(filter));
41        tracing_subscriber::registry()
42            .with(stdout_layer)
43            .with(file_layer)
44            .try_init()?;
45    } else {
46        let stdout_layer = fmt::layer()
47            .with_target(false)
48            .with_filter(EnvFilter::new(filter));
49        tracing_subscriber::registry()
50            .with(stdout_layer)
51            .try_init()?;
52    }
53
54    Ok(())
55}
56
57/// Prometheus metrics collected during live trading.
58pub struct LiveMetrics {
59    registry: Registry,
60    ticks_total: IntCounter,
61    candles_total: IntCounter,
62    signals_total: IntCounter,
63    orders_total: IntCounter,
64    order_failures: IntCounter,
65    equity_gauge: Gauge,
66    price_gauge: GaugeVec,
67    data_gap_gauge: Gauge,
68    reconciliation_position_diff: GaugeVec,
69    reconciliation_balance_diff: GaugeVec,
70    connection_status: GaugeVec,
71    last_data_timestamp: Gauge,
72}
73
74impl LiveMetrics {
75    pub fn new() -> Self {
76        let registry = Registry::new();
77        let ticks_total = IntCounter::new("ticks_total", "Number of ticks processed").unwrap();
78        let candles_total =
79            IntCounter::new("candles_total", "Number of candles processed").unwrap();
80        let signals_total =
81            IntCounter::new("signals_total", "Signals emitted by strategies").unwrap();
82        let orders_total =
83            IntCounter::new("orders_total", "Orders submitted to execution").unwrap();
84        let order_failures = IntCounter::new("order_failures_total", "Execution failures").unwrap();
85        let equity_gauge = Gauge::new("portfolio_equity", "Current portfolio equity").unwrap();
86        let price_gauge = GaugeVec::new(
87            prometheus::Opts::new("symbol_price", "Latest observed price per symbol"),
88            &["symbol"],
89        )
90        .unwrap();
91        let data_gap_gauge = Gauge::new(
92            "market_data_gap_seconds",
93            "Seconds since last market data heartbeat",
94        )
95        .unwrap();
96        let reconciliation_position_diff = GaugeVec::new(
97            prometheus::Opts::new(
98                "tesser_reconciliation_position_diff",
99                "Absolute quantity difference between local and remote positions",
100            ),
101            &["symbol"],
102        )
103        .unwrap();
104        let reconciliation_balance_diff = GaugeVec::new(
105            prometheus::Opts::new(
106                "tesser_reconciliation_balance_diff",
107                "Absolute balance difference between local and remote accounts",
108            ),
109            &["currency"],
110        )
111        .unwrap();
112        let connection_status = GaugeVec::new(
113            prometheus::Opts::new(
114                "tesser_exchange_connection_status",
115                "Status of exchange websocket connections (1=connected, 0=disconnected)",
116            ),
117            &["stream"],
118        )
119        .unwrap();
120        let last_data_timestamp = Gauge::new(
121            "tesser_market_data_last_received_timestamp_seconds",
122            "Unix timestamp of the last received market data event",
123        )
124        .unwrap();
125
126        registry.register(Box::new(ticks_total.clone())).unwrap();
127        registry.register(Box::new(candles_total.clone())).unwrap();
128        registry.register(Box::new(signals_total.clone())).unwrap();
129        registry.register(Box::new(orders_total.clone())).unwrap();
130        registry.register(Box::new(order_failures.clone())).unwrap();
131        registry.register(Box::new(equity_gauge.clone())).unwrap();
132        registry.register(Box::new(price_gauge.clone())).unwrap();
133        registry.register(Box::new(data_gap_gauge.clone())).unwrap();
134        registry
135            .register(Box::new(reconciliation_position_diff.clone()))
136            .unwrap();
137        registry
138            .register(Box::new(reconciliation_balance_diff.clone()))
139            .unwrap();
140        registry
141            .register(Box::new(connection_status.clone()))
142            .unwrap();
143        registry
144            .register(Box::new(last_data_timestamp.clone()))
145            .unwrap();
146
147        Self {
148            registry,
149            ticks_total,
150            candles_total,
151            signals_total,
152            orders_total,
153            order_failures,
154            equity_gauge,
155            price_gauge,
156            data_gap_gauge,
157            reconciliation_position_diff,
158            reconciliation_balance_diff,
159            connection_status,
160            last_data_timestamp,
161        }
162    }
163
164    pub fn registry(&self) -> Registry {
165        self.registry.clone()
166    }
167
168    pub fn inc_tick(&self) {
169        self.ticks_total.inc();
170    }
171
172    pub fn inc_candle(&self) {
173        self.candles_total.inc();
174    }
175
176    pub fn inc_signals(&self, count: usize) {
177        self.signals_total.inc_by(count as u64);
178    }
179
180    pub fn inc_order(&self) {
181        self.orders_total.inc();
182    }
183
184    pub fn inc_order_failure(&self) {
185        self.order_failures.inc();
186    }
187
188    pub fn update_equity(&self, equity: f64) {
189        self.equity_gauge.set(equity);
190    }
191
192    pub fn update_price(&self, symbol: &str, price: f64) {
193        self.price_gauge.with_label_values(&[symbol]).set(price);
194    }
195
196    pub fn update_staleness(&self, seconds: f64) {
197        self.data_gap_gauge.set(seconds);
198    }
199
200    pub fn update_position_diff(&self, symbol: &str, diff: f64) {
201        self.reconciliation_position_diff
202            .with_label_values(&[symbol])
203            .set(diff);
204    }
205
206    pub fn update_balance_diff(&self, currency: &str, diff: f64) {
207        self.reconciliation_balance_diff
208            .with_label_values(&[currency])
209            .set(diff);
210    }
211
212    pub fn update_connection_status(&self, stream: &str, connected: bool) {
213        let value = if connected { 1.0 } else { 0.0 };
214        self.connection_status
215            .with_label_values(&[stream])
216            .set(value);
217    }
218
219    pub fn update_last_data_timestamp(&self, timestamp_secs: f64) {
220        self.last_data_timestamp.set(timestamp_secs);
221    }
222}
223
224impl Default for LiveMetrics {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// Launch a lightweight HTTP server that exposes Prometheus metrics.
231pub fn spawn_metrics_server(registry: Registry, addr: SocketAddr) -> tokio::task::JoinHandle<()> {
232    tokio::spawn(async move {
233        let make_svc = make_service_fn(move |_| {
234            let registry = registry.clone();
235            async move {
236                Ok::<_, Infallible>(service_fn(move |_req: Request<Body>| {
237                    let registry = registry.clone();
238                    async move {
239                        let encoder = TextEncoder::new();
240                        let metric_families = registry.gather();
241                        let mut buffer = Vec::new();
242                        if let Err(err) = encoder.encode(&metric_families, &mut buffer) {
243                            error!(error = %err, "failed to encode Prometheus metrics");
244                            return Ok::<_, Infallible>(
245                                Response::builder()
246                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
247                                    .body(Body::from("failed to encode metrics"))
248                                    .unwrap(),
249                            );
250                        }
251                        Ok::<_, Infallible>(
252                            Response::builder()
253                                .status(StatusCode::OK)
254                                .header("Content-Type", encoder.format_type())
255                                .body(Body::from(buffer))
256                                .unwrap(),
257                        )
258                    }
259                }))
260            }
261        });
262
263        if let Err(err) = hyper::Server::bind(&addr).serve(make_svc).await {
264            error!(error = %err, %addr, "metrics server terminated");
265        } else {
266            info!(%addr, "metrics server shutdown");
267        }
268    })
269}