use config;
use log::{debug, error, info, log, trace, warn};
use serde_derive::Deserialize;
use std::fmt;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
static TARGET_SOURCE_BUILDER: &'static str = "tempest::source::SourceBuilder";
pub trait SourceBuilder {
type Source;
fn build(&self) -> Self::Source;
fn parse_config_value(&mut self, _cfg: config::Value) {
debug!(
target: TARGET_SOURCE_BUILDER,
"SourceBuilder.parse_config_value not implemented"
);
}
}
pub trait Source {
fn name(&self) -> &'static str;
fn validate(&mut self) -> SourceResult<()> {
Err(SourceError::new(SourceErrorKind::ValidateError(
"Validate isn't configured for Source trait".to_string(),
)))
}
fn setup(&mut self) -> SourceResult<()> {
Ok(())
}
fn shutdown(&mut self) -> SourceResult<()> {
Ok(())
}
fn connect(&mut self) -> SourceResult<()> {
Ok(())
}
fn healthy(&mut self) -> bool {
true
}
fn poll(&mut self) -> SourcePollResult {
Ok(None)
}
fn monitor(&mut self) -> SourceResult<()> {
Ok(())
}
fn ack(&mut self, _msg_id: MsgId) -> SourceResult<(i32, i32)> {
Ok((1, 0))
}
fn batch_ack(&mut self, msgs: Vec<MsgId>) -> SourceResult<(i32, i32)> {
Ok((msgs.len() as i32, 0))
}
fn max_backoff(&self) -> SourceResult<&u64> {
Ok(&1000u64)
}
fn poll_interval(&self) -> SourceResult<&SourceInterval> {
Ok(&SourceInterval::Millisecond(1))
}
fn monitor_interval(&self) -> SourceResult<&SourceInterval> {
Ok(&SourceInterval::Millisecond(0))
}
fn ack_policy(&self) -> SourceResult<&SourceAckPolicy> {
Ok(&SourceAckPolicy::Individual)
}
fn ack_interval(&self) -> SourceResult<&SourceInterval> {
Ok(&SourceInterval::Millisecond(1000))
}
fn flush_metrics(&mut self) {}
}
#[derive(Clone, Debug, PartialEq, Deserialize)]
#[serde(tag = "type", content = "value")]
pub enum SourceAckPolicy {
Batch(u64),
Individual,
None,
}
impl Default for SourceAckPolicy {
fn default() -> Self {
SourceAckPolicy::Individual
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum SourceInterval {
Millisecond(u64),
}
impl Default for SourceInterval {
fn default() -> Self {
SourceInterval::Millisecond(1u64)
}
}
impl SourceInterval {
pub fn as_duration(&self) -> Duration {
match *self {
SourceInterval::Millisecond(v) => Duration::from_millis(v),
}
}
}
pub type MsgId = Vec<u8>;
pub type Msg = Vec<u8>;
#[derive(Default, Debug, Clone)]
pub struct SourceMsg {
pub id: MsgId,
pub msg: Msg,
pub ts: usize,
pub delivered: usize,
}
pub type SourcePollResult = Result<Option<Vec<SourceMsg>>, SourceError>;
pub type SourceResult<T> = Result<T, SourceError>;
pub enum SourceErrorKind {
Io(std::io::Error),
Client(String),
ValidateError(String),
Other(String),
}
#[allow(dead_code)]
pub struct SourceError {
kind: SourceErrorKind,
}
impl SourceError {
pub fn new(kind: SourceErrorKind) -> Self {
SourceError { kind: kind }
}
pub fn from_io_err(err: std::io::Error) -> Self {
SourceError::new(SourceErrorKind::Io(err))
}
}
impl fmt::Display for SourceError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "A Source Error Occurred")
}
}
impl fmt::Debug for SourceError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Source error: {{ file: {}, line: {} }}",
file!(),
line!()
)
}
}
pub fn now_millis() -> usize {
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(n) => n.as_millis() as usize,
Err(_) => 0,
}
}