Skip to main content

mockforge_foundation/
pillar_tracking.rs

1//! Pillar usage tracking utilities
2//!
3//! Provides helper functions for recording pillar usage events throughout the codebase.
4//! These events are used for analytics and understanding which pillars are most used.
5
6use crate::pillars::Pillar;
7use chrono::Utc;
8use once_cell::sync::Lazy;
9use serde_json::Value;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15/// Optional analytics database for recording pillar usage
16/// This is set globally and can be None if analytics is not enabled
17#[allow(clippy::type_complexity)]
18static ANALYTICS_DB: Lazy<Arc<RwLock<Option<Arc<dyn PillarUsageRecorder>>>>> =
19    Lazy::new(|| Arc::new(RwLock::new(None)));
20
21/// Tracks dropped/failed pillar events so we can emit a rate-limited
22/// aggregate WARN instead of one WARN per event under load.
23///
24/// Issue #79 — Srikanth's bench at `--rps 100` for 600s flooded the log
25/// with hundreds of `WARN ... Failed to record pillar usage event: pool
26/// timed out` lines (one per failed event). The events themselves are
27/// best-effort metrics — losing them under sustained load doesn't break
28/// anything functional — so the right behaviour is to drop with low-
29/// volume reporting rather than spam.
30static FAILED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
31static LAST_FAILURE_WARN_AT: Lazy<RwLock<Instant>> = Lazy::new(|| RwLock::new(Instant::now()));
32
33/// How often we emit the aggregated "X pillar events dropped" warning.
34const FAILURE_WARN_INTERVAL: Duration = Duration::from_secs(60);
35
36/// In-flight task counter — used to short-circuit event submission when
37/// the recorder is already saturated, instead of spawning more tokio
38/// tasks that will pile up on the analytics-DB pool's acquire queue.
39///
40/// Issue #79 round 12 — round 11 silenced the per-event WARN spam, but
41/// the underlying `sqlx::pool::acquire` "slow acquire" WARNs still
42/// fired because every event spawned a task that waited on the pool's
43/// 30s acquire timeout. Capping concurrency at the entry point drops
44/// the over-pressure events immediately (counted toward `FAILED_EVENT_COUNT`
45/// for the aggregate WARN) and lets the pool serve the ones in flight
46/// without bunching up.
47static IN_FLIGHT_RECORDS: AtomicU64 = AtomicU64::new(0);
48
49/// How many recorder tasks may be in flight simultaneously. Picked at
50/// 2× the analytics SqlitePool's `max_connections(10)` so a healthy
51/// pool can fully utilise its connection budget; bursts beyond that
52/// get dropped instead of queued.
53const IN_FLIGHT_LIMIT: u64 = 20;
54
55/// Trait for recording pillar usage events
56/// This allows different implementations (analytics DB, API endpoint, etc.)
57#[async_trait::async_trait]
58pub trait PillarUsageRecorder: Send + Sync {
59    /// Record a pillar usage event
60    async fn record(
61        &self,
62        event: PillarUsageEvent,
63    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
64}
65
66/// Pillar usage event (simplified version for internal use)
67#[derive(Debug, Clone)]
68pub struct PillarUsageEvent {
69    /// Workspace ID where the event occurred
70    pub workspace_id: Option<String>,
71    /// Organization ID (if applicable)
72    pub org_id: Option<String>,
73    /// The pillar this event relates to
74    pub pillar: Pillar,
75    /// Name of the metric being recorded
76    pub metric_name: String,
77    /// Value of the metric (JSON)
78    pub metric_value: Value,
79    /// Timestamp when the event occurred
80    pub timestamp: chrono::DateTime<Utc>,
81}
82
83/// Initialize the pillar usage tracker with a recorder
84pub async fn init(recorder: Arc<dyn PillarUsageRecorder>) {
85    let mut db = ANALYTICS_DB.write().await;
86    *db = Some(recorder);
87}
88
89/// Record a reality pillar usage event
90///
91/// This should be called when:
92/// - Reality continuum blend ratio is used
93/// - Smart personas are activated
94/// - Chaos is enabled/used
95/// - Reality level changes
96pub async fn record_reality_usage(
97    workspace_id: Option<String>,
98    org_id: Option<String>,
99    metric_name: &str,
100    metric_value: Value,
101) {
102    record_pillar_usage(workspace_id, org_id, Pillar::Reality, metric_name, metric_value).await;
103}
104
105/// Record a contracts pillar usage event
106///
107/// This should be called when:
108/// - Contract validation is performed
109/// - Drift detection occurs
110/// - Contract sync happens
111/// - Validation mode changes
112pub async fn record_contracts_usage(
113    workspace_id: Option<String>,
114    org_id: Option<String>,
115    metric_name: &str,
116    metric_value: Value,
117) {
118    record_pillar_usage(workspace_id, org_id, Pillar::Contracts, metric_name, metric_value).await;
119}
120
121/// Record a DevX pillar usage event
122///
123/// This should be called when:
124/// - SDK is installed/used
125/// - Client code is generated
126/// - Playground session starts
127/// - CLI command is executed
128pub async fn record_devx_usage(
129    workspace_id: Option<String>,
130    org_id: Option<String>,
131    metric_name: &str,
132    metric_value: Value,
133) {
134    record_pillar_usage(workspace_id, org_id, Pillar::DevX, metric_name, metric_value).await;
135}
136
137/// Record a cloud pillar usage event
138///
139/// This should be called when:
140/// - Scenario is shared
141/// - Marketplace download occurs
142/// - Workspace is created/shared
143/// - Organization template is used
144pub async fn record_cloud_usage(
145    workspace_id: Option<String>,
146    org_id: Option<String>,
147    metric_name: &str,
148    metric_value: Value,
149) {
150    record_pillar_usage(workspace_id, org_id, Pillar::Cloud, metric_name, metric_value).await;
151}
152
153/// Record an AI pillar usage event
154///
155/// This should be called when:
156/// - AI mock generation occurs
157/// - AI contract diff is performed
158/// - Voice command is executed
159/// - LLM-assisted operation happens
160pub async fn record_ai_usage(
161    workspace_id: Option<String>,
162    org_id: Option<String>,
163    metric_name: &str,
164    metric_value: Value,
165) {
166    record_pillar_usage(workspace_id, org_id, Pillar::Ai, metric_name, metric_value).await;
167}
168
169/// Record a pillar usage event (internal helper)
170async fn record_pillar_usage(
171    workspace_id: Option<String>,
172    org_id: Option<String>,
173    pillar: Pillar,
174    metric_name: &str,
175    metric_value: Value,
176) {
177    let db = ANALYTICS_DB.read().await;
178    if let Some(recorder) = db.as_ref() {
179        let event = PillarUsageEvent {
180            workspace_id,
181            org_id,
182            pillar,
183            metric_name: metric_name.to_string(),
184            metric_value,
185            timestamp: Utc::now(),
186        };
187
188        // Issue #79 round 12 — short-circuit when the recorder is already
189        // saturated. Spawning more tasks just bunches them up on the
190        // analytics-DB pool's 30s acquire timeout, producing the
191        // `sqlx::pool::acquire` "slow acquire" WARN spam Srikanth still
192        // saw on v0.3.144. Cap in-flight tasks; over-cap submissions
193        // count toward the aggregated drop warning and return without
194        // spawning.
195        let current = IN_FLIGHT_RECORDS.load(Ordering::Relaxed);
196        if current >= IN_FLIGHT_LIMIT {
197            FAILED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
198            maybe_flush_dropped_warning().await;
199            return;
200        }
201        IN_FLIGHT_RECORDS.fetch_add(1, Ordering::Relaxed);
202
203        // Record asynchronously without blocking
204        let recorder = recorder.clone();
205        tokio::spawn(async move {
206            let result = recorder.record(event).await;
207            IN_FLIGHT_RECORDS.fetch_sub(1, Ordering::Relaxed);
208            if let Err(e) = result {
209                // Issue #79 — under high load (Srikanth's `--rps 100`
210                // for 600s) the analytics DB pool gets saturated and
211                // every event spawns a task that times out and logs a
212                // WARN. Pillar tracking is best-effort metrics; losing
213                // events under load is acceptable, but spamming the log
214                // with one WARN per dropped event is not. Demote per-
215                // event failures to DEBUG and emit one aggregated WARN
216                // at most every FAILURE_WARN_INTERVAL.
217                tracing::debug!("Failed to record pillar usage event: {}", e);
218                FAILED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
219                maybe_flush_dropped_warning().await;
220            }
221        });
222    }
223}
224
225/// Emit a single aggregated WARN summarising dropped pillar events when
226/// at least `FAILURE_WARN_INTERVAL` has elapsed since the last summary.
227/// The check is racy by design — under contention we'd rather skip a
228/// summary than serialize on a mutex. Counts not surfaced by one race
229/// roll into the next interval's summary.
230async fn maybe_flush_dropped_warning() {
231    let last = *LAST_FAILURE_WARN_AT.read().await;
232    if last.elapsed() < FAILURE_WARN_INTERVAL {
233        return;
234    }
235    // Race-aware swap: take the count we'll report, leave the rest for
236    // the next interval. Another task may have already flushed — we
237    // double-check the timestamp under the write lock and bail if so.
238    let mut last_w = LAST_FAILURE_WARN_AT.write().await;
239    if last_w.elapsed() < FAILURE_WARN_INTERVAL {
240        return;
241    }
242    let dropped = FAILED_EVENT_COUNT.swap(0, Ordering::Relaxed);
243    if dropped > 0 {
244        tracing::warn!(
245            dropped_events = dropped,
246            interval_secs = FAILURE_WARN_INTERVAL.as_secs(),
247            "pillar_tracking: dropped events in the last {}s due to analytics-DB pressure \
248             (analytics is best-effort; bench / serve behaviour is unaffected)",
249            FAILURE_WARN_INTERVAL.as_secs(),
250        );
251    }
252    *last_w = Instant::now();
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use serde_json::json;
259
260    struct TestRecorder {
261        events: Arc<RwLock<Vec<PillarUsageEvent>>>,
262    }
263
264    #[async_trait::async_trait]
265    impl PillarUsageRecorder for TestRecorder {
266        async fn record(
267            &self,
268            event: PillarUsageEvent,
269        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
270            let mut events = self.events.write().await;
271            events.push(event);
272            Ok(())
273        }
274    }
275
276    #[tokio::test]
277    async fn test_record_reality_usage() {
278        let events = Arc::new(RwLock::new(Vec::new()));
279        let recorder = Arc::new(TestRecorder {
280            events: events.clone(),
281        });
282        init(recorder).await;
283
284        record_reality_usage(
285            Some("workspace-1".to_string()),
286            None,
287            "blended_reality_ratio",
288            json!({"ratio": 0.5}),
289        )
290        .await;
291
292        // Give async task time to complete
293        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
294
295        let recorded = events.read().await;
296        assert_eq!(recorded.len(), 1);
297        assert_eq!(recorded[0].pillar, Pillar::Reality);
298        assert_eq!(recorded[0].metric_name, "blended_reality_ratio");
299    }
300}