Skip to main content

jflow_core/
session_metrics.rs

1//! JanusAI session metrics push client (JFLOW-A).
2//!
3//! Forward signal pipeline rolls up per-cycle metrics (signal counts, win
4//! rate, average confidence, latency) and pushes them to JanusAI via
5//! `POST {JANUS_AI_URL}/api/janus-ai/sessions/{session_id}/metrics`.
6//!
7//! The client is a no-op when `JANUS_AI_URL` is unset so the binary can boot
8//! and the signal pipeline can run without a JanusAI sidecar (dev / backtest
9//! / standalone compose).
10
11use serde::{Deserialize, Serialize};
12use std::time::Duration;
13use tracing::{debug, warn};
14
15/// Per-session metrics snapshot pushed to JanusAI.
16///
17/// The shape is intentionally narrow — the JanusAI service owns the wider
18/// schema and is free to ignore unknown fields. Add fields here only when
19/// JanusAI is ready to consume them.
20#[derive(Debug, Clone, Default, Serialize, Deserialize)]
21pub struct SessionMetrics {
22    /// Signals generated since the last push.
23    pub signals_generated: u64,
24    /// Signals filtered out by gates / risk before publish.
25    pub signals_filtered: u64,
26    /// Mean confidence of generated signals (0.0..1.0).
27    pub avg_confidence: f64,
28    /// P50 end-to-end signal generation latency in microseconds.
29    pub p50_latency_us: u64,
30    /// P99 end-to-end signal generation latency in microseconds.
31    pub p99_latency_us: u64,
32    /// Optional human-readable regime label at snapshot time.
33    pub regime: Option<String>,
34}
35
36/// HTTP client for pushing session metrics to JanusAI.
37///
38/// Construct once at startup with `from_env()` and share via `Arc`.
39#[derive(Debug, Clone)]
40pub struct SessionMetricsClient {
41    inner: Option<ActiveClient>,
42}
43
44#[derive(Debug, Clone)]
45struct ActiveClient {
46    base_url: String,
47    http: reqwest::Client,
48}
49
50impl SessionMetricsClient {
51    /// Build a client from environment variables.
52    ///
53    /// Reads `JANUS_AI_URL` (e.g. `http://fks_janus_ai:8000`). When unset,
54    /// returns a disabled client whose `push()` is a debug-logged no-op.
55    pub fn from_env() -> Self {
56        let url = std::env::var("JANUS_AI_URL").ok().filter(|s| !s.is_empty());
57
58        let inner = url.map(|base_url| {
59            let timeout_secs = std::env::var("JANUS_AI_TIMEOUT_SECS")
60                .ok()
61                .and_then(|v| v.parse().ok())
62                .unwrap_or(2);
63            let http = reqwest::Client::builder()
64                .timeout(Duration::from_secs(timeout_secs))
65                .build()
66                .unwrap_or_else(|_| reqwest::Client::new());
67            ActiveClient {
68                base_url: base_url.trim_end_matches('/').to_string(),
69                http,
70            }
71        });
72
73        Self { inner }
74    }
75
76    /// True when the client will actually attempt a network call.
77    pub fn is_enabled(&self) -> bool {
78        self.inner.is_some()
79    }
80
81    /// Push a metrics snapshot for `session_id`. Errors are logged and
82    /// swallowed — metric push must never break the signal pipeline.
83    pub async fn push(&self, session_id: &str, metrics: &SessionMetrics) {
84        let Some(client) = &self.inner else {
85            debug!(
86                session_id,
87                "session metrics push skipped — JANUS_AI_URL not set"
88            );
89            return;
90        };
91
92        let url = format!(
93            "{base}/api/janus-ai/sessions/{session_id}/metrics",
94            base = client.base_url
95        );
96
97        match client.http.post(&url).json(metrics).send().await {
98            Ok(resp) if resp.status().is_success() => {
99                debug!(session_id, status = %resp.status(), "session metrics pushed");
100            }
101            Ok(resp) => {
102                warn!(
103                    session_id,
104                    status = %resp.status(),
105                    "session metrics push: non-success response"
106                );
107            }
108            Err(e) => {
109                warn!(session_id, error = %e, "session metrics push failed");
110            }
111        }
112    }
113}
114
115impl Default for SessionMetricsClient {
116    fn default() -> Self {
117        Self::from_env()
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124
125    #[test]
126    fn disabled_when_env_unset() {
127        // Defensive: snapshot any pre-existing value so other tests are not
128        // perturbed by removal.
129        let prev = std::env::var("JANUS_AI_URL").ok();
130        // SAFETY: tests in this module are not run in parallel against each
131        // other for this env var.
132        unsafe { std::env::remove_var("JANUS_AI_URL") };
133        let client = SessionMetricsClient::from_env();
134        assert!(!client.is_enabled());
135        if let Some(v) = prev {
136            unsafe { std::env::set_var("JANUS_AI_URL", v) };
137        }
138    }
139
140    #[tokio::test]
141    async fn push_no_ops_when_disabled() {
142        let prev = std::env::var("JANUS_AI_URL").ok();
143        unsafe { std::env::remove_var("JANUS_AI_URL") };
144        let client = SessionMetricsClient::from_env();
145        // Should return without error / panic even with no JanusAI running.
146        client.push("session-abc", &SessionMetrics::default()).await;
147        if let Some(v) = prev {
148            unsafe { std::env::set_var("JANUS_AI_URL", v) };
149        }
150    }
151}