statsig_rust/event_logging/
exposure_sampling.rs

1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3    evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4    global_configs::GlobalConfigs,
5    log_d,
6    user::user_data::UserData,
7    write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use std::sync::{
12    atomic::{AtomicU64, Ordering},
13    Arc, RwLock,
14};
15
16const TAG: &str = "ExposureSampling";
17const SAMPLING_TTL_MS: u64 = 60_000;
18const SAMPLING_MAX_KEYS: usize = 100_000;
19
20#[derive(Debug)]
21pub enum EvtSamplingMode {
22    On,
23    Shadow,
24}
25
26#[derive(Debug)]
27pub enum EvtSamplingDecision {
28    Deduped,
29    NotSampled,
30    ForceSampled,
31    Sampled(Option<u64>, EvtSamplingMode, bool),
32}
33
34impl EvtSamplingDecision {
35    pub fn should_log(&self) -> bool {
36        match self {
37            EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
38            EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
39        }
40    }
41}
42
43type SpecAndRuleHashTuple = (u64, u64);
44pub struct ExposureSampling {
45    spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
46    last_spec_sampling_reset: AtomicU64,
47
48    exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
49    last_exposure_dedupe_reset: AtomicU64,
50
51    global_configs: Arc<GlobalConfigs>,
52}
53
54impl ExposureSampling {
55    pub fn new(sdk_key: &str) -> Self {
56        let now = Utc::now().timestamp_millis() as u64;
57
58        Self {
59            spec_sampling_set: RwLock::from(AHashSet::default()),
60            last_spec_sampling_reset: AtomicU64::from(now),
61
62            exposure_dedupe_set: RwLock::from(AHashSet::default()),
63            last_exposure_dedupe_reset: AtomicU64::from(now),
64
65            global_configs: GlobalConfigs::get_instance(sdk_key),
66        }
67    }
68
69    pub fn get_sampling_decision(&self, payload: &impl EnqueueOperation) -> EvtSamplingDecision {
70        let exposure = match payload.as_exposure() {
71            Some(exposure) => exposure,
72            None => return EvtSamplingDecision::ForceSampled,
73        };
74
75        let expo_sampling_key = exposure.create_exposure_sampling_key();
76        if self.should_dedupe_exposure(&expo_sampling_key) {
77            return EvtSamplingDecision::Deduped;
78        }
79
80        let sampling_mode = match self.global_configs.get_sampling_mode() {
81            Some(sampling_mode) => sampling_mode,
82            None => return EvtSamplingDecision::ForceSampled,
83        };
84
85        let extra_info = exposure.get_extra_exposure_info_ref();
86        if self.should_sample_based_on_evaluation(extra_info) {
87            return EvtSamplingDecision::ForceSampled;
88        }
89
90        if self.should_sample_first_time_exposure(&expo_sampling_key) {
91            return EvtSamplingDecision::ForceSampled;
92        }
93
94        let sampling_rate = self
95            .get_special_case_sampling_rate(exposure)
96            .or_else(|| extra_info.and_then(|info| info.sampling_rate));
97
98        let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
99
100        match sampling_mode {
101            EvtSamplingMode::On if is_sampled => {
102                EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
103            }
104            EvtSamplingMode::Shadow => {
105                EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
106            }
107            _ => EvtSamplingDecision::NotSampled,
108        }
109    }
110
111    pub fn try_reset_all_sampling(&self) {
112        self.try_reset_exposure_dedupe_set();
113        self.try_reset_spec_sampling_set();
114    }
115
116    fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
117        let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
118        if dedupe_set.contains(sampling_key) {
119            return true;
120        }
121
122        dedupe_set.insert(sampling_key.clone());
123        false
124    }
125
126    fn should_sample_based_on_evaluation(&self, extra_info: Option<&ExtraExposureInfo>) -> bool {
127        let exposure_info = match extra_info {
128            Some(exposure_info) => exposure_info,
129            None => return false,
130        };
131
132        if exposure_info.forward_all_exposures == Some(true) {
133            return true;
134        }
135
136        if exposure_info.has_seen_analytical_gates == Some(true) {
137            return true;
138        }
139
140        false
141    }
142
143    fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
144        let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
145        if self.sample_key_exists(&sampling_key) {
146            return false;
147        }
148
149        if let Ok(mut sampling_map) = self.spec_sampling_set.write() {
150            sampling_map.insert(sampling_key);
151        }
152
153        true
154    }
155
156    fn try_reset_spec_sampling_set(&self) {
157        let now = Utc::now().timestamp_millis() as u64;
158        let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
159        let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
160
161        let has_expired = now - last_sampling_reset > SAMPLING_TTL_MS;
162        let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
163
164        if has_expired || is_full {
165            log_d!(
166                TAG,
167                "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
168                has_expired,
169                is_full
170            );
171            sampling_map.clear();
172            self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
173        }
174    }
175
176    fn try_reset_exposure_dedupe_set(&self) {
177        let now = Utc::now().timestamp_millis() as u64;
178        let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
179        let mut dedupe_map = self.exposure_dedupe_set.write().unwrap();
180
181        let has_expired = now - last_dedupe_reset > SAMPLING_TTL_MS;
182        let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
183
184        if has_expired || is_full {
185            log_d!(
186                TAG,
187                "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
188                has_expired,
189                is_full
190            );
191            dedupe_map.clear();
192            self.last_exposure_dedupe_reset
193                .store(now, Ordering::Relaxed);
194        }
195    }
196
197    fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
198        match self.spec_sampling_set.read() {
199            Ok(map) => map.contains(key),
200            _ => false,
201        }
202    }
203
204    fn get_special_case_sampling_rate<'a>(
205        &self,
206        exposure: &'a impl QueuedExposure<'a>,
207    ) -> Option<u64> {
208        let rule_id = exposure.get_rule_id_ref();
209        match rule_id {
210            "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
211            _ => None,
212        }
213    }
214}
215
216impl GlobalConfigs {
217    fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
218        fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
219            let v = sampling_mode?.string_value.as_ref()?.value.as_str();
220
221            match v {
222                "on" => Some(EvtSamplingMode::On),
223                "shadow" => Some(EvtSamplingMode::Shadow),
224                _ => None,
225            }
226        }
227
228        self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
229    }
230
231    fn get_special_case_sampling_rate(&self) -> Option<u64> {
232        fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
233            match value {
234                Some(value) => value.float_value.map(|rate| rate as u64),
235                None => None,
236            }
237        }
238
239        self.use_sdk_config_value(
240            "special_case_sampling_rate",
241            parse_special_case_sampling_rate,
242        )
243    }
244}
245
246#[derive(Debug, PartialEq, Eq, Hash, Clone)]
247pub struct ExposureSamplingKey {
248    pub spec_name_hash: u64,
249    pub rule_id_hash: u64,
250    pub user_values_hash: u64,
251    pub additional_hash: u64,
252}
253
254impl ExposureSamplingKey {
255    pub fn new(evaluation: Option<&BaseEvaluation>, user: &UserData, additional_hash: u64) -> Self {
256        let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash_value);
257        let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash_value);
258
259        let user_values_hash = user.create_user_values_hash();
260
261        Self {
262            spec_name_hash,
263            rule_id_hash,
264            user_values_hash,
265            additional_hash,
266        }
267    }
268
269    pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
270        let sampling_rate = match sampling_rate {
271            Some(rate) => rate,
272            None => return true, // without a sampling rate, we should sample
273        };
274
275        let final_hash =
276            self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
277
278        final_hash % sampling_rate == 0
279    }
280}