1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
use std::sync::Arc;

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use hyper::Client as HyperClient;
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnector;
use rustls::ClientConfig as TlsConfig;
use tokio::runtime::Runtime;

use crate::body::IngestBody;
use crate::error::HttpError;
use crate::request::RequestTemplate;
use crate::response::{IngestResponse, Response};

/// Client for sending IngestRequests to LogDNA
pub struct Client {
    hyper: Arc<HyperClient<HttpsConnector<HttpConnector>>>,
    template: RequestTemplate,
}

impl Client {
    /// Create a new client taking a RequestTemplate and Tokio Runtime
    ///
    /// #  Example
    ///
    /// ```rust
    /// # use logdna_client::client::Client;
    /// # use tokio::runtime::Runtime;
    /// # use logdna_client::params::{Params, Tags};
    /// # use logdna_client::request::RequestTemplate;
    ///
    /// let mut rt = Runtime::new().expect("Runtime::new()");
    /// let params = Params::builder()
    ///     .hostname("rust-client-test")
    ///     .tags(Tags::parse("this,is,a,test"))
    ///     .build()
    ///     .expect("Params::builder()");
    /// let request_template = RequestTemplate::builder()
    ///     .params(params)
    ///     .api_key("<your ingestion key>")
    ///     .build()
    ///     .expect("RequestTemplate::builder()");
    /// let client = Client::new(request_template, &mut rt);
    /// ```
    pub fn new(template: RequestTemplate, runtime: &mut Runtime) -> Self {
        let exec = runtime.executor();
        let reactor = runtime.reactor().clone();

        let http_connector = {
            let mut connector = HttpConnector::new_with_executor(
                exec, Some(reactor),
            );
            connector.enforce_http(false); // this is needed or https:// urls will error
            connector
        };

        let tls_config = {
            let mut cfg = TlsConfig::new();
            cfg.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
            cfg.ct_logs = Some(&ct_logs::LOGS);
            cfg
        };

        let https_connector = HttpsConnector::from((http_connector, tls_config));

        Client {
            hyper: Arc::new(HyperClient::builder().build(https_connector)),
            template,
        }
    }

    /// Send an IngestBody to the LogDNA Ingest API
    ///
    /// Returns an IngestResponse, which is a future that must be run on the Tokio Runtime
    pub fn send(&self, body: IngestBody) -> IngestResponse {
        let hyper = self.hyper.clone();
        let body = Arc::new(body);
        let tmp_body = body.clone();
        Box::new(
            self.template.new_request(body.clone())
                .map_err(HttpError::from)
                .and_then(move |req|
                    hyper.request(req)
                        .map_err(move |e| HttpError::Send(body, e))
                )
                .and_then(|res| {
                    let status = res.status();
                    res.into_body()
                        .map_err(Into::into)
                        .fold(Vec::new(), |mut vec, chunk| {
                            vec.extend_from_slice(&*chunk);
                            future::ok::<_, HttpError>(vec)
                        })
                        .and_then(|body| String::from_utf8(body).map_err(Into::into))
                        .map(move |reason| (status, reason))
                })
                .map(move |(status, reason)| {
                    if status.as_u16() < 200 || status.as_u16() >= 300 {
                        Response::Failed(tmp_body, status, reason)
                    } else {
                        Response::Sent
                    }
                })
        )
    }
}