Skip to main content

statsig_rust/event_logging/event_queue/
queued_expo.rs

1use std::{collections::HashMap, fmt::Display};
2
3use chrono::Utc;
4
5use crate::{
6    evaluation::{
7        evaluation_types::ExtraExposureInfo,
8        evaluator_result::{result_to_extra_exposure_info, EvaluatorResult},
9    },
10    event_logging::{
11        event_logger::ExposureTrigger,
12        exposure_sampling::{EvtSamplingDecision, ExposureSamplingKey},
13        exposure_utils::get_statsig_metadata_with_sampling_decision,
14        statsig_event::StatsigEvent,
15        statsig_event_internal::{
16            StatsigEventInternal, CONFIG_EXPOSURE_EVENT_NAME, GATE_EXPOSURE_EVENT_NAME,
17            LAYER_EXPOSURE_EVENT_NAME,
18        },
19    },
20    interned_string::InternedString,
21    specs_response::explicit_params::ExplicitParameters,
22    user::{StatsigUserInternal, StatsigUserLoggable},
23    EvaluationDetails, SecondaryExposure,
24};
25
26use super::queued_event::{
27    take_non_empty_secondary_exposures, EnqueueOperation, QueuedEvent, QueuedExposure,
28};
29use crate::event_logging::statsig_event::string_metadata_to_value_metadata;
30
31// Flow:
32// IN(EVAL) |> EnqueueOp [sampling]> QueuedEvent [bg thread]> StatsigEventInternal |> OUT(LOGGED)
33
34pub enum UserLoggableOrInternal<'a> {
35    Loggable(StatsigUserLoggable),
36    Internal(&'a StatsigUserInternal<'a, 'a>),
37}
38
39pub struct EnqueueExposureOp<'a> {
40    user: UserLoggableOrInternal<'a>,
41    data: ExposureData,
42}
43
44pub struct ExposureData {
45    pub event_name: &'static str,
46    pub spec_name: InternedString,
47    pub rule_id: Option<InternedString>,
48    pub exposure_time: u64,
49    pub trigger: ExposureTrigger,
50    pub evaluation_details: EvaluationDetails,
51    pub secondary_exposures: Option<Vec<SecondaryExposure>>,
52    pub undelegated_secondary_exposures: Option<Vec<SecondaryExposure>>,
53    pub version: Option<u32>,
54    pub override_spec_name: Option<InternedString>,
55    pub exposure_info: Option<ExtraExposureInfo>,
56
57    // Gate Only
58    pub gate_value: Option<bool>,
59
60    // DynamicConfig
61    pub rule_passed: Option<bool>,
62
63    // Experiment Only
64    pub is_user_in_experiment: Option<bool>,
65
66    // Layer Only
67    pub parameter_name: Option<InternedString>,
68    pub explicit_params: Option<ExplicitParameters>,
69    pub allocated_experiment: Option<InternedString>,
70}
71
72impl<'a> EnqueueExposureOp<'a> {
73    pub fn gate_exposure(
74        user: &'a StatsigUserInternal<'a, 'a>,
75        spec_name: &InternedString,
76        trigger: ExposureTrigger,
77        details: EvaluationDetails,
78        result: Option<EvaluatorResult>,
79    ) -> Self {
80        let gate_value = result.as_ref().is_some_and(|r| r.bool_value);
81        let mut op = Self::new(
82            GATE_EXPOSURE_EVENT_NAME,
83            UserLoggableOrInternal::Internal(user),
84            spec_name,
85            trigger,
86            details,
87            result,
88        );
89
90        op.data.gate_value = Some(gate_value);
91        op
92    }
93
94    pub fn dynamic_config_exposure(
95        user: &'a StatsigUserInternal<'a, 'a>,
96        spec_name: &InternedString,
97        trigger: ExposureTrigger,
98        details: EvaluationDetails,
99        result: Option<EvaluatorResult>,
100    ) -> Self {
101        let rule_passed = result.as_ref().is_some_and(|r| r.bool_value);
102
103        let mut op = Self::new(
104            CONFIG_EXPOSURE_EVENT_NAME,
105            UserLoggableOrInternal::Internal(user),
106            spec_name,
107            trigger,
108            details,
109            result,
110        );
111
112        op.data.rule_passed = Some(rule_passed);
113        op
114    }
115
116    pub fn experiment_exposure(
117        user: &'a StatsigUserInternal<'a, 'a>,
118        spec_name: &InternedString,
119        trigger: ExposureTrigger,
120        details: EvaluationDetails,
121        result: Option<EvaluatorResult>,
122    ) -> Self {
123        let is_user_in_experiment = result.as_ref().is_some_and(|r| r.is_experiment_group);
124        let mut op = Self::new(
125            CONFIG_EXPOSURE_EVENT_NAME,
126            UserLoggableOrInternal::Internal(user),
127            spec_name,
128            trigger,
129            details,
130            result,
131        );
132
133        op.data.is_user_in_experiment = Some(is_user_in_experiment);
134        op
135    }
136
137    pub fn layer_param_exposure(
138        user: &'a StatsigUserInternal<'a, 'a>,
139        spec_name: &InternedString,
140        parameter_name: InternedString,
141        trigger: ExposureTrigger,
142        details: EvaluationDetails,
143        result: Option<EvaluatorResult>,
144    ) -> Self {
145        let mut result = result;
146        let mut allocated_experiment = None;
147        let mut explicit_params = None;
148        let mut undelegated_secondary_exposures = None;
149
150        if let Some(result) = result.as_mut() {
151            allocated_experiment = result.config_delegate.clone();
152            explicit_params = result.explicit_parameters.clone();
153            undelegated_secondary_exposures = result.undelegated_secondary_exposures.take();
154        }
155
156        let mut op = Self::new(
157            LAYER_EXPOSURE_EVENT_NAME,
158            UserLoggableOrInternal::Internal(user),
159            spec_name,
160            trigger,
161            details,
162            result,
163        );
164
165        op.data.parameter_name = Some(parameter_name);
166        op.data.explicit_params = explicit_params;
167        op.data.allocated_experiment = allocated_experiment;
168        op.data.undelegated_secondary_exposures = undelegated_secondary_exposures;
169
170        op
171    }
172
173    fn new(
174        event_name: &'static str,
175        user: UserLoggableOrInternal<'a>,
176        spec_name: &InternedString,
177        trigger: ExposureTrigger,
178        details: EvaluationDetails,
179        result: Option<EvaluatorResult>,
180    ) -> Self {
181        let mut data = ExposureData {
182            event_name,
183            spec_name: spec_name.clone(),
184            rule_id: None,
185            exposure_time: Utc::now().timestamp_millis() as u64,
186            trigger,
187            evaluation_details: details,
188            secondary_exposures: None,
189            undelegated_secondary_exposures: None,
190            version: None,
191            override_spec_name: None,
192            rule_passed: None,
193            exposure_info: None,
194            parameter_name: None,
195            explicit_params: None,
196            allocated_experiment: None,
197            is_user_in_experiment: None,
198            gate_value: None,
199        };
200
201        if let Some(result) = result {
202            data.exposure_info = Some(result_to_extra_exposure_info(&result));
203
204            data.rule_id = result.rule_id;
205            data.version = result.version;
206            data.override_spec_name = result.override_config_name;
207            data.secondary_exposures = Some(result.secondary_exposures);
208        }
209
210        Self { user, data }
211    }
212}
213
214#[cfg(feature = "ffi-support")]
215impl<'a> EnqueueExposureOp<'a> {
216    pub(crate) fn layer_param_exposure_from_partial_raw(
217        parameter_name: InternedString,
218        trigger: ExposureTrigger,
219        partial_raw: crate::statsig_types_raw::PartialLayerRaw,
220    ) -> Self {
221        let version = partial_raw.details.version;
222        let user = UserLoggableOrInternal::Loggable(partial_raw.user);
223
224        let mut rule_id = partial_raw
225            .parameter_rule_ids
226            .as_ref()
227            .and_then(|ids| ids.get(&parameter_name));
228
229        if rule_id.is_none() {
230            rule_id = partial_raw.rule_id.as_ref();
231        }
232
233        let data = ExposureData {
234            event_name: LAYER_EXPOSURE_EVENT_NAME,
235            spec_name: partial_raw.name,
236            rule_id: rule_id.cloned(),
237            exposure_time: Utc::now().timestamp_millis() as u64,
238            trigger,
239            evaluation_details: partial_raw.details,
240            secondary_exposures: partial_raw.secondary_exposures,
241            undelegated_secondary_exposures: partial_raw.undelegated_secondary_exposures,
242            version,
243            override_spec_name: None,
244            rule_passed: None,
245            exposure_info: partial_raw.exposure_info,
246            parameter_name: Some(parameter_name),
247            explicit_params: partial_raw.explicit_parameters,
248            allocated_experiment: partial_raw.allocated_experiment_name,
249            is_user_in_experiment: None,
250            gate_value: None,
251        };
252
253        Self { user, data }
254    }
255}
256
257impl EnqueueOperation for EnqueueExposureOp<'_> {
258    fn as_exposure(&self) -> Option<&impl QueuedExposure<'_>> {
259        Some(self)
260    }
261
262    fn into_queued_event(self, sampling_decision: EvtSamplingDecision) -> QueuedEvent {
263        let loggable_user = match self.user {
264            UserLoggableOrInternal::Loggable(loggable) => loggable,
265            UserLoggableOrInternal::Internal(internal) => internal.to_loggable(),
266        };
267
268        QueuedEvent::Exposure(QueuedExposureEvent {
269            user: loggable_user,
270            sampling_decision,
271            data: self.data,
272        })
273    }
274}
275
276impl<'a> QueuedExposure<'a> for EnqueueExposureOp<'a> {
277    fn create_exposure_sampling_key(&self) -> ExposureSamplingKey {
278        let mut additional_hash = 0u64;
279        if let Some(gate_value) = self.data.gate_value {
280            additional_hash = gate_value as u64;
281        } else if let Some(rule_passed) = self.data.rule_passed {
282            additional_hash = rule_passed as u64;
283        } else if let Some(is_user_in_experiment) = self.data.is_user_in_experiment {
284            additional_hash = is_user_in_experiment as u64;
285        } else if let Some(parameter_name) = &self.data.parameter_name {
286            additional_hash = parameter_name.hash;
287        }
288
289        let user_data = match &self.user {
290            UserLoggableOrInternal::Loggable(loggable) => &loggable.data,
291            UserLoggableOrInternal::Internal(internal) => &internal.user_ref.data,
292        };
293
294        let user_values_hash = user_data.create_exposure_dedupe_user_hash(None);
295
296        ExposureSamplingKey {
297            spec_name_hash: self.data.spec_name.hash,
298            rule_id_hash: self.data.rule_id.as_ref().map_or(0, |id| id.hash),
299            user_values_hash,
300            additional_hash,
301        }
302    }
303
304    fn get_rule_id_ref(&'a self) -> &'a str {
305        self.data.rule_id.as_ref().map_or("", |id| id.as_str())
306    }
307
308    fn get_extra_exposure_info_ref(&'a self) -> Option<&'a ExtraExposureInfo> {
309        self.data.exposure_info.as_ref()
310    }
311}
312
313pub struct QueuedExposureEvent {
314    pub user: StatsigUserLoggable,
315    pub sampling_decision: EvtSamplingDecision,
316    pub data: ExposureData,
317}
318
319impl QueuedExposureEvent {
320    pub fn take_secondary_exposures_for_primary_logging(&mut self) -> Vec<SecondaryExposure> {
321        if self.should_use_undelegated_secondary_exposures() {
322            return take_non_empty_secondary_exposures(
323                &mut self.data.undelegated_secondary_exposures,
324            );
325        }
326
327        take_non_empty_secondary_exposures(&mut self.data.secondary_exposures)
328    }
329
330    fn should_use_undelegated_secondary_exposures(&self) -> bool {
331        if self.data.event_name != LAYER_EXPOSURE_EVENT_NAME {
332            return false;
333        }
334
335        let Some(parameter_name) = self.data.parameter_name.as_ref() else {
336            return false;
337        };
338
339        !self
340            .data
341            .explicit_params
342            .as_ref()
343            .is_some_and(|params| params.contains(parameter_name.as_str()))
344    }
345
346    pub fn into_statsig_event_internal(self) -> StatsigEventInternal {
347        let mut data = self.data;
348        let mut builder = MetadataBuilder::new();
349
350        let mut should_use_undelegated_secondary_exposures = false;
351
352        builder
353            .try_add_gate_fields(&mut data)
354            .try_add_config_fields(&mut data)
355            .try_add_layer_fields(&mut data, &mut should_use_undelegated_secondary_exposures)
356            .add_eval_details(data.evaluation_details)
357            .add_interned_str("ruleID", data.rule_id.as_ref())
358            .try_add("configVersion", data.version.as_ref());
359
360        if data.trigger == ExposureTrigger::Manual {
361            builder.add_bool("isManualExposure", true);
362        }
363
364        if let Some(name) = data.override_spec_name {
365            builder.add_interned_str("overrideConfigName", Some(&name));
366        }
367
368        let statsig_metadata = get_statsig_metadata_with_sampling_decision(self.sampling_decision);
369
370        let event = StatsigEvent {
371            event_name: data.event_name.into(),
372            value: None,
373            metadata: Some(string_metadata_to_value_metadata(builder.build())),
374            statsig_metadata: Some(statsig_metadata),
375        };
376
377        let secondary_exposures = if should_use_undelegated_secondary_exposures {
378            data.undelegated_secondary_exposures
379        } else {
380            data.secondary_exposures
381        };
382
383        StatsigEventInternal::new(data.exposure_time, self.user, event, secondary_exposures)
384    }
385}
386
387struct MetadataBuilder {
388    metadata: HashMap<String, String>,
389}
390
391impl MetadataBuilder {
392    pub fn new() -> Self {
393        Self {
394            metadata: HashMap::new(),
395        }
396    }
397
398    pub fn add_eval_details(&mut self, evaluation_details: EvaluationDetails) -> &mut Self {
399        self.add_string("reason", evaluation_details.reason);
400        self.try_add("lcut", evaluation_details.lcut.as_ref());
401        self.try_add("receivedAt", evaluation_details.received_at.as_ref());
402        self
403    }
404
405    // This is called on an async thread, so it's somewhat ok to use unperformant_to_string
406    pub fn add_interned_str(&mut self, key: &str, value: Option<&InternedString>) -> &mut Self {
407        self.metadata.insert(
408            key.into(),
409            value
410                .map(|v| v.unperformant_to_string())
411                .unwrap_or_default(),
412        );
413        self
414    }
415
416    pub fn add_string(&mut self, key: &str, value: String) -> &mut Self {
417        self.metadata.insert(key.into(), value);
418        self
419    }
420
421    pub fn add_bool(&mut self, key: &str, value: bool) -> &mut Self {
422        self.metadata.insert(key.into(), value.to_string());
423        self
424    }
425
426    pub fn try_add<T>(&mut self, key: &str, value: Option<&T>) -> &mut Self
427    where
428        T: Display,
429    {
430        if let Some(value) = value {
431            self.metadata.insert(key.into(), value.to_string());
432        }
433        self
434    }
435
436    pub fn try_add_gate_fields(&mut self, data: &mut ExposureData) -> &mut Self {
437        if data.event_name != GATE_EXPOSURE_EVENT_NAME {
438            return self;
439        }
440
441        self.add_interned_str("gate", Some(&data.spec_name));
442        self.add_bool("gateValue", data.gate_value == Some(true));
443        self
444    }
445
446    pub fn try_add_config_fields(&mut self, data: &mut ExposureData) -> &mut Self {
447        if data.event_name != CONFIG_EXPOSURE_EVENT_NAME {
448            return self;
449        }
450
451        self.add_interned_str("config", Some(&data.spec_name));
452        self.try_add("rulePassed", data.rule_passed.as_ref());
453        self
454    }
455
456    pub fn try_add_layer_fields(
457        &mut self,
458        data: &mut ExposureData,
459        should_use_undelegated_secondary_exposures: &mut bool,
460    ) -> &mut Self {
461        if data.event_name != LAYER_EXPOSURE_EVENT_NAME {
462            return self;
463        }
464
465        let param_name = data.parameter_name.take().unwrap_or_default();
466        let is_explicit = data
467            .explicit_params
468            .take()
469            .is_some_and(|params| params.contains(&param_name));
470
471        self.add_interned_str("config", Some(&data.spec_name));
472        self.add_interned_str("parameterName", Some(&param_name));
473        self.add_bool("isExplicitParameter", is_explicit);
474
475        if is_explicit {
476            self.add_interned_str("allocatedExperiment", data.allocated_experiment.as_ref());
477        } else {
478            self.add_interned_str("allocatedExperiment", None);
479            *should_use_undelegated_secondary_exposures = true;
480        }
481
482        self
483    }
484
485    pub fn build(self) -> HashMap<String, String> {
486        self.metadata
487    }
488}