edgee_components_runtime/data_collection/
mod.rs

1mod context;
2mod convert;
3mod debug;
4pub mod logger;
5pub mod payload;
6
7wasmtime::component::bindgen!({
8    world: "data-collection",
9    path: "wit/",
10    async: true,
11});
12
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::config::ComponentsConfiguration;
17use context::EventContext;
18use debug::{debug_and_trace_response, trace_disabled_event, trace_request, DebugParams};
19use http::{header, HeaderMap, HeaderName, HeaderValue};
20use tokio::task::JoinHandle;
21use tracing::{error, span, Instrument, Level};
22
23use crate::context::ComponentsContext;
24use crate::{
25    data_collection::exports::edgee::components::data_collection as Component,
26    data_collection::payload::{Consent, Event, EventType},
27};
28use std::collections::HashMap;
29
30#[derive(Clone)]
31pub struct ComponentMetadata {
32    pub component_id: String,
33    pub component: String,
34    pub anonymization: bool,
35}
36
37#[derive(Clone)]
38pub struct Response {
39    pub status: i32,
40    pub body: String,
41    pub content_type: String,
42    pub message: String,
43    pub duration: u128,
44}
45
46#[derive(Clone)]
47pub struct Request {
48    pub method: String,
49    pub url: String,
50    pub body: String,
51    pub headers: HashMap<String, String>,
52}
53
54#[derive(Clone)]
55pub struct EventResponse {
56    pub context: EventContext,
57    pub event: Event,
58    pub component_metadata: ComponentMetadata,
59
60    pub response: Response,
61    pub request: Request,
62}
63
64pub async fn send_json_events(
65    component_ctx: &ComponentsContext,
66    events_json: &str,
67    component_config: &ComponentsConfiguration,
68    trace_component: &Option<String>,
69    debug: bool,
70) -> anyhow::Result<Vec<JoinHandle<EventResponse>>> {
71    if events_json.is_empty() {
72        return Ok(vec![]);
73    }
74
75    let mut events: Vec<Event> = serde_json::from_str(events_json)?;
76    send_events(
77        component_ctx,
78        &mut events,
79        component_config,
80        trace_component,
81        debug,
82        "",
83        "",
84    )
85    .await
86}
87
88pub async fn send_events(
89    component_ctx: &ComponentsContext,
90    events: &mut [Event],
91    component_config: &ComponentsConfiguration,
92    trace_component: &Option<String>,
93    debug: bool,
94    project_id: &str,
95    proxy_host: &str,
96) -> anyhow::Result<Vec<JoinHandle<EventResponse>>> {
97    if events.is_empty() {
98        return Ok(vec![]);
99    }
100
101    let ctx = &EventContext::new(events, project_id, proxy_host);
102
103    let mut store = component_ctx.empty_store();
104
105    let mut futures = vec![];
106
107    // iterate on each event
108    for event in events.iter_mut() {
109        for cfg in component_config.data_collection.iter() {
110            let span = span!(
111                Level::INFO,
112                "component",
113                name = cfg.id.as_str(),
114                event = ?event.event_type
115            );
116            let _enter = span.enter();
117
118            let mut event = event.clone();
119
120            let trace =
121                trace_component.is_some() && trace_component.as_ref().unwrap() == cfg.id.as_str();
122
123            // if event_type is not enabled in config.config.get(component_id).unwrap(), skip the event
124            match event.event_type {
125                EventType::Page => {
126                    if !cfg.settings.edgee_page_event_enabled {
127                        trace_disabled_event(trace, "page");
128                        continue;
129                    }
130                }
131                EventType::User => {
132                    if !cfg.settings.edgee_user_event_enabled {
133                        trace_disabled_event(trace, "user");
134                        continue;
135                    }
136                }
137                EventType::Track => {
138                    if !cfg.settings.edgee_track_event_enabled {
139                        trace_disabled_event(trace, "track");
140                        continue;
141                    }
142                }
143            }
144
145            if !event.is_component_enabled(cfg) {
146                continue;
147            }
148
149            let initial_anonymization = cfg.settings.edgee_anonymization;
150            let default_consent = cfg.settings.edgee_default_consent.clone();
151
152            // Use the helper function to handle consent and determine anonymization
153            let (anonymization, outgoing_consent) = handle_consent_and_anonymization(
154                &mut event,
155                &default_consent,
156                initial_anonymization,
157            );
158
159            if anonymization {
160                event.context.client.ip = ctx.get_ip_anonymized().clone();
161                // todo: anonymize other data, utm, referrer, etc.
162            } else {
163                event.context.client.ip = ctx.get_ip().clone();
164            }
165
166            // Native cookie support
167            if let Some(ref ids) = event.context.user.native_cookie_ids {
168                if ids.contains_key(&cfg.slug) {
169                    event.context.user.edgee_id = ids.get(&cfg.slug).unwrap().clone();
170                } else {
171                    event.context.user.edgee_id = ctx.get_edgee_id().clone();
172                }
173            }
174
175            // Add one second to the timestamp if uuid is not the same than the first event, to prevent duplicate sessions
176            if &event.uuid != ctx.get_uuid() {
177                event.timestamp = *ctx.get_timestamp() + chrono::Duration::seconds(1);
178                event.context.session.session_start = false;
179            }
180
181            // get the instance of the component
182            let instance = match component_ctx
183                .get_data_collection_instance(&cfg.id, &mut store)
184                .await
185            {
186                Ok(instance) => instance,
187                Err(err) => {
188                    error!("Failed to get data collection instance. Error: {}", err);
189                    continue;
190                }
191            };
192            let component = instance.edgee_components_data_collection();
193
194            let component_event: Component::Event = event.clone().into();
195            let component_settings: Vec<(String, String)> = cfg
196                .settings
197                .additional_settings
198                .clone()
199                .into_iter()
200                .collect();
201
202            // call the corresponding method of the component
203            let request = match component_event.event_type {
204                Component::EventType::Page => {
205                    component
206                        .call_page(&mut store, &component_event, &component_settings)
207                        .await
208                }
209                Component::EventType::Track => {
210                    component
211                        .call_track(&mut store, &component_event, &component_settings)
212                        .await
213                }
214                Component::EventType::User => {
215                    component
216                        .call_user(&mut store, &component_event, &component_settings)
217                        .await
218                }
219            };
220            let request = match request {
221                Ok(Ok(request)) => request,
222                Ok(Err(err)) => {
223                    // todo: debug and trace response (error)
224                    error!(
225                        step = "request",
226                        err = err.to_string(),
227                        "failed to handle data collection payload"
228                    );
229                    continue;
230                }
231                Err(err) => {
232                    // todo: debug and trace response (error)
233                    error!(
234                        step = "request",
235                        err = err.to_string(),
236                        "failed to handle data collection payload"
237                    );
238                    continue;
239                }
240            };
241
242            let mut headers = HeaderMap::new();
243            for (key, value) in request.headers.iter() {
244                headers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?);
245            }
246
247            if request.forward_client_headers {
248                insert_expected_headers(&mut headers, &event)?;
249            }
250
251            let client = reqwest::Client::builder()
252                .timeout(Duration::from_secs(5))
253                .build()?;
254
255            trace_request(trace, &request, &headers, &outgoing_consent, anonymization);
256
257            // spawn a separated async thread
258            let cfg_project_component_id = cfg.project_component_id.to_string();
259            let cfg_id = cfg.id.to_string();
260            let ctx_clone = ctx.clone();
261
262            let headers_map = headers.iter().fold(HashMap::new(), |mut acc, (k, v)| {
263                acc.insert(k.to_string(), v.to_str().unwrap().to_string());
264                acc
265            });
266
267            let future = tokio::spawn(
268                async move {
269                    let timer = std::time::Instant::now();
270                    let request_clone = request.clone();
271                    let res = match request.method {
272                        Component::HttpMethod::Get => {
273                            client.get(request.url).headers(headers).send().await
274                        }
275                        Component::HttpMethod::Put => {
276                            client
277                                .put(request.url)
278                                .headers(headers)
279                                .body(request.body)
280                                .send()
281                                .await
282                        }
283                        Component::HttpMethod::Post => {
284                            client
285                                .post(request.url)
286                                .headers(headers)
287                                .body(request.body)
288                                .send()
289                                .await
290                        }
291                        Component::HttpMethod::Delete => {
292                            client.delete(request.url).headers(headers).send().await
293                        }
294                    };
295
296                    let mut debug_params = DebugParams::new(
297                        &ctx_clone,
298                        &cfg_project_component_id,
299                        &cfg_id,
300                        &event,
301                        &request_clone,
302                        timer,
303                        anonymization,
304                    );
305
306                    let mut message = "".to_string();
307                    match res {
308                        Ok(res) => {
309                            debug_params.response_status =
310                                format!("{:?}", res.status()).parse::<i32>().unwrap();
311
312                            debug_params.response_content_type = res
313                                .headers()
314                                .get("content-type")
315                                .and_then(|v| v.to_str().ok())
316                                .unwrap_or("text/plain")
317                                .to_string();
318
319                            debug_params.response_body = Some(res.text().await.unwrap_or_default());
320
321                            let _r = debug_and_trace_response(
322                                debug,
323                                trace,
324                                debug_params.clone(),
325                                "".to_string(),
326                            )
327                            .await;
328                        }
329                        Err(err) => {
330                            error!(step = "response", status = "500", err = err.to_string());
331                            let _r = debug_and_trace_response(
332                                debug,
333                                trace,
334                                debug_params.clone(),
335                                err.to_string(),
336                            )
337                            .await;
338                            message = err.to_string();
339                        }
340                    }
341
342                    EventResponse {
343                        context: ctx_clone,
344                        event,
345                        component_metadata: ComponentMetadata {
346                            component_id: cfg_project_component_id,
347                            component: cfg_id,
348                            anonymization,
349                        },
350                        response: Response {
351                            status: debug_params.response_status,
352                            body: debug_params.response_body.unwrap_or_default(),
353                            content_type: debug_params.response_content_type,
354                            message,
355                            duration: timer.elapsed().as_millis(),
356                        },
357                        request: Request {
358                            method: match debug_params.request.method {
359                                Component::HttpMethod::Get => "GET".to_string(),
360                                Component::HttpMethod::Put => "PUT".to_string(),
361                                Component::HttpMethod::Post => "POST".to_string(),
362                                Component::HttpMethod::Delete => "DELETE".to_string(),
363                            },
364                            url: debug_params.request.url.to_string(),
365                            body: debug_params.request.body,
366                            headers: headers_map,
367                        },
368                    }
369                }
370                .in_current_span(),
371            );
372            futures.push(future);
373        }
374    }
375
376    Ok(futures)
377}
378
379fn handle_consent_and_anonymization(
380    event: &mut Event,
381    default_consent: &str,
382    initial_anonymization: bool,
383) -> (bool, String) {
384    // Handle default consent if not set
385    if event.consent.is_none() {
386        event.consent = match default_consent {
387            "granted" => Some(Consent::Granted),
388            "denied" => Some(Consent::Denied),
389            _ => Some(Consent::Pending),
390        };
391    }
392
393    let outgoing_consent = event.consent.clone().unwrap().to_string();
394
395    // Determine final anonymization state
396    match event.consent {
397        Some(Consent::Granted) => (false, outgoing_consent),
398        _ => (initial_anonymization, outgoing_consent),
399    }
400}
401
402pub fn insert_expected_headers(headers: &mut HeaderMap, event: &Event) -> anyhow::Result<()> {
403    // Insert client ip in the x-forwarded-for header
404    headers.insert(
405        HeaderName::from_str("x-forwarded-for")?,
406        HeaderValue::from_str(&event.context.client.ip)?,
407    );
408
409    // Insert User-Agent in the user-agent header
410    headers.insert(
411        header::USER_AGENT,
412        HeaderValue::from_str(&event.context.client.user_agent)?,
413    );
414
415    // Insert referrer in the referer header like an analytics client-side collect does
416    if !event.context.page.url.is_empty() {
417        let document_location = format!(
418            "{}{}",
419            event.context.page.url.clone(),
420            event.context.page.search.clone()
421        );
422        headers.insert(
423            header::REFERER,
424            HeaderValue::from_str(document_location.as_str())?,
425        );
426    }
427
428    // Insert Accept-Language in the accept-language header
429    if !event.context.client.accept_language.is_empty() {
430        headers.insert(
431            header::ACCEPT_LANGUAGE,
432            HeaderValue::from_str(event.context.client.accept_language.as_str())?,
433        );
434    }
435
436    // Insert sec-ch-ua headers
437    // sec-ch-ua
438    if !event.context.client.user_agent_version_list.is_empty() {
439        let ch_ua_value = format_ch_ua_header(&event.context.client.user_agent_version_list);
440        headers.insert(
441            HeaderName::from_str("sec-ch-ua")?,
442            HeaderValue::from_str(ch_ua_value.as_str())?,
443        );
444    }
445    // sec-ch-ua-mobile
446    if !event.context.client.user_agent_mobile.is_empty() {
447        let mobile_value = format!("?{}", event.context.client.user_agent_mobile.clone());
448        headers.insert(
449            HeaderName::from_str("sec-ch-ua-mobile")?,
450            HeaderValue::from_str(mobile_value.as_str())?,
451        );
452    }
453    // sec-ch-ua-platform
454    if !event.context.client.os_name.is_empty() {
455        let platform_value = format!("\"{}\"", event.context.client.os_name.clone());
456        headers.insert(
457            HeaderName::from_str("sec-ch-ua-platform")?,
458            HeaderValue::from_str(platform_value.as_str())?,
459        );
460    }
461
462    Ok(())
463}
464
465fn format_ch_ua_header(string: &str) -> String {
466    if string.is_empty() {
467        return String::new();
468    }
469
470    let mut ch_ua_list = vec![];
471
472    // Split into individual brand-version pairs
473    let pairs = if string.contains('|') {
474        string.split('|').collect::<Vec<_>>()
475    } else {
476        vec![string]
477    };
478
479    // Process each pair
480    for pair in pairs {
481        if let Some((brand, version)) = parse_brand_version(pair) {
482            ch_ua_list.push(format!("\"{}\";v=\"{}\"", brand, version));
483        }
484    }
485
486    ch_ua_list.join(", ")
487}
488
489// Helper function to parse a single brand-version pair
490fn parse_brand_version(pair: &str) -> Option<(String, &str)> {
491    if !pair.contains(';') {
492        return None;
493    }
494
495    let parts: Vec<&str> = pair.split(';').collect();
496    if parts.len() < 2 {
497        return None;
498    }
499
500    // brand is everything except the last part
501    let brand = parts[0..parts.len() - 1].join(";");
502    // version is the last part
503    let version = parts[parts.len() - 1];
504
505    if brand.is_empty() || version.is_empty() {
506        return None;
507    }
508
509    Some((brand, version))
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515
516    #[test]
517    fn test_format_ch_ua_header() {
518        // Valid cases
519        assert_eq!(
520            format_ch_ua_header("Chromium;128"),
521            "\"Chromium\";v=\"128\""
522        );
523        assert_eq!(
524            format_ch_ua_header("Chromium;128|Google Chrome;128"),
525            "\"Chromium\";v=\"128\", \"Google Chrome\";v=\"128\""
526        );
527        assert_eq!(
528            format_ch_ua_header("Not;A=Brand;24"),
529            "\"Not;A=Brand\";v=\"24\""
530        );
531        assert_eq!(
532            format_ch_ua_header("Chromium;128|Google Chrome;128|Not;A=Brand;24"),
533            "\"Chromium\";v=\"128\", \"Google Chrome\";v=\"128\", \"Not;A=Brand\";v=\"24\""
534        );
535        assert_eq!(
536            format_ch_ua_header("Chromium;128|Google Chrome;128|Not_A Brand;24|Opera;128"),
537            "\"Chromium\";v=\"128\", \"Google Chrome\";v=\"128\", \"Not_A Brand\";v=\"24\", \"Opera\";v=\"128\""
538        );
539
540        // Edge cases
541        assert_eq!(format_ch_ua_header(""), "");
542        assert_eq!(format_ch_ua_header("Invalid"), "");
543        assert_eq!(format_ch_ua_header("No Version;"), "");
544        assert_eq!(format_ch_ua_header(";No Brand"), "");
545    }
546}