statsig_rust/event_logging/
exposure_sampling.rs1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3 evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4 global_configs::GlobalConfigs,
5 log_d, log_e,
6 user::user_data::UserData,
7 write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use parking_lot::RwLock;
12use std::sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15};
16
17const TAG: &str = "ExposureSampling";
18const DEFAULT_EXPOSURE_SAMPLING_TTL_MS: u64 = 60_000;
19const SAMPLING_MAX_KEYS: usize = 100_000;
20
21#[derive(Debug)]
22pub enum EvtSamplingMode {
23 On,
24 Shadow,
25}
26
27#[derive(Debug)]
28pub enum EvtSamplingDecision {
29 Deduped,
30 NotSampled,
31 ForceSampled,
32 Sampled(Option<u64>, EvtSamplingMode, bool),
33}
34
35impl EvtSamplingDecision {
36 pub fn should_log(&self) -> bool {
37 match self {
38 EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
39 EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
40 }
41 }
42}
43
44type SpecAndRuleHashTuple = (u64, u64);
45pub struct ExposureSampling {
46 spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
47 last_spec_sampling_reset: AtomicU64,
48
49 exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
50 last_exposure_dedupe_reset: AtomicU64,
51
52 global_configs: Arc<GlobalConfigs>,
53}
54
55impl ExposureSampling {
56 pub fn new(sdk_key: &str) -> Self {
57 let now = Utc::now().timestamp_millis() as u64;
58
59 Self {
60 spec_sampling_set: RwLock::from(AHashSet::default()),
61 last_spec_sampling_reset: AtomicU64::from(now),
62
63 exposure_dedupe_set: RwLock::from(AHashSet::default()),
64 last_exposure_dedupe_reset: AtomicU64::from(now),
65
66 global_configs: GlobalConfigs::get_instance(sdk_key),
67 }
68 }
69
70 pub fn get_sampling_decision(&self, payload: &impl EnqueueOperation) -> EvtSamplingDecision {
71 let exposure = match payload.as_exposure() {
72 Some(exposure) => exposure,
73 None => return EvtSamplingDecision::ForceSampled,
74 };
75
76 let expo_sampling_key = exposure.create_exposure_sampling_key();
77 if self.should_dedupe_exposure(&expo_sampling_key) {
78 return EvtSamplingDecision::Deduped;
79 }
80
81 let sampling_mode = match self.global_configs.get_sampling_mode() {
82 Some(sampling_mode) => sampling_mode,
83 None => return EvtSamplingDecision::ForceSampled,
84 };
85
86 let extra_info = exposure.get_extra_exposure_info_ref();
87 if self.should_sample_based_on_evaluation(extra_info) {
88 return EvtSamplingDecision::ForceSampled;
89 }
90
91 if self.should_sample_first_time_exposure(&expo_sampling_key) {
92 return EvtSamplingDecision::ForceSampled;
93 }
94
95 let sampling_rate = self
96 .get_special_case_sampling_rate(exposure)
97 .or_else(|| extra_info.and_then(|info| info.sampling_rate));
98
99 let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
100
101 match sampling_mode {
102 EvtSamplingMode::On if is_sampled => {
103 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
104 }
105 EvtSamplingMode::Shadow => {
106 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
107 }
108 _ => EvtSamplingDecision::NotSampled,
109 }
110 }
111
112 pub fn try_reset_all_sampling(&self) {
113 self.try_reset_exposure_dedupe_set();
114 self.try_reset_spec_sampling_set();
115 }
116
117 fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
118 let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
119 if dedupe_set.contains(sampling_key) {
120 return true;
121 }
122
123 dedupe_set.insert(sampling_key.clone());
124 false
125 }
126
127 fn should_sample_based_on_evaluation(&self, extra_info: Option<&ExtraExposureInfo>) -> bool {
128 let exposure_info = match extra_info {
129 Some(exposure_info) => exposure_info,
130 None => return false,
131 };
132
133 if exposure_info.forward_all_exposures == Some(true) {
134 return true;
135 }
136
137 if exposure_info.has_seen_analytical_gates == Some(true) {
138 return true;
139 }
140
141 false
142 }
143
144 fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
145 let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
146 if self.sample_key_exists(&sampling_key) {
147 return false;
148 }
149
150 match self
151 .spec_sampling_set
152 .try_write_for(std::time::Duration::from_secs(5))
153 {
154 Some(mut sampling_map) => {
155 sampling_map.insert(sampling_key);
156 }
157 None => {
158 log_e!(TAG, "Failed to acquire write lock for spec sampling set");
159 }
160 }
161
162 true
163 }
164
165 fn try_reset_spec_sampling_set(&self) {
166 let ttl_ms = self.global_configs.get_exposure_spec_sampling_ttl_ms();
167 let now = Utc::now().timestamp_millis() as u64;
168 let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
169 let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
170
171 let has_expired = now - last_sampling_reset > ttl_ms;
172 let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
173
174 if has_expired || is_full {
175 log_d!(
176 TAG,
177 "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
178 has_expired,
179 is_full
180 );
181 sampling_map.clear();
182 self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
183 }
184 }
185
186 fn try_reset_exposure_dedupe_set(&self) {
187 let ttl_ms = self.global_configs.get_exposure_dedupe_ttl_ms();
188 let now = Utc::now().timestamp_millis() as u64;
189 let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
190 let mut dedupe_map = match self
191 .exposure_dedupe_set
192 .try_write_for(std::time::Duration::from_secs(5))
193 {
194 Some(map) => map,
195 None => {
196 log_e!(TAG, "Failed to acquire write lock for exposure dedupe set");
197 return;
198 }
199 };
200
201 let has_expired = now - last_dedupe_reset > ttl_ms;
202 let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
203
204 if has_expired || is_full {
205 log_d!(
206 TAG,
207 "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
208 has_expired,
209 is_full
210 );
211 dedupe_map.clear();
212 self.last_exposure_dedupe_reset
213 .store(now, Ordering::Relaxed);
214 }
215 }
216
217 fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
218 match self
219 .spec_sampling_set
220 .try_read_for(std::time::Duration::from_secs(5))
221 {
222 Some(map) => map.contains(key),
223 None => false,
224 }
225 }
226
227 fn get_special_case_sampling_rate<'a>(
228 &self,
229 exposure: &'a impl QueuedExposure<'a>,
230 ) -> Option<u64> {
231 let rule_id = exposure.get_rule_id_ref();
232 match rule_id {
233 "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
234 _ => None,
235 }
236 }
237}
238
239impl GlobalConfigs {
240 fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
241 fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
242 let v = sampling_mode?.string_value.as_ref()?.value.as_str();
243
244 match v {
245 "on" => Some(EvtSamplingMode::On),
246 "shadow" => Some(EvtSamplingMode::Shadow),
247 _ => None,
248 }
249 }
250
251 self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
252 }
253
254 fn get_special_case_sampling_rate(&self) -> Option<u64> {
255 fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
256 match value {
257 Some(value) => value.float_value.map(|rate| rate as u64),
258 None => None,
259 }
260 }
261
262 self.use_sdk_config_value(
263 "special_case_sampling_rate",
264 parse_special_case_sampling_rate,
265 )
266 }
267
268 fn get_exposure_spec_sampling_ttl_ms(&self) -> u64 {
269 fn parse_exposure_spec_sampling_ttl_ms(value: Option<&DynamicValue>) -> u64 {
270 match value {
271 Some(value) => value
272 .float_value
273 .map(|ttl_ms| ttl_ms as u64)
274 .unwrap_or(DEFAULT_EXPOSURE_SAMPLING_TTL_MS),
275 None => DEFAULT_EXPOSURE_SAMPLING_TTL_MS,
276 }
277 }
278
279 self.use_sdk_config_value(
280 "exposure_spec_sampling_ttl_ms",
281 parse_exposure_spec_sampling_ttl_ms,
282 )
283 }
284
285 fn get_exposure_dedupe_ttl_ms(&self) -> u64 {
286 fn parse_exposure_dedupe_ttl_ms(value: Option<&DynamicValue>) -> u64 {
287 match value {
288 Some(value) => value
289 .float_value
290 .map(|ttl_ms| ttl_ms as u64)
291 .unwrap_or(DEFAULT_EXPOSURE_SAMPLING_TTL_MS),
292 None => DEFAULT_EXPOSURE_SAMPLING_TTL_MS,
293 }
294 }
295
296 self.use_sdk_config_value("exposure_dedupe_ttl_ms", parse_exposure_dedupe_ttl_ms)
297 }
298}
299
300#[derive(Debug, PartialEq, Eq, Hash, Clone)]
301pub struct ExposureSamplingKey {
302 pub spec_name_hash: u64,
303 pub rule_id_hash: u64,
304 pub user_values_hash: u64,
305 pub additional_hash: u64,
306}
307
308impl ExposureSamplingKey {
309 pub fn new(
310 evaluation: Option<&BaseEvaluation>,
311 user: &UserData,
312 additional_hash: u64,
313 unit_id_type: Option<&str>,
314 ) -> Self {
315 let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash);
316 let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash);
317 let user_values_hash = user.create_exposure_dedupe_user_hash(unit_id_type);
318
319 Self {
320 spec_name_hash,
321 rule_id_hash,
322 user_values_hash,
323 additional_hash,
324 }
325 }
326
327 pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
328 let sampling_rate = match sampling_rate {
329 Some(rate) => rate,
330 None => return true, };
332
333 let final_hash =
334 self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
335
336 final_hash.is_multiple_of(sampling_rate)
337 }
338}