use hyper::{Body, Method, Request};
use hyper::client::connect::HttpConnector;
use rmp::encode;
use serde::Serialize;
use tokio::sync::mpsc;
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct Client {
    env: Option<String>,
    endpoint: String,
    service: String,
    http_client: hyper::Client<HttpConnector>,
    buffer_sender: mpsc::Sender<Trace>,
    buffer_size: usize,
    buffer_flush_min_interval: Duration,
}
#[derive(Debug)]
pub struct Config {
    
    pub service: String,
    
    pub env: Option<String>,
    
    pub host: String,
    
    pub port: String,
    
    
    pub buffer_queue_capacity: u16,
    
    pub buffer_size: u16,
    
    pub buffer_flush_min_interval: Duration,
}
impl Default for Config {
    fn default() -> Self {
        Config {
            env: None,
            host: "localhost".to_string(),
            port: "8126".to_string(),
            service: "".to_string(),
            buffer_queue_capacity: std::u16::MAX,
            buffer_size: 200,
            buffer_flush_min_interval: Duration::from_millis(200),
        }
    }
}
impl Client {
    pub fn new(config: Config) -> Client {
        let (buffer_sender, buffer_receiver) = mpsc::channel(config.buffer_queue_capacity as usize);
        let client = Client {
            env: config.env,
            service: config.service,
            endpoint: format!("http://{}:{}/v0.3/traces", config.host, config.port),
            http_client: hyper::Client::new(),
            buffer_sender: buffer_sender,
            buffer_size: config.buffer_size as usize,
            buffer_flush_min_interval: config.buffer_flush_min_interval,
        };
        spawn_consume_buffer_task(buffer_receiver, client.clone());
        client
    }
    pub fn send_trace(mut self, trace: Trace) {
        match self.buffer_sender.try_send(trace) {
            Ok(_) => trace!("trace enqueued"),
            Err(err) => warn!("could not enqueue trace: {:?}", err),
        };
    }
    async fn send_traces(self, traces: Vec<Trace>) {
        let traces = traces
            .iter()
            .map(|trace| map_to_raw_spans(trace, self.env.clone(), self.service.clone()))
            .collect::<Vec<Vec<RawSpan>>>();
        let trace_count = traces.len();
        let payload = serialize_as_msgpack(traces);
        let req = Request::builder()
            .method(Method::POST)
            .uri(self.endpoint)
            .header("content-type", "application/msgpack")
            .header("content-length", payload.len())
            .header("X-Datadog-Trace-Count", trace_count)
            .body(Body::from(payload))
            .unwrap();
        match self.http_client.request(req).await {
            Ok(resp) => {
                if resp.status().is_success() {
                    trace!("{} traces sent to datadog", trace_count)
                } else {
                    error!("error sending traces to datadog: {:?}", resp)
                }
            }
            Err(err) => error!("error sending traces to datadog: {:?}", err),
        }
    }
}
#[derive(Debug, Clone)]
pub struct Trace {
    pub id: u64,
    pub spans: Vec<Span>,
    pub priority: u32,
}
#[derive(Debug, Clone)]
pub struct Span {
    pub id: u64,
    pub name: String,
    pub resource: String,
    pub parent_id: Option<u64>,
    pub start: SystemTime,
    pub duration: Duration,
    pub error: Option<ErrorInfo>,
    pub http: Option<HttpInfo>,
    pub sql: Option<SqlInfo>,
    pub r#type: String,
    pub tags: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct ErrorInfo {
    pub r#type: String,
    pub msg: String,
    pub stack: String,
}
#[derive(Debug, Clone)]
pub struct HttpInfo {
    pub url: String,
    pub status_code: String,
    pub method: String,
}
#[derive(Debug, Clone)]
pub struct SqlInfo {
    pub query: String,
    pub rows: String,
    pub db: String,
}
#[derive(Debug, Serialize, Clone, PartialEq)]
struct RawSpan {
    service: String,
    name: String,
    resource: String,
    trace_id: u64,
    span_id: u64,
    parent_id: Option<u64>,
    start: u64,
    duration: u64,
    error: i32,
    meta: HashMap<String, String>,
    metrics: HashMap<String, f64>,
    r#type: String,
}
fn spawn_consume_buffer_task(mut buffer_receiver: mpsc::Receiver<Trace>, client: Client) {
    tokio::spawn(async move {
        let mut buffer = Vec::with_capacity(client.buffer_size);
        let mut last_flushed_at = SystemTime::now();
        loop {
            let client = client.clone();
            match buffer_receiver.try_recv() {
                Ok(trace) => {
                    buffer.push(trace);
                }
                Err(_) => {
                    tokio::time::delay_for(Duration::from_secs(1)).await;
                }
            }
            if buffer.len() == client.buffer_size
                || flush_min_interval_has_passed(&buffer, &client, last_flushed_at)
            {
                client.send_traces(buffer.drain(..).collect()).await;
                last_flushed_at = SystemTime::now();
            }
        }
        fn flush_min_interval_has_passed<T>(
            buffer: &Vec<T>,
            client: &Client,
            last_flushed_at: SystemTime,
        ) -> bool {
            buffer.len() > 0
                && SystemTime::now().duration_since(last_flushed_at).unwrap()
                    > client.buffer_flush_min_interval
        }
    });
}
fn serialize_as_msgpack(traces: Vec<Vec<RawSpan>>) -> Vec<u8> {
    
    
    
    let mut buf = Vec::new();
    encode::write_array_len(&mut buf, traces.len() as u32).unwrap();
    for spans in traces {
        encode::write_array_len(&mut buf, spans.len() as u32).unwrap();
        for span in spans {
            let mut se = rmps::Serializer::new(&mut buf).with_struct_map();
            span.serialize(&mut se).unwrap();
        }
    }
    buf
}
fn fill_meta(span: &Span, env: Option<String>) -> HashMap<String, String> {
    let mut meta = HashMap::new();
    if let Some(env) = env {
        meta.insert("env".to_string(), env);
    }
    if let Some(http) = &span.http {
        meta.insert("http.status_code".to_string(), http.status_code.clone());
        meta.insert("http.method".to_string(), http.method.clone());
        meta.insert("http.url".to_string(), http.url.clone());
    }
    if let Some(error) = &span.error {
        meta.insert("error.type".to_string(), error.r#type.clone());
        meta.insert("error.msg".to_string(), error.msg.clone());
        meta.insert("error.stack".to_string(), error.stack.clone());
    }
    if let Some(sql) = &span.sql {
        meta.insert("sql.query".to_string(), sql.query.clone());
        meta.insert("sql.rows".to_string(), sql.rows.clone());
        meta.insert("sql.db".to_string(), sql.db.clone());
    }
    for (key, value) in &span.tags {
        meta.insert(key.to_string(), value.to_string());
    }
    meta
}
fn fill_metrics(priority: u32) -> HashMap<String, f64> {
    let mut metrics = HashMap::new();
    metrics.insert("_sampling_priority_v1".to_string(), f64::from(priority));
    metrics
}
fn map_to_raw_spans(trace: &Trace, env: Option<String>, service: String) -> Vec<RawSpan> {
    let mut traces = Vec::new();
    for span in &trace.spans {
        traces.push(RawSpan {
            service: service.clone(),
            trace_id: trace.id,
            span_id: span.id,
            name: span.name.clone(),
            resource: span.resource.clone(),
            parent_id: span.parent_id,
            start: duration_to_nanos(span.start.duration_since(UNIX_EPOCH).unwrap()),
            duration: duration_to_nanos(span.duration),
            error: if span.error.is_some() { 1 } else { 0 },
            r#type: span.r#type.clone(),
            meta: fill_meta(&span, env.clone()),
            metrics: fill_metrics(trace.priority),
        });
    }
    traces
}
fn duration_to_nanos(duration: Duration) -> u64 {
    duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64
}
#[cfg(test)]
mod tests {
    extern crate rand;
    use super::*;
    use rand::Rng;
    use serde_json::json;
    #[tokio::test]
    #[ignore]
    async fn test_send_trace() {
        let config = Config {
            service: String::from("service_name"),
            ..Default::default()
        };
        let client = Client::new(config);
        let trace = a_trace();
        client.send_trace(trace);
    }
    #[tokio::test]
    async fn test_map_to_raw_spans() {
        let config = Config {
            service: String::from("service_name"),
            env: Some(String::from("staging")),
            ..Default::default()
        };
        let trace = a_trace();
        let mut expected = Vec::new();
        for span in &trace.spans {
            let mut meta: HashMap<String, String> = HashMap::new();
            meta.insert("env".to_string(), config.env.clone().unwrap());
            if let Some(http) = &span.http {
                meta.insert("http.url".to_string(), http.url.clone());
                meta.insert("http.method".to_string(), http.method.clone());
                meta.insert("http.status_code".to_string(), http.status_code.clone());
            }
            let mut metrics = HashMap::new();
            metrics.insert(
                "_sampling_priority_v1".to_string(),
                f64::from(trace.priority),
            );
            expected.push(RawSpan {
                trace_id: trace.id,
                span_id: span.id,
                parent_id: span.parent_id,
                name: span.name.clone(),
                resource: span.resource.clone(),
                service: config.service.clone(),
                r#type: span.r#type.clone(),
                start: duration_to_nanos(span.start.duration_since(UNIX_EPOCH).unwrap()),
                duration: duration_to_nanos(span.duration),
                error: 0,
                meta: meta,
                metrics: metrics,
            });
        }
        let raw_spans = map_to_raw_spans(&trace, config.env, config.service);
        assert_eq!(raw_spans, expected);
    }
    #[tokio::test]
    async fn test_message_pack_serialization() {
        let generate_span = || {
            let mut rng = rand::thread_rng();
            let now = SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap()
                .as_secs();
            RawSpan {
                trace_id: rng.gen::<u64>(),
                span_id: rng.gen::<u64>(),
                parent_id: None,
                name: String::from("request"),
                resource: String::from("/home"),
                service: String::from("service_name"),
                r#type: String::from("web"),
                start: now * 1_000_000_000,
                duration: 4853472865,
                error: 0,
                meta: std::collections::HashMap::new(),
                metrics: std::collections::HashMap::new(),
            }
        };
        let traces = (0..3).map(|_| vec![generate_span()]).collect::<Vec<_>>();
        let result = serialize_as_msgpack(traces.clone());
        let msgpack_as_json: serde_json::Value = rmp_serde::from_read_ref(&result).unwrap();
        
        
        assert_eq!(msgpack_as_json, json!(traces));
    }
    fn a_trace() -> Trace {
        let mut rng = rand::thread_rng();
        Trace {
            id: rng.gen::<u64>(),
            priority: 1,
            spans: vec![Span {
                id: rng.gen::<u64>(),
                name: String::from("request"),
                resource: String::from("/home/v3"),
                r#type: String::from("web"),
                start: SystemTime::now(),
                duration: Duration::from_secs(2),
                parent_id: None,
                http: Some(HttpInfo {
                    url: String::from("/home/v3/2?trace=true"),
                    method: String::from("GET"),
                    status_code: String::from("200"),
                }),
                error: None,
                sql: None,
                tags: HashMap::new(),
            }],
        }
    }
}