statsig_rust/event_logging/
exposure_sampling.rs1use super::event_queue::queued_event::{EnqueueOperation, QueuedExposure};
2use crate::{
3 evaluation::evaluation_types::{BaseEvaluation, ExtraExposureInfo},
4 global_configs::GlobalConfigs,
5 log_d, log_e,
6 user::user_data::UserData,
7 write_lock_or_noop, write_lock_or_return, DynamicValue,
8};
9use ahash::AHashSet;
10use chrono::Utc;
11use parking_lot::RwLock;
12use std::sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15};
16
17const TAG: &str = "ExposureSampling";
18const SAMPLING_TTL_MS: u64 = 60_000;
19const SAMPLING_MAX_KEYS: usize = 100_000;
20
21#[derive(Debug)]
22pub enum EvtSamplingMode {
23 On,
24 Shadow,
25}
26
27#[derive(Debug)]
28pub enum EvtSamplingDecision {
29 Deduped,
30 NotSampled,
31 ForceSampled,
32 Sampled(Option<u64>, EvtSamplingMode, bool),
33}
34
35impl EvtSamplingDecision {
36 pub fn should_log(&self) -> bool {
37 match self {
38 EvtSamplingDecision::Deduped | EvtSamplingDecision::NotSampled => false,
39 EvtSamplingDecision::ForceSampled | EvtSamplingDecision::Sampled(_, _, _) => true,
40 }
41 }
42}
43
44type SpecAndRuleHashTuple = (u64, u64);
45pub struct ExposureSampling {
46 spec_sampling_set: RwLock<AHashSet<SpecAndRuleHashTuple>>,
47 last_spec_sampling_reset: AtomicU64,
48
49 exposure_dedupe_set: RwLock<AHashSet<ExposureSamplingKey>>,
50 last_exposure_dedupe_reset: AtomicU64,
51
52 global_configs: Arc<GlobalConfigs>,
53}
54
55impl ExposureSampling {
56 pub fn new(sdk_key: &str) -> Self {
57 let now = Utc::now().timestamp_millis() as u64;
58
59 Self {
60 spec_sampling_set: RwLock::from(AHashSet::default()),
61 last_spec_sampling_reset: AtomicU64::from(now),
62
63 exposure_dedupe_set: RwLock::from(AHashSet::default()),
64 last_exposure_dedupe_reset: AtomicU64::from(now),
65
66 global_configs: GlobalConfigs::get_instance(sdk_key),
67 }
68 }
69
70 pub fn get_sampling_decision(&self, payload: &impl EnqueueOperation) -> EvtSamplingDecision {
71 let exposure = match payload.as_exposure() {
72 Some(exposure) => exposure,
73 None => return EvtSamplingDecision::ForceSampled,
74 };
75
76 let expo_sampling_key = exposure.create_exposure_sampling_key();
77 if self.should_dedupe_exposure(&expo_sampling_key) {
78 return EvtSamplingDecision::Deduped;
79 }
80
81 let sampling_mode = match self.global_configs.get_sampling_mode() {
82 Some(sampling_mode) => sampling_mode,
83 None => return EvtSamplingDecision::ForceSampled,
84 };
85
86 let extra_info = exposure.get_extra_exposure_info_ref();
87 if self.should_sample_based_on_evaluation(extra_info) {
88 return EvtSamplingDecision::ForceSampled;
89 }
90
91 if self.should_sample_first_time_exposure(&expo_sampling_key) {
92 return EvtSamplingDecision::ForceSampled;
93 }
94
95 let sampling_rate = self
96 .get_special_case_sampling_rate(exposure)
97 .or_else(|| extra_info.and_then(|info| info.sampling_rate));
98
99 let is_sampled = expo_sampling_key.is_sampled(sampling_rate);
100
101 match sampling_mode {
102 EvtSamplingMode::On if is_sampled => {
103 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::On, true)
104 }
105 EvtSamplingMode::Shadow => {
106 EvtSamplingDecision::Sampled(sampling_rate, EvtSamplingMode::Shadow, is_sampled)
107 }
108 _ => EvtSamplingDecision::NotSampled,
109 }
110 }
111
112 pub fn try_reset_all_sampling(&self) {
113 self.try_reset_exposure_dedupe_set();
114 self.try_reset_spec_sampling_set();
115 }
116
117 fn should_dedupe_exposure(&self, sampling_key: &ExposureSamplingKey) -> bool {
118 let mut dedupe_set = write_lock_or_return!(TAG, self.exposure_dedupe_set, false);
119 if dedupe_set.contains(sampling_key) {
120 return true;
121 }
122
123 dedupe_set.insert(sampling_key.clone());
124 false
125 }
126
127 fn should_sample_based_on_evaluation(&self, extra_info: Option<&ExtraExposureInfo>) -> bool {
128 let exposure_info = match extra_info {
129 Some(exposure_info) => exposure_info,
130 None => return false,
131 };
132
133 if exposure_info.forward_all_exposures == Some(true) {
134 return true;
135 }
136
137 if exposure_info.has_seen_analytical_gates == Some(true) {
138 return true;
139 }
140
141 false
142 }
143
144 fn should_sample_first_time_exposure(&self, exposure: &ExposureSamplingKey) -> bool {
145 let sampling_key: SpecAndRuleHashTuple = (exposure.spec_name_hash, exposure.rule_id_hash);
146 if self.sample_key_exists(&sampling_key) {
147 return false;
148 }
149
150 match self
151 .spec_sampling_set
152 .try_write_for(std::time::Duration::from_secs(5))
153 {
154 Some(mut sampling_map) => {
155 sampling_map.insert(sampling_key);
156 }
157 None => {
158 log_e!(TAG, "Failed to acquire write lock for spec sampling set");
159 }
160 }
161
162 true
163 }
164
165 fn try_reset_spec_sampling_set(&self) {
166 let now = Utc::now().timestamp_millis() as u64;
167 let last_sampling_reset = self.last_spec_sampling_reset.load(Ordering::Relaxed);
168 let mut sampling_map = write_lock_or_noop!(TAG, self.spec_sampling_set);
169
170 let has_expired = now - last_sampling_reset > SAMPLING_TTL_MS;
171 let is_full = sampling_map.len() > SAMPLING_MAX_KEYS;
172
173 if has_expired || is_full {
174 log_d!(
175 TAG,
176 "Resetting spec sampling set. has_expired: {:?}, is_full: {:?}",
177 has_expired,
178 is_full
179 );
180 sampling_map.clear();
181 self.last_spec_sampling_reset.store(now, Ordering::Relaxed);
182 }
183 }
184
185 fn try_reset_exposure_dedupe_set(&self) {
186 let now = Utc::now().timestamp_millis() as u64;
187 let last_dedupe_reset = self.last_exposure_dedupe_reset.load(Ordering::Relaxed);
188 let mut dedupe_map = match self
189 .exposure_dedupe_set
190 .try_write_for(std::time::Duration::from_secs(5))
191 {
192 Some(map) => map,
193 None => {
194 log_e!(TAG, "Failed to acquire write lock for exposure dedupe set");
195 return;
196 }
197 };
198
199 let has_expired = now - last_dedupe_reset > SAMPLING_TTL_MS;
200 let is_full = dedupe_map.len() > SAMPLING_MAX_KEYS;
201
202 if has_expired || is_full {
203 log_d!(
204 TAG,
205 "Resetting exposure dedupe set. has_expired: {:?}, is_full: {:?}",
206 has_expired,
207 is_full
208 );
209 dedupe_map.clear();
210 self.last_exposure_dedupe_reset
211 .store(now, Ordering::Relaxed);
212 }
213 }
214
215 fn sample_key_exists(&self, key: &SpecAndRuleHashTuple) -> bool {
216 match self
217 .spec_sampling_set
218 .try_read_for(std::time::Duration::from_secs(5))
219 {
220 Some(map) => map.contains(key),
221 None => false,
222 }
223 }
224
225 fn get_special_case_sampling_rate<'a>(
226 &self,
227 exposure: &'a impl QueuedExposure<'a>,
228 ) -> Option<u64> {
229 let rule_id = exposure.get_rule_id_ref();
230 match rule_id {
231 "default" | "disabled" | "" => self.global_configs.get_special_case_sampling_rate(),
232 _ => None,
233 }
234 }
235}
236
237impl GlobalConfigs {
238 fn get_sampling_mode(&self) -> Option<EvtSamplingMode> {
239 fn parse_sampling_mode(sampling_mode: Option<&DynamicValue>) -> Option<EvtSamplingMode> {
240 let v = sampling_mode?.string_value.as_ref()?.value.as_str();
241
242 match v {
243 "on" => Some(EvtSamplingMode::On),
244 "shadow" => Some(EvtSamplingMode::Shadow),
245 _ => None,
246 }
247 }
248
249 self.use_sdk_config_value("sampling_mode", parse_sampling_mode)
250 }
251
252 fn get_special_case_sampling_rate(&self) -> Option<u64> {
253 fn parse_special_case_sampling_rate(value: Option<&DynamicValue>) -> Option<u64> {
254 match value {
255 Some(value) => value.float_value.map(|rate| rate as u64),
256 None => None,
257 }
258 }
259
260 self.use_sdk_config_value(
261 "special_case_sampling_rate",
262 parse_special_case_sampling_rate,
263 )
264 }
265}
266
267#[derive(Debug, PartialEq, Eq, Hash, Clone)]
268pub struct ExposureSamplingKey {
269 pub spec_name_hash: u64,
270 pub rule_id_hash: u64,
271 pub user_values_hash: u64,
272 pub additional_hash: u64,
273}
274
275impl ExposureSamplingKey {
276 pub fn new(evaluation: Option<&BaseEvaluation>, user: &UserData, additional_hash: u64) -> Self {
277 let spec_name_hash = evaluation.as_ref().map_or(0, |e| e.name.hash);
278 let rule_id_hash = evaluation.as_ref().map_or(0, |e| e.rule_id.hash);
279
280 let user_values_hash = user.create_user_values_hash();
281
282 Self {
283 spec_name_hash,
284 rule_id_hash,
285 user_values_hash,
286 additional_hash,
287 }
288 }
289
290 pub fn is_sampled(&self, sampling_rate: Option<u64>) -> bool {
291 let sampling_rate = match sampling_rate {
292 Some(rate) => rate,
293 None => return true, };
295
296 let final_hash =
297 self.spec_name_hash ^ self.rule_id_hash ^ self.user_values_hash ^ self.additional_hash;
298
299 final_hash.is_multiple_of(sampling_rate)
300 }
301}