tempest_source/
source.rs

1use config;
2use log::{debug, error, info, log, trace, warn};
3use serde_derive::Deserialize;
4use std::fmt;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7static TARGET_SOURCE_BUILDER: &'static str = "tempest::source::SourceBuilder";
8
9/// `SourceBuilder` trait for defining how to configure
10/// structs that implements the `Source` trait.
11pub trait SourceBuilder {
12    type Source;
13    fn build(&self) -> Self::Source;
14
15    /// Given a Topology.toml for a [source.config] `config::Value` override the options
16    /// for the source.
17    fn parse_config_value(&mut self, _cfg: config::Value) {
18        debug!(
19            target: TARGET_SOURCE_BUILDER,
20            "SourceBuilder.parse_config_value not implemented"
21        );
22    }
23}
24
25/// `Source` trait is used define actions
26pub trait Source {
27    /// return the name of this source
28    fn name(&self) -> &'static str;
29
30    /// Validation logic for determining if a source is properly configured
31    /// at runtime.
32    fn validate(&mut self) -> SourceResult<()> {
33        Err(SourceError::new(SourceErrorKind::ValidateError(
34            "Validate isn't configured for Source trait".to_string(),
35        )))
36    }
37
38    /// Source setup logic should go here
39    fn setup(&mut self) -> SourceResult<()> {
40        Ok(())
41    }
42
43    /// Source shutdown logic for closing connections, performing cleanup logic, etc.
44    fn shutdown(&mut self) -> SourceResult<()> {
45        Ok(())
46    }
47
48    /// Source connection logic for creating client connections
49    fn connect(&mut self) -> SourceResult<()> {
50        Ok(())
51    }
52
53    /// Source health check. Should be used to determine if clients
54    /// are still able to reach upstream brokers
55    fn healthy(&mut self) -> bool {
56        true
57    }
58
59    /// Poll for new message from the source
60    fn poll(&mut self) -> SourcePollResult {
61        Ok(None)
62    }
63
64    /// Monitor is a special method which is the callback for the monitor interval
65    /// It's intended to ack as a special hook for keeping track of your source structure
66    /// or performing other actions.
67    fn monitor(&mut self) -> SourceResult<()> {
68        Ok(())
69    }
70
71    /// Ack a single method with the upstream source
72    fn ack(&mut self, _msg_id: MsgId) -> SourceResult<(i32, i32)> {
73        Ok((1, 0))
74    }
75
76    /// Batch ack a vector of messages with an upstream source
77    fn batch_ack(&mut self, msgs: Vec<MsgId>) -> SourceResult<(i32, i32)> {
78        Ok((msgs.len() as i32, 0))
79    }
80
81    /// The configured maximum amount of time in milliseconds a source should
82    /// backoff. This value is read by the TopologyActor when scheduling
83    /// the next `source.poll` call
84    fn max_backoff(&self) -> SourceResult<&u64> {
85        Ok(&1000u64)
86    }
87
88    /// Poll interval controls how often a Topology should ask for new messages
89    fn poll_interval(&self) -> SourceResult<&SourceInterval> {
90        Ok(&SourceInterval::Millisecond(1))
91    }
92
93    /// Monitor interval controls how often the source `monitor` is called
94    fn monitor_interval(&self) -> SourceResult<&SourceInterval> {
95        Ok(&SourceInterval::Millisecond(0))
96    }
97
98    /// Ack policy configuration
99    fn ack_policy(&self) -> SourceResult<&SourceAckPolicy> {
100        Ok(&SourceAckPolicy::Individual)
101    }
102
103    /// Configures how often the source should check for new messages to ack
104    fn ack_interval(&self) -> SourceResult<&SourceInterval> {
105        Ok(&SourceInterval::Millisecond(1000))
106    }
107
108    /// Sources can implement Metrics. This method is called by TopologyActor
109    /// to internal source flush metrics to configured backend targets.
110    fn flush_metrics(&mut self) {}
111}
112
113/// Ack policy configuration options
114#[derive(Clone, Debug, PartialEq, Deserialize)]
115#[serde(tag = "type", content = "value")]
116pub enum SourceAckPolicy {
117    /// Accumulate messages and ack them in batches
118    Batch(u64),
119    /// Ack a single message at a time
120    Individual,
121    /// Disable message acking.
122    None,
123}
124
125impl Default for SourceAckPolicy {
126    fn default() -> Self {
127        SourceAckPolicy::Individual
128    }
129}
130
131/// Source interval enum time value
132#[derive(Clone, Debug, PartialEq)]
133pub enum SourceInterval {
134    Millisecond(u64),
135}
136
137impl Default for SourceInterval {
138    fn default() -> Self {
139        SourceInterval::Millisecond(1u64)
140    }
141}
142
143impl SourceInterval {
144    pub fn as_duration(&self) -> Duration {
145        match *self {
146            SourceInterval::Millisecond(v) => Duration::from_millis(v),
147        }
148    }
149}
150
151/// All messages must have a unique message id
152/// This value is used to ack messages
153pub type MsgId = Vec<u8>;
154/// Source type for representing messages
155pub type Msg = Vec<u8>;
156
157/// Data structure for representing a message as it moves through
158/// a topology
159#[derive(Default, Debug, Clone)]
160pub struct SourceMsg {
161    /// MsgId as Vec<u8> used for keeping track of a source msg
162    pub id: MsgId,
163    /// Msg as Vec<u8>
164    pub msg: Msg,
165    /// Source msg read timestamp. Message timeouts are computed from
166    /// this field value
167    pub ts: usize,
168    /// How many times has this message been delivered?
169    /// Used for tracking internal retries if topology is
170    /// configured with `TopologyFailurePolicy::Retry(count)`
171    pub delivered: usize,
172}
173
174/// Return type for polled sources
175pub type SourcePollResult = Result<Option<Vec<SourceMsg>>, SourceError>;
176
177/// Generic return type
178pub type SourceResult<T> = Result<T, SourceError>;
179
180/// Wrapper for handling source error states
181pub enum SourceErrorKind {
182    // General std::io::Error
183    Io(std::io::Error),
184    /// Error kind when client connection encounters an error
185    Client(String),
186    /// Error kind when source isn't correctly configured
187    ValidateError(String),
188    /// Error kind when we just need one
189    Other(String),
190}
191
192#[allow(dead_code)]
193pub struct SourceError {
194    kind: SourceErrorKind,
195}
196
197impl SourceError {
198    pub fn new(kind: SourceErrorKind) -> Self {
199        SourceError { kind: kind }
200    }
201
202    pub fn from_io_err(err: std::io::Error) -> Self {
203        SourceError::new(SourceErrorKind::Io(err))
204    }
205}
206
207impl fmt::Display for SourceError {
208    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
209        write!(f, "A Source Error Occurred")
210    }
211}
212
213impl fmt::Debug for SourceError {
214    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
215        write!(
216            f,
217            "Source error: {{ file: {}, line: {} }}",
218            file!(),
219            line!()
220        )
221    }
222}
223
224/// Helper function for returning the current system time as epoch milliseconds
225pub fn now_millis() -> usize {
226    match SystemTime::now().duration_since(UNIX_EPOCH) {
227        Ok(n) => n.as_millis() as usize,
228        Err(_) => 0,
229    }
230}