use hopper;
use metric::{Encoding, Event, LogLine, Telemetry};
use std::marker::PhantomData;
use thread;
use time;
use util::Valve;
mod console;
mod null;
pub mod wavefront;
mod native;
pub mod influxdb;
pub mod prometheus;
pub mod elasticsearch;
pub mod kafka;
pub use self::console::{Console, ConsoleConfig};
pub use self::elasticsearch::{Elasticsearch, ElasticsearchConfig};
pub use self::influxdb::{InfluxDB, InfluxDBConfig};
pub use self::kafka::{Kafka, KafkaConfig};
pub use self::native::{Native, NativeConfig};
pub use self::null::{Null, NullConfig};
pub use self::prometheus::{Prometheus, PrometheusConfig};
pub use self::wavefront::{Wavefront, WavefrontConfig};
pub struct RunnableSink<S, SConfig>
where
S: Send + Sink<SConfig>,
SConfig: 'static + Send + Clone,
{
recv: hopper::Receiver<Event>,
sources: Vec<String>,
state: S,
config: PhantomData<SConfig>,
}
impl<S, SConfig> RunnableSink<S, SConfig>
where
S: 'static + Send + Sink<SConfig>,
SConfig: 'static + Clone + Send,
{
pub fn new(
recv: hopper::Receiver<Event>,
sources: Vec<String>,
config: SConfig,
) -> RunnableSink<S, SConfig> {
RunnableSink {
recv: recv,
sources: sources,
state: S::init(config),
config: PhantomData,
}
}
pub fn run(self) -> thread::ThreadHandle {
thread::spawn(move |_poll| {
self.consume();
})
}
fn consume(mut self) -> () {
let mut attempts = 0;
let mut recv = self.recv.into_iter();
let mut last_flush_idx = 0;
let mut total_shutdowns = 0;
loop {
let nxt = recv.next();
if nxt.is_none() {
time::delay(attempts);
attempts += 1;
continue;
}
attempts = 0;
let event = nxt.unwrap();
loop {
match self.state.valve_state() {
Valve::Open => match event {
Event::TimerFlush(idx) => {
if idx > last_flush_idx {
if let Some(flush_interval) =
self.state.flush_interval()
{
if idx % flush_interval == 0 {
self.state.flush();
}
}
last_flush_idx = idx;
}
break;
}
Event::Telemetry(metric) => {
self.state.deliver(metric);
break;
}
Event::Log(line) => {
self.state.deliver_line(line);
break;
}
Event::Raw {
order_by,
encoding,
bytes,
} => {
self.state.deliver_raw(order_by, encoding, bytes);
break;
}
Event::Shutdown => {
total_shutdowns += 1;
if total_shutdowns >= self.sources.len() {
trace!("Received shutdown from every configured source: {:?}", self.sources);
self.state.shutdown();
return;
}
}
},
Valve::Closed => {
self.state.flush();
continue;
}
}
}
}
}
}
pub trait Sink<SConfig>
where
Self: 'static + Send + Sized,
SConfig: 'static + Send + Clone,
{
fn new(
recv: hopper::Receiver<Event>,
sources: Vec<String>,
config: SConfig,
) -> RunnableSink<Self, SConfig> {
RunnableSink::<Self, SConfig>::new(recv, sources, config)
}
fn init(config: SConfig) -> Self;
fn flush_interval(&self) -> Option<u64>;
fn flush(&mut self) -> ();
fn valve_state(&self) -> Valve {
Valve::Open
}
fn deliver(&mut self, _telem: Telemetry) -> () {
}
fn deliver_line(&mut self, _line: LogLine) -> () {
}
fn deliver_raw(
&mut self,
_order_by: u64,
_encoding: Encoding,
_bytes: Vec<u8>,
) -> () {
}
fn shutdown(self) -> ();
}