mockforge_foundation/pillar_tracking.rs
1//! Pillar usage tracking utilities
2//!
3//! Provides helper functions for recording pillar usage events throughout the codebase.
4//! These events are used for analytics and understanding which pillars are most used.
5
6use crate::pillars::Pillar;
7use chrono::Utc;
8use once_cell::sync::Lazy;
9use serde_json::Value;
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, Instant};
13use tokio::sync::RwLock;
14
15/// Optional analytics database for recording pillar usage
16/// This is set globally and can be None if analytics is not enabled
17#[allow(clippy::type_complexity)]
18static ANALYTICS_DB: Lazy<Arc<RwLock<Option<Arc<dyn PillarUsageRecorder>>>>> =
19 Lazy::new(|| Arc::new(RwLock::new(None)));
20
21/// Tracks dropped/failed pillar events so we can emit a rate-limited
22/// aggregate WARN instead of one WARN per event under load.
23///
24/// Issue #79 — Srikanth's bench at `--rps 100` for 600s flooded the log
25/// with hundreds of `WARN ... Failed to record pillar usage event: pool
26/// timed out` lines (one per failed event). The events themselves are
27/// best-effort metrics — losing them under sustained load doesn't break
28/// anything functional — so the right behaviour is to drop with low-
29/// volume reporting rather than spam.
30static FAILED_EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
31static LAST_FAILURE_WARN_AT: Lazy<RwLock<Instant>> = Lazy::new(|| RwLock::new(Instant::now()));
32
33/// How often we emit the aggregated "X pillar events dropped" warning.
34const FAILURE_WARN_INTERVAL: Duration = Duration::from_secs(60);
35
36/// In-flight task counter — used to short-circuit event submission when
37/// the recorder is already saturated, instead of spawning more tokio
38/// tasks that will pile up on the analytics-DB pool's acquire queue.
39///
40/// Issue #79 round 12 — round 11 silenced the per-event WARN spam, but
41/// the underlying `sqlx::pool::acquire` "slow acquire" WARNs still
42/// fired because every event spawned a task that waited on the pool's
43/// 30s acquire timeout. Capping concurrency at the entry point drops
44/// the over-pressure events immediately (counted toward `FAILED_EVENT_COUNT`
45/// for the aggregate WARN) and lets the pool serve the ones in flight
46/// without bunching up.
47static IN_FLIGHT_RECORDS: AtomicU64 = AtomicU64::new(0);
48
49/// How many recorder tasks may be in flight simultaneously. Picked at
50/// 2× the analytics SqlitePool's `max_connections(10)` so a healthy
51/// pool can fully utilise its connection budget; bursts beyond that
52/// get dropped instead of queued.
53const IN_FLIGHT_LIMIT: u64 = 20;
54
55/// Trait for recording pillar usage events
56/// This allows different implementations (analytics DB, API endpoint, etc.)
57#[async_trait::async_trait]
58pub trait PillarUsageRecorder: Send + Sync {
59 /// Record a pillar usage event
60 async fn record(
61 &self,
62 event: PillarUsageEvent,
63 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
64}
65
66/// Pillar usage event (simplified version for internal use)
67#[derive(Debug, Clone)]
68pub struct PillarUsageEvent {
69 /// Workspace ID where the event occurred
70 pub workspace_id: Option<String>,
71 /// Organization ID (if applicable)
72 pub org_id: Option<String>,
73 /// The pillar this event relates to
74 pub pillar: Pillar,
75 /// Name of the metric being recorded
76 pub metric_name: String,
77 /// Value of the metric (JSON)
78 pub metric_value: Value,
79 /// Timestamp when the event occurred
80 pub timestamp: chrono::DateTime<Utc>,
81}
82
83/// Initialize the pillar usage tracker with a recorder
84pub async fn init(recorder: Arc<dyn PillarUsageRecorder>) {
85 let mut db = ANALYTICS_DB.write().await;
86 *db = Some(recorder);
87}
88
89/// Record a reality pillar usage event
90///
91/// This should be called when:
92/// - Reality continuum blend ratio is used
93/// - Smart personas are activated
94/// - Chaos is enabled/used
95/// - Reality level changes
96pub async fn record_reality_usage(
97 workspace_id: Option<String>,
98 org_id: Option<String>,
99 metric_name: &str,
100 metric_value: Value,
101) {
102 record_pillar_usage(workspace_id, org_id, Pillar::Reality, metric_name, metric_value).await;
103}
104
105/// Record a contracts pillar usage event
106///
107/// This should be called when:
108/// - Contract validation is performed
109/// - Drift detection occurs
110/// - Contract sync happens
111/// - Validation mode changes
112pub async fn record_contracts_usage(
113 workspace_id: Option<String>,
114 org_id: Option<String>,
115 metric_name: &str,
116 metric_value: Value,
117) {
118 record_pillar_usage(workspace_id, org_id, Pillar::Contracts, metric_name, metric_value).await;
119}
120
121/// Record a DevX pillar usage event
122///
123/// This should be called when:
124/// - SDK is installed/used
125/// - Client code is generated
126/// - Playground session starts
127/// - CLI command is executed
128pub async fn record_devx_usage(
129 workspace_id: Option<String>,
130 org_id: Option<String>,
131 metric_name: &str,
132 metric_value: Value,
133) {
134 record_pillar_usage(workspace_id, org_id, Pillar::DevX, metric_name, metric_value).await;
135}
136
137/// Record a cloud pillar usage event
138///
139/// This should be called when:
140/// - Scenario is shared
141/// - Marketplace download occurs
142/// - Workspace is created/shared
143/// - Organization template is used
144pub async fn record_cloud_usage(
145 workspace_id: Option<String>,
146 org_id: Option<String>,
147 metric_name: &str,
148 metric_value: Value,
149) {
150 record_pillar_usage(workspace_id, org_id, Pillar::Cloud, metric_name, metric_value).await;
151}
152
153/// Record an AI pillar usage event
154///
155/// This should be called when:
156/// - AI mock generation occurs
157/// - AI contract diff is performed
158/// - Voice command is executed
159/// - LLM-assisted operation happens
160pub async fn record_ai_usage(
161 workspace_id: Option<String>,
162 org_id: Option<String>,
163 metric_name: &str,
164 metric_value: Value,
165) {
166 record_pillar_usage(workspace_id, org_id, Pillar::Ai, metric_name, metric_value).await;
167}
168
169/// Record a pillar usage event (internal helper)
170async fn record_pillar_usage(
171 workspace_id: Option<String>,
172 org_id: Option<String>,
173 pillar: Pillar,
174 metric_name: &str,
175 metric_value: Value,
176) {
177 let db = ANALYTICS_DB.read().await;
178 if let Some(recorder) = db.as_ref() {
179 let event = PillarUsageEvent {
180 workspace_id,
181 org_id,
182 pillar,
183 metric_name: metric_name.to_string(),
184 metric_value,
185 timestamp: Utc::now(),
186 };
187
188 // Issue #79 round 12 — short-circuit when the recorder is already
189 // saturated. Spawning more tasks just bunches them up on the
190 // analytics-DB pool's 30s acquire timeout, producing the
191 // `sqlx::pool::acquire` "slow acquire" WARN spam Srikanth still
192 // saw on v0.3.144. Cap in-flight tasks; over-cap submissions
193 // count toward the aggregated drop warning and return without
194 // spawning.
195 let current = IN_FLIGHT_RECORDS.load(Ordering::Relaxed);
196 if current >= IN_FLIGHT_LIMIT {
197 FAILED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
198 maybe_flush_dropped_warning().await;
199 return;
200 }
201 IN_FLIGHT_RECORDS.fetch_add(1, Ordering::Relaxed);
202
203 // Record asynchronously without blocking
204 let recorder = recorder.clone();
205 tokio::spawn(async move {
206 let result = recorder.record(event).await;
207 IN_FLIGHT_RECORDS.fetch_sub(1, Ordering::Relaxed);
208 if let Err(e) = result {
209 // Issue #79 — under high load (Srikanth's `--rps 100`
210 // for 600s) the analytics DB pool gets saturated and
211 // every event spawns a task that times out and logs a
212 // WARN. Pillar tracking is best-effort metrics; losing
213 // events under load is acceptable, but spamming the log
214 // with one WARN per dropped event is not. Demote per-
215 // event failures to DEBUG and emit one aggregated WARN
216 // at most every FAILURE_WARN_INTERVAL.
217 tracing::debug!("Failed to record pillar usage event: {}", e);
218 FAILED_EVENT_COUNT.fetch_add(1, Ordering::Relaxed);
219 maybe_flush_dropped_warning().await;
220 }
221 });
222 }
223}
224
225/// Emit a single aggregated WARN summarising dropped pillar events when
226/// at least `FAILURE_WARN_INTERVAL` has elapsed since the last summary.
227/// The check is racy by design — under contention we'd rather skip a
228/// summary than serialize on a mutex. Counts not surfaced by one race
229/// roll into the next interval's summary.
230async fn maybe_flush_dropped_warning() {
231 let last = *LAST_FAILURE_WARN_AT.read().await;
232 if last.elapsed() < FAILURE_WARN_INTERVAL {
233 return;
234 }
235 // Race-aware swap: take the count we'll report, leave the rest for
236 // the next interval. Another task may have already flushed — we
237 // double-check the timestamp under the write lock and bail if so.
238 let mut last_w = LAST_FAILURE_WARN_AT.write().await;
239 if last_w.elapsed() < FAILURE_WARN_INTERVAL {
240 return;
241 }
242 let dropped = FAILED_EVENT_COUNT.swap(0, Ordering::Relaxed);
243 if dropped > 0 {
244 tracing::warn!(
245 dropped_events = dropped,
246 interval_secs = FAILURE_WARN_INTERVAL.as_secs(),
247 "pillar_tracking: dropped events in the last {}s due to analytics-DB pressure \
248 (analytics is best-effort; bench / serve behaviour is unaffected)",
249 FAILURE_WARN_INTERVAL.as_secs(),
250 );
251 }
252 *last_w = Instant::now();
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use serde_json::json;
259
260 struct TestRecorder {
261 events: Arc<RwLock<Vec<PillarUsageEvent>>>,
262 }
263
264 #[async_trait::async_trait]
265 impl PillarUsageRecorder for TestRecorder {
266 async fn record(
267 &self,
268 event: PillarUsageEvent,
269 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
270 let mut events = self.events.write().await;
271 events.push(event);
272 Ok(())
273 }
274 }
275
276 #[tokio::test]
277 async fn test_record_reality_usage() {
278 let events = Arc::new(RwLock::new(Vec::new()));
279 let recorder = Arc::new(TestRecorder {
280 events: events.clone(),
281 });
282 init(recorder).await;
283
284 record_reality_usage(
285 Some("workspace-1".to_string()),
286 None,
287 "blended_reality_ratio",
288 json!({"ratio": 0.5}),
289 )
290 .await;
291
292 // Give async task time to complete
293 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
294
295 let recorded = events.read().await;
296 assert_eq!(recorded.len(), 1);
297 assert_eq!(recorded[0].pillar, Pillar::Reality);
298 assert_eq!(recorded[0].metric_name, "blended_reality_ratio");
299 }
300}