devcycle_bucketing_rs/events/
event_queue.rs1use crate::config::ConfigBody;
2use crate::config::client_custom_data::get_client_custom_data;
3use crate::config::platform_data::PlatformData;
4use crate::errors::DevCycleError;
5use crate::events::event::*;
6use crate::generate_bucketed_config;
7use crate::user::{PopulatedUser, User};
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicI64, Ordering};
11use std::time::Duration;
12use tokio::sync::{Mutex, mpsc};
13
14#[derive(Clone)]
15pub struct EventQueueOptions {
16 pub flush_events_interval: Duration,
17 pub disable_automatic_event_logging: bool,
18 pub disable_custom_event_logging: bool,
19 pub max_event_queue_size: i32,
20 pub max_user_event_queue_size: i32,
21 pub flush_events_batch_size: i32,
22 pub flush_events_queue_size: i32,
23 pub events_api_base_uri: String,
24}
25
26impl EventQueueOptions {
27 pub fn is_event_logging_disabled(&self, event_type: *const EventType) -> bool {
28 if event_type == &EventType::CustomEvent {
29 return self.disable_custom_event_logging;
30 }
31 return self.disable_automatic_event_logging;
32 }
33}
34
35impl Default for EventQueueOptions {
36 fn default() -> Self {
37 EventQueueOptions {
38 flush_events_interval: Duration::from_secs(60),
39 disable_automatic_event_logging: false,
40 disable_custom_event_logging: false,
41 max_event_queue_size: 10000,
42 max_user_event_queue_size: 1000,
43 flush_events_batch_size: 100,
44 flush_events_queue_size: 1000,
45 events_api_base_uri: "https://events.devcycle.com".to_string(),
46 }
47 }
48}
49
50pub struct EventQueue {
51 pub(crate) sdk_key: String,
52 pub(crate) platform_data: Arc<PlatformData>,
53 pub(crate) agg_event_queue_raw_tx: mpsc::Sender<AggEventQueueRawMessage>,
54 pub(crate) agg_event_queue_raw_rx: mpsc::Receiver<AggEventQueueRawMessage>,
55 pub(crate) user_event_queue_raw_tx: mpsc::Sender<UserEventData>,
56 pub(crate) user_event_queue_raw_rx: mpsc::Receiver<UserEventData>,
57 pub(crate) agg_event_queue: AggregateEventQueue,
58 pub(crate) user_event_queue: Mutex<UserEventQueue>, pub(crate) user_event_queue_count: AtomicI64,
60 pub(crate) queue_access_mutex: tokio::sync::Mutex<()>,
61 pub(crate) events_flushed: AtomicI64,
62 pub(crate) events_dropped: AtomicI64,
63 pub(crate) events_reported: AtomicI64,
64 pub(crate) options: EventQueueOptions,
65}
66
67impl EventQueue {
68 pub fn new(
69 sdk_key: String,
70 event_queue_options: EventQueueOptions,
71 ) -> Result<Self, DevCycleError> {
72 let (agg_event_queue_raw_tx, agg_event_queue_raw_rx) = mpsc::channel(10000);
73 let (user_event_queue_raw_tx, user_event_queue_raw_rx) = mpsc::channel(10000);
74 let platform_data = crate::config::platform_data::get_platform_data(&sdk_key)
75 .map_err(|e| DevCycleError::new(&e))?;
76 Ok(Self {
77 sdk_key,
78 platform_data,
79 agg_event_queue_raw_tx,
80 user_event_queue_raw_tx,
81 agg_event_queue_raw_rx,
82 user_event_queue_raw_rx,
83 agg_event_queue: HashMap::new(),
84 user_event_queue: Mutex::new(HashMap::new()), user_event_queue_count: AtomicI64::new(0),
86 queue_access_mutex: tokio::sync::Mutex::new(()),
87 events_flushed: AtomicI64::new(0),
88 events_dropped: AtomicI64::new(0),
89 events_reported: AtomicI64::new(0),
90 options: event_queue_options,
91 })
92 }
93
94 pub async fn queue_variable_evaluated_event(
95 &self,
96 variable_key: &str,
97 feature_id: &str,
98 variation_id: &str,
99 eval_reason: EvaluationReason,
100 ) -> Result<bool, DevCycleError> {
101 return self
102 .queue_aggregate_event_internal(
103 variable_key,
104 feature_id,
105 variation_id,
106 EventType::AggregateVariableEvaluated,
107 eval_reason,
108 )
109 .await;
110 }
111
112 pub async fn queue_variable_defaulted_event(
113 &self,
114 variable_key: &str,
115 feature_id: &str,
116 variation_id: &str,
117 ) -> Result<bool, DevCycleError> {
118 return self
119 .queue_aggregate_event_internal(
120 variable_key,
121 feature_id,
122 variation_id,
123 EventType::AggregateVariableDefaulted,
124 EvaluationReason::Default,
125 )
126 .await;
127 }
128
129 async fn queue_aggregate_event_internal(
130 &self,
131 variable_key: &str,
132 feature_id: &str,
133 variation_id: &str,
134 event_type: EventType,
135 eval_reason: EvaluationReason,
136 ) -> Result<bool, DevCycleError> {
137 if self.options.is_event_logging_disabled(&event_type) {
138 return Ok(false);
139 }
140 if variable_key.is_empty() {
141 return Err(DevCycleError::new(
142 "a variable key is required for aggregate events",
143 ));
144 }
145 let mut eval: EvalReasonAggMap = HashMap::new();
146 if event_type == EventType::AggregateVariableDefaulted {
147 eval.insert(EvaluationReason::Default, 1);
148 } else {
149 eval.insert(eval_reason, 1);
150 }
151
152 let success = self
153 .agg_event_queue_raw_tx
154 .try_send(AggEventQueueRawMessage {
155 event_type,
156 variation_id: variation_id.to_string(),
157 feature_id: feature_id.to_string(),
158 variable_key: variable_key.to_string(),
159 eval_metadata: eval,
160 });
161
162 if success.is_err() {
163 self.events_dropped.fetch_add(1, Ordering::Relaxed);
164 return Err(DevCycleError::new(&format!(
165 "dropping event, queue is full: {}",
166 success.unwrap_err()
167 )));
168 }
169 return Ok(true);
170 }
171
172 pub async fn queue_event(&self, user: User, event: Event) -> Result<bool, DevCycleError> {
173 let success = self
174 .user_event_queue_raw_tx
175 .try_send(UserEventData { user, event });
176
177 if success.is_err() {
178 self.events_dropped.fetch_add(1, Ordering::Relaxed);
179 return Err(DevCycleError::new(&format!(
180 "dropping event, queue is full: {}",
181 success.unwrap_err()
182 )));
183 }
184 return Ok(true);
185 }
186
187 pub(crate) async fn merge_agg_event_queue_keys(&mut self, config_body: &ConfigBody) {
188 let _guard = self.queue_access_mutex.lock().await;
189 for event_type in [
190 EventType::AggregateVariableDefaulted,
191 EventType::AggregateVariableEvaluated,
192 ] {
193 if !self.agg_event_queue.contains_key(&event_type) {
194 self.agg_event_queue
195 .insert(event_type.clone(), HashMap::new());
196 }
197
198 for variable in config_body.variables.iter() {
199 if !self
200 .agg_event_queue
201 .get(&event_type)
202 .unwrap()
203 .contains_key(&variable.key)
204 {
205 self.agg_event_queue
206 .get_mut(&event_type)
207 .unwrap()
208 .insert(variable.key.clone(), HashMap::new());
209 }
210
211 for feature in config_body.features.iter() {
212 if !self
213 .agg_event_queue
214 .get(&event_type)
215 .unwrap()
216 .get(&variable.key)
217 .unwrap()
218 .contains_key(&feature.key)
219 {
220 self.agg_event_queue
221 .get_mut(&event_type)
222 .unwrap()
223 .get_mut(&variable.key)
224 .unwrap()
225 .insert(feature._id.clone(), HashMap::new());
226 }
227
228 for variation in feature.variations.iter() {
229 if !self
230 .agg_event_queue
231 .get(&event_type)
232 .unwrap()
233 .get(&variable.key)
234 .unwrap()
235 .get(&feature._id)
236 .unwrap()
237 .contains_key(&variation._id)
238 {
239 self.agg_event_queue
240 .get_mut(&event_type)
241 .unwrap()
242 .get_mut(&variable.key)
243 .unwrap()
244 .get_mut(&feature._id)
245 .unwrap()
246 .insert(variation._id.clone(), HashMap::new());
247 }
248
249 for reason in [
250 EvaluationReason::TargetingMatch,
251 EvaluationReason::Split,
252 EvaluationReason::Default,
253 EvaluationReason::Disabled,
254 EvaluationReason::Error,
255 ] {
256 if !self
257 .agg_event_queue
258 .get(&event_type)
259 .unwrap()
260 .get(&variable.key)
261 .unwrap()
262 .get(&feature._id)
263 .unwrap()
264 .get(&variation._id)
265 .unwrap()
266 .contains_key(&reason)
267 {
268 self.agg_event_queue
269 .get_mut(&event_type)
270 .unwrap()
271 .get_mut(&variable.key)
272 .unwrap()
273 .get_mut(&feature._id)
274 .unwrap()
275 .get_mut(&variation._id)
276 .unwrap()
277 .insert(reason.clone(), 0);
278 }
279 }
280 }
281 }
282 }
283 }
284 }
285
286 async unsafe fn process_user_events(
287 &mut self,
288 mut event: UserEventData,
289 ) -> Result<bool, DevCycleError> {
290 let client_custom_data = get_client_custom_data(self.sdk_key.clone());
291
292 let populated_user = PopulatedUser::new(
293 event.user.clone(),
294 self.platform_data.clone(),
295 client_custom_data.clone(),
296 );
297 let bucketed_config =
298 generate_bucketed_config(&self.sdk_key, populated_user.clone(), client_custom_data)
299 .await;
300 if bucketed_config.is_err() {
301 return Err(bucketed_config.err().unwrap());
302 }
303
304 event.event.feature_vars = bucketed_config?.feature_variation_map;
305
306 if event.event.event_type == EventType::CustomEvent {
307 event.event.user_id = event.user.user_id.clone();
308 }
309
310 let _guard = self.queue_access_mutex.lock().await;
311
312 let user_id = event.user.user_id.clone();
314 {
315 let mut user_queue = self.user_event_queue.lock().await;
316 user_queue
317 .entry(user_id)
318 .or_insert_with(|| UserEventsBatchRecord {
319 user: populated_user,
320 events: Vec::new(),
321 })
322 .events
323 .push(event.event);
324 }
325
326 self.user_event_queue_count.fetch_add(1, Ordering::Relaxed);
327
328 return Ok(true);
329 }
330
331 pub(crate) async fn process_aggregate_event(
332 &mut self,
333 agg_event_queue_raw_message: AggEventQueueRawMessage,
334 ) {
335 let _guard = self.queue_access_mutex.lock().await;
336
337 let event_type = agg_event_queue_raw_message.event_type.clone();
338 let variable_key = agg_event_queue_raw_message.variable_key;
339 let feature_id = agg_event_queue_raw_message.feature_id;
340 let variation_id = agg_event_queue_raw_message.variation_id;
341 let eval_metadata = agg_event_queue_raw_message.eval_metadata;
342
343 if event_type == EventType::AggregateVariableEvaluated {
344 let eval_reasons = self
346 .agg_event_queue
347 .entry(event_type)
348 .or_insert_with(HashMap::new)
349 .entry(variable_key)
350 .or_insert_with(HashMap::new)
351 .entry(feature_id)
352 .or_insert_with(HashMap::new)
353 .entry(variation_id)
354 .or_insert_with(HashMap::new);
355
356 for (reason, count) in eval_metadata {
357 *eval_reasons.entry(reason).or_insert(0) += count;
358 }
359 } else {
360 let default_key = "default".to_string();
362
363 let default_reasons = self
364 .agg_event_queue
365 .entry(event_type)
366 .or_insert_with(HashMap::new)
367 .entry(variable_key)
368 .or_insert_with(HashMap::new)
369 .entry(default_key.clone())
370 .or_insert_with(HashMap::new)
371 .entry(default_key)
372 .or_insert_with(HashMap::new);
373
374 for (reason, count) in eval_metadata {
375 *default_reasons.entry(reason).or_insert(0) += count;
376 }
377 }
378 }
379
380 pub(crate) async fn process_events(
381 &mut self,
382 mut shutdown: tokio::sync::watch::Receiver<bool>,
383 ) {
384 loop {
385 tokio::select! {
386 _ = shutdown.changed() => {
387 return;
389 }
390 user_event = self.user_event_queue_raw_rx.recv() => {
391 match user_event {
392 Some(event) => unsafe{
393 let _ = self.process_user_events(event).await;
394 }
395 None => {
396 return;
398 }
399 }
400 }
401 agg_event = self.agg_event_queue_raw_rx.recv() => {
402 match agg_event {
403 Some(event) => {
404 self.process_aggregate_event(event).await;
405 }
406 None => {
407 return;
409 }
410 }
411 }
412 }
413 }
414 }
415}