Skip to main content

ai_agent/services/analytics/
mod.rs

1//! Analytics service - public API for event logging
2//!
3//! This module serves as the main entry point for analytics events.
4//!
5//! DESIGN: This module has NO dependencies to avoid import cycles.
6//! Events are queued until attach_analytics_sink() is called during app initialization.
7//! The sink handles routing to Datadog and 1P event logging.
8
9pub mod config;
10pub mod datadog;
11pub mod first_party_event_logger;
12pub mod first_party_event_logging_exporter;
13pub mod growthbook;
14pub mod metadata;
15pub mod sink;
16pub mod sink_killswitch;
17
18// Re-export all submodules
19pub use config::*;
20pub use datadog::*;
21pub use first_party_event_logger::*;
22pub use first_party_event_logging_exporter::*;
23pub use growthbook::*;
24pub use metadata::*;
25pub use sink::*;
26pub use sink_killswitch::*;
27
28/// Marker type for verifying analytics metadata doesn't contain sensitive data
29/// This type forces explicit verification that string values being logged
30/// don't contain code snippets, file paths, or other sensitive information.
31/// Usage: `my_string as AnalyticsMetadataVerified`
32pub type AnalyticsMetadataVerified = ();
33
34/// Marker type for values routed to PII-tagged proto columns
35pub type AnalyticsMetadataPiiTagged = ();
36
37/// Log event metadata type
38pub type LogEventMetadata = std::collections::HashMap<String, serde_json::Value>;
39
40/// Queued event structure
41#[derive(Debug, Clone)]
42struct QueuedEvent {
43    event_name: String,
44    metadata: LogEventMetadata,
45    is_async: bool,
46}
47
48/// Sink interface for the analytics backend
49pub trait AnalyticsSink: Send + Sync {
50    fn log_event(&self, event_name: &str, metadata: &LogEventMetadata);
51    fn log_event_async(
52        &self,
53        event_name: &str,
54        metadata: &LogEventMetadata,
55    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>>;
56}
57
58/// Strip `_PROTO_*` keys from a payload destined for general-access storage.
59/// Used by sink.rs before Datadog fanout and first_party_event_logging_exporter
60/// for defensive stripping after hoisting known _PROTO_* keys.
61pub fn strip_proto_fields<V: Clone>(
62    metadata: &std::collections::HashMap<String, V>,
63) -> std::collections::HashMap<String, V> {
64    let mut result: Option<std::collections::HashMap<String, V>> = None;
65
66    for key in metadata.keys() {
67        if key.starts_with("_PROTO_") {
68            if result.is_none() {
69                result = Some(metadata.clone());
70            }
71            if let Some(ref mut r) = result {
72                r.remove(key);
73            }
74        }
75    }
76
77    result.unwrap_or_else(|| metadata.clone())
78}
79
80/// Internal event queue for events logged before sink is attached
81static EVENT_QUEUE: std::sync::OnceLock<std::sync::Mutex<Vec<QueuedEvent>>> =
82    std::sync::OnceLock::new();
83
84fn get_event_queue() -> &'static std::sync::Mutex<Vec<QueuedEvent>> {
85    EVENT_QUEUE.get_or_init(|| std::sync::Mutex::new(Vec::new()))
86}
87
88/// Sink - initialized during app startup
89static ANALYTICS_SINK: std::sync::OnceLock<Box<dyn AnalyticsSink>> = std::sync::OnceLock::new();
90
91fn get_sink() -> Option<&'static Box<dyn AnalyticsSink>> {
92    ANALYTICS_SINK.get()
93}
94
95/// Attach the analytics sink that will receive all events.
96/// Queued events are drained asynchronously via microtask to avoid
97/// adding latency to the startup path.
98///
99/// Idempotent: if a sink is already attached, this is a no-op.
100pub fn attach_analytics_sink(new_sink: Box<dyn AnalyticsSink>) -> bool {
101    if ANALYTICS_SINK.get().is_some() {
102        return false; // Already attached
103    }
104
105    let _ = ANALYTICS_SINK.set(new_sink);
106
107    // Drain the queue asynchronously
108    let queue = get_event_queue();
109    let mut queued_events = queue.lock().unwrap();
110
111    if !queued_events.is_empty() {
112        let events: Vec<QueuedEvent> = std::mem::take(&mut *queued_events);
113
114        // Log queue size for debugging analytics initialization timing
115        if let Some(sink) = get_sink() {
116            let mut metadata = LogEventMetadata::new();
117            metadata.insert(
118                "queued_event_count".to_string(),
119                serde_json::json!(events.len()),
120            );
121            sink.log_event("analytics_sink_attached", &metadata);
122        }
123
124        // Schedule async drain
125        let sink = ANALYTICS_SINK.get().expect("sink just set");
126
127        // Use spawn to simulate queueMicrotask behavior
128        std::thread::spawn(move || {
129            for event in events {
130                if event.is_async {
131                    // For async events, we need to handle differently
132                    let metadata = &event.metadata;
133                    sink.log_event(&event.event_name, metadata);
134                } else {
135                    sink.log_event(&event.event_name, &event.metadata);
136                }
137            }
138        });
139    }
140
141    true
142}
143
144/// Log an event to analytics backends (synchronous)
145///
146/// Events may be sampled based on the 'tengu_event_sampling_config' dynamic config.
147/// When sampled, the sample_rate is added to the event metadata.
148///
149/// If no sink is attached, events are queued and drained when the sink attaches.
150pub fn log_event(event_name: &str, metadata: LogEventMetadata) {
151    if let Some(sink) = get_sink() {
152        sink.log_event(event_name, &metadata);
153    } else {
154        let mut queue = get_event_queue().lock().unwrap();
155        queue.push(QueuedEvent {
156            event_name: event_name.to_string(),
157            metadata,
158            is_async: false,
159        });
160    }
161}
162
163/// Log an event to analytics backends (asynchronous)
164///
165/// Events may be sampled based on the 'tengu_event_sampling_config' dynamic config.
166/// When sampled, the sample_rate is added to the event metadata.
167///
168/// If no sink is attached, events are queued and drained when the sink attaches.
169pub async fn log_event_async(event_name: &str, metadata: LogEventMetadata) {
170    if let Some(sink) = get_sink() {
171        sink.log_event_async(event_name, &metadata).await;
172    } else {
173        let mut queue = get_event_queue().lock().unwrap();
174        queue.push(QueuedEvent {
175            event_name: event_name.to_string(),
176            metadata,
177            is_async: true,
178        });
179    }
180}
181
182/// Reset analytics state for testing purposes only.
183#[cfg(test)]
184pub fn reset_for_testing() {
185    // Note: Arc::from_raw creates an Arc from a raw pointer - we don't actually need to do anything with the sink
186    // Just clear the queue
187    let mut queue = get_event_queue().lock().unwrap();
188    queue.clear();
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn test_strip_proto_fields_no_change() {
197        let mut metadata: std::collections::HashMap<String, serde_json::Value> = std::collections::HashMap::new();
198        metadata.insert("event_name".to_string(), serde_json::json!("test"));
199        metadata.insert("count".to_string(), serde_json::json!(42));
200
201        let result = strip_proto_fields(&metadata);
202
203        assert_eq!(result.len(), 2);
204        assert!(result.contains_key("event_name"));
205        assert!(result.contains_key("count"));
206    }
207
208    #[test]
209    fn test_strip_proto_fields_removes_proto() {
210        let mut metadata: std::collections::HashMap<String, serde_json::Value> = std::collections::HashMap::new();
211        metadata.insert("event_name".to_string(), serde_json::json!("test"));
212        metadata.insert("_PROTO_PII".to_string(), serde_json::json!("sensitive"));
213
214        let result = strip_proto_fields(&metadata);
215
216        assert_eq!(result.len(), 1);
217        assert!(result.contains_key("event_name"));
218        assert!(!result.contains_key("_PROTO_PII"));
219    }
220
221    #[test]
222    fn test_attach_analytics_sink_idempotent() {
223        struct TestSink;
224        impl AnalyticsSink for TestSink {
225            fn log_event(&self, _event_name: &str, _metadata: &LogEventMetadata) {}
226            fn log_event_async(
227                &self,
228                _event_name: &str,
229                _metadata: &LogEventMetadata,
230            ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
231                Box::pin(async {})
232            }
233        }
234
235        let sink1 = Box::new(TestSink);
236        let sink2 = Box::new(TestSink);
237
238        let result1 = attach_analytics_sink(sink1);
239        let result2 = attach_analytics_sink(sink2);
240
241        assert!(result1);
242        assert!(!result2); // Second attach should fail
243    }
244}