1mod context;
2mod convert;
3mod debug;
4pub mod logger;
5pub mod payload;
6
7wasmtime::component::bindgen!({
8 world: "data-collection",
9 path: "wit/",
10 async: true,
11});
12
13use std::str::FromStr;
14use std::time::Duration;
15
16use crate::config::ComponentsConfiguration;
17use context::EventContext;
18use debug::{debug_and_trace_response, trace_disabled_event, trace_request, DebugParams};
19use http::{header, HeaderMap, HeaderName, HeaderValue};
20use tokio::task::JoinHandle;
21use tracing::{error, span, Instrument, Level};
22
23use crate::context::ComponentsContext;
24use crate::{
25 data_collection::exports::edgee::components::data_collection as Component,
26 data_collection::payload::{Consent, Event, EventType},
27};
28use std::collections::HashMap;
29
30#[derive(Clone)]
31pub struct ComponentMetadata {
32 pub component_id: String,
33 pub component: String,
34 pub anonymization: bool,
35}
36
37#[derive(Clone)]
38pub struct Response {
39 pub status: i32,
40 pub body: String,
41 pub content_type: String,
42 pub message: String,
43 pub duration: u128,
44}
45
46#[derive(Clone)]
47pub struct Request {
48 pub method: String,
49 pub url: String,
50 pub body: String,
51 pub headers: HashMap<String, String>,
52}
53
54#[derive(Clone)]
55pub struct EventResponse {
56 pub context: EventContext,
57 pub event: Event,
58 pub component_metadata: ComponentMetadata,
59
60 pub response: Response,
61 pub request: Request,
62}
63
64pub async fn send_json_events(
65 component_ctx: &ComponentsContext,
66 events_json: &str,
67 component_config: &ComponentsConfiguration,
68 trace_component: &Option<String>,
69 debug: bool,
70) -> anyhow::Result<Vec<JoinHandle<EventResponse>>> {
71 if events_json.is_empty() {
72 return Ok(vec![]);
73 }
74
75 let mut events: Vec<Event> = serde_json::from_str(events_json)?;
76 send_events(
77 component_ctx,
78 &mut events,
79 component_config,
80 trace_component,
81 debug,
82 "",
83 "",
84 )
85 .await
86}
87
88pub async fn send_events(
89 component_ctx: &ComponentsContext,
90 events: &mut [Event],
91 component_config: &ComponentsConfiguration,
92 trace_component: &Option<String>,
93 debug: bool,
94 project_id: &str,
95 proxy_host: &str,
96) -> anyhow::Result<Vec<JoinHandle<EventResponse>>> {
97 if events.is_empty() {
98 return Ok(vec![]);
99 }
100
101 let ctx = &EventContext::new(events, project_id, proxy_host);
102
103 let mut store = component_ctx.empty_store();
104
105 let mut futures = vec![];
106
107 for event in events.iter_mut() {
109 for cfg in component_config.data_collection.iter() {
110 let span = span!(
111 Level::INFO,
112 "component",
113 name = cfg.id.as_str(),
114 event = ?event.event_type
115 );
116 let _enter = span.enter();
117
118 let mut event = event.clone();
119
120 let trace =
121 trace_component.is_some() && trace_component.as_ref().unwrap() == cfg.id.as_str();
122
123 match event.event_type {
125 EventType::Page => {
126 if !cfg.settings.edgee_page_event_enabled {
127 trace_disabled_event(trace, "page");
128 continue;
129 }
130 }
131 EventType::User => {
132 if !cfg.settings.edgee_user_event_enabled {
133 trace_disabled_event(trace, "user");
134 continue;
135 }
136 }
137 EventType::Track => {
138 if !cfg.settings.edgee_track_event_enabled {
139 trace_disabled_event(trace, "track");
140 continue;
141 }
142 }
143 }
144
145 if !event.is_component_enabled(cfg) {
146 continue;
147 }
148
149 let initial_anonymization = cfg.settings.edgee_anonymization;
150 let default_consent = cfg.settings.edgee_default_consent.clone();
151
152 let (anonymization, outgoing_consent) = handle_consent_and_anonymization(
154 &mut event,
155 &default_consent,
156 initial_anonymization,
157 );
158
159 if anonymization {
160 event.context.client.ip = ctx.get_ip_anonymized().clone();
161 } else {
163 event.context.client.ip = ctx.get_ip().clone();
164 }
165
166 if let Some(ref ids) = event.context.user.native_cookie_ids {
168 if ids.contains_key(&cfg.slug) {
169 event.context.user.edgee_id = ids.get(&cfg.slug).unwrap().clone();
170 } else {
171 event.context.user.edgee_id = ctx.get_edgee_id().clone();
172 }
173 }
174
175 if &event.uuid != ctx.get_uuid() {
177 event.timestamp = *ctx.get_timestamp() + chrono::Duration::seconds(1);
178 event.context.session.session_start = false;
179 }
180
181 let instance = match component_ctx
183 .get_data_collection_instance(&cfg.id, &mut store)
184 .await
185 {
186 Ok(instance) => instance,
187 Err(err) => {
188 error!("Failed to get data collection instance. Error: {}", err);
189 continue;
190 }
191 };
192 let component = instance.edgee_components_data_collection();
193
194 let component_event: Component::Event = event.clone().into();
195 let component_settings: Vec<(String, String)> = cfg
196 .settings
197 .additional_settings
198 .clone()
199 .into_iter()
200 .collect();
201
202 let request = match component_event.event_type {
204 Component::EventType::Page => {
205 component
206 .call_page(&mut store, &component_event, &component_settings)
207 .await
208 }
209 Component::EventType::Track => {
210 component
211 .call_track(&mut store, &component_event, &component_settings)
212 .await
213 }
214 Component::EventType::User => {
215 component
216 .call_user(&mut store, &component_event, &component_settings)
217 .await
218 }
219 };
220 let request = match request {
221 Ok(Ok(request)) => request,
222 Ok(Err(err)) => {
223 error!(
225 step = "request",
226 err = err.to_string(),
227 "failed to handle data collection payload"
228 );
229 continue;
230 }
231 Err(err) => {
232 error!(
234 step = "request",
235 err = err.to_string(),
236 "failed to handle data collection payload"
237 );
238 continue;
239 }
240 };
241
242 let mut headers = HeaderMap::new();
243 for (key, value) in request.headers.iter() {
244 headers.insert(HeaderName::from_str(key)?, HeaderValue::from_str(value)?);
245 }
246
247 if request.forward_client_headers {
248 insert_expected_headers(&mut headers, &event)?;
249 }
250
251 let client = reqwest::Client::builder()
252 .timeout(Duration::from_secs(5))
253 .build()?;
254
255 trace_request(trace, &request, &headers, &outgoing_consent, anonymization);
256
257 let cfg_project_component_id = cfg.project_component_id.to_string();
259 let cfg_id = cfg.id.to_string();
260 let ctx_clone = ctx.clone();
261
262 let headers_map = headers.iter().fold(HashMap::new(), |mut acc, (k, v)| {
263 acc.insert(k.to_string(), v.to_str().unwrap().to_string());
264 acc
265 });
266
267 let future = tokio::spawn(
268 async move {
269 let timer = std::time::Instant::now();
270 let request_clone = request.clone();
271 let res = match request.method {
272 Component::HttpMethod::Get => {
273 client.get(request.url).headers(headers).send().await
274 }
275 Component::HttpMethod::Put => {
276 client
277 .put(request.url)
278 .headers(headers)
279 .body(request.body)
280 .send()
281 .await
282 }
283 Component::HttpMethod::Post => {
284 client
285 .post(request.url)
286 .headers(headers)
287 .body(request.body)
288 .send()
289 .await
290 }
291 Component::HttpMethod::Delete => {
292 client.delete(request.url).headers(headers).send().await
293 }
294 };
295
296 let mut debug_params = DebugParams::new(
297 &ctx_clone,
298 &cfg_project_component_id,
299 &cfg_id,
300 &event,
301 &request_clone,
302 timer,
303 anonymization,
304 );
305
306 let mut message = "".to_string();
307 match res {
308 Ok(res) => {
309 debug_params.response_status =
310 format!("{:?}", res.status()).parse::<i32>().unwrap();
311
312 debug_params.response_content_type = res
313 .headers()
314 .get("content-type")
315 .and_then(|v| v.to_str().ok())
316 .unwrap_or("text/plain")
317 .to_string();
318
319 debug_params.response_body = Some(res.text().await.unwrap_or_default());
320
321 let _r = debug_and_trace_response(
322 debug,
323 trace,
324 debug_params.clone(),
325 "".to_string(),
326 )
327 .await;
328 }
329 Err(err) => {
330 error!(step = "response", status = "500", err = err.to_string());
331 let _r = debug_and_trace_response(
332 debug,
333 trace,
334 debug_params.clone(),
335 err.to_string(),
336 )
337 .await;
338 message = err.to_string();
339 }
340 }
341
342 EventResponse {
343 context: ctx_clone,
344 event,
345 component_metadata: ComponentMetadata {
346 component_id: cfg_project_component_id,
347 component: cfg_id,
348 anonymization,
349 },
350 response: Response {
351 status: debug_params.response_status,
352 body: debug_params.response_body.unwrap_or_default(),
353 content_type: debug_params.response_content_type,
354 message,
355 duration: timer.elapsed().as_millis(),
356 },
357 request: Request {
358 method: match debug_params.request.method {
359 Component::HttpMethod::Get => "GET".to_string(),
360 Component::HttpMethod::Put => "PUT".to_string(),
361 Component::HttpMethod::Post => "POST".to_string(),
362 Component::HttpMethod::Delete => "DELETE".to_string(),
363 },
364 url: debug_params.request.url.to_string(),
365 body: debug_params.request.body,
366 headers: headers_map,
367 },
368 }
369 }
370 .in_current_span(),
371 );
372 futures.push(future);
373 }
374 }
375
376 Ok(futures)
377}
378
379fn handle_consent_and_anonymization(
380 event: &mut Event,
381 default_consent: &str,
382 initial_anonymization: bool,
383) -> (bool, String) {
384 if event.consent.is_none() {
386 event.consent = match default_consent {
387 "granted" => Some(Consent::Granted),
388 "denied" => Some(Consent::Denied),
389 _ => Some(Consent::Pending),
390 };
391 }
392
393 let outgoing_consent = event.consent.clone().unwrap().to_string();
394
395 match event.consent {
397 Some(Consent::Granted) => (false, outgoing_consent),
398 _ => (initial_anonymization, outgoing_consent),
399 }
400}
401
402pub fn insert_expected_headers(headers: &mut HeaderMap, event: &Event) -> anyhow::Result<()> {
403 headers.insert(
405 HeaderName::from_str("x-forwarded-for")?,
406 HeaderValue::from_str(&event.context.client.ip)?,
407 );
408
409 headers.insert(
411 header::USER_AGENT,
412 HeaderValue::from_str(&event.context.client.user_agent)?,
413 );
414
415 if !event.context.page.url.is_empty() {
417 let document_location = format!(
418 "{}{}",
419 event.context.page.url.clone(),
420 event.context.page.search.clone()
421 );
422 headers.insert(
423 header::REFERER,
424 HeaderValue::from_str(document_location.as_str())?,
425 );
426 }
427
428 if !event.context.client.accept_language.is_empty() {
430 headers.insert(
431 header::ACCEPT_LANGUAGE,
432 HeaderValue::from_str(event.context.client.accept_language.as_str())?,
433 );
434 }
435
436 if !event.context.client.user_agent_version_list.is_empty() {
439 let ch_ua_value = format_ch_ua_header(&event.context.client.user_agent_version_list);
440 headers.insert(
441 HeaderName::from_str("sec-ch-ua")?,
442 HeaderValue::from_str(ch_ua_value.as_str())?,
443 );
444 }
445 if !event.context.client.user_agent_mobile.is_empty() {
447 let mobile_value = format!("?{}", event.context.client.user_agent_mobile.clone());
448 headers.insert(
449 HeaderName::from_str("sec-ch-ua-mobile")?,
450 HeaderValue::from_str(mobile_value.as_str())?,
451 );
452 }
453 if !event.context.client.os_name.is_empty() {
455 let platform_value = format!("\"{}\"", event.context.client.os_name.clone());
456 headers.insert(
457 HeaderName::from_str("sec-ch-ua-platform")?,
458 HeaderValue::from_str(platform_value.as_str())?,
459 );
460 }
461
462 Ok(())
463}
464
465fn format_ch_ua_header(string: &str) -> String {
466 if string.is_empty() {
467 return String::new();
468 }
469
470 let mut ch_ua_list = vec![];
471
472 let pairs = if string.contains('|') {
474 string.split('|').collect::<Vec<_>>()
475 } else {
476 vec![string]
477 };
478
479 for pair in pairs {
481 if let Some((brand, version)) = parse_brand_version(pair) {
482 ch_ua_list.push(format!("\"{}\";v=\"{}\"", brand, version));
483 }
484 }
485
486 ch_ua_list.join(", ")
487}
488
489fn parse_brand_version(pair: &str) -> Option<(String, &str)> {
491 if !pair.contains(';') {
492 return None;
493 }
494
495 let parts: Vec<&str> = pair.split(';').collect();
496 if parts.len() < 2 {
497 return None;
498 }
499
500 let brand = parts[0..parts.len() - 1].join(";");
502 let version = parts[parts.len() - 1];
504
505 if brand.is_empty() || version.is_empty() {
506 return None;
507 }
508
509 Some((brand, version))
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515
516 #[test]
517 fn test_format_ch_ua_header() {
518 assert_eq!(
520 format_ch_ua_header("Chromium;128"),
521 "\"Chromium\";v=\"128\""
522 );
523 assert_eq!(
524 format_ch_ua_header("Chromium;128|Google Chrome;128"),
525 "\"Chromium\";v=\"128\", \"Google Chrome\";v=\"128\""
526 );
527 assert_eq!(
528 format_ch_ua_header("Not;A=Brand;24"),
529 "\"Not;A=Brand\";v=\"24\""
530 );
531 assert_eq!(
532 format_ch_ua_header("Chromium;128|Google Chrome;128|Not;A=Brand;24"),
533 "\"Chromium\";v=\"128\", \"Google Chrome\";v=\"128\", \"Not;A=Brand\";v=\"24\""
534 );
535 assert_eq!(
536 format_ch_ua_header("Chromium;128|Google Chrome;128|Not_A Brand;24|Opera;128"),
537 "\"Chromium\";v=\"128\", \"Google Chrome\";v=\"128\", \"Not_A Brand\";v=\"24\", \"Opera\";v=\"128\""
538 );
539
540 assert_eq!(format_ch_ua_header(""), "");
542 assert_eq!(format_ch_ua_header("Invalid"), "");
543 assert_eq!(format_ch_ua_header("No Version;"), "");
544 assert_eq!(format_ch_ua_header(";No Brand"), "");
545 }
546}