devcycle_bucketing_rs/events/
event_queue.rs

1use crate::config::ConfigBody;
2use crate::config::client_custom_data::get_client_custom_data;
3use crate::config::platform_data::PlatformData;
4use crate::errors::DevCycleError;
5use crate::events::event::*;
6use crate::generate_bucketed_config;
7use crate::user::{PopulatedUser, User};
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicI64, Ordering};
11use std::time::Duration;
12use tokio::sync::{Mutex, mpsc};
13
14#[derive(Clone)]
15pub struct EventQueueOptions {
16    pub flush_events_interval: Duration,
17    pub disable_automatic_event_logging: bool,
18    pub disable_custom_event_logging: bool,
19    pub max_event_queue_size: i32,
20    pub max_user_event_queue_size: i32,
21    pub flush_events_batch_size: i32,
22    pub flush_events_queue_size: i32,
23    pub events_api_base_uri: String,
24}
25
26impl EventQueueOptions {
27    pub fn is_event_logging_disabled(&self, event_type: *const EventType) -> bool {
28        if event_type == &EventType::CustomEvent {
29            return self.disable_custom_event_logging;
30        }
31        return self.disable_automatic_event_logging;
32    }
33}
34
35impl Default for EventQueueOptions {
36    fn default() -> Self {
37        EventQueueOptions {
38            flush_events_interval: Duration::from_secs(60),
39            disable_automatic_event_logging: false,
40            disable_custom_event_logging: false,
41            max_event_queue_size: 10000,
42            max_user_event_queue_size: 1000,
43            flush_events_batch_size: 100,
44            flush_events_queue_size: 1000,
45            events_api_base_uri: "https://events.devcycle.com".to_string(),
46        }
47    }
48}
49
50pub struct EventQueue {
51    pub(crate) sdk_key: String,
52    pub(crate) platform_data: Arc<PlatformData>,
53    pub(crate) agg_event_queue_raw_tx: mpsc::Sender<AggEventQueueRawMessage>,
54    pub(crate) agg_event_queue_raw_rx: mpsc::Receiver<AggEventQueueRawMessage>,
55    pub(crate) user_event_queue_raw_tx: mpsc::Sender<UserEventData>,
56    pub(crate) user_event_queue_raw_rx: mpsc::Receiver<UserEventData>,
57    pub(crate) agg_event_queue: AggregateEventQueue,
58    pub(crate) user_event_queue: Mutex<UserEventQueue>, // changed to Mutex for interior mutability
59    pub(crate) user_event_queue_count: AtomicI64,
60    pub(crate) queue_access_mutex: tokio::sync::Mutex<()>,
61    pub(crate) events_flushed: AtomicI64,
62    pub(crate) events_dropped: AtomicI64,
63    pub(crate) events_reported: AtomicI64,
64    pub(crate) options: EventQueueOptions,
65}
66
67impl EventQueue {
68    pub fn new(
69        sdk_key: String,
70        event_queue_options: EventQueueOptions,
71    ) -> Result<Self, DevCycleError> {
72        let (agg_event_queue_raw_tx, agg_event_queue_raw_rx) = mpsc::channel(10000);
73        let (user_event_queue_raw_tx, user_event_queue_raw_rx) = mpsc::channel(10000);
74        let platform_data = crate::config::platform_data::get_platform_data(&sdk_key)
75            .map_err(|e| DevCycleError::new(&e))?;
76        Ok(Self {
77            sdk_key,
78            platform_data,
79            agg_event_queue_raw_tx,
80            user_event_queue_raw_tx,
81            agg_event_queue_raw_rx,
82            user_event_queue_raw_rx,
83            agg_event_queue: HashMap::new(),
84            user_event_queue: Mutex::new(HashMap::new()), // wrap in Mutex
85            user_event_queue_count: AtomicI64::new(0),
86            queue_access_mutex: tokio::sync::Mutex::new(()),
87            events_flushed: AtomicI64::new(0),
88            events_dropped: AtomicI64::new(0),
89            events_reported: AtomicI64::new(0),
90            options: event_queue_options,
91        })
92    }
93
94    pub async fn queue_variable_evaluated_event(
95        &self,
96        variable_key: &str,
97        feature_id: &str,
98        variation_id: &str,
99        eval_reason: EvaluationReason,
100    ) -> Result<bool, DevCycleError> {
101        return self
102            .queue_aggregate_event_internal(
103                variable_key,
104                feature_id,
105                variation_id,
106                EventType::AggregateVariableEvaluated,
107                eval_reason,
108            )
109            .await;
110    }
111
112    pub async fn queue_variable_defaulted_event(
113        &self,
114        variable_key: &str,
115        feature_id: &str,
116        variation_id: &str,
117    ) -> Result<bool, DevCycleError> {
118        return self
119            .queue_aggregate_event_internal(
120                variable_key,
121                feature_id,
122                variation_id,
123                EventType::AggregateVariableDefaulted,
124                EvaluationReason::Default,
125            )
126            .await;
127    }
128
129    async fn queue_aggregate_event_internal(
130        &self,
131        variable_key: &str,
132        feature_id: &str,
133        variation_id: &str,
134        event_type: EventType,
135        eval_reason: EvaluationReason,
136    ) -> Result<bool, DevCycleError> {
137        if self.options.is_event_logging_disabled(&event_type) {
138            return Ok(false);
139        }
140        if variable_key.is_empty() {
141            return Err(DevCycleError::new(
142                "a variable key is required for aggregate events",
143            ));
144        }
145        let mut eval: EvalReasonAggMap = HashMap::new();
146        if event_type == EventType::AggregateVariableDefaulted {
147            eval.insert(EvaluationReason::Default, 1);
148        } else {
149            eval.insert(eval_reason, 1);
150        }
151
152        let success = self
153            .agg_event_queue_raw_tx
154            .try_send(AggEventQueueRawMessage {
155                event_type,
156                variation_id: variation_id.to_string(),
157                feature_id: feature_id.to_string(),
158                variable_key: variable_key.to_string(),
159                eval_metadata: eval,
160            });
161
162        if success.is_err() {
163            self.events_dropped.fetch_add(1, Ordering::Relaxed);
164            return Err(DevCycleError::new(&format!(
165                "dropping event, queue is full: {}",
166                success.unwrap_err()
167            )));
168        }
169        return Ok(true);
170    }
171
172    pub async fn queue_event(&self, user: User, event: Event) -> Result<bool, DevCycleError> {
173        let success = self
174            .user_event_queue_raw_tx
175            .try_send(UserEventData { user, event });
176
177        if success.is_err() {
178            self.events_dropped.fetch_add(1, Ordering::Relaxed);
179            return Err(DevCycleError::new(&format!(
180                "dropping event, queue is full: {}",
181                success.unwrap_err()
182            )));
183        }
184        return Ok(true);
185    }
186
187    pub(crate) async fn merge_agg_event_queue_keys(&mut self, config_body: &ConfigBody) {
188        let _guard = self.queue_access_mutex.lock().await;
189        for event_type in [
190            EventType::AggregateVariableDefaulted,
191            EventType::AggregateVariableEvaluated,
192        ] {
193            if !self.agg_event_queue.contains_key(&event_type) {
194                self.agg_event_queue
195                    .insert(event_type.clone(), HashMap::new());
196            }
197
198            for variable in config_body.variables.iter() {
199                if !self
200                    .agg_event_queue
201                    .get(&event_type)
202                    .unwrap()
203                    .contains_key(&variable.key)
204                {
205                    self.agg_event_queue
206                        .get_mut(&event_type)
207                        .unwrap()
208                        .insert(variable.key.clone(), HashMap::new());
209                }
210
211                for feature in config_body.features.iter() {
212                    if !self
213                        .agg_event_queue
214                        .get(&event_type)
215                        .unwrap()
216                        .get(&variable.key)
217                        .unwrap()
218                        .contains_key(&feature.key)
219                    {
220                        self.agg_event_queue
221                            .get_mut(&event_type)
222                            .unwrap()
223                            .get_mut(&variable.key)
224                            .unwrap()
225                            .insert(feature._id.clone(), HashMap::new());
226                    }
227
228                    for variation in feature.variations.iter() {
229                        if !self
230                            .agg_event_queue
231                            .get(&event_type)
232                            .unwrap()
233                            .get(&variable.key)
234                            .unwrap()
235                            .get(&feature._id)
236                            .unwrap()
237                            .contains_key(&variation._id)
238                        {
239                            self.agg_event_queue
240                                .get_mut(&event_type)
241                                .unwrap()
242                                .get_mut(&variable.key)
243                                .unwrap()
244                                .get_mut(&feature._id)
245                                .unwrap()
246                                .insert(variation._id.clone(), HashMap::new());
247                        }
248
249                        for reason in [
250                            EvaluationReason::TargetingMatch,
251                            EvaluationReason::Split,
252                            EvaluationReason::Default,
253                            EvaluationReason::Disabled,
254                            EvaluationReason::Error,
255                        ] {
256                            if !self
257                                .agg_event_queue
258                                .get(&event_type)
259                                .unwrap()
260                                .get(&variable.key)
261                                .unwrap()
262                                .get(&feature._id)
263                                .unwrap()
264                                .get(&variation._id)
265                                .unwrap()
266                                .contains_key(&reason)
267                            {
268                                self.agg_event_queue
269                                    .get_mut(&event_type)
270                                    .unwrap()
271                                    .get_mut(&variable.key)
272                                    .unwrap()
273                                    .get_mut(&feature._id)
274                                    .unwrap()
275                                    .get_mut(&variation._id)
276                                    .unwrap()
277                                    .insert(reason.clone(), 0);
278                            }
279                        }
280                    }
281                }
282            }
283        }
284    }
285
286    async unsafe fn process_user_events(
287        &mut self,
288        mut event: UserEventData,
289    ) -> Result<bool, DevCycleError> {
290        let client_custom_data = get_client_custom_data(self.sdk_key.clone());
291
292        let populated_user = PopulatedUser::new(
293            event.user.clone(),
294            self.platform_data.clone(),
295            client_custom_data.clone(),
296        );
297        let bucketed_config =
298            generate_bucketed_config(&self.sdk_key, populated_user.clone(), client_custom_data)
299                .await;
300        if bucketed_config.is_err() {
301            return Err(bucketed_config.err().unwrap());
302        }
303
304        event.event.feature_vars = bucketed_config?.feature_variation_map;
305
306        if event.event.event_type == EventType::CustomEvent {
307            event.event.user_id = event.user.user_id.clone();
308        }
309
310        let _guard = self.queue_access_mutex.lock().await;
311
312        // Lock the user_event_queue mutex for insertion
313        let user_id = event.user.user_id.clone();
314        {
315            let mut user_queue = self.user_event_queue.lock().await;
316            user_queue
317                .entry(user_id)
318                .or_insert_with(|| UserEventsBatchRecord {
319                    user: populated_user,
320                    events: Vec::new(),
321                })
322                .events
323                .push(event.event);
324        }
325
326        self.user_event_queue_count.fetch_add(1, Ordering::Relaxed);
327
328        return Ok(true);
329    }
330
331    pub(crate) async fn process_aggregate_event(
332        &mut self,
333        agg_event_queue_raw_message: AggEventQueueRawMessage,
334    ) {
335        let _guard = self.queue_access_mutex.lock().await;
336
337        let event_type = agg_event_queue_raw_message.event_type.clone();
338        let variable_key = agg_event_queue_raw_message.variable_key;
339        let feature_id = agg_event_queue_raw_message.feature_id;
340        let variation_id = agg_event_queue_raw_message.variation_id;
341        let eval_metadata = agg_event_queue_raw_message.eval_metadata;
342
343        if event_type == EventType::AggregateVariableEvaluated {
344            // Get or clone the nested structure and update counts directly
345            let eval_reasons = self
346                .agg_event_queue
347                .entry(event_type)
348                .or_insert_with(HashMap::new)
349                .entry(variable_key)
350                .or_insert_with(HashMap::new)
351                .entry(feature_id)
352                .or_insert_with(HashMap::new)
353                .entry(variation_id)
354                .or_insert_with(HashMap::new);
355
356            for (reason, count) in eval_metadata {
357                *eval_reasons.entry(reason).or_insert(0) += count;
358            }
359        } else {
360            // For defaulted events, use "default" as both feature_id and variation_id keys
361            let default_key = "default".to_string();
362
363            let default_reasons = self
364                .agg_event_queue
365                .entry(event_type)
366                .or_insert_with(HashMap::new)
367                .entry(variable_key)
368                .or_insert_with(HashMap::new)
369                .entry(default_key.clone())
370                .or_insert_with(HashMap::new)
371                .entry(default_key)
372                .or_insert_with(HashMap::new);
373
374            for (reason, count) in eval_metadata {
375                *default_reasons.entry(reason).or_insert(0) += count;
376            }
377        }
378    }
379
380    pub(crate) async fn process_events(
381        &mut self,
382        mut shutdown: tokio::sync::watch::Receiver<bool>,
383    ) {
384        loop {
385            tokio::select! {
386                _ = shutdown.changed() => {
387                    // Context is cancelled, exit the loop
388                    return;
389                }
390                user_event = self.user_event_queue_raw_rx.recv() => {
391                    match user_event {
392                        Some(event) => unsafe{
393                            let _ = self.process_user_events(event).await;
394                        }
395                        None => {
396                            // Channel closed
397                            return;
398                        }
399                    }
400                }
401                agg_event = self.agg_event_queue_raw_rx.recv() => {
402                    match agg_event {
403                        Some(event) => {
404                            self.process_aggregate_event(event).await;
405                        }
406                        None => {
407                            // Channel closed
408                            return;
409                        }
410                    }
411                }
412            }
413        }
414    }
415}