ai_agent/services/analytics/
mod.rs1pub mod config;
10pub mod datadog;
11pub mod first_party_event_logger;
12pub mod first_party_event_logging_exporter;
13pub mod growthbook;
14pub mod metadata;
15pub mod sink;
16pub mod sink_killswitch;
17
18pub use config::*;
20pub use datadog::*;
21pub use first_party_event_logger::*;
22pub use first_party_event_logging_exporter::*;
23pub use growthbook::*;
24pub use metadata::*;
25pub use sink::*;
26pub use sink_killswitch::*;
27
28pub type AnalyticsMetadataVerified = ();
33
34pub type AnalyticsMetadataPiiTagged = ();
36
37pub type LogEventMetadata = std::collections::HashMap<String, serde_json::Value>;
39
40#[derive(Debug, Clone)]
42struct QueuedEvent {
43 event_name: String,
44 metadata: LogEventMetadata,
45 is_async: bool,
46}
47
48pub trait AnalyticsSink: Send + Sync {
50 fn log_event(&self, event_name: &str, metadata: &LogEventMetadata);
51 fn log_event_async(
52 &self,
53 event_name: &str,
54 metadata: &LogEventMetadata,
55 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>>;
56}
57
58pub fn strip_proto_fields<V: Clone>(
62 metadata: &std::collections::HashMap<String, V>,
63) -> std::collections::HashMap<String, V> {
64 let mut result: Option<std::collections::HashMap<String, V>> = None;
65
66 for key in metadata.keys() {
67 if key.starts_with("_PROTO_") {
68 if result.is_none() {
69 result = Some(metadata.clone());
70 }
71 if let Some(ref mut r) = result {
72 r.remove(key);
73 }
74 }
75 }
76
77 result.unwrap_or_else(|| metadata.clone())
78}
79
80static EVENT_QUEUE: std::sync::OnceLock<std::sync::Mutex<Vec<QueuedEvent>>> =
82 std::sync::OnceLock::new();
83
84fn get_event_queue() -> &'static std::sync::Mutex<Vec<QueuedEvent>> {
85 EVENT_QUEUE.get_or_init(|| std::sync::Mutex::new(Vec::new()))
86}
87
88static ANALYTICS_SINK: std::sync::Mutex<Option<std::sync::Arc<dyn AnalyticsSink>>> =
90 std::sync::Mutex::new(None);
91
92fn get_sink() -> Option<std::sync::Arc<dyn AnalyticsSink>> {
93 ANALYTICS_SINK.lock().unwrap().clone()
94}
95
96pub fn attach_analytics_sink(new_sink: std::sync::Arc<dyn AnalyticsSink>) -> bool {
102 let mut guard = ANALYTICS_SINK.lock().unwrap();
103 if guard.is_some() {
104 return false; }
106
107 *guard = Some(new_sink);
108
109 let queue = get_event_queue();
111 let mut queued_events = queue.lock().unwrap();
112
113 if !queued_events.is_empty() {
114 let events: Vec<QueuedEvent> = std::mem::take(&mut *queued_events);
115
116 if let Some(sink) = get_sink() {
118 let mut metadata = LogEventMetadata::new();
119 metadata.insert(
120 "queued_event_count".to_string(),
121 serde_json::json!(events.len()),
122 );
123 sink.log_event("analytics_sink_attached", &metadata);
124 }
125
126 let sink = ANALYTICS_SINK.lock().unwrap().clone().expect("sink just set");
128
129 std::thread::spawn(move || {
131 for event in events {
132 if event.is_async {
133 let metadata = &event.metadata;
135 sink.log_event(&event.event_name, metadata);
136 } else {
137 sink.log_event(&event.event_name, &event.metadata);
138 }
139 }
140 });
141 }
142
143 true
144}
145
146pub fn log_event(event_name: &str, metadata: LogEventMetadata) {
153 if let Some(sink) = get_sink() {
154 sink.log_event(event_name, &metadata);
155 } else {
156 let mut queue = get_event_queue().lock().unwrap();
157 queue.push(QueuedEvent {
158 event_name: event_name.to_string(),
159 metadata,
160 is_async: false,
161 });
162 }
163}
164
165pub async fn log_event_async(event_name: &str, metadata: LogEventMetadata) {
172 if let Some(sink) = get_sink() {
173 sink.log_event_async(event_name, &metadata).await;
174 } else {
175 let mut queue = get_event_queue().lock().unwrap();
176 queue.push(QueuedEvent {
177 event_name: event_name.to_string(),
178 metadata,
179 is_async: true,
180 });
181 }
182}
183
184pub fn reset_for_testing() {
186 let mut queue = get_event_queue().lock().unwrap();
187 queue.clear();
188 *ANALYTICS_SINK.lock().unwrap() = None;
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194
195 #[test]
196 fn test_strip_proto_fields_no_change() {
197 let mut metadata: std::collections::HashMap<String, serde_json::Value> =
198 std::collections::HashMap::new();
199 metadata.insert("event_name".to_string(), serde_json::json!("test"));
200 metadata.insert("count".to_string(), serde_json::json!(42));
201
202 let result = strip_proto_fields(&metadata);
203
204 assert_eq!(result.len(), 2);
205 assert!(result.contains_key("event_name"));
206 assert!(result.contains_key("count"));
207 }
208
209 #[test]
210 fn test_strip_proto_fields_removes_proto() {
211 let mut metadata: std::collections::HashMap<String, serde_json::Value> =
212 std::collections::HashMap::new();
213 metadata.insert("event_name".to_string(), serde_json::json!("test"));
214 metadata.insert("_PROTO_PII".to_string(), serde_json::json!("sensitive"));
215
216 let result = strip_proto_fields(&metadata);
217
218 assert_eq!(result.len(), 1);
219 assert!(result.contains_key("event_name"));
220 assert!(!result.contains_key("_PROTO_PII"));
221 }
222
223 #[test]
224 fn test_attach_analytics_sink_idempotent() {
225 struct TestSink;
226 impl AnalyticsSink for TestSink {
227 fn log_event(&self, _event_name: &str, _metadata: &LogEventMetadata) {}
228 fn log_event_async(
229 &self,
230 _event_name: &str,
231 _metadata: &LogEventMetadata,
232 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
233 Box::pin(async {})
234 }
235 }
236
237 let sink1 = std::sync::Arc::new(TestSink);
238 let sink2 = std::sync::Arc::new(TestSink);
239
240 let result1 = attach_analytics_sink(sink1);
241 let result2 = attach_analytics_sink(sink2);
242
243 assert!(result1);
244 assert!(!result2); }
246}