statsig_rust/event_logging/
exposure_sampling.rs1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3 evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4 global_configs::GlobalConfigs,
5 log_d, log_e,
6 user::user_data::UserData,
7 write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use std::sync::{
12 atomic::{AtomicU64, Ordering},
13 Arc, RwLock,
14};
15
16const TAG: &str = "ExposureSampling";
17const SAMPLING_TTL_MS: u64 = 60_000;
18const SAMPLING_MAX_KEYS: usize = 100_000;
19
20#[derive(Debug)]
21pub enum EvtSamplingMode {
22 On,
23 Shadow,
24}
25
26#[derive(Debug)]
27pub enum EvtSamplingDecision {
28 Deduped,
29 NotSampled,
30 ForceSampled,
31 Sampled(Option<u64>, EvtSamplingMode, bool),
32}
33
34impl EvtSamplingDecision {
35 pub fn should_log(&self) -> bool {
36 match self {
37 EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
38 EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
39 }
40 }
41}
42
43type SpecAndRuleHashTuple = (u64, u64);
44pub struct ExposureSampling {
45 spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
46 last_spec_sampling_reset: AtomicU64,
47
48 exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
49 last_exposure_dedupe_reset: AtomicU64,
50
51 global_configs: Arc<GlobalConfigs>,
52}
53
54impl ExposureSampling {
55 pub fn new(sdk_key: &str) -> Self {
56 let now = Utc::now().timestamp_millis() as u64;
57
58 Self {
59 spec_sampling_set: RwLock::from(AHashSet::default()),
60 last_spec_sampling_reset: AtomicU64::from(now),
61
62 exposure_dedupe_set: RwLock::from(AHashSet::default()),
63 last_exposure_dedupe_reset: AtomicU64::from(now),
64
65 global_configs: GlobalConfigs::get_instance(sdk_key),
66 }
67 }
68
69 pub fn get_sampling_decision(&self, payload: &impl EnqueueOperation) -> EvtSamplingDecision {
70 let exposure = match payload.as_exposure() {
71 Some(exposure) => exposure,
72 None => return EvtSamplingDecision::ForceSampled,
73 };
74
75 let expo_sampling_key = exposure.create_exposure_sampling_key();
76 if self.should_dedupe_exposure(&expo_sampling_key) {
77 return EvtSamplingDecision::Deduped;
78 }
79
80 let sampling_mode = match self.global_configs.get_sampling_mode() {
81 Some(sampling_mode) => sampling_mode,
82 None => return EvtSamplingDecision::ForceSampled,
83 };
84
85 let extra_info = exposure.get_extra_exposure_info_ref();
86 if self.should_sample_based_on_evaluation(extra_info) {
87 return EvtSamplingDecision::ForceSampled;
88 }
89
90 if self.should_sample_first_time_exposure(&expo_sampling_key) {
91 return EvtSamplingDecision::ForceSampled;
92 }
93
94 let sampling_rate = self
95 .get_special_case_sampling_rate(exposure)
96 .or_else(|| extra_info.and_then(|info| info.sampling_rate));
97
98 let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
99
100 match sampling_mode {
101 EvtSamplingMode::On if is_sampled => {
102 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
103 }
104 EvtSamplingMode::Shadow => {
105 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
106 }
107 _ => EvtSamplingDecision::NotSampled,
108 }
109 }
110
111 pub fn try_reset_all_sampling(&self) {
112 self.try_reset_exposure_dedupe_set();
113 self.try_reset_spec_sampling_set();
114 }
115
116 fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
117 let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
118 if dedupe_set.contains(sampling_key) {
119 return true;
120 }
121
122 dedupe_set.insert(sampling_key.clone());
123 false
124 }
125
126 fn should_sample_based_on_evaluation(&self, extra_info: Option<&ExtraExposureInfo>) -> bool {
127 let exposure_info = match extra_info {
128 Some(exposure_info) => exposure_info,
129 None => return false,
130 };
131
132 if exposure_info.forward_all_exposures == Some(true) {
133 return true;
134 }
135
136 if exposure_info.has_seen_analytical_gates == Some(true) {
137 return true;
138 }
139
140 false
141 }
142
143 fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
144 let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
145 if self.sample_key_exists(&sampling_key) {
146 return false;
147 }
148
149 if let Ok(mut sampling_map) = self.spec_sampling_set.write() {
150 sampling_map.insert(sampling_key);
151 }
152
153 true
154 }
155
156 fn try_reset_spec_sampling_set(&self) {
157 let now = Utc::now().timestamp_millis() as u64;
158 let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
159 let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
160
161 let has_expired = now - last_sampling_reset > SAMPLING_TTL_MS;
162 let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
163
164 if has_expired || is_full {
165 log_d!(
166 TAG,
167 "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
168 has_expired,
169 is_full
170 );
171 sampling_map.clear();
172 self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
173 }
174 }
175
176 fn try_reset_exposure_dedupe_set(&self) {
177 let now = Utc::now().timestamp_millis() as u64;
178 let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
179 let mut dedupe_map = match self.exposure_dedupe_set.write() {
180 Ok(map) => map,
181 Err(e) => {
182 log_e!(
183 TAG,
184 "Failed to acquire write lock for exposure dedupe set: {}",
185 e
186 );
187 return;
188 }
189 };
190
191 let has_expired = now - last_dedupe_reset > SAMPLING_TTL_MS;
192 let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
193
194 if has_expired || is_full {
195 log_d!(
196 TAG,
197 "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
198 has_expired,
199 is_full
200 );
201 dedupe_map.clear();
202 self.last_exposure_dedupe_reset
203 .store(now, Ordering::Relaxed);
204 }
205 }
206
207 fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
208 match self.spec_sampling_set.read() {
209 Ok(map) => map.contains(key),
210 _ => false,
211 }
212 }
213
214 fn get_special_case_sampling_rate<'a>(
215 &self,
216 exposure: &'a impl QueuedExposure<'a>,
217 ) -> Option<u64> {
218 let rule_id = exposure.get_rule_id_ref();
219 match rule_id {
220 "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
221 _ => None,
222 }
223 }
224}
225
226impl GlobalConfigs {
227 fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
228 fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
229 let v = sampling_mode?.string_value.as_ref()?.value.as_str();
230
231 match v {
232 "on" => Some(EvtSamplingMode::On),
233 "shadow" => Some(EvtSamplingMode::Shadow),
234 _ => None,
235 }
236 }
237
238 self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
239 }
240
241 fn get_special_case_sampling_rate(&self) -> Option<u64> {
242 fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
243 match value {
244 Some(value) => value.float_value.map(|rate| rate as u64),
245 None => None,
246 }
247 }
248
249 self.use_sdk_config_value(
250 "special_case_sampling_rate",
251 parse_special_case_sampling_rate,
252 )
253 }
254}
255
256#[derive(Debug, PartialEq, Eq, Hash, Clone)]
257pub struct ExposureSamplingKey {
258 pub spec_name_hash: u64,
259 pub rule_id_hash: u64,
260 pub user_values_hash: u64,
261 pub additional_hash: u64,
262}
263
264impl ExposureSamplingKey {
265 pub fn new(evaluation: Option<&BaseEvaluation>, user: &UserData, additional_hash: u64) -> Self {
266 let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash_value);
267 let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash_value);
268
269 let user_values_hash = user.create_user_values_hash();
270
271 Self {
272 spec_name_hash,
273 rule_id_hash,
274 user_values_hash,
275 additional_hash,
276 }
277 }
278
279 pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
280 let sampling_rate = match sampling_rate {
281 Some(rate) => rate,
282 None => return true, };
284
285 let final_hash =
286 self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
287
288 final_hash % sampling_rate == 0
289 }
290}