pub mod config;
pub mod datadog;
pub mod first_party_event_logger;
pub mod first_party_event_logging_exporter;
pub mod growthbook;
pub mod metadata;
pub mod sink;
pub mod sink_killswitch;
pub use config::*;
pub use datadog::*;
pub use first_party_event_logger::*;
pub use first_party_event_logging_exporter::*;
pub use growthbook::*;
pub use metadata::*;
pub use sink::*;
pub use sink_killswitch::*;
pub type AnalyticsMetadataVerified = ();
pub type AnalyticsMetadataPiiTagged = ();
pub type LogEventMetadata = std::collections::HashMap<String, serde_json::Value>;
#[derive(Debug, Clone)]
struct QueuedEvent {
event_name: String,
metadata: LogEventMetadata,
is_async: bool,
}
pub trait AnalyticsSink: Send + Sync {
fn log_event(&self, event_name: &str, metadata: &LogEventMetadata);
fn log_event_async(
&self,
event_name: &str,
metadata: &LogEventMetadata,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>>;
}
pub fn strip_proto_fields<V: Clone>(
metadata: &std::collections::HashMap<String, V>,
) -> std::collections::HashMap<String, V> {
let mut result: Option<std::collections::HashMap<String, V>> = None;
for key in metadata.keys() {
if key.starts_with("_PROTO_") {
if result.is_none() {
result = Some(metadata.clone());
}
if let Some(ref mut r) = result {
r.remove(key);
}
}
}
result.unwrap_or_else(|| metadata.clone())
}
static EVENT_QUEUE: std::sync::OnceLock<std::sync::Mutex<Vec<QueuedEvent>>> =
std::sync::OnceLock::new();
fn get_event_queue() -> &'static std::sync::Mutex<Vec<QueuedEvent>> {
EVENT_QUEUE.get_or_init(|| std::sync::Mutex::new(Vec::new()))
}
static ANALYTICS_SINK: std::sync::OnceLock<Box<dyn AnalyticsSink>> = std::sync::OnceLock::new();
fn get_sink() -> Option<&'static Box<dyn AnalyticsSink>> {
ANALYTICS_SINK.get()
}
pub fn attach_analytics_sink(new_sink: Box<dyn AnalyticsSink>) -> bool {
if ANALYTICS_SINK.get().is_some() {
return false; }
let _ = ANALYTICS_SINK.set(new_sink);
let queue = get_event_queue();
let mut queued_events = queue.lock().unwrap();
if !queued_events.is_empty() {
let events: Vec<QueuedEvent> = std::mem::take(&mut *queued_events);
if let Some(sink) = get_sink() {
let mut metadata = LogEventMetadata::new();
metadata.insert(
"queued_event_count".to_string(),
serde_json::json!(events.len()),
);
sink.log_event("analytics_sink_attached", &metadata);
}
let sink = ANALYTICS_SINK.get().expect("sink just set");
std::thread::spawn(move || {
for event in events {
if event.is_async {
let metadata = &event.metadata;
sink.log_event(&event.event_name, metadata);
} else {
sink.log_event(&event.event_name, &event.metadata);
}
}
});
}
true
}
pub fn log_event(event_name: &str, metadata: LogEventMetadata) {
if let Some(sink) = get_sink() {
sink.log_event(event_name, &metadata);
} else {
let mut queue = get_event_queue().lock().unwrap();
queue.push(QueuedEvent {
event_name: event_name.to_string(),
metadata,
is_async: false,
});
}
}
pub async fn log_event_async(event_name: &str, metadata: LogEventMetadata) {
if let Some(sink) = get_sink() {
sink.log_event_async(event_name, &metadata).await;
} else {
let mut queue = get_event_queue().lock().unwrap();
queue.push(QueuedEvent {
event_name: event_name.to_string(),
metadata,
is_async: true,
});
}
}
#[cfg(test)]
pub fn reset_for_testing() {
let mut queue = get_event_queue().lock().unwrap();
queue.clear();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_strip_proto_fields_no_change() {
let mut metadata: std::collections::HashMap<String, serde_json::Value> = std::collections::HashMap::new();
metadata.insert("event_name".to_string(), serde_json::json!("test"));
metadata.insert("count".to_string(), serde_json::json!(42));
let result = strip_proto_fields(&metadata);
assert_eq!(result.len(), 2);
assert!(result.contains_key("event_name"));
assert!(result.contains_key("count"));
}
#[test]
fn test_strip_proto_fields_removes_proto() {
let mut metadata: std::collections::HashMap<String, serde_json::Value> = std::collections::HashMap::new();
metadata.insert("event_name".to_string(), serde_json::json!("test"));
metadata.insert("_PROTO_PII".to_string(), serde_json::json!("sensitive"));
let result = strip_proto_fields(&metadata);
assert_eq!(result.len(), 1);
assert!(result.contains_key("event_name"));
assert!(!result.contains_key("_PROTO_PII"));
}
#[test]
fn test_attach_analytics_sink_idempotent() {
struct TestSink;
impl AnalyticsSink for TestSink {
fn log_event(&self, _event_name: &str, _metadata: &LogEventMetadata) {}
fn log_event_async(
&self,
_event_name: &str,
_metadata: &LogEventMetadata,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
let sink1 = Box::new(TestSink);
let sink2 = Box::new(TestSink);
let result1 = attach_analytics_sink(sink1);
let result2 = attach_analytics_sink(sink2);
assert!(result1);
assert!(!result2); }
}