Skip to main content

statsig_rust/event_logging/event_queue/
queued_expo.rs

1use std::{collections::HashMap, fmt::Display};
2
3use chrono::Utc;
4
5use crate::{
6    evaluation::{
7        evaluation_types::ExtraExposureInfo,
8        evaluator_result::{result_to_extra_exposure_info, EvaluatorResult},
9    },
10    event_logging::{
11        event_logger::ExposureTrigger,
12        exposure_sampling::{EvtSamplingDecision, ExposureSamplingKey},
13        exposure_utils::get_statsig_metadata_with_sampling_decision,
14        statsig_event::StatsigEvent,
15        statsig_event_internal::{
16            StatsigEventInternal, CONFIG_EXPOSURE_EVENT_NAME, GATE_EXPOSURE_EVENT_NAME,
17            LAYER_EXPOSURE_EVENT_NAME,
18        },
19    },
20    interned_string::InternedString,
21    specs_response::explicit_params::ExplicitParameters,
22    user::{StatsigUserInternal, StatsigUserLoggable},
23    EvaluationDetails, SecondaryExposure,
24};
25
26use super::queued_event::{EnqueueOperation, QueuedEvent, QueuedExposure};
27use crate::event_logging::statsig_event::string_metadata_to_value_metadata;
28
29// Flow:
30// IN(EVAL) |> EnqueueOp [sampling]> QueuedEvent [bg thread]> StatsigEventInternal |> OUT(LOGGED)
31
32pub enum UserLoggableOrInternal<'a> {
33    Loggable(StatsigUserLoggable),
34    Internal(&'a StatsigUserInternal<'a, 'a>),
35}
36
37pub struct EnqueueExposureOp<'a> {
38    user: UserLoggableOrInternal<'a>,
39    data: ExposureData,
40}
41
42pub struct ExposureData {
43    pub event_name: &'static str,
44    pub spec_name: InternedString,
45    pub rule_id: Option<InternedString>,
46    pub exposure_time: u64,
47    pub trigger: ExposureTrigger,
48    pub evaluation_details: EvaluationDetails,
49    pub secondary_exposures: Option<Vec<SecondaryExposure>>,
50    pub undelegated_secondary_exposures: Option<Vec<SecondaryExposure>>,
51    pub version: Option<u32>,
52    pub override_spec_name: Option<InternedString>,
53    pub exposure_info: Option<ExtraExposureInfo>,
54
55    // Gate Only
56    pub gate_value: Option<bool>,
57
58    // DynamicConfig
59    pub rule_passed: Option<bool>,
60
61    // Experiment Only
62    pub is_user_in_experiment: Option<bool>,
63
64    // Layer Only
65    pub parameter_name: Option<InternedString>,
66    pub explicit_params: Option<ExplicitParameters>,
67    pub allocated_experiment: Option<InternedString>,
68}
69
70impl<'a> EnqueueExposureOp<'a> {
71    pub fn gate_exposure(
72        user: &'a StatsigUserInternal<'a, 'a>,
73        spec_name: &InternedString,
74        trigger: ExposureTrigger,
75        details: EvaluationDetails,
76        result: Option<EvaluatorResult>,
77    ) -> Self {
78        let gate_value = result.as_ref().is_some_and(|r| r.bool_value);
79        let mut op = Self::new(
80            GATE_EXPOSURE_EVENT_NAME,
81            UserLoggableOrInternal::Internal(user),
82            spec_name,
83            trigger,
84            details,
85            result,
86        );
87
88        op.data.gate_value = Some(gate_value);
89        op
90    }
91
92    pub fn dynamic_config_exposure(
93        user: &'a StatsigUserInternal<'a, 'a>,
94        spec_name: &InternedString,
95        trigger: ExposureTrigger,
96        details: EvaluationDetails,
97        result: Option<EvaluatorResult>,
98    ) -> Self {
99        let rule_passed = result.as_ref().is_some_and(|r| r.bool_value);
100
101        let mut op = Self::new(
102            CONFIG_EXPOSURE_EVENT_NAME,
103            UserLoggableOrInternal::Internal(user),
104            spec_name,
105            trigger,
106            details,
107            result,
108        );
109
110        op.data.rule_passed = Some(rule_passed);
111        op
112    }
113
114    pub fn experiment_exposure(
115        user: &'a StatsigUserInternal<'a, 'a>,
116        spec_name: &InternedString,
117        trigger: ExposureTrigger,
118        details: EvaluationDetails,
119        result: Option<EvaluatorResult>,
120    ) -> Self {
121        let is_user_in_experiment = result.as_ref().is_some_and(|r| r.is_experiment_group);
122        let mut op = Self::new(
123            CONFIG_EXPOSURE_EVENT_NAME,
124            UserLoggableOrInternal::Internal(user),
125            spec_name,
126            trigger,
127            details,
128            result,
129        );
130
131        op.data.is_user_in_experiment = Some(is_user_in_experiment);
132        op
133    }
134
135    pub fn layer_param_exposure(
136        user: &'a StatsigUserInternal<'a, 'a>,
137        spec_name: &InternedString,
138        parameter_name: InternedString,
139        trigger: ExposureTrigger,
140        details: EvaluationDetails,
141        result: Option<EvaluatorResult>,
142    ) -> Self {
143        let mut result = result;
144        let mut allocated_experiment = None;
145        let mut explicit_params = None;
146        let mut undelegated_secondary_exposures = None;
147
148        if let Some(result) = result.as_mut() {
149            allocated_experiment = result.config_delegate.clone();
150            explicit_params = result.explicit_parameters.clone();
151            undelegated_secondary_exposures = result.undelegated_secondary_exposures.take();
152        }
153
154        let mut op = Self::new(
155            LAYER_EXPOSURE_EVENT_NAME,
156            UserLoggableOrInternal::Internal(user),
157            spec_name,
158            trigger,
159            details,
160            result,
161        );
162
163        op.data.parameter_name = Some(parameter_name);
164        op.data.explicit_params = explicit_params;
165        op.data.allocated_experiment = allocated_experiment;
166        op.data.undelegated_secondary_exposures = undelegated_secondary_exposures;
167
168        op
169    }
170
171    fn new(
172        event_name: &'static str,
173        user: UserLoggableOrInternal<'a>,
174        spec_name: &InternedString,
175        trigger: ExposureTrigger,
176        details: EvaluationDetails,
177        result: Option<EvaluatorResult>,
178    ) -> Self {
179        let mut data = ExposureData {
180            event_name,
181            spec_name: spec_name.clone(),
182            rule_id: None,
183            exposure_time: Utc::now().timestamp_millis() as u64,
184            trigger,
185            evaluation_details: details,
186            secondary_exposures: None,
187            undelegated_secondary_exposures: None,
188            version: None,
189            override_spec_name: None,
190            rule_passed: None,
191            exposure_info: None,
192            parameter_name: None,
193            explicit_params: None,
194            allocated_experiment: None,
195            is_user_in_experiment: None,
196            gate_value: None,
197        };
198
199        if let Some(result) = result {
200            data.exposure_info = Some(result_to_extra_exposure_info(&result));
201
202            data.rule_id = result.rule_id;
203            data.version = result.version;
204            data.override_spec_name = result.override_config_name;
205            data.secondary_exposures = Some(result.secondary_exposures);
206        }
207
208        Self { user, data }
209    }
210}
211
212#[cfg(feature = "ffi-support")]
213impl<'a> EnqueueExposureOp<'a> {
214    pub(crate) fn layer_param_exposure_from_partial_raw(
215        parameter_name: InternedString,
216        trigger: ExposureTrigger,
217        partial_raw: crate::statsig_types_raw::PartialLayerRaw,
218    ) -> Self {
219        let version = partial_raw.details.version;
220        let user = UserLoggableOrInternal::Loggable(partial_raw.user);
221
222        let mut rule_id = partial_raw
223            .parameter_rule_ids
224            .as_ref()
225            .and_then(|ids| ids.get(&parameter_name));
226
227        if rule_id.is_none() {
228            rule_id = partial_raw.rule_id.as_ref();
229        }
230
231        let data = ExposureData {
232            event_name: LAYER_EXPOSURE_EVENT_NAME,
233            spec_name: partial_raw.name,
234            rule_id: rule_id.cloned(),
235            exposure_time: Utc::now().timestamp_millis() as u64,
236            trigger,
237            evaluation_details: partial_raw.details,
238            secondary_exposures: partial_raw.secondary_exposures,
239            undelegated_secondary_exposures: partial_raw.undelegated_secondary_exposures,
240            version,
241            override_spec_name: None,
242            rule_passed: None,
243            exposure_info: partial_raw.exposure_info,
244            parameter_name: Some(parameter_name),
245            explicit_params: partial_raw.explicit_parameters,
246            allocated_experiment: partial_raw.allocated_experiment_name,
247            is_user_in_experiment: None,
248            gate_value: None,
249        };
250
251        Self { user, data }
252    }
253}
254
255impl EnqueueOperation for EnqueueExposureOp<'_> {
256    fn as_exposure(&self) -> Option<&impl QueuedExposure<'_>> {
257        Some(self)
258    }
259
260    fn into_queued_event(self, sampling_decision: EvtSamplingDecision) -> QueuedEvent {
261        let loggable_user = match self.user {
262            UserLoggableOrInternal::Loggable(loggable) => loggable,
263            UserLoggableOrInternal::Internal(internal) => internal.to_loggable(),
264        };
265
266        QueuedEvent::Exposure(QueuedExposureEvent {
267            user: loggable_user,
268            sampling_decision,
269            data: self.data,
270        })
271    }
272}
273
274impl<'a> QueuedExposure<'a> for EnqueueExposureOp<'a> {
275    fn create_exposure_sampling_key(&self) -> ExposureSamplingKey {
276        let mut additional_hash = 0u64;
277        if let Some(gate_value) = self.data.gate_value {
278            additional_hash = gate_value as u64;
279        } else if let Some(rule_passed) = self.data.rule_passed {
280            additional_hash = rule_passed as u64;
281        } else if let Some(is_user_in_experiment) = self.data.is_user_in_experiment {
282            additional_hash = is_user_in_experiment as u64;
283        } else if let Some(parameter_name) = &self.data.parameter_name {
284            additional_hash = parameter_name.hash;
285        }
286
287        let user_data = match &self.user {
288            UserLoggableOrInternal::Loggable(loggable) => &loggable.data,
289            UserLoggableOrInternal::Internal(internal) => &internal.user_ref.data,
290        };
291
292        let user_values_hash = user_data.create_exposure_dedupe_user_hash(None);
293
294        ExposureSamplingKey {
295            spec_name_hash: self.data.spec_name.hash,
296            rule_id_hash: self.data.rule_id.as_ref().map_or(0, |id| id.hash),
297            user_values_hash,
298            additional_hash,
299        }
300    }
301
302    fn get_rule_id_ref(&'a self) -> &'a str {
303        self.data.rule_id.as_ref().map_or("", |id| id.as_str())
304    }
305
306    fn get_extra_exposure_info_ref(&'a self) -> Option<&'a ExtraExposureInfo> {
307        self.data.exposure_info.as_ref()
308    }
309}
310
311pub struct QueuedExposureEvent {
312    pub user: StatsigUserLoggable,
313    pub sampling_decision: EvtSamplingDecision,
314    pub data: ExposureData,
315}
316
317impl QueuedExposureEvent {
318    pub fn into_statsig_event_internal(self) -> StatsigEventInternal {
319        let mut data = self.data;
320        let mut builder = MetadataBuilder::new();
321
322        let mut should_use_undelegated_secondary_exposures = false;
323
324        builder
325            .try_add_gate_fields(&mut data)
326            .try_add_config_fields(&mut data)
327            .try_add_layer_fields(&mut data, &mut should_use_undelegated_secondary_exposures)
328            .add_eval_details(data.evaluation_details)
329            .add_interned_str("ruleID", data.rule_id.as_ref())
330            .try_add("configVersion", data.version.as_ref());
331
332        if data.trigger == ExposureTrigger::Manual {
333            builder.add_bool("isManualExposure", true);
334        }
335
336        if let Some(name) = data.override_spec_name {
337            builder.add_interned_str("overrideConfigName", Some(&name));
338        }
339
340        let statsig_metadata = get_statsig_metadata_with_sampling_decision(self.sampling_decision);
341
342        let event = StatsigEvent {
343            event_name: data.event_name.into(),
344            value: None,
345            metadata: Some(string_metadata_to_value_metadata(builder.build())),
346            statsig_metadata: Some(statsig_metadata),
347        };
348
349        let secondary_exposures = if should_use_undelegated_secondary_exposures {
350            data.undelegated_secondary_exposures
351        } else {
352            data.secondary_exposures
353        };
354
355        StatsigEventInternal::new(data.exposure_time, self.user, event, secondary_exposures)
356    }
357}
358
359struct MetadataBuilder {
360    metadata: HashMap<String, String>,
361}
362
363impl MetadataBuilder {
364    pub fn new() -> Self {
365        Self {
366            metadata: HashMap::new(),
367        }
368    }
369
370    pub fn add_eval_details(&mut self, evaluation_details: EvaluationDetails) -> &mut Self {
371        self.add_string("reason", evaluation_details.reason);
372        self.try_add("lcut", evaluation_details.lcut.as_ref());
373        self.try_add("receivedAt", evaluation_details.received_at.as_ref());
374        self
375    }
376
377    // This is called on an async thread, so it's somewhat ok to use unperformant_to_string
378    pub fn add_interned_str(&mut self, key: &str, value: Option<&InternedString>) -> &mut Self {
379        self.metadata.insert(
380            key.into(),
381            value
382                .map(|v| v.unperformant_to_string())
383                .unwrap_or_default(),
384        );
385        self
386    }
387
388    pub fn add_string(&mut self, key: &str, value: String) -> &mut Self {
389        self.metadata.insert(key.into(), value);
390        self
391    }
392
393    pub fn add_bool(&mut self, key: &str, value: bool) -> &mut Self {
394        self.metadata.insert(key.into(), value.to_string());
395        self
396    }
397
398    pub fn try_add<T>(&mut self, key: &str, value: Option<&T>) -> &mut Self
399    where
400        T: Display,
401    {
402        if let Some(value) = value {
403            self.metadata.insert(key.into(), value.to_string());
404        }
405        self
406    }
407
408    pub fn try_add_gate_fields(&mut self, data: &mut ExposureData) -> &mut Self {
409        if data.event_name != GATE_EXPOSURE_EVENT_NAME {
410            return self;
411        }
412
413        self.add_interned_str("gate", Some(&data.spec_name));
414        self.add_bool("gateValue", data.gate_value == Some(true));
415        self
416    }
417
418    pub fn try_add_config_fields(&mut self, data: &mut ExposureData) -> &mut Self {
419        if data.event_name != CONFIG_EXPOSURE_EVENT_NAME {
420            return self;
421        }
422
423        self.add_interned_str("config", Some(&data.spec_name));
424        self.try_add("rulePassed", data.rule_passed.as_ref());
425        self
426    }
427
428    pub fn try_add_layer_fields(
429        &mut self,
430        data: &mut ExposureData,
431        should_use_undelegated_secondary_exposures: &mut bool,
432    ) -> &mut Self {
433        if data.event_name != LAYER_EXPOSURE_EVENT_NAME {
434            return self;
435        }
436
437        let param_name = data.parameter_name.take().unwrap_or_default();
438        let is_explicit = data
439            .explicit_params
440            .take()
441            .is_some_and(|params| params.contains(&param_name));
442
443        self.add_interned_str("config", Some(&data.spec_name));
444        self.add_interned_str("parameterName", Some(&param_name));
445        self.add_bool("isExplicitParameter", is_explicit);
446
447        if is_explicit {
448            self.add_interned_str("allocatedExperiment", data.allocated_experiment.as_ref());
449        } else {
450            self.add_interned_str("allocatedExperiment", None);
451            *should_use_undelegated_secondary_exposures = true;
452        }
453
454        self
455    }
456
457    pub fn build(self) -> HashMap<String, String> {
458        self.metadata
459    }
460}