1#![allow(dead_code)]
2#![allow(non_snake_case)]
3#![allow(clippy::needless_return)]
4#![allow(clippy::unnecessary_lazy_evaluations)]
5#![allow(clippy::new_without_default)]
6#![allow(clippy::derivable_impls)]
7#![allow(clippy::drain_collect)]
8#![allow(clippy::collapsible_match)]
9#![allow(clippy::collapsible_if)]
10
11pub mod api;
37pub mod batch;
38pub mod cache_metrics;
39pub mod config;
40pub mod error;
41pub mod events;
42pub mod response;
43mod transport;
44pub mod user;
45
46use std::collections::HashMap;
47use std::hash::Hash;
48
49use moka::future::Cache;
50use serde_json::Value;
51use tokio::sync::{mpsc, oneshot};
52
53pub use api::{ConfigEvaluationResult, GateEvaluationResult, StatsigMetadata};
54pub use batch::{BatchProcessor, BatchRequest};
55pub use cache_metrics::{CacheMetrics, CacheMetricsSummary};
56pub use config::StatsigClientConfig;
57pub use error::{Result, StatsigError};
58pub use events::{
59 ExposureEventMetadata, LogEventResponse, StatsigEvent, StatsigEventTime, StatsigEventValue,
60};
61pub use response::ApiResponseHandler;
62pub use user::{EnvironmentTier, StatsigEnvironment, User, UserBuilder};
63
64#[derive(Debug)]
79pub struct StatsigClient {
80 config: StatsigClientConfig,
81 transport: transport::StatsigTransport,
82 cache: Cache<CacheKey, CachedEvaluation>,
83 cache_metrics: CacheMetrics,
84 batch_sender: mpsc::Sender<BatchRequest>,
85 _shutdown_tx: tokio::sync::broadcast::Sender<()>,
86}
87
88#[derive(Debug, Clone, Hash, Eq, PartialEq)]
89struct CacheKey {
90 entity_type: EntityType,
91 entity_name: String,
92 user_hash: String,
93}
94
95#[derive(Debug, Clone, Hash, Eq, PartialEq)]
96enum EntityType {
97 Gate,
98 Config,
99}
100
101#[derive(Debug, Clone)]
102struct CachedEvaluation {
103 result: EvaluationResult,
104 timestamp: std::time::Instant,
105}
106
107#[derive(Debug, Clone)]
108enum EvaluationResult {
109 Gate(GateEvaluationResult),
110 Config(ConfigEvaluationResult),
111}
112
113impl StatsigClient {
114 pub async fn new(api_key: impl Into<String>) -> Result<Self> {
125 let config = StatsigClientConfig::new(api_key)?;
126 Self::with_config(config).await
127 }
128
129 pub async fn with_config(config: StatsigClientConfig) -> Result<Self> {
140 config.validate()?;
141
142 let transport = transport::StatsigTransport::new(&config)?;
143
144 let cache = Cache::builder()
145 .time_to_live(config.cache_ttl)
146 .max_capacity(config.cache_max_capacity)
147 .build();
148
149 let (batch_sender, batch_receiver) = mpsc::channel(1000);
150 let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);
151
152 let batch_processor = BatchProcessor::new(batch_receiver, shutdown_tx.subscribe());
153 let _handle = tokio::spawn(batch_processor.run(transport.clone(), config.clone()));
154
155 Ok(Self {
156 config,
157 transport,
158 cache,
159 cache_metrics: CacheMetrics::new(),
160 batch_sender,
161 _shutdown_tx: shutdown_tx,
162 })
163 }
164
165 pub async fn log_event(&self, event_name: impl Into<String>, user: &User) -> Result<bool> {
166 let event = StatsigEvent::builder()
167 .event_name(event_name.into())
168 .time(StatsigEventTime::UnixMillis(now_ms()))
169 .build();
170
171 Ok(self.log_events(vec![event], user).await?.success)
172 }
173
174 pub async fn log_events(
175 &self,
176 events: Vec<StatsigEvent>,
177 user: &User,
178 ) -> Result<LogEventResponse> {
179 if events.is_empty() {
180 return Err(StatsigError::validation(
181 "events must contain at least 1 item",
182 ));
183 }
184
185 user.validate_user()
186 .map_err(|e| e.with_context("User validation failed"))?;
187
188 self.transport.log_events(user, &events).await
189 }
190
191 pub async fn check_gate(&self, gate_name: impl Into<String>, user: &User) -> Result<bool> {
218 let gate_name = gate_name.into();
219 let results = self.check_gates(vec![gate_name], user).await?;
220 Ok(results.into_values().next().unwrap_or(false))
221 }
222
223 pub async fn check_gates(
239 &self,
240 gate_names: Vec<String>,
241 user: &User,
242 ) -> Result<HashMap<String, bool>> {
243 if gate_names.is_empty() {
244 return Ok(HashMap::new());
245 }
246
247 for gate_name in &gate_names {
248 validate_entity_name("gate", gate_name)?;
249 }
250
251 user.validate_user()
252 .map_err(|e| e.with_context("User validation failed"))?;
253
254 let mut results = HashMap::new();
255 let mut missing_gates = Vec::new();
256
257 for gate_name in &gate_names {
259 let cache_key = self.create_cache_key(EntityType::Gate, gate_name, user);
260 if let Some(cached) = self.cache.get(&cache_key).await {
261 self.cache_metrics.record_hit();
262 if let EvaluationResult::Gate(gate_result) = cached.result {
263 results.insert(gate_name.clone(), gate_result.value);
264 }
265 } else {
266 self.cache_metrics.record_miss();
267 missing_gates.push(gate_name.clone());
268 }
269 }
270
271 if missing_gates.is_empty() {
272 return Ok(results);
273 }
274
275 let gate_results = self.fetch_gates_batch(missing_gates, user).await?;
277
278 for gate_result in gate_results {
279 let cache_key = self.create_cache_key(EntityType::Gate, &gate_result.name, user);
280 let cached = CachedEvaluation {
281 result: EvaluationResult::Gate(gate_result.clone()),
282 timestamp: std::time::Instant::now(),
283 };
284 self.cache_metrics.record_insert();
285 self.cache.insert(cache_key, cached).await;
286 results.insert(gate_result.name, gate_result.value);
287 }
288
289 Ok(results)
290 }
291
292 pub async fn get_config(&self, config_name: impl Into<String>, user: &User) -> Result<Value> {
309 let config_name = config_name.into();
310 let results = self.get_configs(vec![config_name], user).await?;
311 Ok(results.into_values().next().unwrap_or(Value::Null))
312 }
313
314 pub async fn get_config_evaluation(
318 &self,
319 config_name: impl Into<String>,
320 user: &User,
321 ) -> Result<ConfigEvaluationResult> {
322 let config_name = config_name.into();
323 let mut results = self
324 .get_config_evaluations(vec![config_name.clone()], user)
325 .await?;
326 results
327 .remove(&config_name)
328 .ok_or_else(|| StatsigError::internal("Missing config evaluation in response"))
329 }
330
331 pub async fn get_configs(
347 &self,
348 config_names: Vec<String>,
349 user: &User,
350 ) -> Result<HashMap<String, Value>> {
351 let evaluations = self.get_config_evaluations(config_names, user).await?;
352 Ok(evaluations.into_iter().map(|(k, v)| (k, v.value)).collect())
353 }
354
355 pub async fn get_config_evaluations(
359 &self,
360 config_names: Vec<String>,
361 user: &User,
362 ) -> Result<HashMap<String, ConfigEvaluationResult>> {
363 if config_names.is_empty() {
364 return Ok(HashMap::new());
365 }
366
367 for config_name in &config_names {
368 validate_entity_name("config", config_name)?;
369 }
370
371 user.validate_user()
372 .map_err(|e| e.with_context("User validation failed"))?;
373
374 let mut results = HashMap::new();
375 let mut missing_configs = Vec::new();
376
377 for config_name in &config_names {
379 let cache_key = self.create_cache_key(EntityType::Config, config_name, user);
380 if let Some(cached) = self.cache.get(&cache_key).await {
381 self.cache_metrics.record_hit();
382 if let EvaluationResult::Config(config_result) = cached.result {
383 results.insert(config_name.clone(), config_result);
384 }
385 } else {
386 self.cache_metrics.record_miss();
387 missing_configs.push(config_name.clone());
388 }
389 }
390
391 if missing_configs.is_empty() {
392 return Ok(results);
393 }
394
395 let config_results = self.fetch_configs_batch(missing_configs, user).await?;
397
398 for config_result in config_results {
399 let cache_key = self.create_cache_key(EntityType::Config, &config_result.name, user);
400 let cached = CachedEvaluation {
401 result: EvaluationResult::Config(config_result.clone()),
402 timestamp: std::time::Instant::now(),
403 };
404 self.cache_metrics.record_insert();
405 self.cache.insert(cache_key, cached).await;
406 results.insert(config_result.name.clone(), config_result);
407 }
408
409 Ok(results)
410 }
411
412 fn create_cache_key(
413 &self,
414 entity_type: EntityType,
415 entity_name: &str,
416 user: &User,
417 ) -> CacheKey {
418 let user_hash = user.hash_for_cache();
419 CacheKey {
420 entity_type,
421 entity_name: entity_name.to_string(),
422 user_hash,
423 }
424 }
425
426 async fn fetch_gates_batch(
427 &self,
428 gate_names: Vec<String>,
429 user: &User,
430 ) -> Result<Vec<GateEvaluationResult>> {
431 let (response_tx, response_rx) = oneshot::channel();
432
433 let request = BatchRequest::CheckGates {
434 gate_names,
435 user: user.clone(),
436 response_tx,
437 };
438
439 self.batch_sender
440 .send(request)
441 .await
442 .map_err(|_| StatsigError::batch_processor("Batch processor channel closed"))?;
443
444 response_rx
445 .await
446 .map_err(|_| StatsigError::batch_processor("Batch processor response channel closed"))?
447 }
448
449 async fn fetch_configs_batch(
450 &self,
451 config_names: Vec<String>,
452 user: &User,
453 ) -> Result<Vec<ConfigEvaluationResult>> {
454 let (response_tx, response_rx) = oneshot::channel();
455
456 let request = BatchRequest::GetConfigs {
457 config_names,
458 user: user.clone(),
459 response_tx,
460 };
461
462 self.batch_sender
463 .send(request)
464 .await
465 .map_err(|_| StatsigError::batch_processor("Batch processor channel closed"))?;
466
467 response_rx
468 .await
469 .map_err(|_| StatsigError::batch_processor("Batch processor response channel closed"))?
470 }
471
472 pub fn cache_metrics(&self) -> CacheMetricsSummary {
480 self.cache_metrics.summary()
481 }
482
483 pub fn reset_cache_metrics(&self) {
488 self.cache_metrics.reset();
489 }
490}
491
492fn validate_entity_name(kind: &str, name: &str) -> Result<()> {
493 let len = name.chars().count();
494 if !(2..=100).contains(&len) {
495 return Err(StatsigError::validation(format!(
496 "{} name must be between 2 and 100 characters",
497 kind
498 )));
499 }
500 Ok(())
501}
502
503fn now_ms() -> i64 {
504 std::time::SystemTime::now()
505 .duration_since(std::time::UNIX_EPOCH)
506 .unwrap_or_default()
507 .as_millis() as i64
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[tokio::test]
515 async fn test_client_creation() {
516 let _client = StatsigClient::new("test_key").await.unwrap();
517 }
519
520 #[tokio::test]
521 async fn test_user_builder() {
522 let user = User::builder()
523 .user_id("test_user")
524 .email("test@example.com")
525 .country("US")
526 .build()
527 .unwrap();
528
529 assert_eq!(user.user_id, Some("test_user".to_string()));
530 assert_eq!(user.email, Some("test@example.com".to_string()));
531 assert_eq!(user.country, Some("US".to_string()));
532 }
533
534 #[tokio::test]
535 #[ignore = "Network integration test (requires Statsig API access)"]
536 async fn test_demo_gate() {
537 let _client = StatsigClient::new("client-PxavfBEvcE6M449BEtJyQe883t2StBbxwFCMpAuBnI")
538 .await
539 .unwrap();
540 let user = User::builder().user_id("test_user").build().unwrap();
541 let result = _client.check_gate("demo-gate", &user).await;
542 println!("Demo gate result: {:?}", result);
543 assert!(result.is_ok());
544 }
545}