use std::collections::HashMap;
use std::fmt::Write;
use std::io::Read;
use base64::alphabet::STANDARD;
use base64::engine::general_purpose::PAD;
use base64::engine::GeneralPurpose;
use base64::Engine;
use bytes::Bytes;
use edgee_components_runtime::data_collection as dc_component_runtime;
use html_escape;
use http::response::Parts;
use http::{header, HeaderMap};
use json_comments::StripComments;
use payload::{Consent, EventData, EventType, Payload};
use regex::Regex;
use tracing::{error, info, warn, Instrument};
use crate::proxy::compute::html::Document;
use crate::proxy::context::incoming::RequestHandle;
use crate::tools::edgee_cookie;
use crate::{config, get_components_ctx};
pub mod payload;
#[tracing::instrument(name = "data_collection", skip(document, request, response))]
pub async fn process_from_html(
document: &Document,
request: &RequestHandle,
response: &mut Parts,
) -> Option<String> {
let json_data_layer = document.data_layer.clone();
let mut payload = Payload::default();
if !json_data_layer.is_empty() {
let stripped_data_layer = StripComments::new(json_data_layer.as_bytes());
let payload_result = parse_payload(stripped_data_layer);
if payload_result.is_err() {
warn!("Error parsing json payload: {:?}", payload_result.err());
} else {
payload = payload_result.unwrap();
}
}
payload = prepare_data_collection_payload(payload);
payload = add_session(request, response, payload);
payload = add_more_info_from_html_or_request(request, document, payload);
payload = add_more_info_from_request(request, payload);
payload = add_user_context_from_cookie(request, payload);
payload
.data_collection
.as_mut()
.unwrap()
.populate_event_contexts("edge");
if payload.data_collection.as_ref().unwrap().events.is_none() {
payload.data_collection.as_mut().unwrap().events = Some(vec![payload::Event {
uuid: uuid::Uuid::new_v4().to_string(),
timestamp: chrono::Utc::now(),
event_type: EventType::Page,
data: Some(EventData::Page(
payload
.data_collection
.clone()
.unwrap()
.context
.clone()
.unwrap()
.page
.unwrap(),
)),
context: payload.data_collection.clone().unwrap().context.clone(),
components: payload.data_collection.clone().unwrap().components.clone(),
from: Some("edge".to_string()),
consent: payload.data_collection.clone().unwrap().consent.clone(),
}]);
}
let mut events = payload
.data_collection
.clone()
.unwrap()
.events
.unwrap_or_default();
for e in events.clone().iter() {
if e.is_all_components_disabled() {
events.retain(|evt| evt.uuid != e.uuid);
} else {
if e.event_type == EventType::User {
if let EventData::User(user_data) = e.data.as_ref().unwrap() {
edgee_cookie::set_user_cookie(request, response, user_data);
}
}
}
}
if events.is_empty() {
return Option::from("[]".to_string());
}
let events_json =
serde_json::to_string(&events).expect("Could not encode data collection events into JSON");
let events_json_for_components = events_json.clone();
info!(events = %events_json);
let debug = if request.is_debug_mode() {
"true"
} else {
"false"
};
let api_key = std::env::var("DATA_COLLECTION_API_KEY").unwrap_or_default();
let api_url = std::env::var("DATA_COLLECTION_API_URL").unwrap_or_default();
if !api_key.is_empty() && !api_url.is_empty() {
let events_json_to_send = events_json.clone();
let b64 = GeneralPurpose::new(&STANDARD, PAD).encode(format!("{api_key}:"));
let host = request.get_host().to_string();
tokio::spawn(async move {
let _ = reqwest::Client::new()
.post(api_url)
.header("Content-Type", "application/json")
.header("Authorization", format!("Basic {b64}"))
.header("X-Edgee-Debug", debug)
.header("X-Edgee-Host", host)
.body(events_json_to_send)
.send()
.await;
});
}
tokio::spawn(
async move {
let config = config::get();
if let Err(err) = dc_component_runtime::send_json_events(
get_components_ctx(),
&events_json_for_components,
&config.components,
&config.log.trace_component,
debug == "true",
)
.await
{
error!("Failed to use data collection components. Error: {}", err);
}
}
.in_current_span(),
);
Option::from(events_json)
}
#[tracing::instrument(name = "data_collection", skip(body, request, response))]
pub async fn process_from_json(
body: &Bytes,
request: &RequestHandle,
response: &mut Parts,
from_third_party_sdk: bool,
) -> Option<String> {
let payload_result = parse_payload(body.as_ref());
if payload_result.is_err() {
warn!("Error parsing json payload: {:?}", payload_result.err());
return None;
}
let mut payload = payload_result.unwrap();
payload = prepare_data_collection_payload(payload);
payload = add_session(request, response, payload);
payload = add_more_info_from_request(request, payload);
payload = add_user_context_from_cookie(request, payload);
let from = if from_third_party_sdk {
"third"
} else {
"client"
};
payload
.data_collection
.as_mut()
.unwrap()
.populate_event_contexts(from);
let mut events = payload
.data_collection
.clone()
.unwrap()
.events
.unwrap_or_default();
for e in events.clone().iter() {
if e.is_all_components_disabled() {
events.retain(|evt| evt.uuid != e.uuid);
} else {
if e.event_type == EventType::User {
if let EventData::User(user_data) = e.data.as_ref().unwrap() {
edgee_cookie::set_user_cookie(request, response, user_data);
}
}
}
}
if events.is_empty() {
return Option::from("[]".to_string());
}
let events_json =
serde_json::to_string(&events).expect("Could not encode data collection events into JSON");
let events_json_for_components = events_json.clone();
info!(events = %events_json);
let debug = if request.is_debug_mode() {
"true"
} else {
"false"
};
let api_key = std::env::var("DATA_COLLECTION_API_KEY").unwrap_or_default();
let api_url = std::env::var("DATA_COLLECTION_API_URL").unwrap_or_default();
if !api_key.is_empty() && !api_url.is_empty() {
let events_json_to_send = events_json.clone();
let b64 = GeneralPurpose::new(&STANDARD, PAD).encode(format!("{api_key}:"));
let host = request.get_host().to_string();
tokio::spawn(async move {
let _ = reqwest::Client::new()
.post(api_url)
.header("Content-Type", "application/json")
.header("Authorization", format!("Basic {b64}"))
.header("X-Edgee-Debug", debug)
.header("X-Edgee-Host", host)
.body(events_json_to_send)
.send()
.await;
});
}
tokio::spawn(
async move {
let config = config::get();
if let Err(err) = dc_component_runtime::send_json_events(
get_components_ctx(),
&events_json_for_components,
&config.components,
&config.log.trace_component,
debug == "true",
)
.await
{
error!("Failed to use data collection components. Error: {}", err);
}
}
.in_current_span(),
);
Option::from(events_json)
}
fn prepare_data_collection_payload(mut payload: Payload) -> Payload {
payload
.data_collection
.get_or_insert_with(Default::default)
.context
.get_or_insert_with(Default::default)
.client
.get_or_insert_with(Default::default);
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.get_or_insert_with(Default::default);
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.user
.get_or_insert_with(Default::default);
payload
}
fn add_session(request: &RequestHandle, response: &mut Parts, mut payload: Payload) -> Payload {
let edgee_cookie = edgee_cookie::get_or_set(request, response, &payload);
let user_id = edgee_cookie.id.to_string();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.user
.as_mut()
.unwrap()
.edgee_id = user_id;
let mut user_session = payload::Session {
session_id: edgee_cookie.ss.timestamp().to_string(),
previous_session_id: None,
session_count: edgee_cookie.sc,
session_start: false,
first_seen: edgee_cookie.fs,
last_seen: edgee_cookie.ls,
};
if edgee_cookie.ss == edgee_cookie.ls {
user_session.session_start = true;
}
if edgee_cookie.ps.is_some() {
user_session.previous_session_id = Some(edgee_cookie.ps.unwrap().timestamp().to_string());
}
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.session = Some(user_session);
if edgee_cookie.sz.is_some() {
if let Some(sz) = &edgee_cookie.sz {
let size_vec: Vec<&str> = sz.split('x').collect();
if size_vec.len() == 3 {
if let (Ok(width), Ok(height), Ok(density)) = (
size_vec[0].parse::<i32>(),
size_vec[1].parse::<i32>(),
size_vec[2].parse::<f32>(),
) {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.screen_width = Some(width);
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.screen_height = Some(height);
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.screen_density = Some(density);
}
}
}
}
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_architecture = edgee_cookie.uaa.clone();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_bitness = edgee_cookie.uab.clone();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_model = edgee_cookie.uam.clone();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.os_version = edgee_cookie.uapv.clone();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_full_version_list = edgee_cookie.uafvl.clone();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.timezone = edgee_cookie.tz.clone();
if let Some(consent) = edgee_cookie.c {
payload.data_collection.as_mut().unwrap().consent = Some(match consent.as_str() {
"pending" => Consent::Pending,
"granted" => Consent::Granted,
"denied" => Consent::Denied,
_ => todo!(),
});
}
payload
}
fn add_more_info_from_html_or_request(
request: &RequestHandle,
document: &Document,
mut payload: Payload,
) -> Payload {
let mut canonical_url = document.canonical.clone();
if !canonical_url.is_empty() && !canonical_url.starts_with("http") {
canonical_url = format!(
"{}://{}{}",
request.get_proto(),
request.get_host(),
canonical_url
);
}
let mut canonical_path = "".to_string();
if !canonical_url.is_empty() {
canonical_path = url::Url::parse(canonical_url.as_str())
.map(|u| u.path().to_string())
.unwrap_or("".to_string());
}
let url = payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.and_then(|p| p.url.as_ref())
.map(|u| u.to_string())
.map(|u| html_escape::decode_html_entities(&u).to_string())
.or_else(|| {
if !canonical_url.is_empty() {
Option::from(canonical_url.clone())
} else {
None
}
})
.unwrap_or_else(|| {
format!(
"{}://{}{}",
request.get_proto(),
request.get_host(),
request.get_path_and_query()
)
});
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.url = Some(url);
let path = payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.and_then(|p| p.path.as_ref())
.map(|p| p.to_string())
.map(|p| html_escape::decode_html_entities(&p).to_string())
.or_else(|| {
if !canonical_path.is_empty() {
Option::from(canonical_path.clone())
} else {
None
}
})
.unwrap_or_else(|| request.get_path_and_query().path().to_string());
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.path = Some(path);
let search = payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.and_then(|p| p.search.as_ref())
.map(|s| {
if s.starts_with("?") {
s.clone()
} else {
"?".to_string() + s
}
})
.map(|s| html_escape::decode_html_entities(&s).to_string())
.or_else(|| {
request
.get_path_and_query()
.query()
.map(|qs| "?".to_string() + qs)
})
.unwrap_or_default();
if search == "?" || search.is_empty() {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.search = None;
} else {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.search = Some(search.clone());
}
let title = payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.and_then(|p| p.title.as_ref())
.map(|t| t.to_string())
.map(|t| html_escape::decode_html_entities(&t).to_string())
.or_else(|| {
let title_from_html = document.title.clone();
if !title_from_html.is_empty() {
Option::from(title_from_html)
} else {
None
}
})
.unwrap_or_default();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.title = Some(title.clone());
let keywords = payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.and_then(|k| k.keywords.as_ref())
.map(|k| k.to_vec())
.or_else(|| {
let keywords_string = document.keywords.clone();
if !keywords_string.is_empty() {
Option::from(
keywords_string
.split(",")
.map(|s| s.trim().to_string())
.collect::<Vec<String>>(),
)
} else {
None
}
})
.unwrap_or_default();
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.keywords = Some(keywords);
payload
}
fn add_more_info_from_request(request: &RequestHandle, mut payload: Payload) -> Payload {
if payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.client
.is_none()
{
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client = Some(Default::default());
}
if payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.unwrap()
.referrer
.is_none()
{
let referer = request
.get_header(header::REFERER)
.unwrap_or("".to_string());
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.referrer = Some(referer);
}
if payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.page
.as_ref()
.unwrap()
.referrer
.as_ref()
.unwrap()
.is_empty()
{
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.page
.as_mut()
.unwrap()
.referrer = None;
}
let user_agent = request
.get_header(header::USER_AGENT)
.unwrap_or("".to_string());
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent = Some(user_agent);
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.ip = Some(request.get_client_ip().to_string());
let locale = get_preferred_language(request.get_headers());
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.locale = Some(locale);
let accept_language = request
.get_header("Accept-Language")
.unwrap_or("".to_string());
if !accept_language.is_empty() {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.accept_language = Some(accept_language);
}
if let Some(sec_ch_ua_arch) = request.get_header("Sec-Ch-Ua-Arch") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_architecture = Some(sec_ch_ua_arch.replace("\"", ""));
}
if let Some(sec_ch_ua_bitness) = request.get_header("Sec-Ch-Ua-Bitness") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_bitness = Some(sec_ch_ua_bitness.replace("\"", ""));
}
if let Some(sec_ch_ua) = request.get_header("Sec-Ch-Ua") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_version_list = Some(process_sec_ch_ua(sec_ch_ua.as_str(), false));
if payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.client
.as_ref()
.unwrap()
.user_agent_full_version_list
.is_none()
{
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_full_version_list = Some(process_sec_ch_ua(sec_ch_ua.as_str(), true));
}
}
if let Some(sec_ch_ua) = request.get_header("Sec-Ch-Ua-Full-Version-List") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_full_version_list = Some(process_sec_ch_ua(sec_ch_ua.as_str(), true));
}
if let Some(sec_ch_ua_mobile) = request.get_header("Sec-Ch-Ua-Mobile") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_mobile = Some(sec_ch_ua_mobile.replace("?", ""));
}
if let Some(sec_ch_ua_model) = request.get_header("Sec-Ch-Ua-Model") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.user_agent_model = Some(sec_ch_ua_model.replace("\"", ""));
}
if let Some(sec_ch_ua_platform) = request.get_header("Sec-Ch-Ua-Platform") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.os_name = Some(sec_ch_ua_platform.replace("\"", ""));
}
if let Some(sec_ch_ua_platform_version) = request.get_header("Sec-Ch-Ua-Platform-Version") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.client
.as_mut()
.unwrap()
.os_version = Some(sec_ch_ua_platform_version.replace("\"", ""));
}
let map: HashMap<String, String> = url::form_urlencoded::parse(request.get_query().as_bytes())
.into_owned()
.collect();
let utm_keys = [
"utm_campaign",
"utm_source",
"utm_medium",
"utm_term",
"utm_content",
"utm_creative_format",
"utm_marketing_tactic",
];
if utm_keys.iter().any(|key| map.contains_key(*key))
&& payload
.data_collection
.as_ref()
.unwrap()
.context
.as_ref()
.unwrap()
.campaign
.is_none()
{
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign = Some(Default::default());
}
if map.contains_key("utm_campaign") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.name = Some(map.get("utm_campaign").unwrap().to_string());
}
if map.contains_key("utm_source") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.source = Some(map.get("utm_source").unwrap().to_string());
}
if map.contains_key("utm_medium") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.medium = Some(map.get("utm_medium").unwrap().to_string());
}
if map.contains_key("utm_term") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.term = Some(map.get("utm_term").unwrap().to_string());
}
if map.contains_key("utm_content") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.content = Some(map.get("utm_content").unwrap().to_string());
}
if map.contains_key("utm_creative_format") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.creative_format = Some(map.get("utm_creative_format").unwrap().to_string());
}
if map.contains_key("utm_marketing_tactic") {
payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.campaign
.as_mut()
.unwrap()
.marketing_tactic = Some(map.get("utm_marketing_tactic").unwrap().to_string());
}
payload
}
fn add_user_context_from_cookie(request: &RequestHandle, mut payload: Payload) -> Payload {
let edgee_user_cookie = edgee_cookie::get_user_cookie(request);
if let Some(edgee_user_cookie) = edgee_user_cookie {
let user = payload
.data_collection
.as_mut()
.unwrap()
.context
.as_mut()
.unwrap()
.user
.as_mut()
.unwrap();
if let Some(user_id) = edgee_user_cookie.user_id {
if user.user_id.is_none() {
user.user_id = Some(user_id);
}
}
if let Some(anonymous_id) = edgee_user_cookie.anonymous_id {
if user.anonymous_id.is_none() {
user.anonymous_id = Some(anonymous_id);
}
}
if let Some(properties) = edgee_user_cookie.properties {
if user.properties.is_none() {
user.properties = Some(properties);
}
}
}
payload
}
fn process_sec_ch_ua(header: &str, full: bool) -> String {
lazy_static::lazy_static! {
static ref VALUE_REGEX: Regex = Regex::new(r#""([^"]+)";v="([^"]+)""#).unwrap();
}
let mut output = String::new();
let matches: Vec<_> = VALUE_REGEX.captures_iter(header).collect();
for (i, cap) in matches.iter().enumerate() {
let key = &cap[1];
let version = &cap[2];
let mut parts: Vec<_> = version.split('.').collect();
while parts.len() < 4 {
parts.push("0");
}
let mut version_str = parts.join(".");
if !full {
version_str = parts[0].to_string();
}
write!(output, "{key};{version_str}").unwrap();
if i < matches.len() - 1 {
output.push('|');
}
}
output
}
fn get_preferred_language(request_headers: &HeaderMap) -> String {
let accept_language_header = request_headers
.get("accept-language")
.and_then(|header| header.to_str().ok())
.unwrap_or("");
let languages = accept_language_header.split(",");
for l in languages {
let lang = l.split(";").next().unwrap_or("").trim();
if !lang.is_empty() {
return lang.to_lowercase();
}
}
"en-us".to_string()
}
fn parse_payload<T: Read>(clean_json: T) -> Result<Payload, serde_json::Error> {
serde_json::from_reader(clean_json)
}