Skip to main content

statsig_rust/event_logging/
exposure_sampling.rs

1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3    evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4    global_configs::GlobalConfigs,
5    log_d, log_e,
6    user::user_data::UserData,
7    write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use parking_lot::RwLock;
12use std::sync::{
13    atomic::{AtomicU64, Ordering},
14    Arc,
15};
16
17const TAG: &str = "ExposureSampling";
18const DEFAULT_EXPOSURE_SAMPLING_TTL_MS: u64 = 60_000;
19const SAMPLING_MAX_KEYS: usize = 100_000;
20
21#[derive(Debug)]
22pub enum EvtSamplingMode {
23    On,
24    Shadow,
25}
26
27#[derive(Debug)]
28pub enum EvtSamplingDecision {
29    Deduped,
30    NotSampled,
31    ForceSampled,
32    Sampled(Option<u64>, EvtSamplingMode, bool),
33}
34
35impl EvtSamplingDecision {
36    pub fn should_log(&self) -> bool {
37        match self {
38            EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
39            EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
40        }
41    }
42}
43
44type SpecAndRuleHashTuple = (u64, u64);
45pub struct ExposureSampling {
46    spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
47    last_spec_sampling_reset: AtomicU64,
48
49    exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
50    last_exposure_dedupe_reset: AtomicU64,
51
52    global_configs: Arc<GlobalConfigs>,
53}
54
55impl ExposureSampling {
56    pub fn new(sdk_key: &str) -> Self {
57        let now = Utc::now().timestamp_millis() as u64;
58
59        Self {
60            spec_sampling_set: RwLock::from(AHashSet::default()),
61            last_spec_sampling_reset: AtomicU64::from(now),
62
63            exposure_dedupe_set: RwLock::from(AHashSet::default()),
64            last_exposure_dedupe_reset: AtomicU64::from(now),
65
66            global_configs: GlobalConfigs::get_instance(sdk_key),
67        }
68    }
69
70    pub fn get_sampling_decision(
71        &self,
72        payload: &impl EnqueueOperation,
73        ignore_analytical_gate_force_sampling: bool,
74    ) -> EvtSamplingDecision {
75        let exposure = match payload.as_exposure() {
76            Some(exposure) => exposure,
77            None => return EvtSamplingDecision::ForceSampled,
78        };
79
80        let expo_sampling_key = exposure.create_exposure_sampling_key();
81        if self.should_dedupe_exposure(&expo_sampling_key) {
82            return EvtSamplingDecision::Deduped;
83        }
84
85        let sampling_mode = match self.global_configs.get_sampling_mode() {
86            Some(sampling_mode) => sampling_mode,
87            None => return EvtSamplingDecision::ForceSampled,
88        };
89
90        let extra_info = exposure.get_extra_exposure_info_ref();
91        if self.should_sample_based_on_evaluation(extra_info, ignore_analytical_gate_force_sampling)
92        {
93            return EvtSamplingDecision::ForceSampled;
94        }
95
96        if self.should_sample_first_time_exposure(&expo_sampling_key) {
97            return EvtSamplingDecision::ForceSampled;
98        }
99
100        let sampling_rate = self
101            .get_special_case_sampling_rate(exposure)
102            .or_else(|| extra_info.and_then(|info| info.sampling_rate));
103
104        let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
105
106        match sampling_mode {
107            EvtSamplingMode::On if is_sampled => {
108                EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
109            }
110            EvtSamplingMode::Shadow => {
111                EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
112            }
113            _ => EvtSamplingDecision::NotSampled,
114        }
115    }
116
117    pub fn try_reset_all_sampling(&self) {
118        self.try_reset_exposure_dedupe_set();
119        self.try_reset_spec_sampling_set();
120    }
121
122    fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
123        let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
124        if dedupe_set.contains(sampling_key) {
125            return true;
126        }
127
128        dedupe_set.insert(sampling_key.clone());
129        false
130    }
131
132    fn should_sample_based_on_evaluation(
133        &self,
134        extra_info: Option<&ExtraExposureInfo>,
135        ignore_analytical_gate_force_sampling: bool,
136    ) -> bool {
137        let exposure_info = match extra_info {
138            Some(exposure_info) => exposure_info,
139            None => return false,
140        };
141
142        if exposure_info.forward_all_exposures == Some(true) {
143            return true;
144        }
145
146        if !ignore_analytical_gate_force_sampling
147            && exposure_info.has_seen_analytical_gates == Some(true)
148        {
149            return true;
150        }
151
152        false
153    }
154
155    fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
156        let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
157        if self.sample_key_exists(&sampling_key) {
158            return false;
159        }
160
161        match self
162            .spec_sampling_set
163            .try_write_for(std::time::Duration::from_secs(5))
164        {
165            Some(mut sampling_map) => {
166                sampling_map.insert(sampling_key);
167            }
168            None => {
169                log_e!(TAG, "Failed to acquire write lock for spec sampling set");
170            }
171        }
172
173        true
174    }
175
176    fn try_reset_spec_sampling_set(&self) {
177        let ttl_ms = self.global_configs.get_exposure_spec_sampling_ttl_ms();
178        let now = Utc::now().timestamp_millis() as u64;
179        let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
180        let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
181
182        let has_expired = now - last_sampling_reset > ttl_ms;
183        let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
184
185        if has_expired || is_full {
186            log_d!(
187                TAG,
188                "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
189                has_expired,
190                is_full
191            );
192            sampling_map.clear();
193            self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
194        }
195    }
196
197    fn try_reset_exposure_dedupe_set(&self) {
198        let ttl_ms = self.global_configs.get_exposure_dedupe_ttl_ms();
199        let now = Utc::now().timestamp_millis() as u64;
200        let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
201        let mut dedupe_map = match self
202            .exposure_dedupe_set
203            .try_write_for(std::time::Duration::from_secs(5))
204        {
205            Some(map) => map,
206            None => {
207                log_e!(TAG, "Failed to acquire write lock for exposure dedupe set");
208                return;
209            }
210        };
211
212        let has_expired = now - last_dedupe_reset > ttl_ms;
213        let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
214
215        if has_expired || is_full {
216            log_d!(
217                TAG,
218                "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
219                has_expired,
220                is_full
221            );
222            dedupe_map.clear();
223            self.last_exposure_dedupe_reset
224                .store(now, Ordering::Relaxed);
225        }
226    }
227
228    fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
229        match self
230            .spec_sampling_set
231            .try_read_for(std::time::Duration::from_secs(5))
232        {
233            Some(map) => map.contains(key),
234            None => false,
235        }
236    }
237
238    fn get_special_case_sampling_rate<'a>(
239        &self,
240        exposure: &'a impl QueuedExposure<'a>,
241    ) -> Option<u64> {
242        let rule_id = exposure.get_rule_id_ref();
243        match rule_id {
244            "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
245            _ => None,
246        }
247    }
248}
249
250impl GlobalConfigs {
251    fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
252        fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
253            let v = sampling_mode?.string_value.as_ref()?.value.as_str();
254
255            match v {
256                "on" => Some(EvtSamplingMode::On),
257                "shadow" => Some(EvtSamplingMode::Shadow),
258                _ => None,
259            }
260        }
261
262        self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
263    }
264
265    fn get_special_case_sampling_rate(&self) -> Option<u64> {
266        fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
267            match value {
268                Some(value) => value.float_value.map(|rate| rate as u64),
269                None => None,
270            }
271        }
272
273        self.use_sdk_config_value(
274            "special_case_sampling_rate",
275            parse_special_case_sampling_rate,
276        )
277    }
278
279    fn get_exposure_spec_sampling_ttl_ms(&self) -> u64 {
280        fn parse_exposure_spec_sampling_ttl_ms(value: Option<&DynamicValue>) -> u64 {
281            match value {
282                Some(value) => value
283                    .float_value
284                    .map(|ttl_ms| ttl_ms as u64)
285                    .unwrap_or(DEFAULT_EXPOSURE_SAMPLING_TTL_MS),
286                None => DEFAULT_EXPOSURE_SAMPLING_TTL_MS,
287            }
288        }
289
290        self.use_sdk_config_value(
291            "exposure_spec_sampling_ttl_ms",
292            parse_exposure_spec_sampling_ttl_ms,
293        )
294    }
295
296    fn get_exposure_dedupe_ttl_ms(&self) -> u64 {
297        fn parse_exposure_dedupe_ttl_ms(value: Option<&DynamicValue>) -> u64 {
298            match value {
299                Some(value) => value
300                    .float_value
301                    .map(|ttl_ms| ttl_ms as u64)
302                    .unwrap_or(DEFAULT_EXPOSURE_SAMPLING_TTL_MS),
303                None => DEFAULT_EXPOSURE_SAMPLING_TTL_MS,
304            }
305        }
306
307        self.use_sdk_config_value("exposure_dedupe_ttl_ms", parse_exposure_dedupe_ttl_ms)
308    }
309}
310
311#[derive(Debug, PartialEq, Eq, Hash, Clone)]
312pub struct ExposureSamplingKey {
313    pub spec_name_hash: u64,
314    pub rule_id_hash: u64,
315    pub user_values_hash: u64,
316    pub additional_hash: u64,
317}
318
319impl ExposureSamplingKey {
320    pub fn new(
321        evaluation: Option<&BaseEvaluation>,
322        user: &UserData,
323        additional_hash: u64,
324        unit_id_type: Option<&str>,
325    ) -> Self {
326        let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash);
327        let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash);
328        let user_values_hash = user.create_exposure_dedupe_user_hash(unit_id_type);
329
330        Self {
331            spec_name_hash,
332            rule_id_hash,
333            user_values_hash,
334            additional_hash,
335        }
336    }
337
338    pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
339        let sampling_rate = match sampling_rate {
340            Some(rate) => rate,
341            None => return true, // without a sampling rate, we should sample
342        };
343
344        let final_hash =
345            self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
346
347        final_hash.is_multiple_of(sampling_rate)
348    }
349}