Skip to main content

statsig_rust/event_logging/event_queue/
queued_expo.rs

1use std::{collections::HashMap, fmt::Display};
2
3use chrono::Utc;
4
5use crate::{
6    evaluation::{evaluation_types::ExtraExposureInfo, evaluator_result::EvaluatorResult},
7    event_logging::{
8        event_logger::ExposureTrigger,
9        exposure_sampling::{EvtSamplingDecision, ExposureSamplingKey},
10        exposure_utils::get_statsig_metadata_with_sampling_decision,
11        statsig_event::StatsigEvent,
12        statsig_event_internal::{
13            StatsigEventInternal, CONFIG_EXPOSURE_EVENT_NAME, GATE_EXPOSURE_EVENT_NAME,
14            LAYER_EXPOSURE_EVENT_NAME,
15        },
16    },
17    interned_string::InternedString,
18    specs_response::explicit_params::ExplicitParameters,
19    user::{StatsigUserInternal, StatsigUserLoggable},
20    EvaluationDetails, SecondaryExposure,
21};
22
23use super::queued_event::{EnqueueOperation, QueuedEvent, QueuedExposure};
24use crate::event_logging::statsig_event::string_metadata_to_value_metadata;
25
26// Flow:
27// IN(EVAL) |> EnqueueOp [sampling]> QueuedEvent [bg thread]> StatsigEventInternal |> OUT(LOGGED)
28
29pub enum UserLoggableOrInternal<'a> {
30    Loggable(StatsigUserLoggable),
31    Internal(&'a StatsigUserInternal<'a, 'a>),
32}
33
34pub struct EnqueueExposureOp<'a> {
35    user: UserLoggableOrInternal<'a>,
36    data: ExposureData,
37}
38
39pub struct ExposureData {
40    pub event_name: &'static str,
41    pub spec_name: InternedString,
42    pub rule_id: Option<InternedString>,
43    pub exposure_time: u64,
44    pub trigger: ExposureTrigger,
45    pub evaluation_details: EvaluationDetails,
46    pub secondary_exposures: Option<Vec<SecondaryExposure>>,
47    pub undelegated_secondary_exposures: Option<Vec<SecondaryExposure>>,
48    pub version: Option<u32>,
49    pub override_spec_name: Option<InternedString>,
50    pub exposure_info: Option<ExtraExposureInfo>,
51
52    // Gate Only
53    pub gate_value: Option<bool>,
54
55    // DynamicConfig
56    pub rule_passed: Option<bool>,
57
58    // Experiment Only
59    pub is_user_in_experiment: Option<bool>,
60
61    // Layer Only
62    pub parameter_name: Option<InternedString>,
63    pub explicit_params: Option<ExplicitParameters>,
64    pub allocated_experiment: Option<InternedString>,
65}
66
67impl<'a> EnqueueExposureOp<'a> {
68    pub fn gate_exposure(
69        user: &'a StatsigUserInternal<'a, 'a>,
70        spec_name: &InternedString,
71        trigger: ExposureTrigger,
72        details: EvaluationDetails,
73        result: Option<EvaluatorResult>,
74    ) -> Self {
75        let gate_value = result.as_ref().is_some_and(|r| r.bool_value);
76        let mut op = Self::new(
77            GATE_EXPOSURE_EVENT_NAME,
78            UserLoggableOrInternal::Internal(user),
79            spec_name,
80            trigger,
81            details,
82            result,
83        );
84
85        op.data.gate_value = Some(gate_value);
86        op
87    }
88
89    pub fn dynamic_config_exposure(
90        user: &'a StatsigUserInternal<'a, 'a>,
91        spec_name: &InternedString,
92        trigger: ExposureTrigger,
93        details: EvaluationDetails,
94        result: Option<EvaluatorResult>,
95    ) -> Self {
96        let rule_passed = result.as_ref().is_some_and(|r| r.bool_value);
97
98        let mut op = Self::new(
99            CONFIG_EXPOSURE_EVENT_NAME,
100            UserLoggableOrInternal::Internal(user),
101            spec_name,
102            trigger,
103            details,
104            result,
105        );
106
107        op.data.rule_passed = Some(rule_passed);
108        op
109    }
110
111    pub fn experiment_exposure(
112        user: &'a StatsigUserInternal<'a, 'a>,
113        spec_name: &InternedString,
114        trigger: ExposureTrigger,
115        details: EvaluationDetails,
116        result: Option<EvaluatorResult>,
117    ) -> Self {
118        let is_user_in_experiment = result.as_ref().is_some_and(|r| r.is_experiment_group);
119        let mut op = Self::new(
120            CONFIG_EXPOSURE_EVENT_NAME,
121            UserLoggableOrInternal::Internal(user),
122            spec_name,
123            trigger,
124            details,
125            result,
126        );
127
128        op.data.is_user_in_experiment = Some(is_user_in_experiment);
129        op
130    }
131
132    pub fn layer_param_exposure(
133        user: &'a StatsigUserInternal<'a, 'a>,
134        spec_name: &InternedString,
135        parameter_name: InternedString,
136        trigger: ExposureTrigger,
137        details: EvaluationDetails,
138        result: Option<EvaluatorResult>,
139    ) -> Self {
140        let mut result = result;
141        let mut allocated_experiment = None;
142        let mut explicit_params = None;
143        let mut undelegated_secondary_exposures = None;
144
145        if let Some(result) = result.as_mut() {
146            allocated_experiment = result.config_delegate.clone();
147            explicit_params = result.explicit_parameters.clone();
148            undelegated_secondary_exposures = result.undelegated_secondary_exposures.take();
149        }
150
151        let mut op = Self::new(
152            LAYER_EXPOSURE_EVENT_NAME,
153            UserLoggableOrInternal::Internal(user),
154            spec_name,
155            trigger,
156            details,
157            result,
158        );
159
160        op.data.parameter_name = Some(parameter_name);
161        op.data.explicit_params = explicit_params;
162        op.data.allocated_experiment = allocated_experiment;
163        op.data.undelegated_secondary_exposures = undelegated_secondary_exposures;
164
165        op
166    }
167
168    fn new(
169        event_name: &'static str,
170        user: UserLoggableOrInternal<'a>,
171        spec_name: &InternedString,
172        trigger: ExposureTrigger,
173        details: EvaluationDetails,
174        result: Option<EvaluatorResult>,
175    ) -> Self {
176        let mut data = ExposureData {
177            event_name,
178            spec_name: spec_name.clone(),
179            rule_id: None,
180            exposure_time: Utc::now().timestamp_millis() as u64,
181            trigger,
182            evaluation_details: details,
183            secondary_exposures: None,
184            undelegated_secondary_exposures: None,
185            version: None,
186            override_spec_name: None,
187            rule_passed: None,
188            exposure_info: None,
189            parameter_name: None,
190            explicit_params: None,
191            allocated_experiment: None,
192            is_user_in_experiment: None,
193            gate_value: None,
194        };
195
196        if let Some(result) = result {
197            data.exposure_info = Some(ExtraExposureInfo {
198                sampling_rate: result.sampling_rate,
199                forward_all_exposures: result.forward_all_exposures,
200                has_seen_analytical_gates: result.has_seen_analytical_gates,
201                override_config_name: result.override_config_name.clone(),
202                version: result.version,
203            });
204
205            data.rule_id = result.rule_id;
206            data.version = result.version;
207            data.override_spec_name = result.override_config_name;
208            data.secondary_exposures = Some(result.secondary_exposures);
209        }
210
211        Self { user, data }
212    }
213}
214
215#[cfg(feature = "ffi-support")]
216impl<'a> EnqueueExposureOp<'a> {
217    pub(crate) fn layer_param_exposure_from_partial_raw(
218        parameter_name: InternedString,
219        trigger: ExposureTrigger,
220        partial_raw: crate::statsig_types_raw::PartialLayerRaw,
221    ) -> Self {
222        let version = partial_raw.details.version;
223        let user = UserLoggableOrInternal::Loggable(partial_raw.user);
224
225        let mut rule_id = partial_raw
226            .parameter_rule_ids
227            .as_ref()
228            .and_then(|ids| ids.get(&parameter_name));
229
230        if rule_id.is_none() {
231            rule_id = partial_raw.rule_id.as_ref();
232        }
233
234        let data = ExposureData {
235            event_name: LAYER_EXPOSURE_EVENT_NAME,
236            spec_name: partial_raw.name,
237            rule_id: rule_id.cloned(),
238            exposure_time: Utc::now().timestamp_millis() as u64,
239            trigger,
240            evaluation_details: partial_raw.details,
241            secondary_exposures: partial_raw.secondary_exposures,
242            undelegated_secondary_exposures: partial_raw.undelegated_secondary_exposures,
243            version,
244            override_spec_name: None,
245            rule_passed: None,
246            exposure_info: None,
247            parameter_name: Some(parameter_name),
248            explicit_params: partial_raw.explicit_parameters,
249            allocated_experiment: partial_raw.allocated_experiment_name,
250            is_user_in_experiment: None,
251            gate_value: None,
252        };
253
254        Self { user, data }
255    }
256}
257
258impl EnqueueOperation for EnqueueExposureOp<'_> {
259    fn as_exposure(&self) -> Option<&impl QueuedExposure<'_>> {
260        Some(self)
261    }
262
263    fn into_queued_event(self, sampling_decision: EvtSamplingDecision) -> QueuedEvent {
264        let loggable_user = match self.user {
265            UserLoggableOrInternal::Loggable(loggable) => loggable,
266            UserLoggableOrInternal::Internal(internal) => internal.to_loggable(),
267        };
268
269        QueuedEvent::Exposure(QueuedExposureEvent {
270            user: loggable_user,
271            sampling_decision,
272            data: self.data,
273        })
274    }
275}
276
277impl<'a> QueuedExposure<'a> for EnqueueExposureOp<'a> {
278    fn create_exposure_sampling_key(&self) -> ExposureSamplingKey {
279        let spec_name_hash = self.data.spec_name.hash;
280        let rule_id_hash = self.data.rule_id.as_ref().map_or(0, |id| id.hash);
281        let user_values_hash = match &self.user {
282            UserLoggableOrInternal::Loggable(loggable) => loggable.data.create_user_values_hash(),
283            UserLoggableOrInternal::Internal(internal) => {
284                internal.user_ref.data.create_user_values_hash()
285            }
286        };
287
288        let mut additional_hash = 0u64;
289        if let Some(gate_value) = self.data.gate_value {
290            additional_hash = gate_value as u64;
291        } else if let Some(rule_passed) = self.data.rule_passed {
292            additional_hash = rule_passed as u64;
293        } else if let Some(is_user_in_experiment) = self.data.is_user_in_experiment {
294            additional_hash = is_user_in_experiment as u64;
295        } else if let Some(parameter_name) = &self.data.parameter_name {
296            additional_hash = parameter_name.hash;
297        }
298
299        ExposureSamplingKey {
300            spec_name_hash,
301            rule_id_hash,
302            user_values_hash,
303            additional_hash,
304        }
305    }
306
307    fn get_rule_id_ref(&'a self) -> &'a str {
308        self.data.rule_id.as_ref().map_or("", |id| id.as_str())
309    }
310
311    fn get_extra_exposure_info_ref(&'a self) -> Option<&'a ExtraExposureInfo> {
312        self.data.exposure_info.as_ref()
313    }
314}
315
316pub struct QueuedExposureEvent {
317    pub user: StatsigUserLoggable,
318    pub sampling_decision: EvtSamplingDecision,
319    pub data: ExposureData,
320}
321
322impl QueuedExposureEvent {
323    pub fn into_statsig_event_internal(self) -> StatsigEventInternal {
324        let mut data = self.data;
325        let mut builder = MetadataBuilder::new();
326
327        let mut should_use_undelegated_secondary_exposures = false;
328
329        builder
330            .try_add_gate_fields(&mut data)
331            .try_add_config_fields(&mut data)
332            .try_add_layer_fields(&mut data, &mut should_use_undelegated_secondary_exposures)
333            .add_eval_details(data.evaluation_details)
334            .add_interned_str("ruleID", data.rule_id.as_ref())
335            .try_add("configVersion", data.version.as_ref());
336
337        if data.trigger == ExposureTrigger::Manual {
338            builder.add_bool("isManualExposure", true);
339        }
340
341        if let Some(name) = data.override_spec_name {
342            builder.add_interned_str("overrideConfigName", Some(&name));
343        }
344
345        let statsig_metadata = get_statsig_metadata_with_sampling_decision(self.sampling_decision);
346
347        let event = StatsigEvent {
348            event_name: data.event_name.into(),
349            value: None,
350            metadata: Some(string_metadata_to_value_metadata(builder.build())),
351            statsig_metadata: Some(statsig_metadata),
352        };
353
354        let secondary_exposures = if should_use_undelegated_secondary_exposures {
355            data.undelegated_secondary_exposures
356        } else {
357            data.secondary_exposures
358        };
359
360        StatsigEventInternal::new(data.exposure_time, self.user, event, secondary_exposures)
361    }
362}
363
364struct MetadataBuilder {
365    metadata: HashMap<String, String>,
366}
367
368impl MetadataBuilder {
369    pub fn new() -> Self {
370        Self {
371            metadata: HashMap::new(),
372        }
373    }
374
375    pub fn add_eval_details(&mut self, evaluation_details: EvaluationDetails) -> &mut Self {
376        self.add_string("reason", evaluation_details.reason);
377        self.try_add("lcut", evaluation_details.lcut.as_ref());
378        self.try_add("receivedAt", evaluation_details.received_at.as_ref());
379        self
380    }
381
382    // This is called on an async thread, so it's somewhat ok to use unperformant_to_string
383    pub fn add_interned_str(&mut self, key: &str, value: Option<&InternedString>) -> &mut Self {
384        self.metadata.insert(
385            key.into(),
386            value
387                .map(|v| v.unperformant_to_string())
388                .unwrap_or_default(),
389        );
390        self
391    }
392
393    pub fn add_string(&mut self, key: &str, value: String) -> &mut Self {
394        self.metadata.insert(key.into(), value);
395        self
396    }
397
398    pub fn add_bool(&mut self, key: &str, value: bool) -> &mut Self {
399        self.metadata.insert(key.into(), value.to_string());
400        self
401    }
402
403    pub fn try_add<T>(&mut self, key: &str, value: Option<&T>) -> &mut Self
404    where
405        T: Display,
406    {
407        if let Some(value) = value {
408            self.metadata.insert(key.into(), value.to_string());
409        }
410        self
411    }
412
413    pub fn try_add_gate_fields(&mut self, data: &mut ExposureData) -> &mut Self {
414        if data.event_name != GATE_EXPOSURE_EVENT_NAME {
415            return self;
416        }
417
418        self.add_interned_str("gate", Some(&data.spec_name));
419        self.add_bool("gateValue", data.gate_value == Some(true));
420        self
421    }
422
423    pub fn try_add_config_fields(&mut self, data: &mut ExposureData) -> &mut Self {
424        if data.event_name != CONFIG_EXPOSURE_EVENT_NAME {
425            return self;
426        }
427
428        self.add_interned_str("config", Some(&data.spec_name));
429        self.try_add("rulePassed", data.rule_passed.as_ref());
430        self
431    }
432
433    pub fn try_add_layer_fields(
434        &mut self,
435        data: &mut ExposureData,
436        should_use_undelegated_secondary_exposures: &mut bool,
437    ) -> &mut Self {
438        if data.event_name != LAYER_EXPOSURE_EVENT_NAME {
439            return self;
440        }
441
442        let param_name = data.parameter_name.take().unwrap_or_default();
443        let is_explicit = data
444            .explicit_params
445            .take()
446            .is_some_and(|params| params.contains(&param_name));
447
448        self.add_interned_str("config", Some(&data.spec_name));
449        self.add_interned_str("parameterName", Some(&param_name));
450        self.add_bool("isExplicitParameter", is_explicit);
451
452        if is_explicit {
453            self.add_interned_str("allocatedExperiment", data.allocated_experiment.as_ref());
454        } else {
455            self.add_interned_str("allocatedExperiment", None);
456            *should_use_undelegated_secondary_exposures = true;
457        }
458
459        self
460    }
461
462    pub fn build(self) -> HashMap<String, String> {
463        self.metadata
464    }
465}