statsig_rust/event_logging/
exposure_sampling.rs

1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3    evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4    global_configs::GlobalConfigs,
5    log_d, log_e,
6    user::user_data::UserData,
7    write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use parking_lot::RwLock;
12use std::sync::{
13    atomic::{AtomicU64, Ordering},
14    Arc,
15};
16
17const TAG: &str = "ExposureSampling";
18const SAMPLING_TTL_MS: u64 = 60_000;
19const SAMPLING_MAX_KEYS: usize = 100_000;
20
21#[derive(Debug)]
22pub enum EvtSamplingMode {
23    On,
24    Shadow,
25}
26
27#[derive(Debug)]
28pub enum EvtSamplingDecision {
29    Deduped,
30    NotSampled,
31    ForceSampled,
32    Sampled(Option<u64>, EvtSamplingMode, bool),
33}
34
35impl EvtSamplingDecision {
36    pub fn should_log(&self) -> bool {
37        match self {
38            EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
39            EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
40        }
41    }
42}
43
44type SpecAndRuleHashTuple = (u64, u64);
45pub struct ExposureSampling {
46    spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
47    last_spec_sampling_reset: AtomicU64,
48
49    exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
50    last_exposure_dedupe_reset: AtomicU64,
51
52    global_configs: Arc<GlobalConfigs>,
53}
54
55impl ExposureSampling {
56    pub fn new(sdk_key: &str) -> Self {
57        let now = Utc::now().timestamp_millis() as u64;
58
59        Self {
60            spec_sampling_set: RwLock::from(AHashSet::default()),
61            last_spec_sampling_reset: AtomicU64::from(now),
62
63            exposure_dedupe_set: RwLock::from(AHashSet::default()),
64            last_exposure_dedupe_reset: AtomicU64::from(now),
65
66            global_configs: GlobalConfigs::get_instance(sdk_key),
67        }
68    }
69
70    pub fn get_sampling_decision(&self, payload: &impl EnqueueOperation) -> EvtSamplingDecision {
71        let exposure = match payload.as_exposure() {
72            Some(exposure) => exposure,
73            None => return EvtSamplingDecision::ForceSampled,
74        };
75
76        let expo_sampling_key = exposure.create_exposure_sampling_key();
77        if self.should_dedupe_exposure(&expo_sampling_key) {
78            return EvtSamplingDecision::Deduped;
79        }
80
81        let sampling_mode = match self.global_configs.get_sampling_mode() {
82            Some(sampling_mode) => sampling_mode,
83            None => return EvtSamplingDecision::ForceSampled,
84        };
85
86        let extra_info = exposure.get_extra_exposure_info_ref();
87        if self.should_sample_based_on_evaluation(extra_info) {
88            return EvtSamplingDecision::ForceSampled;
89        }
90
91        if self.should_sample_first_time_exposure(&expo_sampling_key) {
92            return EvtSamplingDecision::ForceSampled;
93        }
94
95        let sampling_rate = self
96            .get_special_case_sampling_rate(exposure)
97            .or_else(|| extra_info.and_then(|info| info.sampling_rate));
98
99        let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
100
101        match sampling_mode {
102            EvtSamplingMode::On if is_sampled => {
103                EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
104            }
105            EvtSamplingMode::Shadow => {
106                EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
107            }
108            _ => EvtSamplingDecision::NotSampled,
109        }
110    }
111
112    pub fn try_reset_all_sampling(&self) {
113        self.try_reset_exposure_dedupe_set();
114        self.try_reset_spec_sampling_set();
115    }
116
117    fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
118        let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
119        if dedupe_set.contains(sampling_key) {
120            return true;
121        }
122
123        dedupe_set.insert(sampling_key.clone());
124        false
125    }
126
127    fn should_sample_based_on_evaluation(&self, extra_info: Option<&ExtraExposureInfo>) -> bool {
128        let exposure_info = match extra_info {
129            Some(exposure_info) => exposure_info,
130            None => return false,
131        };
132
133        if exposure_info.forward_all_exposures == Some(true) {
134            return true;
135        }
136
137        if exposure_info.has_seen_analytical_gates == Some(true) {
138            return true;
139        }
140
141        false
142    }
143
144    fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
145        let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
146        if self.sample_key_exists(&sampling_key) {
147            return false;
148        }
149
150        match self
151            .spec_sampling_set
152            .try_write_for(std::time::Duration::from_secs(5))
153        {
154            Some(mut sampling_map) => {
155                sampling_map.insert(sampling_key);
156            }
157            None => {
158                log_e!(TAG, "Failed to acquire write lock for spec sampling set");
159            }
160        }
161
162        true
163    }
164
165    fn try_reset_spec_sampling_set(&self) {
166        let now = Utc::now().timestamp_millis() as u64;
167        let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
168        let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
169
170        let has_expired = now - last_sampling_reset > SAMPLING_TTL_MS;
171        let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
172
173        if has_expired || is_full {
174            log_d!(
175                TAG,
176                "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
177                has_expired,
178                is_full
179            );
180            sampling_map.clear();
181            self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
182        }
183    }
184
185    fn try_reset_exposure_dedupe_set(&self) {
186        let now = Utc::now().timestamp_millis() as u64;
187        let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
188        let mut dedupe_map = match self
189            .exposure_dedupe_set
190            .try_write_for(std::time::Duration::from_secs(5))
191        {
192            Some(map) => map,
193            None => {
194                log_e!(TAG, "Failed to acquire write lock for exposure dedupe set");
195                return;
196            }
197        };
198
199        let has_expired = now - last_dedupe_reset > SAMPLING_TTL_MS;
200        let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
201
202        if has_expired || is_full {
203            log_d!(
204                TAG,
205                "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
206                has_expired,
207                is_full
208            );
209            dedupe_map.clear();
210            self.last_exposure_dedupe_reset
211                .store(now, Ordering::Relaxed);
212        }
213    }
214
215    fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
216        match self
217            .spec_sampling_set
218            .try_read_for(std::time::Duration::from_secs(5))
219        {
220            Some(map) => map.contains(key),
221            None => false,
222        }
223    }
224
225    fn get_special_case_sampling_rate<'a>(
226        &self,
227        exposure: &'a impl QueuedExposure<'a>,
228    ) -> Option<u64> {
229        let rule_id = exposure.get_rule_id_ref();
230        match rule_id {
231            "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
232            _ => None,
233        }
234    }
235}
236
237impl GlobalConfigs {
238    fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
239        fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
240            let v = sampling_mode?.string_value.as_ref()?.value.as_str();
241
242            match v {
243                "on" => Some(EvtSamplingMode::On),
244                "shadow" => Some(EvtSamplingMode::Shadow),
245                _ => None,
246            }
247        }
248
249        self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
250    }
251
252    fn get_special_case_sampling_rate(&self) -> Option<u64> {
253        fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
254            match value {
255                Some(value) => value.float_value.map(|rate| rate as u64),
256                None => None,
257            }
258        }
259
260        self.use_sdk_config_value(
261            "special_case_sampling_rate",
262            parse_special_case_sampling_rate,
263        )
264    }
265}
266
267#[derive(Debug, PartialEq, Eq, Hash, Clone)]
268pub struct ExposureSamplingKey {
269    pub spec_name_hash: u64,
270    pub rule_id_hash: u64,
271    pub user_values_hash: u64,
272    pub additional_hash: u64,
273}
274
275impl ExposureSamplingKey {
276    pub fn new(evaluation: Option<&BaseEvaluation>, user: &UserData, additional_hash: u64) -> Self {
277        let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash);
278        let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash);
279
280        let user_values_hash = user.create_user_values_hash();
281
282        Self {
283            spec_name_hash,
284            rule_id_hash,
285            user_values_hash,
286            additional_hash,
287        }
288    }
289
290    pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
291        let sampling_rate = match sampling_rate {
292            Some(rate) => rate,
293            None => return true, // without a sampling rate, we should sample
294        };
295
296        let final_hash =
297            self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
298
299        final_hash.is_multiple_of(sampling_rate)
300    }
301}