use base64;
use hyper;
use metric;
use rusoto_core;
use rusoto_core::DefaultCredentialsProvider;
use rusoto_core::default_tls_client;
use rusoto_kinesis::{KinesisClient, PutRecordsError, PutRecordsInput,
PutRecordsOutput, PutRecordsRequestEntry};
use rusoto_kinesis::Kinesis as RusotoKinesis;
use sink::Sink;
use std::sync::atomic::{AtomicUsize, Ordering};
use util::Valve;
pub static KINESIS_PUBLISH_SUCCESS_SUM: AtomicUsize = AtomicUsize::new(0);
pub static KINESIS_PUBLISH_DISCARD_SUM: AtomicUsize = AtomicUsize::new(0);
pub static KINESIS_PUBLISH_FAILURE_SUM: AtomicUsize = AtomicUsize::new(0);
pub static KINESIS_PUBLISH_FATAL_SUM: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug, Deserialize)]
pub struct KinesisConfig {
pub config_path: Option<String>,
pub region: rusoto_core::Region,
pub stream_name: Option<String>,
pub flush_interval: u64,
}
impl Default for KinesisConfig {
fn default() -> KinesisConfig {
KinesisConfig {
config_path: None,
region: rusoto_core::region::default_region(),
stream_name: None,
flush_interval: 1,
}
}
}
fn connect(
region: rusoto_core::Region,
) -> Box<KinesisClient<DefaultCredentialsProvider, hyper::client::Client>> {
let tls = default_tls_client().unwrap();
let provider = DefaultCredentialsProvider::new().unwrap();
Box::new(KinesisClient::new(tls, provider, region))
}
pub struct Kinesis {
region: rusoto_core::Region,
client: Box<KinesisClient<DefaultCredentialsProvider, hyper::client::Client>>,
flush_interval: u64,
max_records_per_batch: usize,
max_bytes_per_batch: usize,
stream_name: String,
buffer_size: usize,
put_records_input: PutRecordsInput,
}
impl Sink<KinesisConfig> for Kinesis {
fn init(config: KinesisConfig) -> Self {
if config.stream_name.is_none() {
panic!("No Kinesis stream provided!");
};
let flush_interval = config.flush_interval;
let max_records = 1000;
let max_bytes = 1 << 20; Kinesis {
client: connect(config.region.clone()),
region: config.region,
stream_name: config.stream_name.clone().unwrap(),
flush_interval: flush_interval,
put_records_input: PutRecordsInput {
stream_name: config.stream_name.unwrap(),
records: Vec::new(),
},
buffer_size: 0,
max_records_per_batch: max_records,
max_bytes_per_batch: max_bytes, }
}
fn valve_state(&self) -> Valve {
Valve::Open
}
fn deliver_raw(
&mut self,
order_by: u64,
_encoding: metric::Encoding,
bytes: Vec<u8>,
) {
let encoded_bytes = base64::encode(&bytes).into_bytes();
let encoded_bytes_len = encoded_bytes.len();
let record_too_big = encoded_bytes_len > self.max_bytes_per_batch;
if record_too_big {
KINESIS_PUBLISH_DISCARD_SUM.fetch_add(1, Ordering::Relaxed);
warn!("Discarding encoded record with size {:?} as it is too large to publish!", encoded_bytes_len);
return;
}
let buffer_too_big =
(self.buffer_size + encoded_bytes_len) > self.max_bytes_per_batch;
let buffer_too_long =
self.put_records_input.records.len() >= self.max_records_per_batch;
if buffer_too_big || buffer_too_long {
self.flush();
}
let partition_key = format!("{:X}", order_by);
let entry = PutRecordsRequestEntry {
data: encoded_bytes,
explicit_hash_key: None,
partition_key: partition_key,
};
self.put_records_input.records.push(entry);
self.buffer_size += encoded_bytes_len;
}
fn flush(&mut self) {
self.publish_buffer();
}
fn flush_interval(&self) -> Option<u64> {
Some(self.flush_interval)
}
fn shutdown(mut self) -> () {
self.flush();
}
}
impl Kinesis {
pub fn publish_buffer(&mut self) {
self.buffer_size = 0;
while !self.put_records_input.records.is_empty() {
match self.client.put_records(&self.put_records_input) {
Ok(put_records_output) => {
self.filter_successful(&put_records_output);
break;
}
Err(PutRecordsError::ProvisionedThroughputExceeded(_)) => {
info!(
"Provisioned throughput exceeded on {:?}. Retrying...",
self.stream_name
);
}
Err(err) => {
KINESIS_PUBLISH_FATAL_SUM.fetch_add(1, Ordering::Relaxed);
self.client = connect(self.region.clone());
error!(
"Reconnecting due to fatal exception during put_records: {:?}",
err
);
continue;
}
}
}
}
pub fn filter_successful(&mut self, put_records_output: &PutRecordsOutput) {
if put_records_output.failed_record_count.is_none()
|| put_records_output.failed_record_count == Some(0)
{
KINESIS_PUBLISH_SUCCESS_SUM
.fetch_add(self.put_records_input.records.len(), Ordering::Relaxed);
self.put_records_input.records.clear();
return;
}
for (idx, record_result) in put_records_output.records.iter().enumerate().rev()
{
if record_result.sequence_number.is_some() {
self.put_records_input.records.remove(idx);
KINESIS_PUBLISH_SUCCESS_SUM.fetch_add(1, Ordering::Relaxed);
} else {
trace!(
"Record failed to publish: {:?} - {:?}",
record_result.error_code.clone().unwrap(),
record_result.error_message.clone().unwrap()
);
KINESIS_PUBLISH_FAILURE_SUM.fetch_add(1, Ordering::Relaxed);
}
}
}
}