1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
use std::time::Duration;

pub use hyper::{Client as HyperClient, client::Builder as HyperBuilder};
use hyper::client::connect::dns::TokioThreadpoolGaiResolver;
use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector;
use tokio::timer::Timeout;

use crate::body::IngestBody;
use crate::error::HttpError;
use crate::request::RequestTemplate;
use crate::response::{IngestResponse, Response};
use arc_swap::ArcSwap;
use std::sync::Arc;

/// Client for sending IngestRequests to LogDNA
pub struct Client {
    hyper: HyperClient<HttpsConnector<HttpConnector<TokioThreadpoolGaiResolver>>>,
    template: RequestTemplate,
    timeout: ArcSwap<Duration>,
}

impl Client {
    /// Create a new client taking a RequestTemplate and Tokio Runtime
    ///
    /// #  Example
    ///
    /// ```rust
    /// # use logdna_client::client::Client;
    /// # use tokio::runtime::Runtime;
    /// # use logdna_client::params::{Params, Tags};
    /// # use logdna_client::request::RequestTemplate;
    ///
    /// let mut rt = Runtime::new().expect("Runtime::new()");
    /// let params = Params::builder()
    ///     .hostname("rust-client-test")
    ///     .tags(Tags::parse("this,is,a,test"))
    ///     .build()
    ///     .expect("Params::builder()");
    /// let request_template = RequestTemplate::builder()
    ///     .params(params)
    ///     .api_key("<your ingestion key>")
    ///     .build()
    ///     .expect("RequestTemplate::builder()");
    /// let client = Client::new(request_template);
    /// ```
    pub fn new(template: RequestTemplate) -> Self {
        let http_connector = {
            let mut connector = HttpConnector::new_with_tokio_threadpool_resolver();
            connector.enforce_http(false); // this is needed or https:// urls will error
            connector
        };

        let tls = native_tls::TlsConnector::new().expect("TlsConnector::new()");
        let https_connector = HttpsConnector::from((http_connector, tls.into()));

        Client {
            hyper: HyperClient::builder()
                    .max_idle_per_host(20)
                    .build(https_connector),
            template,
            timeout: ArcSwap::new(Arc::new(Duration::from_secs(5))),
        }
    }
    /// Sets the request timeout
    pub fn set_timeout(&self, timeout: Duration) {
        self.timeout.swap(Arc::new(timeout));
    }
    /// Send an IngestBody to the LogDNA Ingest API
    ///
    /// Returns an IngestResponse, which is a future that must be run on the Tokio Runtime
    pub async fn send<T: AsRef<IngestBody>>(&self, body: T) -> IngestResponse<T> {
        let request = self.template.new_request(body.as_ref())?;
        let timeout = Timeout::new(
            self.hyper.request(request),
            self.timeout.load().as_ref().clone(),
        );

        let result = match timeout.await {
            Ok(result) => result,
            Err(_) => {
                return Err(HttpError::Timeout(body));
            }
        };

        let mut response = match result {
            Ok(response) => response,
            Err(e) => {
                return Err(HttpError::Send(body, e));
            }
        };

        let status = response.status().as_u16();
        if status < 200 || status >= 300 {
            let mut response_body = Vec::new();
            while let Some(chunk) = response.body_mut().next().await {
                if let Ok(chunk) = chunk {
                    response_body.extend_from_slice(&chunk)
                }
            };
            Ok(Response::Failed(body, response.status(), String::from_utf8(response_body)?))
        } else {
            Ok(Response::Sent)
        }
    }
}