1use config;
2use log::{debug, error, info, log, trace, warn};
3use serde_derive::Deserialize;
4use std::fmt;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7static TARGET_SOURCE_BUILDER: &'static str = "tempest::source::SourceBuilder";
8
9pub trait SourceBuilder {
12 type Source;
13 fn build(&self) -> Self::Source;
14
15 fn parse_config_value(&mut self, _cfg: config::Value) {
18 debug!(
19 target: TARGET_SOURCE_BUILDER,
20 "SourceBuilder.parse_config_value not implemented"
21 );
22 }
23}
24
25pub trait Source {
27 fn name(&self) -> &'static str;
29
30 fn validate(&mut self) -> SourceResult<()> {
33 Err(SourceError::new(SourceErrorKind::ValidateError(
34 "Validate isn't configured for Source trait".to_string(),
35 )))
36 }
37
38 fn setup(&mut self) -> SourceResult<()> {
40 Ok(())
41 }
42
43 fn shutdown(&mut self) -> SourceResult<()> {
45 Ok(())
46 }
47
48 fn connect(&mut self) -> SourceResult<()> {
50 Ok(())
51 }
52
53 fn healthy(&mut self) -> bool {
56 true
57 }
58
59 fn poll(&mut self) -> SourcePollResult {
61 Ok(None)
62 }
63
64 fn monitor(&mut self) -> SourceResult<()> {
68 Ok(())
69 }
70
71 fn ack(&mut self, _msg_id: MsgId) -> SourceResult<(i32, i32)> {
73 Ok((1, 0))
74 }
75
76 fn batch_ack(&mut self, msgs: Vec<MsgId>) -> SourceResult<(i32, i32)> {
78 Ok((msgs.len() as i32, 0))
79 }
80
81 fn max_backoff(&self) -> SourceResult<&u64> {
85 Ok(&1000u64)
86 }
87
88 fn poll_interval(&self) -> SourceResult<&SourceInterval> {
90 Ok(&SourceInterval::Millisecond(1))
91 }
92
93 fn monitor_interval(&self) -> SourceResult<&SourceInterval> {
95 Ok(&SourceInterval::Millisecond(0))
96 }
97
98 fn ack_policy(&self) -> SourceResult<&SourceAckPolicy> {
100 Ok(&SourceAckPolicy::Individual)
101 }
102
103 fn ack_interval(&self) -> SourceResult<&SourceInterval> {
105 Ok(&SourceInterval::Millisecond(1000))
106 }
107
108 fn flush_metrics(&mut self) {}
111}
112
113#[derive(Clone, Debug, PartialEq, Deserialize)]
115#[serde(tag = "type", content = "value")]
116pub enum SourceAckPolicy {
117 Batch(u64),
119 Individual,
121 None,
123}
124
125impl Default for SourceAckPolicy {
126 fn default() -> Self {
127 SourceAckPolicy::Individual
128 }
129}
130
131#[derive(Clone, Debug, PartialEq)]
133pub enum SourceInterval {
134 Millisecond(u64),
135}
136
137impl Default for SourceInterval {
138 fn default() -> Self {
139 SourceInterval::Millisecond(1u64)
140 }
141}
142
143impl SourceInterval {
144 pub fn as_duration(&self) -> Duration {
145 match *self {
146 SourceInterval::Millisecond(v) => Duration::from_millis(v),
147 }
148 }
149}
150
151pub type MsgId = Vec<u8>;
154pub type Msg = Vec<u8>;
156
157#[derive(Default, Debug, Clone)]
160pub struct SourceMsg {
161 pub id: MsgId,
163 pub msg: Msg,
165 pub ts: usize,
168 pub delivered: usize,
172}
173
174pub type SourcePollResult = Result<Option<Vec<SourceMsg>>, SourceError>;
176
177pub type SourceResult<T> = Result<T, SourceError>;
179
180pub enum SourceErrorKind {
182 Io(std::io::Error),
184 Client(String),
186 ValidateError(String),
188 Other(String),
190}
191
192#[allow(dead_code)]
193pub struct SourceError {
194 kind: SourceErrorKind,
195}
196
197impl SourceError {
198 pub fn new(kind: SourceErrorKind) -> Self {
199 SourceError { kind: kind }
200 }
201
202 pub fn from_io_err(err: std::io::Error) -> Self {
203 SourceError::new(SourceErrorKind::Io(err))
204 }
205}
206
207impl fmt::Display for SourceError {
208 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
209 write!(f, "A Source Error Occurred")
210 }
211}
212
213impl fmt::Debug for SourceError {
214 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215 write!(
216 f,
217 "Source error: {{ file: {}, line: {} }}",
218 file!(),
219 line!()
220 )
221 }
222}
223
224pub fn now_millis() -> usize {
226 match SystemTime::now().duration_since(UNIX_EPOCH) {
227 Ok(n) => n.as_millis() as usize,
228 Err(_) => 0,
229 }
230}