statsig_rust/event_logging/
exposure_sampling.rs1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3 evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4 global_configs::GlobalConfigs,
5 log_d, log_e,
6 user::user_data::UserData,
7 write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use parking_lot::RwLock;
12use std::sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15};
16
17const TAG: &str = "ExposureSampling";
18const DEFAULT_EXPOSURE_SAMPLING_TTL_MS: u64 = 60_000;
19const SAMPLING_MAX_KEYS: usize = 100_000;
20
21#[derive(Debug)]
22pub enum EvtSamplingMode {
23 On,
24 Shadow,
25}
26
27#[derive(Debug)]
28pub enum EvtSamplingDecision {
29 Deduped,
30 NotSampled,
31 ForceSampled,
32 Sampled(Option<u64>, EvtSamplingMode, bool),
33}
34
35impl EvtSamplingDecision {
36 pub fn should_log(&self) -> bool {
37 match self {
38 EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
39 EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
40 }
41 }
42}
43
44type SpecAndRuleHashTuple = (u64, u64);
45pub struct ExposureSampling {
46 spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
47 last_spec_sampling_reset: AtomicU64,
48
49 exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
50 last_exposure_dedupe_reset: AtomicU64,
51
52 global_configs: Arc<GlobalConfigs>,
53}
54
55impl ExposureSampling {
56 pub fn new(sdk_key: &str) -> Self {
57 let now = Utc::now().timestamp_millis() as u64;
58
59 Self {
60 spec_sampling_set: RwLock::from(AHashSet::default()),
61 last_spec_sampling_reset: AtomicU64::from(now),
62
63 exposure_dedupe_set: RwLock::from(AHashSet::default()),
64 last_exposure_dedupe_reset: AtomicU64::from(now),
65
66 global_configs: GlobalConfigs::get_instance(sdk_key),
67 }
68 }
69
70 pub fn get_sampling_decision(
71 &self,
72 payload: &impl EnqueueOperation,
73 ignore_analytical_gate_force_sampling: bool,
74 ) -> EvtSamplingDecision {
75 let exposure = match payload.as_exposure() {
76 Some(exposure) => exposure,
77 None => return EvtSamplingDecision::ForceSampled,
78 };
79
80 let expo_sampling_key = exposure.create_exposure_sampling_key();
81 if self.should_dedupe_exposure(&expo_sampling_key) {
82 return EvtSamplingDecision::Deduped;
83 }
84
85 let sampling_mode = match self.global_configs.get_sampling_mode() {
86 Some(sampling_mode) => sampling_mode,
87 None => return EvtSamplingDecision::ForceSampled,
88 };
89
90 let extra_info = exposure.get_extra_exposure_info_ref();
91 if self.should_sample_based_on_evaluation(extra_info, ignore_analytical_gate_force_sampling)
92 {
93 return EvtSamplingDecision::ForceSampled;
94 }
95
96 if self.should_sample_first_time_exposure(&expo_sampling_key) {
97 return EvtSamplingDecision::ForceSampled;
98 }
99
100 let sampling_rate = self
101 .get_special_case_sampling_rate(exposure)
102 .or_else(|| extra_info.and_then(|info| info.sampling_rate));
103
104 let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
105
106 match sampling_mode {
107 EvtSamplingMode::On if is_sampled => {
108 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
109 }
110 EvtSamplingMode::Shadow => {
111 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
112 }
113 _ => EvtSamplingDecision::NotSampled,
114 }
115 }
116
117 pub fn try_reset_all_sampling(&self) {
118 self.try_reset_exposure_dedupe_set();
119 self.try_reset_spec_sampling_set();
120 }
121
122 fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
123 let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
124 if dedupe_set.contains(sampling_key) {
125 return true;
126 }
127
128 dedupe_set.insert(sampling_key.clone());
129 false
130 }
131
132 fn should_sample_based_on_evaluation(
133 &self,
134 extra_info: Option<&ExtraExposureInfo>,
135 ignore_analytical_gate_force_sampling: bool,
136 ) -> bool {
137 let exposure_info = match extra_info {
138 Some(exposure_info) => exposure_info,
139 None => return false,
140 };
141
142 if exposure_info.forward_all_exposures == Some(true) {
143 return true;
144 }
145
146 if !ignore_analytical_gate_force_sampling
147 && exposure_info.has_seen_analytical_gates == Some(true)
148 {
149 return true;
150 }
151
152 false
153 }
154
155 fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
156 let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
157 if self.sample_key_exists(&sampling_key) {
158 return false;
159 }
160
161 match self
162 .spec_sampling_set
163 .try_write_for(std::time::Duration::from_secs(5))
164 {
165 Some(mut sampling_map) => {
166 sampling_map.insert(sampling_key);
167 }
168 None => {
169 log_e!(TAG, "Failed to acquire write lock for spec sampling set");
170 }
171 }
172
173 true
174 }
175
176 fn try_reset_spec_sampling_set(&self) {
177 let ttl_ms = self.global_configs.get_exposure_spec_sampling_ttl_ms();
178 let now = Utc::now().timestamp_millis() as u64;
179 let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
180 let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
181
182 let has_expired = now - last_sampling_reset > ttl_ms;
183 let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
184
185 if has_expired || is_full {
186 log_d!(
187 TAG,
188 "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
189 has_expired,
190 is_full
191 );
192 sampling_map.clear();
193 self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
194 }
195 }
196
197 fn try_reset_exposure_dedupe_set(&self) {
198 let ttl_ms = self.global_configs.get_exposure_dedupe_ttl_ms();
199 let now = Utc::now().timestamp_millis() as u64;
200 let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
201 let mut dedupe_map = match self
202 .exposure_dedupe_set
203 .try_write_for(std::time::Duration::from_secs(5))
204 {
205 Some(map) => map,
206 None => {
207 log_e!(TAG, "Failed to acquire write lock for exposure dedupe set");
208 return;
209 }
210 };
211
212 let has_expired = now - last_dedupe_reset > ttl_ms;
213 let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
214
215 if has_expired || is_full {
216 log_d!(
217 TAG,
218 "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
219 has_expired,
220 is_full
221 );
222 dedupe_map.clear();
223 self.last_exposure_dedupe_reset
224 .store(now, Ordering::Relaxed);
225 }
226 }
227
228 fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
229 match self
230 .spec_sampling_set
231 .try_read_for(std::time::Duration::from_secs(5))
232 {
233 Some(map) => map.contains(key),
234 None => false,
235 }
236 }
237
238 fn get_special_case_sampling_rate<'a>(
239 &self,
240 exposure: &'a impl QueuedExposure<'a>,
241 ) -> Option<u64> {
242 let rule_id = exposure.get_rule_id_ref();
243 match rule_id {
244 "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
245 _ => None,
246 }
247 }
248}
249
250impl GlobalConfigs {
251 fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
252 fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
253 let v = sampling_mode?.string_value.as_ref()?.value.as_str();
254
255 match v {
256 "on" => Some(EvtSamplingMode::On),
257 "shadow" => Some(EvtSamplingMode::Shadow),
258 _ => None,
259 }
260 }
261
262 self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
263 }
264
265 fn get_special_case_sampling_rate(&self) -> Option<u64> {
266 fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
267 match value {
268 Some(value) => value.float_value.map(|rate| rate as u64),
269 None => None,
270 }
271 }
272
273 self.use_sdk_config_value(
274 "special_case_sampling_rate",
275 parse_special_case_sampling_rate,
276 )
277 }
278
279 fn get_exposure_spec_sampling_ttl_ms(&self) -> u64 {
280 fn parse_exposure_spec_sampling_ttl_ms(value: Option<&DynamicValue>) -> u64 {
281 match value {
282 Some(value) => value
283 .float_value
284 .map(|ttl_ms| ttl_ms as u64)
285 .unwrap_or(DEFAULT_EXPOSURE_SAMPLING_TTL_MS),
286 None => DEFAULT_EXPOSURE_SAMPLING_TTL_MS,
287 }
288 }
289
290 self.use_sdk_config_value(
291 "exposure_spec_sampling_ttl_ms",
292 parse_exposure_spec_sampling_ttl_ms,
293 )
294 }
295
296 fn get_exposure_dedupe_ttl_ms(&self) -> u64 {
297 fn parse_exposure_dedupe_ttl_ms(value: Option<&DynamicValue>) -> u64 {
298 match value {
299 Some(value) => value
300 .float_value
301 .map(|ttl_ms| ttl_ms as u64)
302 .unwrap_or(DEFAULT_EXPOSURE_SAMPLING_TTL_MS),
303 None => DEFAULT_EXPOSURE_SAMPLING_TTL_MS,
304 }
305 }
306
307 self.use_sdk_config_value("exposure_dedupe_ttl_ms", parse_exposure_dedupe_ttl_ms)
308 }
309}
310
311#[derive(Debug, PartialEq, Eq, Hash, Clone)]
312pub struct ExposureSamplingKey {
313 pub spec_name_hash: u64,
314 pub rule_id_hash: u64,
315 pub user_values_hash: u64,
316 pub additional_hash: u64,
317}
318
319impl ExposureSamplingKey {
320 pub fn new(
321 evaluation: Option<&BaseEvaluation>,
322 user: &UserData,
323 additional_hash: u64,
324 unit_id_type: Option<&str>,
325 ) -> Self {
326 let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash);
327 let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash);
328 let user_values_hash = user.create_exposure_dedupe_user_hash(unit_id_type);
329
330 Self {
331 spec_name_hash,
332 rule_id_hash,
333 user_values_hash,
334 additional_hash,
335 }
336 }
337
338 pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
339 let sampling_rate = match sampling_rate {
340 Some(rate) => rate,
341 None => return true, };
343
344 let final_hash =
345 self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
346
347 final_hash.is_multiple_of(sampling_rate)
348 }
349}