hypertune 0.6.2

Hypertune SDK for type safe configuration
Documentation
use std::collections::HashMap;

use chrono::prelude::*;
use rand::seq::SliceRandom;
use serde::Serialize;
use serde_json::Number;
use uuid::Uuid;

use crate::expression::Event;
use crate::expression::Exposure;
use crate::expression::Logs;
use crate::split::SplitAssignmentEntry;
use crate::types::LogLevel;

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateLogsInput {
    evaluations: Vec<EvaluationCountInput>,
    events: Vec<EventInput>,
    exposures: Vec<ExposureInput>,
    idempotency_key: String,
    logs: Vec<LogInput>,
    token: String,
}

impl CreateLogsInput {
    pub fn is_empty(&self) -> bool {
        self.evaluations.is_empty()
            && self.events.is_empty()
            && self.exposures.is_empty()
            && self.logs.is_empty()
    }
}

#[derive(Serialize, Clone)]
enum LogType {
    SDKMessage,
}

#[derive(Serialize, Clone)]
#[serde(rename_all = "camelCase")]
struct LogInput {
    pub commit_id: String,
    pub created_at: DateTime<Utc>,
    pub level: LogLevel,
    pub r#type: LogType,
    pub message: String,
    pub metadata_json: String,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct EvaluationCountInput {
    pub commit_id: String,
    pub count: u64,
    pub expression_id: String,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct EventInput {
    pub commit_id: String,
    pub created_at: DateTime<Utc>,
    pub event_object_type_name: String,
    pub event_payload_json: String,
}

#[derive(Serialize)]
enum DimensionType {
    Continuous,
    Discrete,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct AssignmentInput {
    pub continuous_value: Option<Number>,
    pub dimension_id: String,
    pub discrete_arm_id: Option<String>,
    pub entry_type: DimensionType,
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct ExposureInput {
    pub assignment: Vec<AssignmentInput>,
    pub commit_id: String,
    pub created_at: DateTime<Utc>,
    pub event_object_type_name: Option<String>,
    pub event_payload_json: Option<String>,
    pub split_id: String,
    pub unit_id: String,
}

pub struct BackendLogger {
    token: String,
    commit_id: Option<String>,
    evaluations: HashMap<String, HashMap<String, u64>>,
    events: Vec<EventInput>,
    exposures: Vec<ExposureInput>,
    logs: Vec<LogInput>,
}

impl BackendLogger {
    pub fn new(token: String) -> Self {
        Self {
            token,
            commit_id: None,
            evaluations: HashMap::new(),
            events: Vec::new(),
            exposures: Vec::new(),
            logs: Vec::new(),
        }
    }

    pub fn set_commit_id(&mut self, commit_id: String) {
        self.commit_id = Some(commit_id);
    }

    pub fn log_error(&mut self, message: String) {
        if let Some(ref commit_id) = self.commit_id {
            self.logs.push(LogInput {
                r#type: LogType::SDKMessage,
                level: LogLevel::Error,
                commit_id: commit_id.clone(),
                created_at: Utc::now(),
                metadata_json: "{}".to_string(),
                message,
            })
        }
    }

    pub fn reduction_logs(&mut self, logs: &Option<Logs>) {
        if self.commit_id.is_none() || logs.is_none() {
            return;
        }

        let logs = logs.clone().unwrap();
        let commit_id = self.commit_id.as_ref().unwrap().clone();

        if let Some(evaluations) = logs.evaluations {
            for (expression_id, count) in &evaluations {
                self.log_evaluation(expression_id, *count, commit_id.clone());
            }
        }

        if let Some(events) = logs.event_list {
            for event in events {
                self.log_event(event, commit_id.clone())
            }
        }

        if let Some(exposures) = logs.exposure_list {
            for exposure in exposures {
                self.log_exposure(exposure, commit_id.clone())
            }
        }
    }

    pub fn collect_logs(&mut self) -> CreateLogsInput {
        // We choose a random log sample each time we flush to keep the
        // payload size reasonable.
        let sample_log = self.logs.choose(&mut rand::thread_rng()).cloned();
        self.logs = vec![];

        let logs = match sample_log {
            Some(sample_log) => vec![sample_log],
            None => vec![],
        };
        CreateLogsInput {
            token: self.token.clone(),
            idempotency_key: Uuid::new_v4().to_string(),
            logs,
            evaluations: self
                .evaluations
                .drain()
                .map(|(commit_id, commit_map)| {
                    commit_map
                        .into_iter()
                        .map(move |(expression_id, count)| EvaluationCountInput {
                            commit_id: commit_id.clone(),
                            expression_id,
                            count,
                        })
                })
                .flatten()
                .collect(),
            events: self.events.drain(..).collect(),
            exposures: self.exposures.drain(..).collect(),
        }
    }

    fn log_evaluation(&mut self, expression_id: &str, count: u64, commit_id: String) {
        let commit_map = self
            .evaluations
            .entry(commit_id.clone())
            .or_insert(HashMap::new());

        commit_map
            .entry(expression_id.to_string())
            .and_modify(|cur_count| *cur_count += count)
            .or_insert(count);
    }

    fn log_event(&mut self, event: Event, commit_id: String) {
        self.events.push(EventInput {
            commit_id,
            created_at: Utc::now(),
            event_object_type_name: event.event_object_type_name,
            event_payload_json: serde_json::to_string(&event.event_payload)
                .unwrap_or("{}".to_string()),
        });
    }

    fn log_exposure(&mut self, exposure: Exposure, commit_id: String) {
        self.exposures.push(ExposureInput {
            commit_id,
            created_at: Utc::now(),
            split_id: exposure.split_id,
            unit_id: exposure.unit_id,
            event_object_type_name: exposure.event_object_type_name,
            event_payload_json: match exposure.event_payload {
                None => None,
                Some(payload) => Some(serde_json::to_string(&payload).unwrap_or("{}".to_string())),
            },
            assignment: exposure
                .assignment
                .into_iter()
                .map(|(dimension_id, entry)| match entry {
                    SplitAssignmentEntry::Discrete { arm_id } => AssignmentInput {
                        continuous_value: None,
                        dimension_id,
                        discrete_arm_id: Some(arm_id),
                        entry_type: DimensionType::Discrete,
                    },
                    SplitAssignmentEntry::Continuous { value } => AssignmentInput {
                        continuous_value: Some(value),
                        dimension_id,
                        discrete_arm_id: None,
                        entry_type: DimensionType::Continuous,
                    },
                })
                .collect(),
        });
    }

    pub async fn create_logs(
        remote_logging_base_url: String,
        create_logs_input: CreateLogsInput,
    ) -> Result<(), reqwest::Error> {
        if create_logs_input.is_empty() {
            return Ok(());
        }

        let client = reqwest::Client::new();

        client
            .post(format!("{}/logs", remote_logging_base_url))
            .json(&create_logs_input)
            .send()
            .await?
            .error_for_status()?;

        Ok(())
    }
}