ai_agent/services/analytics/
mod.rs1pub mod config;
10pub mod datadog;
11pub mod first_party_event_logger;
12pub mod first_party_event_logging_exporter;
13pub mod growthbook;
14pub mod metadata;
15pub mod sink;
16pub mod sink_killswitch;
17
18pub use config::*;
20pub use datadog::*;
21pub use first_party_event_logger::*;
22pub use first_party_event_logging_exporter::*;
23pub use growthbook::*;
24pub use metadata::*;
25pub use sink::*;
26pub use sink_killswitch::*;
27
28pub type AnalyticsMetadataVerified = ();
33
34pub type AnalyticsMetadataPiiTagged = ();
36
37pub type LogEventMetadata = std::collections::HashMap<String, serde_json::Value>;
39
40#[derive(Debug, Clone)]
42struct QueuedEvent {
43 event_name: String,
44 metadata: LogEventMetadata,
45 is_async: bool,
46}
47
48pub trait AnalyticsSink: Send + Sync {
50 fn log_event(&self, event_name: &str, metadata: &LogEventMetadata);
51 fn log_event_async(
52 &self,
53 event_name: &str,
54 metadata: &LogEventMetadata,
55 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>>;
56}
57
58pub fn strip_proto_fields<V: Clone>(
62 metadata: &std::collections::HashMap<String, V>,
63) -> std::collections::HashMap<String, V> {
64 let mut result: Option<std::collections::HashMap<String, V>> = None;
65
66 for key in metadata.keys() {
67 if key.starts_with("_PROTO_") {
68 if result.is_none() {
69 result = Some(metadata.clone());
70 }
71 if let Some(ref mut r) = result {
72 r.remove(key);
73 }
74 }
75 }
76
77 result.unwrap_or_else(|| metadata.clone())
78}
79
80static EVENT_QUEUE: std::sync::OnceLock<std::sync::Mutex<Vec<QueuedEvent>>> =
82 std::sync::OnceLock::new();
83
84fn get_event_queue() -> &'static std::sync::Mutex<Vec<QueuedEvent>> {
85 EVENT_QUEUE.get_or_init(|| std::sync::Mutex::new(Vec::new()))
86}
87
88static ANALYTICS_SINK: std::sync::OnceLock<Box<dyn AnalyticsSink>> = std::sync::OnceLock::new();
90
91fn get_sink() -> Option<&'static Box<dyn AnalyticsSink>> {
92 ANALYTICS_SINK.get()
93}
94
95pub fn attach_analytics_sink(new_sink: Box<dyn AnalyticsSink>) -> bool {
101 if ANALYTICS_SINK.get().is_some() {
102 return false; }
104
105 let _ = ANALYTICS_SINK.set(new_sink);
106
107 let queue = get_event_queue();
109 let mut queued_events = queue.lock().unwrap();
110
111 if !queued_events.is_empty() {
112 let events: Vec<QueuedEvent> = std::mem::take(&mut *queued_events);
113
114 if let Some(sink) = get_sink() {
116 let mut metadata = LogEventMetadata::new();
117 metadata.insert(
118 "queued_event_count".to_string(),
119 serde_json::json!(events.len()),
120 );
121 sink.log_event("analytics_sink_attached", &metadata);
122 }
123
124 let sink = ANALYTICS_SINK.get().expect("sink just set");
126
127 std::thread::spawn(move || {
129 for event in events {
130 if event.is_async {
131 let metadata = &event.metadata;
133 sink.log_event(&event.event_name, metadata);
134 } else {
135 sink.log_event(&event.event_name, &event.metadata);
136 }
137 }
138 });
139 }
140
141 true
142}
143
144pub fn log_event(event_name: &str, metadata: LogEventMetadata) {
151 if let Some(sink) = get_sink() {
152 sink.log_event(event_name, &metadata);
153 } else {
154 let mut queue = get_event_queue().lock().unwrap();
155 queue.push(QueuedEvent {
156 event_name: event_name.to_string(),
157 metadata,
158 is_async: false,
159 });
160 }
161}
162
163pub async fn log_event_async(event_name: &str, metadata: LogEventMetadata) {
170 if let Some(sink) = get_sink() {
171 sink.log_event_async(event_name, &metadata).await;
172 } else {
173 let mut queue = get_event_queue().lock().unwrap();
174 queue.push(QueuedEvent {
175 event_name: event_name.to_string(),
176 metadata,
177 is_async: true,
178 });
179 }
180}
181
182#[cfg(test)]
184pub fn reset_for_testing() {
185 let mut queue = get_event_queue().lock().unwrap();
188 queue.clear();
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn test_strip_proto_fields_no_change() {
197 let mut metadata: std::collections::HashMap<String, serde_json::Value> = std::collections::HashMap::new();
198 metadata.insert("event_name".to_string(), serde_json::json!("test"));
199 metadata.insert("count".to_string(), serde_json::json!(42));
200
201 let result = strip_proto_fields(&metadata);
202
203 assert_eq!(result.len(), 2);
204 assert!(result.contains_key("event_name"));
205 assert!(result.contains_key("count"));
206 }
207
208 #[test]
209 fn test_strip_proto_fields_removes_proto() {
210 let mut metadata: std::collections::HashMap<String, serde_json::Value> = std::collections::HashMap::new();
211 metadata.insert("event_name".to_string(), serde_json::json!("test"));
212 metadata.insert("_PROTO_PII".to_string(), serde_json::json!("sensitive"));
213
214 let result = strip_proto_fields(&metadata);
215
216 assert_eq!(result.len(), 1);
217 assert!(result.contains_key("event_name"));
218 assert!(!result.contains_key("_PROTO_PII"));
219 }
220
221 #[test]
222 fn test_attach_analytics_sink_idempotent() {
223 struct TestSink;
224 impl AnalyticsSink for TestSink {
225 fn log_event(&self, _event_name: &str, _metadata: &LogEventMetadata) {}
226 fn log_event_async(
227 &self,
228 _event_name: &str,
229 _metadata: &LogEventMetadata,
230 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
231 Box::pin(async {})
232 }
233 }
234
235 let sink1 = Box::new(TestSink);
236 let sink2 = Box::new(TestSink);
237
238 let result1 = attach_analytics_sink(sink1);
239 let result2 = attach_analytics_sink(sink2);
240
241 assert!(result1);
242 assert!(!result2); }
244}