Skip to main content

ai_agent/services/analytics/
mod.rs

1//! Analytics service - public API for event logging
2//!
3//! This module serves as the main entry point for analytics events.
4//!
5//! DESIGN: This module has NO dependencies to avoid import cycles.
6//! Events are queued until attach_analytics_sink() is called during app initialization.
7//! The sink handles routing to Datadog and 1P event logging.
8
9pub mod config;
10pub mod datadog;
11pub mod first_party_event_logger;
12pub mod first_party_event_logging_exporter;
13pub mod growthbook;
14pub mod metadata;
15pub mod sink;
16pub mod sink_killswitch;
17
18// Re-export all submodules
19pub use config::*;
20pub use datadog::*;
21pub use first_party_event_logger::*;
22pub use first_party_event_logging_exporter::*;
23pub use growthbook::*;
24pub use metadata::*;
25pub use sink::*;
26pub use sink_killswitch::*;
27
28/// Marker type for verifying analytics metadata doesn't contain sensitive data
29/// This type forces explicit verification that string values being logged
30/// don't contain code snippets, file paths, or other sensitive information.
31/// Usage: `my_string as AnalyticsMetadataVerified`
32pub type AnalyticsMetadataVerified = ();
33
34/// Marker type for values routed to PII-tagged proto columns
35pub type AnalyticsMetadataPiiTagged = ();
36
37/// Log event metadata type
38pub type LogEventMetadata = std::collections::HashMap<String, serde_json::Value>;
39
40/// Queued event structure
41#[derive(Debug, Clone)]
42struct QueuedEvent {
43    event_name: String,
44    metadata: LogEventMetadata,
45    is_async: bool,
46}
47
48/// Sink interface for the analytics backend
49pub trait AnalyticsSink: Send + Sync {
50    fn log_event(&self, event_name: &str, metadata: &LogEventMetadata);
51    fn log_event_async(
52        &self,
53        event_name: &str,
54        metadata: &LogEventMetadata,
55    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>>;
56}
57
58/// Strip `_PROTO_*` keys from a payload destined for general-access storage.
59/// Used by sink.rs before Datadog fanout and first_party_event_logging_exporter
60/// for defensive stripping after hoisting known _PROTO_* keys.
61pub fn strip_proto_fields<V: Clone>(
62    metadata: &std::collections::HashMap<String, V>,
63) -> std::collections::HashMap<String, V> {
64    let mut result: Option<std::collections::HashMap<String, V>> = None;
65
66    for key in metadata.keys() {
67        if key.starts_with("_PROTO_") {
68            if result.is_none() {
69                result = Some(metadata.clone());
70            }
71            if let Some(ref mut r) = result {
72                r.remove(key);
73            }
74        }
75    }
76
77    result.unwrap_or_else(|| metadata.clone())
78}
79
80/// Internal event queue for events logged before sink is attached
81static EVENT_QUEUE: std::sync::OnceLock<std::sync::Mutex<Vec<QueuedEvent>>> =
82    std::sync::OnceLock::new();
83
84fn get_event_queue() -> &'static std::sync::Mutex<Vec<QueuedEvent>> {
85    EVENT_QUEUE.get_or_init(|| std::sync::Mutex::new(Vec::new()))
86}
87
88/// Sink - initialized during app startup
89static ANALYTICS_SINK: std::sync::Mutex<Option<std::sync::Arc<dyn AnalyticsSink>>> =
90    std::sync::Mutex::new(None);
91
92fn get_sink() -> Option<std::sync::Arc<dyn AnalyticsSink>> {
93    ANALYTICS_SINK.lock().unwrap().clone()
94}
95
96/// Attach the analytics sink that will receive all events.
97/// Queued events are drained asynchronously via microtask to avoid
98/// adding latency to the startup path.
99///
100/// Idempotent: if a sink is already attached, this is a no-op.
101pub fn attach_analytics_sink(new_sink: std::sync::Arc<dyn AnalyticsSink>) -> bool {
102    let mut guard = ANALYTICS_SINK.lock().unwrap();
103    if guard.is_some() {
104        return false; // Already attached
105    }
106
107    *guard = Some(new_sink);
108
109    // Drain the queue asynchronously
110    let queue = get_event_queue();
111    let mut queued_events = queue.lock().unwrap();
112
113    if !queued_events.is_empty() {
114        let events: Vec<QueuedEvent> = std::mem::take(&mut *queued_events);
115
116        // Log queue size for debugging analytics initialization timing
117        if let Some(sink) = get_sink() {
118            let mut metadata = LogEventMetadata::new();
119            metadata.insert(
120                "queued_event_count".to_string(),
121                serde_json::json!(events.len()),
122            );
123            sink.log_event("analytics_sink_attached", &metadata);
124        }
125
126        // Schedule async drain
127        let sink = ANALYTICS_SINK.lock().unwrap().clone().expect("sink just set");
128
129        // Use spawn to simulate queueMicrotask behavior
130        std::thread::spawn(move || {
131            for event in events {
132                if event.is_async {
133                    // For async events, we need to handle differently
134                    let metadata = &event.metadata;
135                    sink.log_event(&event.event_name, metadata);
136                } else {
137                    sink.log_event(&event.event_name, &event.metadata);
138                }
139            }
140        });
141    }
142
143    true
144}
145
146/// Log an event to analytics backends (synchronous)
147///
148/// Events may be sampled based on the 'tengu_event_sampling_config' dynamic config.
149/// When sampled, the sample_rate is added to the event metadata.
150///
151/// If no sink is attached, events are queued and drained when the sink attaches.
152pub fn log_event(event_name: &str, metadata: LogEventMetadata) {
153    if let Some(sink) = get_sink() {
154        sink.log_event(event_name, &metadata);
155    } else {
156        let mut queue = get_event_queue().lock().unwrap();
157        queue.push(QueuedEvent {
158            event_name: event_name.to_string(),
159            metadata,
160            is_async: false,
161        });
162    }
163}
164
165/// Log an event to analytics backends (asynchronous)
166///
167/// Events may be sampled based on the 'tengu_event_sampling_config' dynamic config.
168/// When sampled, the sample_rate is added to the event metadata.
169///
170/// If no sink is attached, events are queued and drained when the sink attaches.
171pub async fn log_event_async(event_name: &str, metadata: LogEventMetadata) {
172    if let Some(sink) = get_sink() {
173        sink.log_event_async(event_name, &metadata).await;
174    } else {
175        let mut queue = get_event_queue().lock().unwrap();
176        queue.push(QueuedEvent {
177            event_name: event_name.to_string(),
178            metadata,
179            is_async: true,
180        });
181    }
182}
183
184/// Reset analytics state for testing purposes only.
185pub fn reset_for_testing() {
186    let mut queue = get_event_queue().lock().unwrap();
187    queue.clear();
188    *ANALYTICS_SINK.lock().unwrap() = None;
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn test_strip_proto_fields_no_change() {
197        let mut metadata: std::collections::HashMap<String, serde_json::Value> =
198            std::collections::HashMap::new();
199        metadata.insert("event_name".to_string(), serde_json::json!("test"));
200        metadata.insert("count".to_string(), serde_json::json!(42));
201
202        let result = strip_proto_fields(&metadata);
203
204        assert_eq!(result.len(), 2);
205        assert!(result.contains_key("event_name"));
206        assert!(result.contains_key("count"));
207    }
208
209    #[test]
210    fn test_strip_proto_fields_removes_proto() {
211        let mut metadata: std::collections::HashMap<String, serde_json::Value> =
212            std::collections::HashMap::new();
213        metadata.insert("event_name".to_string(), serde_json::json!("test"));
214        metadata.insert("_PROTO_PII".to_string(), serde_json::json!("sensitive"));
215
216        let result = strip_proto_fields(&metadata);
217
218        assert_eq!(result.len(), 1);
219        assert!(result.contains_key("event_name"));
220        assert!(!result.contains_key("_PROTO_PII"));
221    }
222
223    #[test]
224    fn test_attach_analytics_sink_idempotent() {
225        struct TestSink;
226        impl AnalyticsSink for TestSink {
227            fn log_event(&self, _event_name: &str, _metadata: &LogEventMetadata) {}
228            fn log_event_async(
229                &self,
230                _event_name: &str,
231                _metadata: &LogEventMetadata,
232            ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
233                Box::pin(async {})
234            }
235        }
236
237        let sink1 = std::sync::Arc::new(TestSink);
238        let sink2 = std::sync::Arc::new(TestSink);
239
240        let result1 = attach_analytics_sink(sink1);
241        let result2 = attach_analytics_sink(sink2);
242
243        assert!(result1);
244        assert!(!result2); // Second attach should fail
245    }
246}