use std::collections::HashMap;
use chrono::prelude::*;
use rand::seq::SliceRandom;
use serde::Serialize;
use serde_json::Number;
use uuid::Uuid;
use crate::expression::Event;
use crate::expression::Exposure;
use crate::expression::Logs;
use crate::split::SplitAssignmentEntry;
use crate::types::LogLevel;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateLogsInput {
evaluations: Vec<EvaluationCountInput>,
events: Vec<EventInput>,
exposures: Vec<ExposureInput>,
idempotency_key: String,
logs: Vec<LogInput>,
token: String,
}
impl CreateLogsInput {
pub fn is_empty(&self) -> bool {
self.evaluations.is_empty()
&& self.events.is_empty()
&& self.exposures.is_empty()
&& self.logs.is_empty()
}
}
#[derive(Serialize, Clone)]
enum LogType {
SDKMessage,
}
#[derive(Serialize, Clone)]
#[serde(rename_all = "camelCase")]
struct LogInput {
pub commit_id: String,
pub created_at: DateTime<Utc>,
pub level: LogLevel,
pub r#type: LogType,
pub message: String,
pub metadata_json: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct EvaluationCountInput {
pub commit_id: String,
pub count: u64,
pub expression_id: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct EventInput {
pub commit_id: String,
pub created_at: DateTime<Utc>,
pub event_object_type_name: String,
pub event_payload_json: String,
}
#[derive(Serialize)]
enum DimensionType {
Continuous,
Discrete,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct AssignmentInput {
pub continuous_value: Option<Number>,
pub dimension_id: String,
pub discrete_arm_id: Option<String>,
pub entry_type: DimensionType,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct ExposureInput {
pub assignment: Vec<AssignmentInput>,
pub commit_id: String,
pub created_at: DateTime<Utc>,
pub event_object_type_name: Option<String>,
pub event_payload_json: Option<String>,
pub split_id: String,
pub unit_id: String,
}
pub struct BackendLogger {
token: String,
commit_id: Option<String>,
evaluations: HashMap<String, HashMap<String, u64>>,
events: Vec<EventInput>,
exposures: Vec<ExposureInput>,
logs: Vec<LogInput>,
}
impl BackendLogger {
pub fn new(token: String) -> Self {
Self {
token,
commit_id: None,
evaluations: HashMap::new(),
events: Vec::new(),
exposures: Vec::new(),
logs: Vec::new(),
}
}
pub fn set_commit_id(&mut self, commit_id: String) {
self.commit_id = Some(commit_id);
}
pub fn log_error(&mut self, message: String) {
if let Some(ref commit_id) = self.commit_id {
self.logs.push(LogInput {
r#type: LogType::SDKMessage,
level: LogLevel::Error,
commit_id: commit_id.clone(),
created_at: Utc::now(),
metadata_json: "{}".to_string(),
message,
})
}
}
pub fn reduction_logs(&mut self, logs: &Option<Logs>) {
if self.commit_id.is_none() || logs.is_none() {
return;
}
let logs = logs.clone().unwrap();
let commit_id = self.commit_id.as_ref().unwrap().clone();
if let Some(evaluations) = logs.evaluations {
for (expression_id, count) in &evaluations {
self.log_evaluation(expression_id, *count, commit_id.clone());
}
}
if let Some(events) = logs.event_list {
for event in events {
self.log_event(event, commit_id.clone())
}
}
if let Some(exposures) = logs.exposure_list {
for exposure in exposures {
self.log_exposure(exposure, commit_id.clone())
}
}
}
pub fn collect_logs(&mut self) -> CreateLogsInput {
let sample_log = self.logs.choose(&mut rand::thread_rng()).cloned();
self.logs = vec![];
let logs = match sample_log {
Some(sample_log) => vec![sample_log],
None => vec![],
};
CreateLogsInput {
token: self.token.clone(),
idempotency_key: Uuid::new_v4().to_string(),
logs,
evaluations: self
.evaluations
.drain()
.map(|(commit_id, commit_map)| {
commit_map
.into_iter()
.map(move |(expression_id, count)| EvaluationCountInput {
commit_id: commit_id.clone(),
expression_id,
count,
})
})
.flatten()
.collect(),
events: self.events.drain(..).collect(),
exposures: self.exposures.drain(..).collect(),
}
}
fn log_evaluation(&mut self, expression_id: &str, count: u64, commit_id: String) {
let commit_map = self
.evaluations
.entry(commit_id.clone())
.or_insert(HashMap::new());
commit_map
.entry(expression_id.to_string())
.and_modify(|cur_count| *cur_count += count)
.or_insert(count);
}
fn log_event(&mut self, event: Event, commit_id: String) {
self.events.push(EventInput {
commit_id,
created_at: Utc::now(),
event_object_type_name: event.event_object_type_name,
event_payload_json: serde_json::to_string(&event.event_payload)
.unwrap_or("{}".to_string()),
});
}
fn log_exposure(&mut self, exposure: Exposure, commit_id: String) {
self.exposures.push(ExposureInput {
commit_id,
created_at: Utc::now(),
split_id: exposure.split_id,
unit_id: exposure.unit_id,
event_object_type_name: exposure.event_object_type_name,
event_payload_json: match exposure.event_payload {
None => None,
Some(payload) => Some(serde_json::to_string(&payload).unwrap_or("{}".to_string())),
},
assignment: exposure
.assignment
.into_iter()
.map(|(dimension_id, entry)| match entry {
SplitAssignmentEntry::Discrete { arm_id } => AssignmentInput {
continuous_value: None,
dimension_id,
discrete_arm_id: Some(arm_id),
entry_type: DimensionType::Discrete,
},
SplitAssignmentEntry::Continuous { value } => AssignmentInput {
continuous_value: Some(value),
dimension_id,
discrete_arm_id: None,
entry_type: DimensionType::Continuous,
},
})
.collect(),
});
}
pub async fn create_logs(
remote_logging_base_url: String,
create_logs_input: CreateLogsInput,
) -> Result<(), reqwest::Error> {
if create_logs_input.is_empty() {
return Ok(());
}
let client = reqwest::Client::new();
client
.post(format!("{}/logs", remote_logging_base_url))
.json(&create_logs_input)
.send()
.await?
.error_for_status()?;
Ok(())
}
}