statsig_rdp/
client.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex},
4    time::SystemTime,
5};
6
7use anyhow::{anyhow, bail, Result};
8use serde::de::DeserializeOwned;
9use tokio::{time, time::Duration};
10use tracing::{event, Level};
11
12use crate::{
13    evaluator::{models::EvalResult, Evaluator},
14    http::StatsigHttpClient,
15    models::{StatsigConfig, StatsigEvent, StatsigOptions, StatsigPost, StatsigUser},
16};
17
18const GATE_EXPOSURE_EVENT: &str = "statsig::gate_exposure";
19const CONFIG_EXPOSURE_EVENT: &str = "statsig::config_exposure";
20const MAX_LOG_EVENTS: usize = 950;
21
22/// Statsig client that has a local cache and syncs with the API periodically.
23pub struct Client {
24    disable_cache: bool,
25    http_client: StatsigHttpClient,
26    evaluator: Evaluator,
27    event_logs: Mutex<Vec<StatsigEvent>>,
28}
29
30impl Client {
31    pub async fn new(api_key: String, options: StatsigOptions) -> Result<Arc<Self>> {
32        let http_client = StatsigHttpClient::new(
33            api_key,
34            options.api_url,
35            options.cdn_url,
36            options.events_url,
37        );
38
39        let evaluator = Evaluator::new();
40        if !options.disable_cache {
41            let initial_data = http_client.fetch_state_from_source().await?;
42            evaluator.refresh_configs(initial_data);
43        }
44
45        let s = Arc::new(Self {
46            disable_cache: options.disable_cache,
47            evaluator,
48            http_client,
49            event_logs: Mutex::new(vec![]),
50        });
51
52        if !options.disable_cache {
53            tokio::spawn(s.clone().poll_for_changes(options.config_sync_interval));
54            tokio::spawn(s.clone().background_logs_flush());
55        }
56
57        Ok(s)
58    }
59
60    pub async fn check_gate(self: Arc<Self>, gate: String, user: StatsigUser) -> Result<bool> {
61        if user.user_id.is_empty() {
62            bail!("statsig: missing user id");
63        }
64
65        if self.disable_cache {
66            return self.http_client.check_gate(gate, user).await;
67        }
68
69        let res = self.evaluator.check_gate_internal(&user, &gate);
70        if res.fetch_from_server {
71            self.http_client.check_gate(gate, user).await
72        } else {
73            let pass = res.pass;
74            self.log_gate_exposure(gate, user, res);
75            Ok(pass)
76        }
77    }
78
79    pub async fn get_dynamic_config<T: DeserializeOwned>(
80        self: Arc<Self>,
81        config: String,
82        user: StatsigUser,
83    ) -> Result<T> {
84        if user.user_id.is_empty() {
85            bail!("statsig: missing user id");
86        }
87
88        if self.disable_cache {
89            return self.http_client.get_dynamic_config(config, user).await;
90        }
91
92        let mut res = self.evaluator.get_dynamic_config_internal(&user, &config);
93        if res.fetch_from_server {
94            self.http_client.get_dynamic_config(config, user).await
95        } else {
96            let val = res.config_value.take();
97            self.log_config_exposure(config, user, res);
98            let val = val.ok_or_else(|| anyhow!("empty config"))?;
99            Ok(serde_json::from_value(val)?)
100        }
101    }
102
103    /// Returns the value, together with the metadata about the group that matched the check
104    pub async fn get_config<T: DeserializeOwned>(
105        self: Arc<Self>,
106        config: String,
107        user: StatsigUser,
108    ) -> Result<StatsigConfig<T>> {
109        if user.user_id.is_empty() {
110            bail!("statsig: missing user id");
111        }
112
113        if self.disable_cache {
114            return self.http_client.get_config(config, user).await;
115        }
116
117        let res = self.evaluator.get_dynamic_config_internal(&user, &config);
118        if res.fetch_from_server {
119            self.http_client.get_config(config, user).await
120        } else {
121            let value: Option<T> = serde_json::from_value(
122                res.config_value.clone().unwrap_or(serde_json::Value::Null),
123            )?;
124
125            let val = StatsigConfig {
126                value,
127                name: config.clone(),
128                group_name: res.group_name.clone(),
129                rule_id: res.rule_id.clone(),
130                group: res.group.clone(),
131            };
132
133            self.log_config_exposure(config, user, res);
134
135            Ok(val)
136        }
137    }
138
139    pub async fn log_event(&self, statsig_post: &StatsigPost) -> Result<()> {
140        self.http_client.log_event(statsig_post).await
141    }
142}
143
144// Private methods
145impl Client {
146    async fn poll_for_changes(self: Arc<Self>, config_sync_interval: Option<Duration>) {
147        let mut interval =
148            time::interval(config_sync_interval.unwrap_or_else(|| Duration::from_secs(20)));
149        loop {
150            interval.tick().await;
151            event!(Level::DEBUG, "Refreshing statsig configs");
152            let new_state = match self.http_client.fetch_state_from_source().await {
153                Ok(s) => s,
154                Err(e) => {
155                    event!(Level::ERROR, "Failed to fetch state: {}", e);
156                    continue;
157                }
158            };
159            if new_state.has_updates {
160                event!(Level::DEBUG, "Statsig state has changed");
161                self.evaluator.refresh_configs(new_state);
162            }
163        }
164    }
165
166    async fn background_logs_flush(self: Arc<Self>) {
167        let mut interval = time::interval(Duration::from_secs(60));
168        loop {
169            // TODO: Graceful shutdown, listen for signals, flush logs before exiting
170            interval.tick().await;
171            event!(Level::DEBUG, "Flushing logs");
172
173            self.clone().flush_logs().await;
174        }
175    }
176
177    async fn flush_logs(self: Arc<Self>) {
178        let events;
179        {
180            let mut logs = self
181                .event_logs
182                .lock()
183                .expect("should always be able to acquire lock");
184            events = std::mem::take(&mut *logs);
185        }
186
187        if !events.is_empty() {
188            match self
189                .http_client
190                .log_event_internal(StatsigPost { events })
191                .await
192            {
193                Ok(_) => (),
194                Err(e) => {
195                    event!(Level::ERROR, "Failed to log events: {}", e);
196                }
197            };
198        }
199    }
200
201    fn log_gate_exposure(
202        self: Arc<Self>,
203        gate: String,
204        user: StatsigUser,
205        eval_result: EvalResult,
206    ) {
207        let event = StatsigEvent {
208            event_name: GATE_EXPOSURE_EVENT.to_string(),
209            value: eval_result.pass.to_string(),
210            time: SystemTime::now()
211                .duration_since(SystemTime::UNIX_EPOCH)
212                .unwrap_or_else(|_| Duration::from_secs(0))
213                .as_secs()
214                .to_string(),
215            user,
216            metadata: HashMap::from([
217                ("gate".to_string(), gate),
218                ("gateValue".to_string(), eval_result.pass.to_string()),
219                ("ruleID".to_string(), eval_result.id),
220            ]),
221        };
222        let mut events = self
223            .event_logs
224            .lock()
225            .expect("should always be able to acquire lock");
226        events.push(event);
227        if events.len() >= MAX_LOG_EVENTS {
228            tokio::spawn(self.clone().flush_logs());
229        }
230    }
231
232    fn log_config_exposure(
233        self: Arc<Self>,
234        config: String,
235        user: StatsigUser,
236        eval_result: EvalResult,
237    ) {
238        let event = StatsigEvent {
239            event_name: CONFIG_EXPOSURE_EVENT.to_string(),
240            value: eval_result.pass.to_string(),
241            time: SystemTime::now()
242                .duration_since(SystemTime::UNIX_EPOCH)
243                .unwrap_or_else(|_| Duration::from_secs(0))
244                .as_secs()
245                .to_string(),
246            user,
247            metadata: HashMap::from([
248                ("config".to_string(), config),
249                ("ruleID".to_string(), eval_result.id),
250            ]),
251        };
252        let mut events = self
253            .event_logs
254            .lock()
255            .expect("should always be able to acquire lock");
256        events.push(event);
257        if events.len() >= MAX_LOG_EVENTS {
258            tokio::spawn(self.clone().flush_logs());
259        }
260    }
261}