1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex},
4 time::SystemTime,
5};
6
7use anyhow::{anyhow, bail, Result};
8use serde::de::DeserializeOwned;
9use tokio::{time, time::Duration};
10use tracing::{event, Level};
11
12use crate::{
13 evaluator::{models::EvalResult, Evaluator},
14 http::StatsigHttpClient,
15 models::{StatsigConfig, StatsigEvent, StatsigOptions, StatsigPost, StatsigUser},
16};
17
18const GATE_EXPOSURE_EVENT: &str = "statsig::gate_exposure";
19const CONFIG_EXPOSURE_EVENT: &str = "statsig::config_exposure";
20const MAX_LOG_EVENTS: usize = 950;
21
22pub struct Client {
24 disable_cache: bool,
25 http_client: StatsigHttpClient,
26 evaluator: Evaluator,
27 event_logs: Mutex<Vec<StatsigEvent>>,
28}
29
30impl Client {
31 pub async fn new(api_key: String, options: StatsigOptions) -> Result<Arc<Self>> {
32 let http_client = StatsigHttpClient::new(
33 api_key,
34 options.api_url,
35 options.cdn_url,
36 options.events_url,
37 );
38
39 let evaluator = Evaluator::new();
40 if !options.disable_cache {
41 let initial_data = http_client.fetch_state_from_source().await?;
42 evaluator.refresh_configs(initial_data);
43 }
44
45 let s = Arc::new(Self {
46 disable_cache: options.disable_cache,
47 evaluator,
48 http_client,
49 event_logs: Mutex::new(vec![]),
50 });
51
52 if !options.disable_cache {
53 tokio::spawn(s.clone().poll_for_changes(options.config_sync_interval));
54 tokio::spawn(s.clone().background_logs_flush());
55 }
56
57 Ok(s)
58 }
59
60 pub async fn check_gate(self: Arc<Self>, gate: String, user: StatsigUser) -> Result<bool> {
61 if user.user_id.is_empty() {
62 bail!("statsig: missing user id");
63 }
64
65 if self.disable_cache {
66 return self.http_client.check_gate(gate, user).await;
67 }
68
69 let res = self.evaluator.check_gate_internal(&user, &gate);
70 if res.fetch_from_server {
71 self.http_client.check_gate(gate, user).await
72 } else {
73 let pass = res.pass;
74 self.log_gate_exposure(gate, user, res);
75 Ok(pass)
76 }
77 }
78
79 pub async fn get_dynamic_config<T: DeserializeOwned>(
80 self: Arc<Self>,
81 config: String,
82 user: StatsigUser,
83 ) -> Result<T> {
84 if user.user_id.is_empty() {
85 bail!("statsig: missing user id");
86 }
87
88 if self.disable_cache {
89 return self.http_client.get_dynamic_config(config, user).await;
90 }
91
92 let mut res = self.evaluator.get_dynamic_config_internal(&user, &config);
93 if res.fetch_from_server {
94 self.http_client.get_dynamic_config(config, user).await
95 } else {
96 let val = res.config_value.take();
97 self.log_config_exposure(config, user, res);
98 let val = val.ok_or_else(|| anyhow!("empty config"))?;
99 Ok(serde_json::from_value(val)?)
100 }
101 }
102
103 pub async fn get_config<T: DeserializeOwned>(
105 self: Arc<Self>,
106 config: String,
107 user: StatsigUser,
108 ) -> Result<StatsigConfig<T>> {
109 if user.user_id.is_empty() {
110 bail!("statsig: missing user id");
111 }
112
113 if self.disable_cache {
114 return self.http_client.get_config(config, user).await;
115 }
116
117 let res = self.evaluator.get_dynamic_config_internal(&user, &config);
118 if res.fetch_from_server {
119 self.http_client.get_config(config, user).await
120 } else {
121 let value: Option<T> = serde_json::from_value(
122 res.config_value.clone().unwrap_or(serde_json::Value::Null),
123 )?;
124
125 let val = StatsigConfig {
126 value,
127 name: config.clone(),
128 group_name: res.group_name.clone(),
129 rule_id: res.rule_id.clone(),
130 group: res.group.clone(),
131 };
132
133 self.log_config_exposure(config, user, res);
134
135 Ok(val)
136 }
137 }
138
139 pub async fn log_event(&self, statsig_post: &StatsigPost) -> Result<()> {
140 self.http_client.log_event(statsig_post).await
141 }
142}
143
144impl Client {
146 async fn poll_for_changes(self: Arc<Self>, config_sync_interval: Option<Duration>) {
147 let mut interval =
148 time::interval(config_sync_interval.unwrap_or_else(|| Duration::from_secs(20)));
149 loop {
150 interval.tick().await;
151 event!(Level::DEBUG, "Refreshing statsig configs");
152 let new_state = match self.http_client.fetch_state_from_source().await {
153 Ok(s) => s,
154 Err(e) => {
155 event!(Level::ERROR, "Failed to fetch state: {}", e);
156 continue;
157 }
158 };
159 if new_state.has_updates {
160 event!(Level::DEBUG, "Statsig state has changed");
161 self.evaluator.refresh_configs(new_state);
162 }
163 }
164 }
165
166 async fn background_logs_flush(self: Arc<Self>) {
167 let mut interval = time::interval(Duration::from_secs(60));
168 loop {
169 interval.tick().await;
171 event!(Level::DEBUG, "Flushing logs");
172
173 self.clone().flush_logs().await;
174 }
175 }
176
177 async fn flush_logs(self: Arc<Self>) {
178 let events;
179 {
180 let mut logs = self
181 .event_logs
182 .lock()
183 .expect("should always be able to acquire lock");
184 events = std::mem::take(&mut *logs);
185 }
186
187 if !events.is_empty() {
188 match self
189 .http_client
190 .log_event_internal(StatsigPost { events })
191 .await
192 {
193 Ok(_) => (),
194 Err(e) => {
195 event!(Level::ERROR, "Failed to log events: {}", e);
196 }
197 };
198 }
199 }
200
201 fn log_gate_exposure(
202 self: Arc<Self>,
203 gate: String,
204 user: StatsigUser,
205 eval_result: EvalResult,
206 ) {
207 let event = StatsigEvent {
208 event_name: GATE_EXPOSURE_EVENT.to_string(),
209 value: eval_result.pass.to_string(),
210 time: SystemTime::now()
211 .duration_since(SystemTime::UNIX_EPOCH)
212 .unwrap_or_else(|_| Duration::from_secs(0))
213 .as_secs()
214 .to_string(),
215 user,
216 metadata: HashMap::from([
217 ("gate".to_string(), gate),
218 ("gateValue".to_string(), eval_result.pass.to_string()),
219 ("ruleID".to_string(), eval_result.id),
220 ]),
221 };
222 let mut events = self
223 .event_logs
224 .lock()
225 .expect("should always be able to acquire lock");
226 events.push(event);
227 if events.len() >= MAX_LOG_EVENTS {
228 tokio::spawn(self.clone().flush_logs());
229 }
230 }
231
232 fn log_config_exposure(
233 self: Arc<Self>,
234 config: String,
235 user: StatsigUser,
236 eval_result: EvalResult,
237 ) {
238 let event = StatsigEvent {
239 event_name: CONFIG_EXPOSURE_EVENT.to_string(),
240 value: eval_result.pass.to_string(),
241 time: SystemTime::now()
242 .duration_since(SystemTime::UNIX_EPOCH)
243 .unwrap_or_else(|_| Duration::from_secs(0))
244 .as_secs()
245 .to_string(),
246 user,
247 metadata: HashMap::from([
248 ("config".to_string(), config),
249 ("ruleID".to_string(), eval_result.id),
250 ]),
251 };
252 let mut events = self
253 .event_logs
254 .lock()
255 .expect("should always be able to acquire lock");
256 events.push(event);
257 if events.len() >= MAX_LOG_EVENTS {
258 tokio::spawn(self.clone().flush_logs());
259 }
260 }
261}