statsig_rust/event_logging/
exposure_sampling.rs1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3 evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4 global_configs::GlobalConfigs,
5 log_d,
6 user::user_data::UserData,
7 write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use std::sync::{
12 atomic::{AtomicU64, Ordering},
13 Arc, RwLock,
14};
15
16const TAG: &str = "ExposureSampling";
17const SAMPLING_TTL_MS: u64 = 60_000;
18const SAMPLING_MAX_KEYS: usize = 100_000;
19
20#[derive(Debug)]
21pub enum EvtSamplingMode {
22 On,
23 Shadow,
24}
25
26#[derive(Debug)]
27pub enum EvtSamplingDecision {
28 Deduped,
29 NotSampled,
30 ForceSampled,
31 Sampled(Option<u64>, EvtSamplingMode, bool),
32}
33
34impl EvtSamplingDecision {
35 pub fn should_log(&self) -> bool {
36 match self {
37 EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
38 EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
39 }
40 }
41}
42
43type SpecAndRuleHashTuple = (u64, u64);
44pub struct ExposureSampling {
45 spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
46 last_spec_sampling_reset: AtomicU64,
47
48 exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
49 last_exposure_dedupe_reset: AtomicU64,
50
51 global_configs: Arc<GlobalConfigs>,
52}
53
54impl ExposureSampling {
55 pub fn new(sdk_key: &str) -> Self {
56 let now = Utc::now().timestamp_millis() as u64;
57
58 Self {
59 spec_sampling_set: RwLock::from(AHashSet::default()),
60 last_spec_sampling_reset: AtomicU64::from(now),
61
62 exposure_dedupe_set: RwLock::from(AHashSet::default()),
63 last_exposure_dedupe_reset: AtomicU64::from(now),
64
65 global_configs: GlobalConfigs::get_instance(sdk_key),
66 }
67 }
68
69 pub fn get_sampling_decision(&self, payload: &impl EnqueueOperation) -> EvtSamplingDecision {
70 let exposure = match payload.as_exposure() {
71 Some(exposure) => exposure,
72 None => return EvtSamplingDecision::ForceSampled,
73 };
74
75 let expo_sampling_key = exposure.create_exposure_sampling_key();
76 if self.should_dedupe_exposure(&expo_sampling_key) {
77 return EvtSamplingDecision::Deduped;
78 }
79
80 let sampling_mode = match self.global_configs.get_sampling_mode() {
81 Some(sampling_mode) => sampling_mode,
82 None => return EvtSamplingDecision::ForceSampled,
83 };
84
85 let extra_info = exposure.get_extra_exposure_info_ref();
86 if self.should_sample_based_on_evaluation(extra_info) {
87 return EvtSamplingDecision::ForceSampled;
88 }
89
90 if self.should_sample_first_time_exposure(&expo_sampling_key) {
91 return EvtSamplingDecision::ForceSampled;
92 }
93
94 let sampling_rate = self
95 .get_special_case_sampling_rate(exposure)
96 .or_else(|| extra_info.and_then(|info| info.sampling_rate));
97
98 let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
99
100 match sampling_mode {
101 EvtSamplingMode::On if is_sampled => {
102 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
103 }
104 EvtSamplingMode::Shadow => {
105 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
106 }
107 _ => EvtSamplingDecision::NotSampled,
108 }
109 }
110
111 pub fn try_reset_all_sampling(&self) {
112 self.try_reset_exposure_dedupe_set();
113 self.try_reset_spec_sampling_set();
114 }
115
116 fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
117 let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
118 if dedupe_set.contains(sampling_key) {
119 return true;
120 }
121
122 dedupe_set.insert(sampling_key.clone());
123 false
124 }
125
126 fn should_sample_based_on_evaluation(&self, extra_info: Option<&ExtraExposureInfo>) -> bool {
127 let exposure_info = match extra_info {
128 Some(exposure_info) => exposure_info,
129 None => return false,
130 };
131
132 if exposure_info.forward_all_exposures == Some(true) {
133 return true;
134 }
135
136 if exposure_info.has_seen_analytical_gates == Some(true) {
137 return true;
138 }
139
140 false
141 }
142
143 fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
144 let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
145 if self.sample_key_exists(&sampling_key) {
146 return false;
147 }
148
149 if let Ok(mut sampling_map) = self.spec_sampling_set.write() {
150 sampling_map.insert(sampling_key);
151 }
152
153 true
154 }
155
156 fn try_reset_spec_sampling_set(&self) {
157 let now = Utc::now().timestamp_millis() as u64;
158 let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
159 let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
160
161 let has_expired = now - last_sampling_reset > SAMPLING_TTL_MS;
162 let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
163
164 if has_expired || is_full {
165 log_d!(
166 TAG,
167 "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
168 has_expired,
169 is_full
170 );
171 sampling_map.clear();
172 self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
173 }
174 }
175
176 fn try_reset_exposure_dedupe_set(&self) {
177 let now = Utc::now().timestamp_millis() as u64;
178 let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
179 let mut dedupe_map = self.exposure_dedupe_set.write().unwrap();
180
181 let has_expired = now - last_dedupe_reset > SAMPLING_TTL_MS;
182 let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
183
184 if has_expired || is_full {
185 log_d!(
186 TAG,
187 "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
188 has_expired,
189 is_full
190 );
191 dedupe_map.clear();
192 self.last_exposure_dedupe_reset
193 .store(now, Ordering::Relaxed);
194 }
195 }
196
197 fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
198 match self.spec_sampling_set.read() {
199 Ok(map) => map.contains(key),
200 _ => false,
201 }
202 }
203
204 fn get_special_case_sampling_rate<'a>(
205 &self,
206 exposure: &'a impl QueuedExposure<'a>,
207 ) -> Option<u64> {
208 let rule_id = exposure.get_rule_id_ref();
209 match rule_id {
210 "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
211 _ => None,
212 }
213 }
214}
215
216impl GlobalConfigs {
217 fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
218 fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
219 let v = sampling_mode?.string_value.as_ref()?.value.as_str();
220
221 match v {
222 "on" => Some(EvtSamplingMode::On),
223 "shadow" => Some(EvtSamplingMode::Shadow),
224 _ => None,
225 }
226 }
227
228 self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
229 }
230
231 fn get_special_case_sampling_rate(&self) -> Option<u64> {
232 fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
233 match value {
234 Some(value) => value.float_value.map(|rate| rate as u64),
235 None => None,
236 }
237 }
238
239 self.use_sdk_config_value(
240 "special_case_sampling_rate",
241 parse_special_case_sampling_rate,
242 )
243 }
244}
245
246#[derive(Debug, PartialEq, Eq, Hash, Clone)]
247pub struct ExposureSamplingKey {
248 pub spec_name_hash: u64,
249 pub rule_id_hash: u64,
250 pub user_values_hash: u64,
251 pub additional_hash: u64,
252}
253
254impl ExposureSamplingKey {
255 pub fn new(evaluation: Option<&BaseEvaluation>, user: &UserData, additional_hash: u64) -> Self {
256 let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash_value);
257 let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash_value);
258
259 let user_values_hash = user.create_user_values_hash();
260
261 Self {
262 spec_name_hash,
263 rule_id_hash,
264 user_values_hash,
265 additional_hash,
266 }
267 }
268
269 pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
270 let sampling_rate = match sampling_rate {
271 Some(rate) => rate,
272 None => return true, };
274
275 let final_hash =
276 self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
277
278 final_hash % sampling_rate == 0
279 }
280}